package com.volcengine.service.tls.consumer;

import com.volcengine.model.tls.ClientBuilder;
import com.volcengine.model.tls.Const;
import com.volcengine.model.tls.ConsumerGroup;
import com.volcengine.model.tls.consumer.ConsumeShard;
import com.volcengine.model.tls.consumer.ConsumerConfig;
import com.volcengine.model.tls.consumer.ConsumerStatus;
import com.volcengine.model.tls.exception.LogException;
import com.volcengine.model.tls.request.CreateConsumerGroupRequest;
import com.volcengine.model.tls.request.DescribeConsumerGroupsRequest;
import com.volcengine.model.tls.response.DescribeConsumerGroupsResponse;
import com.volcengine.service.tls.TLSLogClient;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/* loaded from: input_file:com/volcengine/service/tls/consumer/ConsumerImpl.class */
public class ConsumerImpl implements Consumer {
    private static final Log LOG = LogFactory.getLog(ConsumerImpl.class);
    ConsumerConfig consumerConfig;
    TLSLogClient tlsClient;
    LogProcessor logProcessor;
    ExecutorService executor;
    private Thread runThread;
    private HeartbeatTracker heartbeatTracker;
    private Map<String, LogConsumer> workerMap;
    private volatile boolean isStop;

    public ConsumerImpl(ConsumerConfig consumerConfig, LogProcessor logProcessor) throws LogException {
        consumerConfig.validateConsumerConfig();
        this.consumerConfig = consumerConfig;
        this.tlsClient = ClientBuilder.newClient(consumerConfig.getClientConfig());
        this.logProcessor = logProcessor;
        this.heartbeatTracker = new HeartbeatTracker(this);
        LOG.info(String.format("TLS consumer %s is initialized.", this.consumerConfig.getConsumerName()));
    }

    @Override // com.volcengine.service.tls.consumer.Consumer
    public void start() throws LogException {
        init();
        this.executor = Executors.newCachedThreadPool();
        this.runThread = new Thread(this::run);
        this.runThread.start();
    }

    @Override // com.volcengine.service.tls.consumer.Consumer
    public void stop() {
        LOG.info(String.format("TLS consumer %s is ready to stop.", this.consumerConfig.getConsumerName()));
        this.isStop = true;
        try {
            this.runThread.join();
        } catch (InterruptedException e) {
            this.runThread.interrupt();
        }
        Iterator<LogConsumer> it = this.workerMap.values().iterator();
        while (it.hasNext()) {
            it.next().stop();
        }
        this.executor.shutdown();
        try {
            this.executor.awaitTermination(this.consumerConfig.getStopTimeout(), TimeUnit.SECONDS);
        } catch (InterruptedException e2) {
            LOG.info("TLS consumer executor stop failed.");
            this.executor.shutdownNow();
        }
        this.heartbeatTracker.stop();
        LOG.info(String.format("TLS consumer %s is stopped.", this.consumerConfig.getConsumerName()));
    }

    @Override // com.volcengine.service.tls.consumer.Consumer
    public void resetAccessKeyToken(String str, String str2, String str3) {
        this.tlsClient.resetAccessKeyToken(str, str2, str3);
    }

    private void init() throws LogException {
        String projectID = this.consumerConfig.getProjectID();
        List<String> topicIDList = this.consumerConfig.getTopicIDList();
        String consumerGroupName = this.consumerConfig.getConsumerGroupName();
        int heartbeatIntervalInSecond = 3 * this.consumerConfig.getHeartbeatIntervalInSecond();
        boolean isOrderedConsume = this.consumerConfig.isOrderedConsume();
        boolean z = false;
        DescribeConsumerGroupsRequest describeConsumerGroupsRequest = new DescribeConsumerGroupsRequest();
        describeConsumerGroupsRequest.setProjectID(projectID);
        describeConsumerGroupsRequest.setConsumerGroupName(consumerGroupName);
        DescribeConsumerGroupsResponse describeConsumerGroups = this.tlsClient.describeConsumerGroups(describeConsumerGroupsRequest);
        if (describeConsumerGroups.getConsumerGroups() != null) {
            Iterator<ConsumerGroup> it = describeConsumerGroups.getConsumerGroups().iterator();
            while (true) {
                if (!it.hasNext()) {
                    break;
                } else if (it.next().getConsumerGroupName().equals(consumerGroupName)) {
                    z = true;
                    break;
                }
            }
        }
        if (!z) {
            try {
                this.tlsClient.createConsumerGroup(new CreateConsumerGroupRequest(projectID, topicIDList, consumerGroupName, heartbeatIntervalInSecond, isOrderedConsume));
            } catch (LogException e) {
                if (!e.getErrorMessage().contains(Const.ERROR_CONSUMER_GROUP_ALREADY_EXISTS)) {
                    LOG.error("Calling CreateConsumerGroup failed.");
                    throw e;
                }
            }
        }
        this.heartbeatTracker.start();
        this.isStop = false;
        this.workerMap = new HashMap();
    }

    private void run() {
        LOG.info("Consumer starts to work.");
        int dataFetchIntervalInMillisecond = this.consumerConfig.getDataFetchIntervalInMillisecond();
        while (!this.isStop) {
            Iterator<Map.Entry<String, LogConsumer>> it = this.workerMap.entrySet().iterator();
            while (it.hasNext()) {
                if (it.next().getValue().loadStatus() == ConsumerStatus.WAIT_FOR_RESTART) {
                    try {
                        this.heartbeatTracker.uploadHeartbeat();
                        break;
                    } catch (Exception e) {
                        LOG.error("Upload heartbeat failed when consumer expired.", e);
                    }
                }
            }
            handleShards(this.heartbeatTracker.getShards());
            ConsumerUtil.sleep(dataFetchIntervalInMillisecond);
        }
        LOG.info("Consumer ends to work.");
    }

    private void handleShards(List<ConsumeShard> list) {
        if (list == null) {
            return;
        }
        HashMap hashMap = new HashMap();
        for (ConsumeShard consumeShard : list) {
            hashMap.put(consumeShard.getTopicID() + consumeShard.getShardID(), consumeShard);
        }
        ArrayList<String> arrayList = new ArrayList();
        for (String str : this.workerMap.keySet()) {
            if (!hashMap.containsKey(str)) {
                arrayList.add(str);
            }
        }
        for (String str2 : arrayList) {
            this.workerMap.get(str2).stop();
            this.workerMap.remove(str2);
        }
        for (String str3 : hashMap.keySet()) {
            if (!this.workerMap.containsKey(str3) || this.workerMap.get(str3).loadStatus() == ConsumerStatus.WAIT_FOR_RESTART) {
                this.workerMap.put(str3, new LogConsumer(this, (ConsumeShard) hashMap.get(str3)));
            }
        }
        Iterator<LogConsumer> it = this.workerMap.values().iterator();
        while (it.hasNext()) {
            it.next().run();
        }
    }
}
