package com.volcengine.service.tls;

import com.volcengine.model.tls.Const;
import com.volcengine.model.tls.LogItem;
import com.volcengine.model.tls.exception.LogException;
import com.volcengine.model.tls.pb.PutLogRequest;
import com.volcengine.model.tls.producer.CallBack;
import com.volcengine.model.tls.producer.ProducerConfig;
import com.volcengine.model.tls.util.AdaptorUtil;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/* loaded from: input_file:com/volcengine/service/tls/ProducerImpl.class */
public class ProducerImpl implements Producer {
    private ProducerConfig producerConfig;
    private final LogDispatcher dispatcher;
    private final String name;
    private final Semaphore memoryLock;
    private final BatchHandler successHandler;
    private final BatchHandler failHandler;
    private final RetryManager retryManager;
    private final AtomicInteger batchCount = new AtomicInteger(0);
    private final Mover mover;
    private static final Log LOG = LogFactory.getLog(ProducerImpl.class);
    private static final AtomicInteger INSTANCE_ID = new AtomicInteger(0);

    public ProducerImpl(ProducerConfig producerConfig) throws LogException {
        producerConfig.validConfig();
        this.producerConfig = producerConfig;
        this.name = "TLS-" + INSTANCE_ID.incrementAndGet();
        LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue();
        LinkedBlockingQueue linkedBlockingQueue2 = new LinkedBlockingQueue();
        this.memoryLock = new Semaphore(producerConfig.getTotalSizeInBytes());
        this.retryManager = new RetryManager();
        this.dispatcher = new LogDispatcher(producerConfig, this.name, linkedBlockingQueue, linkedBlockingQueue2, this.memoryLock, this.batchCount, this.retryManager);
        this.successHandler = new BatchHandler("success batch handler-" + this.name, this.memoryLock, linkedBlockingQueue, this.batchCount);
        this.failHandler = new BatchHandler("fail batch handler-" + this.name, this.memoryLock, linkedBlockingQueue2, this.batchCount);
        this.mover = new Mover(this.name + "-mover", producerConfig, this.dispatcher, this.retryManager, linkedBlockingQueue, linkedBlockingQueue2);
    }

    public static Producer defaultProducer(String str, String str2, String str3, String str4, String str5) throws LogException {
        return new ProducerImpl(new ProducerConfig(str, str2, str3, str4, str5));
    }

    @Override // com.volcengine.service.tls.Producer
    @Deprecated
    public void sendLog(String str, String str2, String str3, String str4, PutLogRequest.Log log, CallBack callBack) throws InterruptedException, LogException {
        if (str2 == null || log == null) {
            throw new LogException(Const.INVALID_ARGUMENT, String.format("topic id: %s, log: %s", str2, log), null);
        }
        sendLogGroup(str, str2, str3, str4, PutLogRequest.LogGroup.newBuilder().setFileName(str4).setSource(str3).addLogs(log).build(), callBack);
    }

    @Override // com.volcengine.service.tls.Producer
    @Deprecated
    public void sendLogGroup(String str, String str2, String str3, String str4, PutLogRequest.LogGroup logGroup, CallBack callBack) throws InterruptedException, LogException {
        if (str2 == null || logGroup == null || logGroup.getLogsList() == null || logGroup.getLogsList().size() == 0) {
            throw new LogException(Const.INVALID_ARGUMENT, String.format("topic id: %s, log group is empty", str2), null);
        }
        if (logGroup.getLogsList().size() > 10000) {
            throw new LogException(Const.INVALID_ARGUMENT, String.format("log list size %d is greater than threshold %d", Integer.valueOf(logGroup.getLogsList().size()), 10000), null);
        }
        this.dispatcher.addBatch(str, str2, str3, str4, logGroup, callBack);
    }

    @Override // com.volcengine.service.tls.Producer
    public void sendLogV2(String str, String str2, String str3, String str4, LogItem logItem, CallBack callBack) throws InterruptedException, LogException {
        if (logItem == null) {
            return;
        }
        ArrayList arrayList = new ArrayList();
        arrayList.add(logItem);
        sendLogsV2(str, str2, str3, str4, arrayList, callBack);
    }

    @Override // com.volcengine.service.tls.Producer
    public void sendLogsV2(String str, String str2, String str3, String str4, List<LogItem> list, CallBack callBack) throws InterruptedException, LogException {
        if (str2 == null || list == null || list.size() == 0) {
            throw new LogException(Const.INVALID_ARGUMENT, String.format("topic id: %s, log group: %s", str2, list), null);
        }
        if (list.size() > 10000) {
            throw new LogException(Const.INVALID_ARGUMENT, String.format("log list size %d is greater than threshold %d", Integer.valueOf(list.size()), 10000), null);
        }
        this.dispatcher.addBatch(str, str2, str3, str4, AdaptorUtil.logItems2PbGroup(str4, str3, list), callBack);
    }

    @Override // com.volcengine.service.tls.Producer
    public void resetAccessKeyToken(String str, String str2, String str3) throws LogException {
        if (StringUtils.isEmpty(str) || StringUtils.isEmpty(str2)) {
            throw new LogException(Const.INVALID_ARGUMENT, String.format("reset producer %s access key failed, accessKey is %s, secretKey is %s, token is %s", this.name, str, str2, str3), null);
        }
        this.dispatcher.resetAccessKeyToken(str, str2, str3);
    }

    @Override // com.volcengine.service.tls.Producer
    public void start() throws LogException {
        this.dispatcher.start();
        this.retryManager.start();
        this.successHandler.start();
        this.failHandler.start();
        this.mover.start();
        LOG.info(String.format("producer %s started", this.name));
    }

    @Override // com.volcengine.service.tls.Producer
    public void close() throws InterruptedException, LogException {
        close(30000L);
    }

    @Override // com.volcengine.service.tls.Producer
    public void close(long j) throws InterruptedException, LogException {
        LogException logException = null;
        try {
            j = closeMover(j);
        } catch (LogException e) {
            logException = e;
        }
        try {
            j = closeExecutorService(j);
        } catch (LogException e2) {
            if (logException == null) {
                logException = e2;
            }
        }
        try {
            j = closeSuccessHandler(j);
        } catch (LogException e3) {
            if (logException == null) {
                logException = e3;
            }
        }
        try {
            closeFailureHandler(j);
        } catch (LogException e4) {
            if (logException == null) {
                logException = e4;
            }
        }
        if (logException != null) {
            throw logException;
        }
        LOG.info(String.format("producer %s closed", this.name));
    }

    private long closeMover(long j) throws InterruptedException, LogException {
        long currentTimeMillis = System.currentTimeMillis();
        this.dispatcher.close();
        this.retryManager.close();
        this.mover.close();
        this.mover.join(j);
        if (this.mover.isAlive()) {
            LOG.warn("producer mover thread is still alive");
            throw new LogException("Producer Error", "producer mover thread is still alive", null);
        }
        LOG.info("producer mover is closed");
        return Math.max(0L, (j - System.currentTimeMillis()) + currentTimeMillis);
    }

    private long closeExecutorService(long j) throws InterruptedException, LogException {
        long currentTimeMillis = System.currentTimeMillis();
        ExecutorService executorService = this.dispatcher.getExecutorService();
        executorService.shutdown();
        if (executorService.awaitTermination(j, TimeUnit.MILLISECONDS)) {
            LOG.info("producer executor service is closed");
            return Math.max(0L, (j - System.currentTimeMillis()) + currentTimeMillis);
        }
        LOG.warn("producer executor is not terminated normally");
        executorService.shutdownNow();
        throw new LogException("Producer Error", "producer executor is not terminated normally", null);
    }

    private long closeSuccessHandler(long j) throws InterruptedException, LogException {
        long currentTimeMillis = System.currentTimeMillis();
        this.successHandler.close();
        if (Thread.currentThread() == this.successHandler) {
            LOG.warn("Skip join success batch handler since you have incorrectly invoked close from the producer callback");
            return j;
        }
        this.successHandler.join(j);
        if (this.successHandler.isAlive()) {
            LOG.warn("producer success handler thread is still alive");
            throw new LogException("Producer Error", "producer success handler thread is still alive", null);
        }
        LOG.info("producer success handler is closed");
        return Math.max(0L, (j - System.currentTimeMillis()) + currentTimeMillis);
    }

    private long closeFailureHandler(long j) throws InterruptedException, LogException {
        long currentTimeMillis = System.currentTimeMillis();
        this.failHandler.close();
        if (Thread.currentThread() == this.successHandler || Thread.currentThread() == this.failHandler) {
            LOG.warn("Skip join failure batch handler since you have incorrectly invoked close from the producer callback");
            return j;
        }
        this.failHandler.join(j);
        if (this.failHandler.isAlive()) {
            LOG.warn("producer failure handler thread is still alive");
            throw new LogException("Producer Error", "producer failure handler thread is still alive", null);
        }
        LOG.info("producer failure handler is closed");
        return Math.max(0L, (j - System.currentTimeMillis()) + currentTimeMillis);
    }

    @Override // com.volcengine.service.tls.Producer
    public void closeNow() throws InterruptedException, LogException {
        this.dispatcher.closeNow();
        this.retryManager.close();
        this.mover.close();
        this.successHandler.close();
        this.failHandler.close();
        LOG.info(String.format("producer %s closed now", this.name));
    }

    @Override // com.volcengine.service.tls.Producer
    public void config(ProducerConfig producerConfig) throws LogException {
        if (producerConfig != null) {
            this.producerConfig = producerConfig;
            producerConfig.validConfig();
            LOG.info(String.format("producer %s configured, config: %s", this.name, producerConfig));
        }
    }
}
