package com.jzt.wotu.etl.core.kafkaRetry.config;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.jzt.wotu.etl.core.ZookeeperService;
import com.jzt.wotu.etl.core.kafkaRetry.entity.KafkaRetryRecord;
import java.util.Calendar;
import java.util.Date;
import java.util.HashMap;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.serializer.JsonSerializer;
import org.springframework.stereotype.Service;

@Service
/* loaded from: input_file:com/jzt/wotu/etl/core/kafkaRetry/config/KafkaRetryServiceImpl.class */
public class KafkaRetryServiceImpl {
    private static final Logger log = LoggerFactory.getLogger(KafkaRetryServiceImpl.class);
    private static final int[] RETRY_INTERVAL_SECONDS = {10, 120};
    private static final String RETRY_TOPIC = "ETL_RETRY_TOPPIC";
    private static final String DLT_TOPIC = "ETL_DLT_TOPIC";

    public ProducerFactory<Object, Object> producerFactory(KafkaProperties kafkaProperties) {
        HashMap hashMap = new HashMap();
        hashMap.put("bootstrap.servers", kafkaProperties.getBrokers());
        hashMap.put("retries", kafkaProperties.getRetries());
        hashMap.put("batch.size", kafkaProperties.getBatchSize());
        hashMap.put("linger.ms", kafkaProperties.getLingerMs());
        hashMap.put("buffer.memory", kafkaProperties.getBufferMemory());
        hashMap.put("compression.type", "gzip");
        hashMap.put("key.serializer", StringSerializer.class);
        hashMap.put("value.serializer", JsonSerializer.class);
        hashMap.put("acks", kafkaProperties.getAck());
        return new DefaultKafkaProducerFactory(hashMap);
    }

    public void consumerLater(ConsumerRecord<String, String> consumerRecord, KafkaProperties kafkaProperties) {
        String str = ZookeeperService.ENV_MAP.get("prefix");
        if (str.equalsIgnoreCase("test")) {
            str = kafkaProperties.getPrefix() + "-" + str;
        }
        KafkaTemplate kafkaTemplate = new KafkaTemplate(producerFactory(kafkaProperties));
        try {
            log.debug("=========================Pre Step 2=============================================");
            KafkaRetryRecord kafkaRetryRecord = new KafkaRetryRecord();
            kafkaRetryRecord.setTopic(consumerRecord.topic());
            kafkaRetryRecord.setKey((String) consumerRecord.key());
            JSONObject parseObject = JSONObject.parseObject((String) consumerRecord.value());
            Integer integer = parseObject.getInteger("size");
            JSONArray jSONArray = parseObject.getJSONArray("data");
            kafkaRetryRecord.setSize(integer);
            kafkaRetryRecord.setData(jSONArray);
            int i = 0;
            if (parseObject.containsKey(KafkaRetryRecord.KEY_RETRY_TIMES)) {
                i = parseObject.getIntValue(KafkaRetryRecord.KEY_RETRY_TIMES) + 1;
            }
            Date nextConsumerTime = getNextConsumerTime(i);
            if (nextConsumerTime == null) {
                JSONObject parseObject2 = JSONObject.parseObject(JSON.toJSONString(kafkaRetryRecord));
                log.info("========>D=====>L=======>T-------retryTimes is -" + i + "=============sendVaule" + parseObject2);
                kafkaTemplate.send(str + "ETL_DLT_TOPIC", parseObject2);
            } else {
                kafkaRetryRecord.setNextTime(Long.valueOf(nextConsumerTime.getTime()));
                kafkaRetryRecord.setRetryTimes(Integer.valueOf(i));
                JSONObject parseObject3 = JSONObject.parseObject(JSON.toJSONString(kafkaRetryRecord));
                log.info("=========================Pre Step 3=异常数据到重试队列============sendVaule=is =" + parseObject3 + "=====投递topic=is =" + kafkaProperties.getRetryTopic() + "=====key=is =" + kafkaProperties.getRetryKey() + "=====");
                kafkaTemplate.send(str + "ETL_RETRY_TOPPIC", kafkaProperties.getRetryKey(), parseObject3);
            }
        } catch (Exception e) {
            log.info("====================>consumerLater--=====Exception=======================------");
            throw e;
        }
    }

    private Date getNextConsumerTime(int i) {
        if (RETRY_INTERVAL_SECONDS.length <= i) {
            return null;
        }
        Calendar calendar = Calendar.getInstance();
        calendar.add(13, RETRY_INTERVAL_SECONDS[i]);
        return calendar.getTime();
    }
}
