package com.odianyun.mq.producer.impl;

import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.InsufficientCapacityException;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import com.odianyun.mq.common.inner.strategy.DefaultPullStrategy;
import com.odianyun.mq.common.inner.wrap.Wrap;
import com.odianyun.mq.common.inner.wrap.WrappedMessage;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:WEB-INF/lib/omq-real-client-2.0.17.1.RELEASE.jar:com/odianyun/mq/producer/impl/DisruptorAsynHandler.class */
public class DisruptorAsynHandler {
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) DisruptorAsynHandler.class);
    private final Disruptor<WrappedMessageContainer> disruptor;
    private final RingBuffer<WrappedMessageContainer> ringBuffer;
    private final ProducerImpl producer;
    private final int sendTimes;
    private final DefaultPullStrategy defaultPullStrategy = new DefaultPullStrategy(500, 2500);
    private final ExecutorService es = Executors.newCachedThreadPool();
    private EventHandler<WrappedMessageContainer> messageEventHandler = new EventHandler<WrappedMessageContainer>() { // from class: com.odianyun.mq.producer.impl.DisruptorAsynHandler.1
        @Override // com.lmax.disruptor.EventHandler
        public void onEvent(WrappedMessageContainer wrappedMessageContainer, long j, boolean z) throws Exception {
            int i = DisruptorAsynHandler.this.sendTimes;
            DisruptorAsynHandler.this.defaultPullStrategy.succeess();
            Wrap wrappedMessage = wrappedMessageContainer.getWrappedMessage();
            int i2 = DisruptorAsynHandler.this.sendTimes;
            while (i2 > 0) {
                i2--;
                try {
                    DisruptorAsynHandler.this.producer.getProducerService().sendMessage(wrappedMessage);
                    wrappedMessageContainer.setWrappedMessage(null);
                    return;
                } catch (Exception e) {
                    DisruptorAsynHandler.LOGGER.warn(e.getMessage(), (Throwable) e);
                    if (i2 <= 0) {
                        return;
                    }
                    try {
                        DisruptorAsynHandler.this.defaultPullStrategy.fail(true);
                    } catch (InterruptedException e2) {
                        return;
                    }
                }
            }
        }
    };

    public DisruptorAsynHandler(ProducerImpl producerImpl) {
        this.producer = producerImpl;
        this.sendTimes = this.producer.getProducerConfig().getAsyncRetryTimes() == Integer.MAX_VALUE ? Integer.MAX_VALUE : this.producer.getProducerConfig().getAsyncRetryTimes() + 1;
        this.disruptor = new Disruptor<>(WrappedMessageContainer.EVENT_FACTORY, producerImpl.getProducerConfig().getAsyncQueueSize(), this.es);
        this.disruptor.handleEventsWith(this.messageEventHandler);
        this.ringBuffer = this.disruptor.start();
    }

    public void putMessage(Wrap wrap) {
        for (int i = 0; i < this.producer.getProducerConfig().getAsyncPutSpinCount() + 1; i++) {
            try {
                try {
                    long tryNext = this.ringBuffer.tryNext(1);
                    this.ringBuffer.get(tryNext).setWrappedMessage(wrap);
                    this.ringBuffer.publish(tryNext);
                    return;
                } catch (InsufficientCapacityException e) {
                } catch (Exception e2) {
                    LOGGER.error("Exception on putMessage", (Throwable) e2);
                    throw e2;
                }
            } catch (Exception e3) {
                LOGGER.warn("Buffersize is full,discard message");
                return;
            }
        }
        AsyncRejectionPolicy asyncRejectionPolicy = this.producer.getAsyncRejectionPolicy();
        if (asyncRejectionPolicy != null && (wrap instanceof WrappedMessage)) {
            asyncRejectionPolicy.onRejected(((WrappedMessage) wrap).getContent(), this.producer);
        } else if (asyncRejectionPolicy != null) {
            LOGGER.warn("Not a MqMessage: {}", wrap);
        } else {
            LOGGER.warn("Insufficient capacity when async send, discard it by default: {}", wrap);
        }
    }

    public void shutdown() {
        this.disruptor.shutdown();
        if (this.es.isShutdown()) {
            return;
        }
        this.es.shutdown();
    }
}
