package com.alibaba.otter.canal.client.rocketmq;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.parser.Feature;
import com.alibaba.google.common.collect.Lists;
import com.alibaba.otter.canal.client.CanalMQConnector;
import com.alibaba.otter.canal.client.CanalMessageDeserializer;
import com.alibaba.otter.canal.protocol.FlatMessage;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.protocol.exception.CanalClientException;
import com.aliyun.openservices.apache.api.impl.authority.SessionCredentials;
import com.aliyun.openservices.apache.api.impl.rocketmq.ClientRPCHook;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang.StringUtils;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:BOOT-INF/lib/canal.client-1.1.3.jar:com/alibaba/otter/canal/client/rocketmq/RocketMQCanalConnector.class */
public class RocketMQCanalConnector implements CanalMQConnector {
    private static final Logger logger = LoggerFactory.getLogger((Class<?>) RocketMQCanalConnector.class);
    private String nameServer;
    private String topic;
    private String groupName;
    private volatile boolean connected;
    private DefaultMQPushConsumer rocketMQConsumer;
    private BlockingQueue<ConsumerBatchMessage> messageBlockingQueue;
    private int batchSize;
    private long batchProcessTimeout;
    private boolean flatMessage;
    private volatile ConsumerBatchMessage lastGetBatchMessage;
    private String accessKey;
    private String secretKey;

    public RocketMQCanalConnector(String str, String str2, String str3, Integer num, boolean z) {
        this.connected = false;
        this.batchSize = -1;
        this.batchProcessTimeout = 60000L;
        this.lastGetBatchMessage = null;
        this.nameServer = str;
        this.topic = str2;
        this.groupName = str3;
        this.flatMessage = z;
        this.messageBlockingQueue = new LinkedBlockingQueue(1024);
        this.batchSize = num.intValue();
    }

    public RocketMQCanalConnector(String str, String str2, String str3, String str4, String str5, Integer num, boolean z) {
        this(str, str2, str3, num, z);
        this.accessKey = str4;
        this.secretKey = str5;
    }

    @Override // com.alibaba.otter.canal.client.CanalConnector
    public void connect() throws CanalClientException {
        ClientRPCHook clientRPCHook = null;
        if (null != this.accessKey && this.accessKey.length() > 0 && null != this.secretKey && this.secretKey.length() > 0) {
            SessionCredentials sessionCredentials = new SessionCredentials();
            sessionCredentials.setAccessKey(this.accessKey);
            sessionCredentials.setSecretKey(this.secretKey);
            clientRPCHook = new ClientRPCHook(sessionCredentials);
        }
        this.rocketMQConsumer = new DefaultMQPushConsumer(this.groupName, clientRPCHook, new AllocateMessageQueueAveragely());
        this.rocketMQConsumer.setVipChannelEnabled(false);
        if (!StringUtils.isBlank(this.nameServer)) {
            this.rocketMQConsumer.setNamesrvAddr(this.nameServer);
        }
        if (this.batchSize != -1) {
            this.rocketMQConsumer.setConsumeMessageBatchMaxSize(this.batchSize);
        }
    }

    @Override // com.alibaba.otter.canal.client.CanalConnector
    public void disconnect() throws CanalClientException {
        this.rocketMQConsumer.shutdown();
        this.connected = false;
    }

    @Override // com.alibaba.otter.canal.client.CanalConnector
    public boolean checkValid() throws CanalClientException {
        return this.connected;
    }

    @Override // com.alibaba.otter.canal.client.CanalConnector
    public synchronized void subscribe(String str) throws CanalClientException {
        if (this.connected) {
            return;
        }
        try {
            if (this.rocketMQConsumer == null) {
                connect();
            }
            this.rocketMQConsumer.subscribe(this.topic, "*");
            this.rocketMQConsumer.registerMessageListener(new MessageListenerOrderly() { // from class: com.alibaba.otter.canal.client.rocketmq.RocketMQCanalConnector.1
                @Override // org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly
                public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {
                    consumeOrderlyContext.setAutoCommit(true);
                    return RocketMQCanalConnector.this.process(list) ? ConsumeOrderlyStatus.SUCCESS : ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
                }
            });
            this.rocketMQConsumer.start();
        } catch (MQClientException e) {
            this.connected = false;
            logger.error("Start RocketMQ consumer error", (Throwable) e);
        }
        this.connected = true;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean process(List<MessageExt> list) {
        logger.info("Get Message:{}", list);
        ArrayList newArrayList = Lists.newArrayList();
        Iterator<MessageExt> it = list.iterator();
        while (it.hasNext()) {
            byte[] body = it.next().getBody();
            if (body != null) {
                try {
                    if (this.flatMessage) {
                        newArrayList.add((FlatMessage) JSON.parseObject(body, FlatMessage.class, new Feature[0]));
                    } else {
                        newArrayList.add(CanalMessageDeserializer.deserializer(body));
                    }
                } catch (Exception e) {
                    logger.error("Add message error", (Throwable) e);
                    throw new CanalClientException(e);
                }
            } else {
                logger.warn("Received message data is null");
            }
        }
        ConsumerBatchMessage consumerBatchMessage = !this.flatMessage ? new ConsumerBatchMessage(newArrayList) : new ConsumerBatchMessage(newArrayList);
        try {
            this.messageBlockingQueue.put(consumerBatchMessage);
            try {
                return consumerBatchMessage.waitFinish(this.batchProcessTimeout) && consumerBatchMessage.isSuccess();
            } catch (InterruptedException e2) {
                logger.error("Interrupted when waiting messages to be finished.", (Throwable) e2);
                throw new RuntimeException(e2);
            }
        } catch (InterruptedException e3) {
            logger.error("Put message to queue error", (Throwable) e3);
            throw new RuntimeException(e3);
        }
    }

    @Override // com.alibaba.otter.canal.client.CanalConnector
    public void subscribe() throws CanalClientException {
        subscribe(null);
    }

    @Override // com.alibaba.otter.canal.client.CanalConnector
    public void unsubscribe() throws CanalClientException {
        this.rocketMQConsumer.unsubscribe(this.topic);
    }

    @Override // com.alibaba.otter.canal.client.CanalMQConnector
    public List<Message> getList(Long l, TimeUnit timeUnit) throws CanalClientException {
        List<Message> listWithoutAck = getListWithoutAck(l, timeUnit);
        if (listWithoutAck != null && !listWithoutAck.isEmpty()) {
            ack();
        }
        return listWithoutAck;
    }

    @Override // com.alibaba.otter.canal.client.CanalMQConnector
    public List<Message> getListWithoutAck(Long l, TimeUnit timeUnit) throws CanalClientException {
        try {
            if (this.lastGetBatchMessage != null) {
                throw new CanalClientException("mq get/ack not support concurrent & async ack");
            }
            ConsumerBatchMessage poll = this.messageBlockingQueue.poll(l.longValue(), timeUnit);
            if (poll == null) {
                return Lists.newArrayList();
            }
            this.lastGetBatchMessage = poll;
            return poll.getData();
        } catch (InterruptedException e) {
            logger.warn("Get message timeout", (Throwable) e);
            throw new CanalClientException("Failed to fetch the data after: " + l);
        }
    }

    @Override // com.alibaba.otter.canal.client.CanalMQConnector
    public List<FlatMessage> getFlatList(Long l, TimeUnit timeUnit) throws CanalClientException {
        List<FlatMessage> flatListWithoutAck = getFlatListWithoutAck(l, timeUnit);
        if (flatListWithoutAck != null && !flatListWithoutAck.isEmpty()) {
            ack();
        }
        return flatListWithoutAck;
    }

    @Override // com.alibaba.otter.canal.client.CanalMQConnector
    public List<FlatMessage> getFlatListWithoutAck(Long l, TimeUnit timeUnit) throws CanalClientException {
        try {
            if (this.lastGetBatchMessage != null) {
                throw new CanalClientException("mq get/ack not support concurrent & async ack");
            }
            ConsumerBatchMessage poll = this.messageBlockingQueue.poll(l.longValue(), timeUnit);
            if (poll == null) {
                return Lists.newArrayList();
            }
            this.lastGetBatchMessage = poll;
            return poll.getData();
        } catch (InterruptedException e) {
            logger.warn("Get message timeout", (Throwable) e);
            throw new CanalClientException("Failed to fetch the data after: " + l);
        }
    }

    @Override // com.alibaba.otter.canal.client.CanalMQConnector
    public void ack() throws CanalClientException {
        try {
            if (this.lastGetBatchMessage != null) {
                this.lastGetBatchMessage.ack();
            }
        } catch (Throwable th) {
            if (this.lastGetBatchMessage != null) {
                this.lastGetBatchMessage.fail();
            }
        } finally {
            this.lastGetBatchMessage = null;
        }
    }

    @Override // com.alibaba.otter.canal.client.CanalMQConnector, com.alibaba.otter.canal.client.CanalConnector
    public void rollback() throws CanalClientException {
        try {
            if (this.lastGetBatchMessage != null) {
                this.lastGetBatchMessage.fail();
            }
        } finally {
            this.lastGetBatchMessage = null;
        }
    }

    @Override // com.alibaba.otter.canal.client.CanalConnector
    public Message get(int i) throws CanalClientException {
        throw new CanalClientException("mq not support this method");
    }

    @Override // com.alibaba.otter.canal.client.CanalConnector
    public Message get(int i, Long l, TimeUnit timeUnit) throws CanalClientException {
        throw new CanalClientException("mq not support this method");
    }

    @Override // com.alibaba.otter.canal.client.CanalConnector
    public Message getWithoutAck(int i) throws CanalClientException {
        throw new CanalClientException("mq not support this method");
    }

    @Override // com.alibaba.otter.canal.client.CanalConnector
    public Message getWithoutAck(int i, Long l, TimeUnit timeUnit) throws CanalClientException {
        throw new CanalClientException("mq not support this method");
    }

    @Override // com.alibaba.otter.canal.client.CanalConnector
    public void ack(long j) throws CanalClientException {
        throw new CanalClientException("mq not support this method");
    }

    @Override // com.alibaba.otter.canal.client.CanalConnector
    public void rollback(long j) throws CanalClientException {
        throw new CanalClientException("mq not support this method");
    }
}
