package com.jzt.cloud.ba.quake.delayqueue;

import cn.hutool.extra.spring.SpringUtil;
import com.alibaba.fastjson.JSONObject;
import com.jzt.cloud.ba.quake.domain.redis.service.IRedisService;
import com.sun.xml.bind.v2.runtime.reflect.opt.Const;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import javax.annotation.PostConstruct;
import org.apache.commons.collections.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:BOOT-INF/classes/com/jzt/cloud/ba/quake/delayqueue/IRedisDelayQueue.class */
public abstract class IRedisDelayQueue<T> {
    private final Class<T> T = null;
    private Executor executor = (Executor) SpringUtil.getBean("taskExecutor", Executor.class);
    private static final Logger log = LoggerFactory.getLogger((Class<?>) IRedisDelayQueue.class);
    private static IRedisService redisService = (IRedisService) SpringUtil.getBean(IRedisService.class);

    public void produce(T t, Long l) {
        String jSONString = JSONObject.toJSONString(t);
        log.info("延时任务添加：{}，exetime:{}", jSONString, l);
        redisService.zAdd(getKey(), jSONString, l.longValue());
    }

    @PostConstruct
    public void consumer() {
        Executors.newSingleThreadExecutor().submit(() -> {
            while (true) {
                Set rangeByScore = redisService.rangeByScore(getKey(), Const.default_value_double, System.currentTimeMillis());
                if (!CollectionUtils.isEmpty(rangeByScore)) {
                    rangeByScore.forEach(str -> {
                        if (redisService.zRemove(getKey(), str).longValue() > 0) {
                            this.executor.execute(() -> {
                                log.info("从延时队列中获取到任务消费，message:{}", str);
                                handleMessage(JSONObject.parseObject(str, messageClass()));
                            });
                        }
                    });
                }
                try {
                    TimeUnit.MILLISECONDS.sleep(1000L);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
    }

    public abstract String getKey();

    public abstract void handleMessage(T t);

    public abstract Class<T> messageClass();
}
