package org.apache.rocketmq.client.impl.consumer;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
import org.apache.rocketmq.client.impl.FindBrokerResult;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.body.LockBatchRequestBody;
import org.apache.rocketmq.common.protocol.body.UnlockBatchRequestBody;
import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.logging.InternalLogger;

/* loaded from: input_file:BOOT-INF/lib/rocketmq-client-4.3.0.jar:org/apache/rocketmq/client/impl/consumer/RebalanceImpl.class */
public abstract class RebalanceImpl {
    protected static final InternalLogger log = ClientLogger.getLog();
    protected final ConcurrentMap<MessageQueue, ProcessQueue> processQueueTable = new ConcurrentHashMap(64);
    protected final ConcurrentMap<String, Set<MessageQueue>> topicSubscribeInfoTable = new ConcurrentHashMap();
    protected final ConcurrentMap<String, SubscriptionData> subscriptionInner = new ConcurrentHashMap();
    protected String consumerGroup;
    protected MessageModel messageModel;
    protected AllocateMessageQueueStrategy allocateMessageQueueStrategy;
    protected MQClientInstance mQClientFactory;

    public RebalanceImpl(String str, MessageModel messageModel, AllocateMessageQueueStrategy allocateMessageQueueStrategy, MQClientInstance mQClientInstance) {
        this.consumerGroup = str;
        this.messageModel = messageModel;
        this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
        this.mQClientFactory = mQClientInstance;
    }

    public void unlock(MessageQueue messageQueue, boolean z) {
        FindBrokerResult findBrokerAddressInSubscribe = this.mQClientFactory.findBrokerAddressInSubscribe(messageQueue.getBrokerName(), 0L, true);
        if (findBrokerAddressInSubscribe != null) {
            UnlockBatchRequestBody unlockBatchRequestBody = new UnlockBatchRequestBody();
            unlockBatchRequestBody.setConsumerGroup(this.consumerGroup);
            unlockBatchRequestBody.setClientId(this.mQClientFactory.getClientId());
            unlockBatchRequestBody.getMqSet().add(messageQueue);
            try {
                this.mQClientFactory.getMQClientAPIImpl().unlockBatchMQ(findBrokerAddressInSubscribe.getBrokerAddr(), unlockBatchRequestBody, 1000L, z);
                log.warn("unlock messageQueue. group:{}, clientId:{}, mq:{}", this.consumerGroup, this.mQClientFactory.getClientId(), messageQueue);
            } catch (Exception e) {
                log.error("unlockBatchMQ exception, " + messageQueue, (Throwable) e);
            }
        }
    }

    public void unlockAll(boolean z) {
        FindBrokerResult findBrokerAddressInSubscribe;
        for (Map.Entry<String, Set<MessageQueue>> entry : buildProcessQueueTableByBrokerName().entrySet()) {
            String key = entry.getKey();
            Set<MessageQueue> value = entry.getValue();
            if (!value.isEmpty() && (findBrokerAddressInSubscribe = this.mQClientFactory.findBrokerAddressInSubscribe(key, 0L, true)) != null) {
                UnlockBatchRequestBody unlockBatchRequestBody = new UnlockBatchRequestBody();
                unlockBatchRequestBody.setConsumerGroup(this.consumerGroup);
                unlockBatchRequestBody.setClientId(this.mQClientFactory.getClientId());
                unlockBatchRequestBody.setMqSet(value);
                try {
                    this.mQClientFactory.getMQClientAPIImpl().unlockBatchMQ(findBrokerAddressInSubscribe.getBrokerAddr(), unlockBatchRequestBody, 1000L, z);
                    for (MessageQueue messageQueue : value) {
                        ProcessQueue processQueue = this.processQueueTable.get(messageQueue);
                        if (processQueue != null) {
                            processQueue.setLocked(false);
                            log.info("the message queue unlock OK, Group: {} {}", this.consumerGroup, messageQueue);
                        }
                    }
                } catch (Exception e) {
                    log.error("unlockBatchMQ exception, " + value, (Throwable) e);
                }
            }
        }
    }

    private HashMap<String, Set<MessageQueue>> buildProcessQueueTableByBrokerName() {
        HashMap<String, Set<MessageQueue>> hashMap = new HashMap<>();
        for (MessageQueue messageQueue : this.processQueueTable.keySet()) {
            Set<MessageQueue> set = hashMap.get(messageQueue.getBrokerName());
            if (null == set) {
                set = new HashSet();
                hashMap.put(messageQueue.getBrokerName(), set);
            }
            set.add(messageQueue);
        }
        return hashMap;
    }

    public boolean lock(MessageQueue messageQueue) {
        FindBrokerResult findBrokerAddressInSubscribe = this.mQClientFactory.findBrokerAddressInSubscribe(messageQueue.getBrokerName(), 0L, true);
        if (findBrokerAddressInSubscribe == null) {
            return false;
        }
        LockBatchRequestBody lockBatchRequestBody = new LockBatchRequestBody();
        lockBatchRequestBody.setConsumerGroup(this.consumerGroup);
        lockBatchRequestBody.setClientId(this.mQClientFactory.getClientId());
        lockBatchRequestBody.getMqSet().add(messageQueue);
        try {
            Set<MessageQueue> lockBatchMQ = this.mQClientFactory.getMQClientAPIImpl().lockBatchMQ(findBrokerAddressInSubscribe.getBrokerAddr(), lockBatchRequestBody, 1000L);
            Iterator<MessageQueue> it = lockBatchMQ.iterator();
            while (it.hasNext()) {
                ProcessQueue processQueue = this.processQueueTable.get(it.next());
                if (processQueue != null) {
                    processQueue.setLocked(true);
                    processQueue.setLastLockTimestamp(System.currentTimeMillis());
                }
            }
            boolean contains = lockBatchMQ.contains(messageQueue);
            InternalLogger internalLogger = log;
            Object[] objArr = new Object[3];
            objArr[0] = contains ? "OK" : "Failed";
            objArr[1] = this.consumerGroup;
            objArr[2] = messageQueue;
            internalLogger.info("the message queue lock {}, {} {}", objArr);
            return contains;
        } catch (Exception e) {
            log.error("lockBatchMQ exception, " + messageQueue, (Throwable) e);
            return false;
        }
    }

    public void lockAll() {
        FindBrokerResult findBrokerAddressInSubscribe;
        ProcessQueue processQueue;
        for (Map.Entry<String, Set<MessageQueue>> entry : buildProcessQueueTableByBrokerName().entrySet()) {
            String key = entry.getKey();
            Set<MessageQueue> value = entry.getValue();
            if (!value.isEmpty() && (findBrokerAddressInSubscribe = this.mQClientFactory.findBrokerAddressInSubscribe(key, 0L, true)) != null) {
                LockBatchRequestBody lockBatchRequestBody = new LockBatchRequestBody();
                lockBatchRequestBody.setConsumerGroup(this.consumerGroup);
                lockBatchRequestBody.setClientId(this.mQClientFactory.getClientId());
                lockBatchRequestBody.setMqSet(value);
                try {
                    Set<MessageQueue> lockBatchMQ = this.mQClientFactory.getMQClientAPIImpl().lockBatchMQ(findBrokerAddressInSubscribe.getBrokerAddr(), lockBatchRequestBody, 1000L);
                    for (MessageQueue messageQueue : lockBatchMQ) {
                        ProcessQueue processQueue2 = this.processQueueTable.get(messageQueue);
                        if (processQueue2 != null) {
                            if (!processQueue2.isLocked()) {
                                log.info("the message queue locked OK, Group: {} {}", this.consumerGroup, messageQueue);
                            }
                            processQueue2.setLocked(true);
                            processQueue2.setLastLockTimestamp(System.currentTimeMillis());
                        }
                    }
                    for (MessageQueue messageQueue2 : value) {
                        if (!lockBatchMQ.contains(messageQueue2) && (processQueue = this.processQueueTable.get(messageQueue2)) != null) {
                            processQueue.setLocked(false);
                            log.warn("the message queue locked Failed, Group: {} {}", this.consumerGroup, messageQueue2);
                        }
                    }
                } catch (Exception e) {
                    log.error("lockBatchMQ exception, " + value, (Throwable) e);
                }
            }
        }
    }

    public void doRebalance(boolean z) {
        ConcurrentMap<String, SubscriptionData> subscriptionInner = getSubscriptionInner();
        if (subscriptionInner != null) {
            Iterator<Map.Entry<String, SubscriptionData>> it = subscriptionInner.entrySet().iterator();
            while (it.hasNext()) {
                String key = it.next().getKey();
                try {
                    rebalanceByTopic(key, z);
                } catch (Throwable th) {
                    if (!key.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                        log.warn("rebalanceByTopic Exception", th);
                    }
                }
            }
        }
        truncateMessageQueueNotMyTopic();
    }

    public ConcurrentMap<String, SubscriptionData> getSubscriptionInner() {
        return this.subscriptionInner;
    }

    private void rebalanceByTopic(String str, boolean z) {
        switch (this.messageModel) {
            case BROADCASTING:
                Set<MessageQueue> set = this.topicSubscribeInfoTable.get(str);
                if (set == null) {
                    log.warn("doRebalance, {}, but the topic[{}] not exist.", this.consumerGroup, str);
                    return;
                } else {
                    if (updateProcessQueueTableInRebalance(str, set, z)) {
                        messageQueueChanged(str, set, set);
                        log.info("messageQueueChanged {} {} {} {}", this.consumerGroup, str, set, set);
                        return;
                    }
                    return;
                }
            case CLUSTERING:
                Set<MessageQueue> set2 = this.topicSubscribeInfoTable.get(str);
                List<String> findConsumerIdList = this.mQClientFactory.findConsumerIdList(str, this.consumerGroup);
                if (null == set2 && !str.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                    log.warn("doRebalance, {}, but the topic[{}] not exist.", this.consumerGroup, str);
                }
                if (null == findConsumerIdList) {
                    log.warn("doRebalance, {} {}, get consumer id list failed", this.consumerGroup, str);
                }
                if (set2 == null || findConsumerIdList == null) {
                    return;
                }
                ArrayList arrayList = new ArrayList();
                arrayList.addAll(set2);
                Collections.sort(arrayList);
                Collections.sort(findConsumerIdList);
                AllocateMessageQueueStrategy allocateMessageQueueStrategy = this.allocateMessageQueueStrategy;
                try {
                    List<MessageQueue> allocate = allocateMessageQueueStrategy.allocate(this.consumerGroup, this.mQClientFactory.getClientId(), arrayList, findConsumerIdList);
                    HashSet hashSet = new HashSet();
                    if (allocate != null) {
                        hashSet.addAll(allocate);
                    }
                    if (updateProcessQueueTableInRebalance(str, hashSet, z)) {
                        log.info("rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}", allocateMessageQueueStrategy.getName(), this.consumerGroup, str, this.mQClientFactory.getClientId(), Integer.valueOf(set2.size()), Integer.valueOf(findConsumerIdList.size()), Integer.valueOf(hashSet.size()), hashSet);
                        messageQueueChanged(str, set2, hashSet);
                        return;
                    }
                    return;
                } catch (Throwable th) {
                    log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", allocateMessageQueueStrategy.getName(), th);
                    return;
                }
            default:
                return;
        }
    }

    private void truncateMessageQueueNotMyTopic() {
        ProcessQueue remove;
        ConcurrentMap<String, SubscriptionData> subscriptionInner = getSubscriptionInner();
        for (MessageQueue messageQueue : this.processQueueTable.keySet()) {
            if (!subscriptionInner.containsKey(messageQueue.getTopic()) && (remove = this.processQueueTable.remove(messageQueue)) != null) {
                remove.setDropped(true);
                log.info("doRebalance, {}, truncateMessageQueueNotMyTopic remove unnecessary mq, {}", this.consumerGroup, messageQueue);
            }
        }
    }

    private boolean updateProcessQueueTableInRebalance(String str, Set<MessageQueue> set, boolean z) {
        boolean z2 = false;
        Iterator<Map.Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry<MessageQueue, ProcessQueue> next = it.next();
            MessageQueue key = next.getKey();
            ProcessQueue value = next.getValue();
            if (key.getTopic().equals(str)) {
                if (!set.contains(key)) {
                    value.setDropped(true);
                    if (removeUnnecessaryMessageQueue(key, value)) {
                        it.remove();
                        z2 = true;
                        log.info("doRebalance, {}, remove unnecessary mq, {}", this.consumerGroup, key);
                    }
                } else if (value.isPullExpired()) {
                    switch (consumeType()) {
                        case CONSUME_PASSIVELY:
                            value.setDropped(true);
                            if (removeUnnecessaryMessageQueue(key, value)) {
                                it.remove();
                                z2 = true;
                                log.error("[BUG]doRebalance, {}, remove unnecessary mq, {}, because pull is pause, so try to fixed it", this.consumerGroup, key);
                                break;
                            } else {
                                break;
                            }
                    }
                }
            }
        }
        ArrayList arrayList = new ArrayList();
        for (MessageQueue messageQueue : set) {
            if (!this.processQueueTable.containsKey(messageQueue)) {
                if (!z || lock(messageQueue)) {
                    removeDirtyOffset(messageQueue);
                    ProcessQueue processQueue = new ProcessQueue();
                    long computePullFromWhere = computePullFromWhere(messageQueue);
                    if (computePullFromWhere < 0) {
                        log.warn("doRebalance, {}, add new mq failed, {}", this.consumerGroup, messageQueue);
                    } else if (this.processQueueTable.putIfAbsent(messageQueue, processQueue) != null) {
                        log.info("doRebalance, {}, mq already exists, {}", this.consumerGroup, messageQueue);
                    } else {
                        log.info("doRebalance, {}, add a new mq, {}", this.consumerGroup, messageQueue);
                        PullRequest pullRequest = new PullRequest();
                        pullRequest.setConsumerGroup(this.consumerGroup);
                        pullRequest.setNextOffset(computePullFromWhere);
                        pullRequest.setMessageQueue(messageQueue);
                        pullRequest.setProcessQueue(processQueue);
                        arrayList.add(pullRequest);
                        z2 = true;
                    }
                } else {
                    log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", this.consumerGroup, messageQueue);
                }
            }
        }
        dispatchPullRequest(arrayList);
        return z2;
    }

    public abstract void messageQueueChanged(String str, Set<MessageQueue> set, Set<MessageQueue> set2);

    public abstract boolean removeUnnecessaryMessageQueue(MessageQueue messageQueue, ProcessQueue processQueue);

    public abstract ConsumeType consumeType();

    public abstract void removeDirtyOffset(MessageQueue messageQueue);

    public abstract long computePullFromWhere(MessageQueue messageQueue);

    public abstract void dispatchPullRequest(List<PullRequest> list);

    public void removeProcessQueue(MessageQueue messageQueue) {
        ProcessQueue remove = this.processQueueTable.remove(messageQueue);
        if (remove != null) {
            boolean isDropped = remove.isDropped();
            remove.setDropped(true);
            removeUnnecessaryMessageQueue(messageQueue, remove);
            log.info("Fix Offset, {}, remove unnecessary mq, {} Droped: {}", this.consumerGroup, messageQueue, Boolean.valueOf(isDropped));
        }
    }

    public ConcurrentMap<MessageQueue, ProcessQueue> getProcessQueueTable() {
        return this.processQueueTable;
    }

    public ConcurrentMap<String, Set<MessageQueue>> getTopicSubscribeInfoTable() {
        return this.topicSubscribeInfoTable;
    }

    public String getConsumerGroup() {
        return this.consumerGroup;
    }

    public void setConsumerGroup(String str) {
        this.consumerGroup = str;
    }

    public MessageModel getMessageModel() {
        return this.messageModel;
    }

    public void setMessageModel(MessageModel messageModel) {
        this.messageModel = messageModel;
    }

    public AllocateMessageQueueStrategy getAllocateMessageQueueStrategy() {
        return this.allocateMessageQueueStrategy;
    }

    public void setAllocateMessageQueueStrategy(AllocateMessageQueueStrategy allocateMessageQueueStrategy) {
        this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
    }

    public MQClientInstance getmQClientFactory() {
        return this.mQClientFactory;
    }

    public void setmQClientFactory(MQClientInstance mQClientInstance) {
        this.mQClientFactory = mQClientInstance;
    }

    public void destroy() {
        Iterator<Map.Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator();
        while (it.hasNext()) {
            it.next().getValue().setDropped(true);
        }
        this.processQueueTable.clear();
    }
}
