package clickHouse.queue;

import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.CollectionUtils;

/* loaded from: input_file:clickHouse/queue/EventLocalQueue.class */
public abstract class EventLocalQueue<T> extends EventQueue<T, Object> {
    private static final Logger log = LoggerFactory.getLogger(EventLocalQueue.class);
    private static final String QUEUE_TYPE = "EventLocalQueue";
    private LinkedBlockingQueue<T> queue;
    private boolean isConsumer;

    public EventLocalQueue(String str, Function<List<T>, Boolean> function) {
        super(QUEUE_TYPE, str + "-" + QUEUE_TYPE + "-" + UUID.randomUUID(), function);
        this.isConsumer = true;
    }

    @Override // clickHouse.queue.EventQueue
    public boolean send(List<T> list) {
        if (CollectionUtils.isEmpty(list)) {
            return true;
        }
        return this.queue.addAll(list);
    }

    @Override // clickHouse.queue.EventQueue
    public boolean send(T t) {
        if (t != null) {
            return this.queue.add(t);
        }
        return true;
    }

    @Override // clickHouse.queue.EventQueue
    public void initProducer() {
        this.queue = new LinkedBlockingQueue<>();
    }

    @Override // clickHouse.queue.EventQueue
    public void initConsumer() {
        createConsumerThread(this.consumer);
    }

    @Override // clickHouse.queue.EventQueue
    public int acceptTakeList(List<T> list, Object obj) {
        return this.queue.drainTo(list, this.perMaxTakeCount);
    }

    @Override // clickHouse.queue.EventQueue
    public void afterConsumerLoop(List<T> list) {
        log.info("queue:{},本次处理event size:{}", getQueueName(), Integer.valueOf(list != null ? list.size() : 0));
        list.clear();
    }

    @Override // clickHouse.queue.EventQueue
    public void handFail(List<T> list) {
        list.clear();
    }

    public boolean processConsumer() {
        return processConsumer(null);
    }

    @Override // clickHouse.queue.EventQueue
    public boolean processConsumer(Object obj) {
        while (true) {
            try {
                try {
                    Thread.sleep(this.perRoundSleepTime);
                } catch (InterruptedException e) {
                    log.error(e.getMessage());
                }
                if (this.isConsumer && !this.queue.isEmpty()) {
                    ArrayList newArrayList = Lists.newArrayList();
                    while (true) {
                        int acceptTakeList = acceptTakeList(newArrayList, obj);
                        if (log.isDebugEnabled()) {
                            log.debug(String.format("queue:%s take list size:%d", this.queueName, Integer.valueOf(acceptTakeList)));
                        }
                        boolean z = false;
                        try {
                            z = this.consumer.apply(newArrayList).booleanValue();
                        } catch (Exception e2) {
                            log.error("consumer apply ex:" + e2.getMessage(), e2);
                        }
                        if (!z) {
                            log.warn("queue consumer handle fail!!!,take list is " + newArrayList.toString());
                            handFail(newArrayList);
                            break;
                        }
                        afterConsumerLoop(newArrayList);
                        if (acceptTakeList != this.perMaxTakeCount) {
                            break;
                        }
                    }
                }
            } catch (Exception e3) {
                log.error("event handle loop ex(will ignore):" + e3.getMessage(), e3);
            }
        }
    }

    private void createConsumerThread(Function<List<T>, Boolean> function) {
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(this.concurrentCount);
        for (int i = 0; i < this.concurrentCount; i++) {
            newFixedThreadPool.execute(() -> {
                processConsumer();
            });
        }
    }
}
