package com.jzt.wotu.etl.core.datasource.es;

import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.jzt.wotu.etl.core.datasource.AbstractDataSource;
import com.jzt.wotu.etl.core.kafkaRetry.config.KafkaRetryServiceImpl;
import com.jzt.wotu.etl.core.utils.CopyConfigUtils;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;

/* loaded from: input_file:com/jzt/wotu/etl/core/datasource/es/KafkaDataSource.class */
public class KafkaDataSource extends AbstractDataSource {
    private final String dataSourceId;
    private final KafkaDataSourceDsl kafkaConfig;
    private final ProducerFactory factory;
    private static final Logger log = LoggerFactory.getLogger(KafkaDataSource.class);
    private static final Cache<String, String> Count_Sql_Cache = CacheBuilder.newBuilder().maximumSize(3000).initialCapacity(500).build();
    private static final AtomicInteger POOL_NAME_ID = new AtomicInteger(0);

    public KafkaDataSource(String str, KafkaDataSourceDsl kafkaDataSourceDsl) {
        this.kafkaConfig = (KafkaDataSourceDsl) CopyConfigUtils.copyConfig(kafkaDataSourceDsl);
        this.dataSourceId = String.format("%s", str);
        this.factory = new KafkaRetryServiceImpl().producerFactory(this.kafkaConfig.getBrokers());
    }

    @Override // com.jzt.wotu.etl.core.datasource.AbstractDataSource
    public String getDataSourceId() {
        return this.dataSourceId;
    }

    public void execute(Consumer<KafkaTemplate> consumer) {
        consumer.accept(new KafkaTemplate(this.factory));
    }
}
