package org.clever.hinny.data.rabbitmq.support;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.ShutdownSignalException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.Assert;

/* loaded from: input_file:org/clever/hinny/data/rabbitmq/support/CanInterruptConsumer.class */
public class CanInterruptConsumer implements Consumer {
    private static final Logger log = LoggerFactory.getLogger(CanInterruptConsumer.class);
    private Channel channel;
    private final boolean autoAck;
    private final int prefetchCount;
    private final String queue;
    private final ConsumerMessages consumerMessages;
    private volatile String consumerTag;
    private volatile boolean interrupted;
    private volatile boolean cancelled;
    private final java.util.function.Consumer<Throwable> onStopConsumer;
    private long lastSleepInterruptedTime;

    public CanInterruptConsumer(Channel channel, String str, boolean z, int i, ConsumerMessages consumerMessages, java.util.function.Consumer<Throwable> consumer) {
        this.interrupted = false;
        this.cancelled = false;
        this.lastSleepInterruptedTime = 0L;
        Assert.notNull(channel, "channel 不能为 null");
        Assert.hasText(str, "queue 不能为空");
        Assert.notNull(consumerMessages, "consumerMessages 不能为 null");
        this.channel = channel;
        this.queue = str;
        this.autoAck = z;
        this.prefetchCount = i;
        this.consumerMessages = consumerMessages;
        this.onStopConsumer = consumer;
    }

    public CanInterruptConsumer(Channel channel, String str, boolean z, int i, ConsumerMessages consumerMessages) {
        this(channel, str, z, i, consumerMessages, null);
    }

    public void handleConsumeOk(String str) {
        this.consumerTag = str;
        this.cancelled = false;
    }

    public void handleCancelOk(String str) {
        onStop(null);
    }

    public void handleCancel(String str) {
        onStop(null);
    }

    public void handleShutdownSignal(String str, ShutdownSignalException shutdownSignalException) {
        log.warn("通道Channel或基础连接关闭 [queue={}] [consumerTag={}] handleShutdownSignal", new Object[]{this.queue, str, shutdownSignalException});
        onStop(shutdownSignalException);
    }

    public void handleRecoverOk(String str) {
    }

    public void handleDelivery(String str, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bArr) {
        if (!this.cancelled || this.autoAck) {
            try {
                this.consumerMessages.handleDelivery(this.channel, this, str, envelope, basicProperties, bArr);
            } catch (Exception e) {
                log.warn("messages消费失败 [queue={}] [consumerTag={}]", new Object[]{this.queue, str, e});
            }
            if (this.interrupted) {
                doInterrupt();
            }
        }
    }

    public void interrupt() {
        boolean z = this.interrupted;
        this.interrupted = true;
        if (z) {
            return;
        }
        doInterrupt();
    }

    private void doInterrupt() {
        if (!this.channel.isOpen() || this.cancelled) {
            return;
        }
        try {
            this.channel.basicCancel(this.consumerTag);
            log.info("[中断消费] - [queue={}] [consumerTag={}] 消费中断成功", this.queue, this.consumerTag);
            this.channel.close();
            onStop(null);
        } catch (Exception e) {
            log.warn("[中断消费] - [queue={}] [consumerTag={}] 消费中断失败", new Object[]{this.queue, this.consumerTag, e});
            try {
                this.channel.abort();
                log.info("[中断消费] - [queue={}] [consumerTag={}] 强制取消消费成功", this.queue, this.consumerTag);
                this.channel.close();
                onStop(null);
            } catch (Exception e2) {
                log.warn("[中断消费] - [queue={}] [consumerTag={}] 强制取消消费失败", new Object[]{this.queue, this.consumerTag, e});
            }
        }
    }

    public void waitForEnd() {
        while (!this.cancelled) {
            try {
                Thread.sleep(1000L);
            } catch (InterruptedException e) {
                long currentTimeMillis = System.currentTimeMillis();
                if (currentTimeMillis - this.lastSleepInterruptedTime < 60000) {
                    log.warn("Consumer Wait For End Error", e);
                }
                this.lastSleepInterruptedTime = currentTimeMillis;
            }
        }
    }

    public void setChannel(Channel channel) {
        this.channel = channel;
    }

    public void clearInterrupt() {
        this.interrupted = false;
        this.cancelled = false;
    }

    protected void onStop(Throwable th) {
        boolean z = this.cancelled;
        this.cancelled = true;
        if (z || this.onStopConsumer == null) {
            return;
        }
        this.onStopConsumer.accept(th);
    }

    public Channel getChannel() {
        return this.channel;
    }

    public boolean isAutoAck() {
        return this.autoAck;
    }

    public int getPrefetchCount() {
        return this.prefetchCount;
    }

    public String getQueue() {
        return this.queue;
    }

    public ConsumerMessages getConsumerMessages() {
        return this.consumerMessages;
    }

    public String getConsumerTag() {
        return this.consumerTag;
    }

    public boolean isInterrupted() {
        return this.interrupted;
    }

    public boolean isCancelled() {
        return this.cancelled;
    }
}
