package com.jzt.wotu.etl.core.kafkaRetry.config;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.core.toolkit.StringUtils;
import com.jzt.wotu.etl.core.ZookeeperService;
import com.jzt.wotu.etl.core.kafkaRetry.entity.KafkaRetryRecord;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.serializer.JsonSerializer;
import org.springframework.stereotype.Service;

@Service
/* loaded from: input_file:com/jzt/wotu/etl/core/kafkaRetry/config/KafkaRetryServiceImpl.class */
public class KafkaRetryServiceImpl {
    private static final Logger log = LoggerFactory.getLogger(KafkaRetryServiceImpl.class);
    private static final int[] RETRY_INTERVAL_SECONDS = {10, 120, 300};

    public ProducerFactory<Object, Object> producerFactory(List<String> list) {
        HashMap hashMap = new HashMap();
        hashMap.put("bootstrap.servers", list);
        hashMap.put("retries", 1);
        hashMap.put("batch.size", 50);
        hashMap.put("linger.ms", 1);
        hashMap.put("buffer.memory", 33554432);
        hashMap.put("compression.type", "gzip");
        hashMap.put("key.serializer", StringSerializer.class);
        hashMap.put("value.serializer", JsonSerializer.class);
        hashMap.put("acks", "-1");
        return new DefaultKafkaProducerFactory(hashMap);
    }

    public void markExceptionMsg(ConsumerRecord<String, String> consumerRecord, List<String> list, String str) {
        String str2 = ZookeeperService.ENV_MAP.get("prefix");
        KafkaTemplate kafkaTemplate = new KafkaTemplate(producerFactory(list));
        JSONObject parseObject = JSONObject.parseObject((String) consumerRecord.value());
        KafkaRetryRecord kafkaRetryRecord = new KafkaRetryRecord();
        Integer integer = parseObject.getInteger("size");
        JSONArray jSONArray = parseObject.getJSONArray("data");
        kafkaRetryRecord.setSize(integer);
        kafkaRetryRecord.setData(jSONArray);
        kafkaRetryRecord.setMarkMsg(str);
        kafkaTemplate.send(str2 + "-eltExceptionMsg", JSONObject.parseObject(JSON.toJSONString(kafkaRetryRecord)));
    }

    public void markLog(String str, String str2, Object obj) {
        if (StringUtils.isNotBlank(str)) {
            return;
        }
        ArrayList arrayList = new ArrayList();
        arrayList.add("10.4.9.91:9092");
        KafkaTemplate kafkaTemplate = new KafkaTemplate(producerFactory(arrayList));
        if (StringUtils.isNotBlank(str2)) {
            kafkaTemplate.send(str, str2 + ":" + JSON.toJSONString(obj));
        } else {
            kafkaTemplate.send(str, obj);
        }
    }

    public void consumerLater(ConsumerRecord<String, String> consumerRecord, List<String> list, String str) {
        String str2 = ZookeeperService.ENV_MAP.get("retryTopic");
        System.out.println("retryTopic = " + str2);
        String str3 = ZookeeperService.ENV_MAP.get("dltTopic");
        System.out.println("dltTopic = " + str3);
        KafkaTemplate kafkaTemplate = new KafkaTemplate(producerFactory(list));
        try {
            log.debug("=========================Pre Step 2=============================================");
            KafkaRetryRecord kafkaRetryRecord = new KafkaRetryRecord();
            kafkaRetryRecord.setTopic(consumerRecord.topic());
            kafkaRetryRecord.setKey((String) consumerRecord.key());
            JSONObject parseObject = JSONObject.parseObject((String) consumerRecord.value());
            Integer integer = parseObject.getInteger("size");
            JSONArray jSONArray = parseObject.getJSONArray("data");
            kafkaRetryRecord.setSize(integer);
            kafkaRetryRecord.setData(jSONArray);
            kafkaRetryRecord.setMarkMsg(str);
            int i = 0;
            if (parseObject.containsKey(KafkaRetryRecord.KEY_RETRY_TIMES)) {
                i = parseObject.getIntValue(KafkaRetryRecord.KEY_RETRY_TIMES) + 1;
            }
            Date nextConsumerTime = getNextConsumerTime(i);
            if (nextConsumerTime == null) {
                kafkaRetryRecord.setRetryTimes(Integer.valueOf(i));
                JSONObject parseObject2 = JSONObject.parseObject(JSON.toJSONString(kafkaRetryRecord));
                log.info("========>D=====>L=======>T-------retryTimes is " + i + "=============dltMsgStr" + parseObject2);
                kafkaTemplate.send(str3, str, parseObject2);
                return;
            }
            kafkaRetryRecord.setNextTime(Long.valueOf(nextConsumerTime.getTime()));
            kafkaRetryRecord.setRetryTimes(Integer.valueOf(i));
            JSONObject parseObject3 = JSONObject.parseObject(JSON.toJSONString(kafkaRetryRecord));
            log.info("=========================Pre Step 3=异常数据到重试队列============投递topic=is " + str2 + "=====key=is retry_" + i + "=====sendVaule=is " + parseObject3);
            kafkaTemplate.send(str2, "retry_" + i, parseObject3);
        } catch (Exception e) {
            log.info("====================>consumerLater--=====Exception=======================------");
            throw e;
        }
    }

    public void commitSingleMsg(String str, String str2, String str3, List<String> list, String str4) {
        String str5 = ZookeeperService.ENV_MAP.get("retryTopic");
        String str6 = ZookeeperService.ENV_MAP.get("dltTopic");
        KafkaTemplate kafkaTemplate = new KafkaTemplate(producerFactory(list));
        try {
            log.debug("=========================Pre Step 2=============================================");
            KafkaRetryRecord kafkaRetryRecord = new KafkaRetryRecord();
            kafkaRetryRecord.setTopic(str);
            kafkaRetryRecord.setKey(str2);
            JSONObject parseObject = JSONObject.parseObject(str3);
            Integer integer = parseObject.getInteger("size");
            JSONArray jSONArray = parseObject.getJSONArray("data");
            kafkaRetryRecord.setSize(integer);
            kafkaRetryRecord.setData(jSONArray);
            kafkaRetryRecord.setMarkMsg(str4);
            int i = 0;
            if (parseObject.containsKey(KafkaRetryRecord.KEY_RETRY_TIMES)) {
                i = parseObject.getIntValue(KafkaRetryRecord.KEY_RETRY_TIMES) + 1;
            }
            Date nextConsumerTime = getNextConsumerTime(i);
            if (nextConsumerTime == null) {
                kafkaRetryRecord.setRetryTimes(Integer.valueOf(i));
                JSONObject parseObject2 = JSONObject.parseObject(JSON.toJSONString(kafkaRetryRecord));
                log.info("========>D=====>L=======>T-------retryTimes is " + i + "=============dltMsgStr" + parseObject2);
                kafkaTemplate.send(str6, str4, parseObject2);
                return;
            }
            kafkaRetryRecord.setNextTime(Long.valueOf(nextConsumerTime.getTime()));
            kafkaRetryRecord.setRetryTimes(Integer.valueOf(i));
            JSONObject parseObject3 = JSONObject.parseObject(JSON.toJSONString(kafkaRetryRecord));
            log.info("=========================Pre Step 3=异常数据到重试队列============投递topic=is " + str5 + "=====key=is retry_" + i + "=====sendVaule=is " + parseObject3);
            kafkaTemplate.send(str5, "retry_" + i, parseObject3);
        } catch (Exception e) {
            log.info("====================>consumerLater--=====Exception=======================------");
            throw e;
        }
    }

    private Date getNextConsumerTime(int i) {
        if (RETRY_INTERVAL_SECONDS.length <= i) {
            return null;
        }
        Calendar calendar = Calendar.getInstance();
        calendar.add(13, RETRY_INTERVAL_SECONDS[i]);
        return calendar.getTime();
    }
}
