package com.jzt.wotu.mq.kafka.core.service.impl;

import com.jzt.wotu.Conv;
import com.jzt.wotu.DateUtils;
import com.jzt.wotu.StringUtils;
import com.jzt.wotu.data.enclosure.SpringUtil;
import com.jzt.wotu.mq.kafka.core.entity.KafkaConsumerReport;
import com.jzt.wotu.mq.kafka.core.repository.KafkaConsumerReportRepository;
import com.jzt.wotu.mq.kafka.core.service.ConsumerConvention;
import com.jzt.wotu.mq.kafka.core.service.KafkaConsumerReportService;
import com.jzt.wotu.mvc.Model;
import com.jzt.wotu.util.extension.ExceptionUtil;
import com.jzt.wotu.util.extension.Strings;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Primary;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;

@Service
@Primary
/* loaded from: input_file:com/jzt/wotu/mq/kafka/core/service/impl/DefaultKafkaConsumerReportServiceImpl.class */
public class DefaultKafkaConsumerReportServiceImpl implements KafkaConsumerReportService {
    private static final Logger log = LoggerFactory.getLogger(DefaultKafkaConsumerReportServiceImpl.class);
    public static final int MAX_RETRY = 5;

    @Autowired
    private KafkaConsumerReportRepository kafkaConsumerReportRepository;

    @Override // com.jzt.wotu.mq.kafka.core.service.KafkaConsumerReportService
    public KafkaConsumerReport save(KafkaConsumerReport kafkaConsumerReport) {
        return (KafkaConsumerReport) this.kafkaConsumerReportRepository.saveAndFlush(kafkaConsumerReport);
    }

    @Override // com.jzt.wotu.mq.kafka.core.service.KafkaConsumerReportService
    @Transactional(propagation = Propagation.REQUIRES_NEW, rollbackFor = {Exception.class})
    public KafkaConsumerReport saveWithNewTx(KafkaConsumerReport kafkaConsumerReport) {
        return (KafkaConsumerReport) this.kafkaConsumerReportRepository.saveAndFlush(kafkaConsumerReport);
    }

    @Override // com.jzt.wotu.mq.kafka.core.service.KafkaConsumerReportService
    public KafkaConsumerReport getKafkaConsumerReport(String str, String str2) {
        return this.kafkaConsumerReportRepository.findBySceneAndUniqueKey(str, str2);
    }

    @Override // com.jzt.wotu.mq.kafka.core.service.KafkaConsumerReportService
    public Boolean idempotent(String str, String str2) {
        return Boolean.valueOf(getKafkaConsumerReport(str, str2) != null);
    }

    @Override // com.jzt.wotu.mq.kafka.core.service.KafkaConsumerReportService
    public void compensate(Date date, Date date2) {
        batchConsumerMessage(this.kafkaConsumerReportRepository.getCompensateData(date, date2, 5, 0));
    }

    private void batchConsumerMessage(List<KafkaConsumerReport> list) {
        if (list == null) {
            return;
        }
        Iterator<KafkaConsumerReport> it = list.iterator();
        while (it.hasNext()) {
            try {
                consumerMessage(it.next());
            } catch (Exception e) {
                log.error("消费补偿错误", e);
            }
        }
    }

    @Override // com.jzt.wotu.mq.kafka.core.service.KafkaConsumerReportService
    public void compensate() {
        compensate(DateUtils.addDays(DateUtils.today(), -7), DateUtils.addMinutes(DateUtils.sysdate(), -5));
    }

    @Override // com.jzt.wotu.mq.kafka.core.service.KafkaConsumerReportService
    public Model<List<KafkaConsumerReport>> compensateByCreateTimeAndScene(Date date, String str) {
        List<KafkaConsumerReport> findByCreateTimeBetweenAndSceneAndSuccess = this.kafkaConsumerReportRepository.findByCreateTimeBetweenAndSceneAndSuccess(date, DateUtils.addMinutes(DateUtils.sysdate(), -5), str, 0);
        batchConsumerMessage(findByCreateTimeBetweenAndSceneAndSuccess);
        return getConsumerReports(findByCreateTimeBetweenAndSceneAndSuccess);
    }

    private Model<List<KafkaConsumerReport>> getConsumerReports(List<KafkaConsumerReport> list) {
        List list2 = (List) list.stream().filter(kafkaConsumerReport -> {
            return kafkaConsumerReport.getSuccess().intValue() == 0;
        }).collect(Collectors.toList());
        return list2.size() == 0 ? Model.newSuccess(new ArrayList()) : Model.newFail("消费补偿错误:", list2);
    }

    @Override // com.jzt.wotu.mq.kafka.core.service.KafkaConsumerReportService
    public Model<List<KafkaConsumerReport>> compensateByCreateTime(Date date) {
        List<KafkaConsumerReport> findByCreateTimeBetweenAndSuccess = this.kafkaConsumerReportRepository.findByCreateTimeBetweenAndSuccess(date, DateUtils.addMinutes(DateUtils.sysdate(), -5), 0);
        batchConsumerMessage(findByCreateTimeBetweenAndSuccess);
        return getConsumerReports(findByCreateTimeBetweenAndSuccess);
    }

    @Override // com.jzt.wotu.mq.kafka.core.service.KafkaConsumerReportService
    public Model<List<KafkaConsumerReport>> compensateBySceneAndKey(String str, String str2) {
        KafkaConsumerReport findBySceneAndUniqueKey = this.kafkaConsumerReportRepository.findBySceneAndUniqueKey(str, str2);
        ArrayList arrayList = new ArrayList();
        arrayList.add(findBySceneAndUniqueKey);
        batchConsumerMessage(arrayList);
        return getConsumerReports(arrayList);
    }

    private void consumerMessage(KafkaConsumerReport kafkaConsumerReport) {
        try {
            if (StringUtils.isBlank(kafkaConsumerReport.getConsumerBean())) {
                log.info("scene: {} not found ConsumerBean. uniqueKey: {}, partition: {}; offset: {}", new Object[]{kafkaConsumerReport.getScene(), kafkaConsumerReport.getUniqueKey(), kafkaConsumerReport.getPartition(), kafkaConsumerReport.getOffset()});
                return;
            }
            int asInteger = Conv.asInteger(kafkaConsumerReport.getRetryCount());
            if (asInteger > 5) {
                return;
            }
            kafkaConsumerReport.setRetryCount(Integer.valueOf(asInteger + 1));
            KafkaConsumerReport save = save(kafkaConsumerReport);
            Object bean = SpringUtil.getBean(save.getConsumerBean());
            if (bean instanceof ConsumerConvention) {
                if (save.getSuccess().intValue() == 1) {
                    return;
                }
                save.setCompensation(true);
                ((ConsumerConvention) bean).consume(save);
            }
        } catch (Exception e) {
            MessageFormat.format("KafkaConsumerCompensation 补偿失败，Topic：{0}，Scene：{1}，UniqueKey：{2}，原因：{3}", kafkaConsumerReport.getTopic(), kafkaConsumerReport.getScene(), kafkaConsumerReport.getUniqueKey(), ExceptionUtil.getAllExceptionMessage(e));
            kafkaConsumerReport.setErrorReason(Strings.substring(ExceptionUtil.getAllExceptionMessage(e), 0, 250));
            save(kafkaConsumerReport);
            throw e;
        }
    }
}
