package com.odianyun.mq.awssqs;

import cn.hutool.core.util.StrUtil;
import com.alibaba.fastjson.JSONObject;
import com.amazonaws.services.sns.AmazonSNSClient;
import com.amazonaws.services.sns.model.SetSubscriptionAttributesRequest;
import com.amazonaws.services.sns.util.Topics;
import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.model.Message;
import com.amazonaws.services.sqs.model.QueueDoesNotExistException;
import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
import com.odianyun.mq.common.MqUtils;
import com.odianyun.mq.common.consumer.MessageFilter;
import com.odianyun.mq.common.inner.message.MqMessage;
import com.odianyun.mq.common.message.Destination;
import com.odianyun.mq.consumer.BackoutMessageException;
import com.odianyun.mq.consumer.Consumer;
import com.odianyun.mq.consumer.ConsumerConfig;
import com.odianyun.mq.consumer.MessageListener;
import com.odianyun.mq.consumer.NeedResendException;
import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.collections.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:BOOT-INF/lib/aws-sqs-client-2.0.17.1.RELEASE.jar:com/odianyun/mq/awssqs/AwsSqsConsumerImpl.class */
public class AwsSqsConsumerImpl implements Consumer {
    private volatile Destination dest;
    private volatile String consumerId;
    private volatile ConsumerConfig config;
    private volatile MessageListener messagelistener;
    private Thread thread;
    private volatile ThreadPoolExecutor threadPoolExecutor;
    private final AtomicBoolean closed = new AtomicBoolean(false);
    private volatile Logger logger = LoggerFactory.getLogger(getClass());
    private volatile AmazonSNSClient snsClient = AwsSqsUtils.getSnsClient();

    public AwsSqsConsumerImpl(Destination destination, String str, ConsumerConfig consumerConfig) {
        this.dest = destination;
        this.consumerId = str;
        this.config = consumerConfig;
        this.threadPoolExecutor = MqUtils.getThreadPoolExecutor(consumerConfig);
    }

    private void subscribeSQS(AmazonSNSClient amazonSNSClient, String str, String str2) {
        String queueUrl;
        AmazonSQS sqsClient = AwsSqsUtils.getSqsClient();
        try {
            queueUrl = sqsClient.getQueueUrl(str).getQueueUrl();
        } catch (QueueDoesNotExistException e) {
            sqsClient.createQueue(str);
            queueUrl = sqsClient.getQueueUrl(str).getQueueUrl();
        }
        amazonSNSClient.setSubscriptionAttributes(new SetSubscriptionAttributesRequest(Topics.subscribeQueue(amazonSNSClient, sqsClient, str2, queueUrl), "FilterPolicy", getFilterPolicy(this.config)));
    }

    private String getFilterPolicy(ConsumerConfig consumerConfig) {
        String str = StrUtil.EMPTY_JSON;
        MessageFilter messageFilter = consumerConfig.getMessageFilter();
        MessageFilter.FilterType type = messageFilter.getType();
        switch (type) {
            case AllMatch:
                break;
            case InSet:
                HashMap hashMap = new HashMap();
                Set<String> param = messageFilter.getParam();
                if (!CollectionUtils.isEmpty(param)) {
                    hashMap.put(AwsSqsUtils.ATTRIBUTE_TYPE, param);
                    str = JSONObject.toJSONString(hashMap);
                    break;
                }
                break;
            default:
                throw new RuntimeException("不支持的消息过滤类型:" + type);
        }
        return str;
    }

    public void receiveMessage(final String str) {
        this.thread = new Thread() { // from class: com.odianyun.mq.awssqs.AwsSqsConsumerImpl.1
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                AmazonSQS sqsClient = AwsSqsUtils.getSqsClient();
                String queueUrl = sqsClient.getQueueUrl(str).getQueueUrl();
                while (!AwsSqsConsumerImpl.this.closed.get()) {
                    List<Message> messages = sqsClient.receiveMessage(new ReceiveMessageRequest().withQueueUrl(queueUrl).withWaitTimeSeconds(1)).getMessages();
                    AwsSqsConsumerImpl.this.logger.info("receive:" + messages.size() + ":" + messages);
                    for (Message message : messages) {
                        MqMessage mqMessage = null;
                        try {
                            mqMessage = MqMessage.getByString(JSONObject.parseObject(message.getBody()).get("Message").toString());
                        } catch (Exception e) {
                            AwsSqsConsumerImpl.this.logger.error("getMqMessage error", (Throwable) e);
                        }
                        if (mqMessage != null && AwsSqsConsumerImpl.this.messagelistener != null) {
                            AwsSqsConsumerImpl.this.fireOnMessage(sqsClient, queueUrl, message, mqMessage);
                        }
                    }
                }
            }
        };
        this.thread.start();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void fireOnMessage(final AmazonSQS amazonSQS, final String str, final Message message, final MqMessage mqMessage) {
        this.threadPoolExecutor.submit(new Runnable() { // from class: com.odianyun.mq.awssqs.AwsSqsConsumerImpl.2
            @Override // java.lang.Runnable
            public void run() {
                boolean z = false;
                try {
                    AwsSqsConsumerImpl.this.messagelistener.onMessage(mqMessage);
                } catch (BackoutMessageException e) {
                    z = true;
                } catch (NeedResendException e2) {
                    z = true;
                }
                if (z) {
                    return;
                }
                amazonSQS.deleteMessage(str, message.getReceiptHandle());
            }
        });
    }

    @Override // com.odianyun.mq.consumer.Consumer
    public void start() {
        this.closed.set(false);
        subscribeSQS(this.snsClient, this.consumerId, AwsSqsUtils.createTopicIfNeed(this.snsClient, this.dest.getAssembleName()));
        receiveMessage(this.consumerId);
    }

    @Override // com.odianyun.mq.consumer.Consumer
    public void setListener(MessageListener messageListener) {
        this.messagelistener = messageListener;
    }

    @Override // com.odianyun.mq.consumer.Consumer
    public void close() {
        this.closed.set(true);
    }

    @Override // com.odianyun.mq.consumer.Consumer
    public String getRemoteAddress() {
        return null;
    }

    @Override // com.odianyun.mq.consumer.Consumer
    public InetSocketAddress getConsumerAddress() {
        return null;
    }

    @Override // com.odianyun.mq.consumer.Consumer
    public void restart() {
        close();
        start();
    }

    @Override // com.odianyun.mq.consumer.Consumer
    public Destination getDest() {
        return this.dest;
    }
}
