package com.insuranceman.oceanus.handler.canal.client;

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.protocol.Message;
import com.insuranceman.oceanus.configuration.ConfigService;
import com.insuranceman.oceanus.constant.CommonConstant;
import com.insuranceman.oceanus.handler.canal.handlers.IMessageHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;

@Component
/* loaded from: input_file:BOOT-INF/classes/com/insuranceman/oceanus/handler/canal/client/CanalService.class */
public class CanalService {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) CanalService.class);

    @Autowired
    private IMessageHandler messageHandler;

    @Autowired
    private ConfigService configService;

    @Async("asyncPool")
    public void doHandler(CanalConnector canalConnector) {
        try {
            try {
                canalConnector.connect();
                canalConnector.subscribe();
                log.info("数据同步工程启动成功，开始获取数据");
                int intValue = Integer.valueOf(this.configService.getString(CommonConstant.Canal.CANAL_ACK_COUNT)).intValue();
                int intValue2 = Integer.valueOf(this.configService.getString(CommonConstant.Canal.CANAL_WAIT_TIME)).intValue();
                while (true) {
                    Message withoutAck = canalConnector.getWithoutAck(intValue);
                    long id = withoutAck.getId();
                    int size = withoutAck.getEntries().size();
                    if (id == -1 || size == 0) {
                        Thread.sleep(intValue2);
                        canalConnector.ack(id);
                    } else {
                        try {
                            this.messageHandler.handleMessage(withoutAck);
                            canalConnector.ack(id);
                        } catch (Exception e) {
                            log.error("数据保存失败", (Throwable) e);
                            canalConnector.rollback();
                        }
                    }
                }
            } catch (Exception e2) {
                log.error("canal读取biglog日志报错", (Throwable) e2);
                e2.printStackTrace();
                canalConnector.disconnect();
            }
        } catch (Throwable th) {
            canalConnector.disconnect();
            throw th;
        }
    }
}
