package com.odianyun.horse.common.util;

import com.alibaba.fastjson.JSONObject;
import com.odianyun.exception.factory.OdyExceptionFactory;
import com.odianyun.horse.model.messageBus.DatabusMessage;
import com.odianyun.mq.common.consumer.ConsumerType;
import com.odianyun.mq.common.inner.util.NameCheckUtil;
import com.odianyun.mq.common.message.Destination;
import com.odianyun.mq.common.properties.OmqConfigUtil;
import com.odianyun.mq.consumer.Consumer;
import com.odianyun.mq.consumer.ConsumerConfig;
import com.odianyun.mq.consumer.ConsumerFactory;
import com.odianyun.mq.consumer.impl.ConsumerFactoryImpl;
import com.odianyun.soa.common.util.ZkUtil;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeoutException;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/odianyun/horse/common/util/DatabusUtil.class */
public class DatabusUtil {
    private static Logger log = LoggerFactory.getLogger(DatabusUtil.class);
    private static DatabusUtil instance = null;
    public static final String HORSE_CONSUMER_ID = "horse-core-user-behavior";
    public static final String INSERT = "insert";
    public static final String UPDATE = "update";
    public static final String DELETE = "delete";
    public static final String TABLE_CREATE = "table-create";
    public static final String TABLE_ALTER = "table-alter";
    public static String rabbitmqHost;
    public static Integer rabbitmqPort;
    public static String rabbitmqPass;
    public static String rabbitmqUser;

    public static void startConsumer(String str, DatabusHandle databusHandle, DatabusFilter databusFilter) {
        startConsumer(str, HORSE_CONSUMER_ID, databusHandle, databusFilter);
    }

    public static void startConsumer(String str, String str2, DatabusHandle databusHandle, DatabusFilter databusFilter) {
        try {
            ConsumerFactory consumerFactoryImpl = ConsumerFactoryImpl.getInstance();
            ConsumerConfig consumerConfig = new ConsumerConfig();
            consumerConfig.setConsumerType(ConsumerType.CLIENT_ACKNOWLEDGE);
            consumerConfig.setThreadPoolSize(10);
            Consumer createLocalConsumer = consumerFactoryImpl.createLocalConsumer(Destination.topic("databus_" + str), str2, consumerConfig);
            createLocalConsumer.setListener(message -> {
                DatabusMessage databusMessage = null;
                try {
                    databusMessage = (DatabusMessage) message.transferContentToBean(DatabusMessage.class);
                } catch (Exception e) {
                    OdyExceptionFactory.log(e);
                }
                DatabusMessage doFilter = databusFilter.doFilter(databusMessage);
                if (doFilter != null) {
                    databusHandle.handle(doFilter);
                }
            });
            createLocalConsumer.start();
        } catch (Exception e) {
            OdyExceptionFactory.log(e);
            log.error("start consumer failed==================================================", e);
        }
    }

    public static void consumeInclude(DatabusHandle databusHandle, String str, String str2, String str3, String str4, String str5) {
        createConsumer(str, str2, databusHandle, databusMessage -> {
            if (null != databusMessage) {
                if (StringUtils.isNotBlank(str3) && !str3.equals(databusMessage.getDatabase())) {
                    return null;
                }
                if (StringUtils.isNotBlank(str4) && !str4.equals(databusMessage.getTable())) {
                    return null;
                }
                if (StringUtils.isNotBlank(str5) && !str5.equals(databusMessage.getType())) {
                    return null;
                }
            }
            return databusMessage;
        });
    }

    public static void consumeIncludeList(DatabusHandle databusHandle, String str, String str2, List<String> list, List<String> list2, List<String> list3) {
        createConsumer(str, str2, databusHandle, databusMessage -> {
            if (null != databusMessage) {
                if (CollectionUtils.isNotEmpty(list)) {
                    Iterator it = list.iterator();
                    while (it.hasNext()) {
                        String str3 = (String) it.next();
                        if (null != str3 && !str3.equals(list)) {
                            return null;
                        }
                    }
                }
                if (CollectionUtils.isNotEmpty(list2)) {
                    Iterator it2 = list2.iterator();
                    while (it2.hasNext()) {
                        String str4 = (String) it2.next();
                        if (null != str4 && !str4.equals(list2)) {
                            return null;
                        }
                    }
                }
                if (CollectionUtils.isNotEmpty(list3)) {
                    Iterator it3 = list3.iterator();
                    while (it3.hasNext()) {
                        String str5 = (String) it3.next();
                        if (null != str5 && !str5.equals(list3)) {
                            return null;
                        }
                    }
                }
            }
            return databusMessage;
        });
    }

    public static void consumeExclusive(DatabusHandle databusHandle, String str, String str2, List<String> list, List<String> list2, List<String> list3) {
        createConsumer(str, str2, databusHandle, databusMessage -> {
            if (null != databusMessage) {
                if (CollectionUtils.isNotEmpty(list)) {
                    Iterator it = list.iterator();
                    while (it.hasNext()) {
                        String str3 = (String) it.next();
                        if (null != str3 && str3.equals(list)) {
                            return null;
                        }
                    }
                }
                if (CollectionUtils.isNotEmpty(list2)) {
                    Iterator it2 = list2.iterator();
                    while (it2.hasNext()) {
                        String str4 = (String) it2.next();
                        if (null != str4 && str4.equals(list2)) {
                            return null;
                        }
                    }
                }
                if (CollectionUtils.isNotEmpty(list3)) {
                    Iterator it3 = list3.iterator();
                    while (it3.hasNext()) {
                        String str5 = (String) it3.next();
                        if (null != str5 && str5.equals(list3)) {
                            return null;
                        }
                    }
                }
            }
            return databusMessage;
        });
    }

    public static void consumeExclusiveList(DatabusHandle databusHandle, String str, String str2, String str3, String str4, String str5) {
        createConsumer(str, str2, databusHandle, databusMessage -> {
            if (null != databusMessage) {
                if (StringUtils.isNotBlank(str3) && str3.equals(databusMessage.getDatabase())) {
                    return null;
                }
                if (StringUtils.isNotBlank(str4) && str4.equals(databusMessage.getTable())) {
                    return null;
                }
                if (StringUtils.isNotBlank(str5) && str5.equals(databusMessage.getType())) {
                    return null;
                }
            }
            return databusMessage;
        });
    }

    public static void createConsumer(String str, String str2, final DatabusHandle databusHandle, final DatabusFilter databusFilter) {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(rabbitmqHost);
        connectionFactory.setPort(rabbitmqPort.intValue());
        connectionFactory.setUsername(rabbitmqUser);
        connectionFactory.setPassword(rabbitmqPass);
        connectionFactory.setVirtualHost("/");
        try {
            Channel createChannel = connectionFactory.newConnection().createChannel();
            String str3 = "databus_" + str;
            final String str4 = str3 + "->" + str2;
            createChannel.queueDeclare(str4, true, false, false, (Map) null);
            createChannel.queueBind(str4, str3, "#");
            createChannel.basicConsume(str4, true, new DefaultConsumer(createChannel) { // from class: com.odianyun.horse.common.util.DatabusUtil.1
                public void handleDelivery(String str5, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bArr) throws IOException {
                    DatabusMessage doFilter;
                    try {
                        envelope.getExchange();
                        envelope.getDeliveryTag();
                        DatabusMessage databusMessage = (DatabusMessage) JSONObject.parseObject(new String(bArr, "utf-8"), DatabusMessage.class);
                        if (databusMessage != null && (doFilter = databusFilter.doFilter(databusMessage)) != null) {
                            databusHandle.handle(doFilter);
                        }
                    } catch (Exception e) {
                        DatabusUtil.log.error(str4 + "  消费失败", e.getMessage());
                    }
                }
            });
        } catch (IOException e) {
            log.error("databus 消费失败 ", e.getMessage());
        } catch (TimeoutException e2) {
            log.error("databus 消费失败 ", e2.getMessage());
        }
    }

    public static String topic(String str) {
        if (!NameCheckUtil.isTopicNameValid(str)) {
            throw new IllegalArgumentException("Topic name is illegal, should be [0-9,a-z,A-Z,'_','-'], begin with a letter, and length is 2-30 long：" + str);
        }
        return ZkUtil.getZkNamespace() + "_" + str;
    }

    static {
        rabbitmqHost = OmqConfigUtil.getProperty("omq.rabbitmq.address", (String) null);
        rabbitmqPort = 5672;
        if (StringUtils.isNotBlank(rabbitmqHost)) {
            String[] split = rabbitmqHost.split(":");
            rabbitmqHost = split[0];
            rabbitmqPort = new Integer(split[1]);
        }
        rabbitmqUser = OmqConfigUtil.getProperty("omq.rabbitmq.username", (String) null);
        rabbitmqPass = OmqConfigUtil.getProperty("omq.rabbitmq.password", (String) null);
    }
}
