package com.jzt.wotu.etl.core.kafkaRetry.retry;

import com.alibaba.fastjson.JSON;
import com.jzt.wotu.etl.core.kafkaRetry.config.KafkaEventTemplate;
import com.jzt.wotu.etl.core.kafkaRetry.retry.entity.KafkaRetryRecord;
import java.util.Iterator;
import java.util.Set;
import java.util.UUID;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.ZSetOperations;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

@Component
/* loaded from: input_file:com/jzt/wotu/etl/core/kafkaRetry/retry/RetryListener.class */
public class RetryListener {
    private static final Logger log = LoggerFactory.getLogger(RetryListener.class);
    private static final String RETRY_KEY_ZSET = "_retry_key_02";
    private static final String RETRY_VALUE_MAP = "_retry_value_02";

    @Autowired
    private RedisTemplate<String, String> redisTemplate;

    @Autowired
    private KafkaEventTemplate<String, String> kafkaTemplate;

    @KafkaListener(topics = {"${kafka.topics.retry}"}, containerFactory = "singleAckContainerFactory")
    public void consume(ConsumerRecord<String, String> consumerRecord, Acknowledgment acknowledgment) {
        try {
            try {
                System.out.println("========>retryTopic<==========" + consumerRecord.toString());
                KafkaRetryRecord kafkaRetryRecord = (KafkaRetryRecord) JSON.parseObject((String) consumerRecord.value(), KafkaRetryRecord.class);
                String uuid = UUID.randomUUID().toString();
                this.redisTemplate.opsForHash().put(RETRY_VALUE_MAP, uuid, consumerRecord.value());
                this.redisTemplate.opsForZSet().add(RETRY_KEY_ZSET, uuid, kafkaRetryRecord.getNextTime().longValue());
                log.info("===========>ack.acknowledge()<=============");
                acknowledgment.acknowledge();
            } catch (Exception e) {
                e.printStackTrace();
                log.info("===========>Error<=consume============");
                log.info("===========>ack.acknowledge()<=============");
                acknowledgment.acknowledge();
            }
        } catch (Throwable th) {
            log.info("===========>ack.acknowledge()<=============");
            acknowledgment.acknowledge();
            throw th;
        }
    }

    @Scheduled(cron = "50 * * * * ?")
    public void retryFormRedis() {
        log.info("=========retryFormRedis===============");
        long currentTimeMillis = System.currentTimeMillis();
        KafkaRetryRecord kafkaRetryRecord = null;
        try {
            Set reverseRangeByScoreWithScores = this.redisTemplate.opsForZSet().reverseRangeByScoreWithScores(RETRY_KEY_ZSET, 0.0d, currentTimeMillis);
            this.redisTemplate.opsForZSet().removeRangeByScore(RETRY_KEY_ZSET, 0.0d, currentTimeMillis);
            Iterator it = reverseRangeByScoreWithScores.iterator();
            while (it.hasNext()) {
                String str = ((String) ((ZSetOperations.TypedTuple) it.next()).getValue()).toString();
                String obj = this.redisTemplate.opsForHash().get(RETRY_VALUE_MAP, str).toString();
                this.redisTemplate.opsForHash().delete(RETRY_VALUE_MAP, new Object[]{str});
                kafkaRetryRecord = (KafkaRetryRecord) JSON.parseObject(obj, KafkaRetryRecord.class);
                ProducerRecord parse = kafkaRetryRecord.parse();
                log.info("=====retryFormRedis===================>" + parse.topic());
                this.kafkaTemplate.send(parse);
            }
        } catch (Exception e) {
            this.redisTemplate.opsForHash().put(RETRY_VALUE_MAP, kafkaRetryRecord.getKey(), kafkaRetryRecord.getValue());
            this.redisTemplate.opsForZSet().add(RETRY_KEY_ZSET, kafkaRetryRecord.getKey(), kafkaRetryRecord.getNextTime().longValue());
        }
    }
}
