package com.odianyun.mq.common.inner.dao.impl.mongodb;

import com.mongodb.BasicDBObject;
import com.mongodb.BasicDBObjectBuilder;
import com.mongodb.DBCollection;
import com.mongodb.DBCursor;
import com.mongodb.DBObject;
import com.mongodb.QueryOperators;
import com.mongodb.WriteConcern;
import com.odianyun.mq.common.consumer.MessageFilter;
import com.odianyun.mq.common.inner.dao.MessageDAO;
import com.odianyun.mq.common.inner.message.MqMessage;
import com.odianyun.mq.common.inner.util.MongoUtil;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.bson.types.BSONTimestamp;
import org.bson.types.ObjectId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:BOOT-INF/lib/omq-real-client-2.0.17.6.RELEASE.jar:com/odianyun/mq/common/inner/dao/impl/mongodb/MessageDAOImpl.class */
public class MessageDAOImpl implements MessageDAO {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) MessageDAOImpl.class);
    public static final String OID = "oid";
    public static final String CONTENT = "c";
    public static final String VERSION = "v";
    public static final String SHA1 = "s";
    public static final String GENERATED_TIME = "gt";
    public static final String PROPERTIES = "p";
    public static final String INTERNAL_PROPERTIES = "_p";
    public static final String TYPE = "t";
    public static final String SOURCE_IP = "si";
    public static final String PROTOCOLTYPE = "pt";
    public static final String UUID = "uid";
    private MongoNewClient mongoNewClient;

    public void setMongoNewClient(MongoNewClient mongoNewClient) {
        this.mongoNewClient = mongoNewClient;
    }

    @Override // com.odianyun.mq.common.inner.dao.MessageDAO
    public MqMessage getMessage(String str, Long l) {
        return getMessage(str, l, 0);
    }

    @Override // com.odianyun.mq.common.inner.dao.MessageDAO
    public MqMessage getMessage(String str, Long l, int i) {
        DBObject findOne = this.mongoNewClient.getMessageCollection(str, i).findOne(BasicDBObjectBuilder.start().add("uid", MongoUtil.longToBSONTimestamp(l)).get());
        if (findOne == null) {
            return null;
        }
        MqMessage mqMessage = new MqMessage();
        try {
            convert(findOne, mqMessage);
            return mqMessage;
        } catch (RuntimeException e) {
            LOG.error("Error when convert resultset to mqMessage.", (Throwable) e);
            return null;
        }
    }

    public DBObject getMessageByUUId(String str, ObjectId objectId, int i) {
        return this.mongoNewClient.getMessageCollection(str, i).findOne(BasicDBObjectBuilder.start().add("uid", objectId).get());
    }

    @Override // com.odianyun.mq.common.inner.dao.MessageDAO
    public Long getMaxMessageId(String str) {
        return getMaxMessageId(str, 0);
    }

    @Override // com.odianyun.mq.common.inner.dao.MessageDAO
    public Long getMaxMessageId(String str, int i) {
        DBCollection messageCollection = this.mongoNewClient.getMessageCollection(str, i);
        DBObject dBObject = BasicDBObjectBuilder.start().add("uid", 1).get();
        DBCursor limit = messageCollection.find((DBObject) null, dBObject).sort(BasicDBObjectBuilder.start().add("uid", -1).get()).limit(1);
        try {
            if (!limit.hasNext()) {
                limit.close();
                return null;
            }
            Long BSONTimestampToLong = MongoUtil.BSONTimestampToLong((BSONTimestamp) limit.next().get("uid"));
            limit.close();
            return BSONTimestampToLong;
        } catch (Throwable th) {
            limit.close();
            throw th;
        }
    }

    @Override // com.odianyun.mq.common.inner.dao.MessageDAO
    public MqMessage getMaxMessage(String str) {
        return getMaxMessage(str, 0);
    }

    @Override // com.odianyun.mq.common.inner.dao.MessageDAO
    public MqMessage getMaxMessage(String str, int i) {
        DBCollection messageCollection = this.mongoNewClient.getMessageCollection(str, i);
        DBCursor limit = messageCollection.find().sort(BasicDBObjectBuilder.start().add("uid", -1).get()).limit(1);
        try {
            if (limit.hasNext()) {
                DBObject next = limit.next();
                MqMessage mqMessage = new MqMessage();
                try {
                    convert(next, mqMessage);
                    limit.close();
                    return mqMessage;
                } catch (RuntimeException e) {
                    LOG.error("Error when convert resultset to mqMessage.", (Throwable) e);
                }
            }
            return null;
        } finally {
            limit.close();
        }
    }

    @Override // com.odianyun.mq.common.inner.dao.MessageDAO
    public List<MqMessage> getMessagesGreaterThan(String str, Long l, int i) {
        return getMessagesGreaterThan(str, l, i, 0);
    }

    @Override // com.odianyun.mq.common.inner.dao.MessageDAO
    public List<MqMessage> getMessagesGreaterThan(String str, Long l, int i, int i2) {
        DBCollection messageCollection = this.mongoNewClient.getMessageCollection(str, i2);
        DBObject dBObject = BasicDBObjectBuilder.start().add("uid", BasicDBObjectBuilder.start().add(QueryOperators.GT, MongoUtil.longToBSONTimestamp(l)).get()).get();
        DBCursor limit = messageCollection.find(dBObject).sort(BasicDBObjectBuilder.start().add("uid", 1).get()).limit(i);
        ArrayList arrayList = new ArrayList();
        while (limit.hasNext()) {
            try {
                DBObject next = limit.next();
                MqMessage mqMessage = new MqMessage();
                try {
                    convert(next, mqMessage);
                    arrayList.add(mqMessage);
                } catch (RuntimeException e) {
                    LOG.error("Error when convert resultset to mqMessage.", (Throwable) e);
                }
            } finally {
                limit.close();
            }
        }
        return arrayList;
    }

    private void convert(DBObject dBObject, MqMessage mqMessage) {
        mqMessage.setMessageId(MongoUtil.BSONTimestampToLong((BSONTimestamp) dBObject.get("uid")));
        mqMessage.setContent(dBObject.get("c"));
        mqMessage.setVersion((String) dBObject.get("v"));
        mqMessage.setGeneratedTime((Date) dBObject.get("gt"));
        Map map = (Map) dBObject.get("p");
        if (map != null) {
            mqMessage.setProperties(new HashMap(map));
        }
        Map map2 = (Map) dBObject.get(INTERNAL_PROPERTIES);
        if (map2 != null) {
            mqMessage.setInternalProperties(new HashMap(map2));
        }
        mqMessage.setSha1((String) dBObject.get("s"));
        mqMessage.setType((String) dBObject.get("t"));
        mqMessage.setSourceIp((String) dBObject.get("si"));
        String str = (String) dBObject.get("pt");
        if (str != null) {
            mqMessage.setProtocolType(str);
        }
    }

    @Override // com.odianyun.mq.common.inner.dao.MessageDAO
    public void saveMessage(String str, MqMessage mqMessage, int i) {
        saveMessage(str, mqMessage, 0, i);
    }

    @Override // com.odianyun.mq.common.inner.dao.MessageDAO
    public void saveMessage(String str, MqMessage mqMessage, int i, int i2) {
        WriteConcern writeConcern;
        DBCollection messageCollection = this.mongoNewClient.getMessageCollection(str, i);
        BasicDBObjectBuilder add = BasicDBObjectBuilder.start().add("uid", new BSONTimestamp());
        convert(mqMessage, add);
        switch (i2) {
            case 0:
                writeConcern = WriteConcern.NORMAL;
                break;
            case 1:
                writeConcern = WriteConcern.SAFE;
                break;
            case 2:
                writeConcern = WriteConcern.FSYNC_SAFE;
                break;
            case 3:
                writeConcern = WriteConcern.JOURNAL_SAFE;
                break;
            case 4:
                writeConcern = WriteConcern.REPLICAS_SAFE;
                break;
            default:
                writeConcern = WriteConcern.NORMAL;
                break;
        }
        if (i2 >= 1) {
            messageCollection.insert(add.get(), writeConcern);
        } else {
            messageCollection.insert(add.get(), writeConcern);
        }
    }

    @Override // com.odianyun.mq.common.inner.dao.MessageDAO
    public Long getMessageIDGreaterThan(String str, Long l, int i) {
        return getMessageIDGreaterThan(str, l, i, 0);
    }

    @Override // com.odianyun.mq.common.inner.dao.MessageDAO
    public Long getMessageIDGreaterThan(String str, Long l, int i, int i2) {
        DBCursor limit = this.mongoNewClient.getMessageCollection(str, i2).find(BasicDBObjectBuilder.start().add("uid", BasicDBObjectBuilder.start().add(QueryOperators.GTE, MongoUtil.longToBSONTimestamp(l)).get()).get(), BasicDBObjectBuilder.start().add("uid", 1).get()).sort(BasicDBObjectBuilder.start().add("uid", 1).get()).limit(i);
        try {
            BSONTimestamp bSONTimestamp = null;
            int size = limit.size();
            if (size > 0) {
                bSONTimestamp = (BSONTimestamp) limit.skip(size - 1).next().get("uid");
            }
            if (bSONTimestamp == null) {
                limit.close();
                return null;
            }
            Long BSONTimestampToLong = MongoUtil.BSONTimestampToLong(bSONTimestamp);
            limit.close();
            return BSONTimestampToLong;
        } catch (Throwable th) {
            limit.close();
            throw th;
        }
    }

    @Override // com.odianyun.mq.common.inner.dao.MessageDAO
    public List<MqMessage> getMessagesGreaterThan(String str, Long l, Long l2, MessageFilter messageFilter) {
        return getMessagesGreaterThan(str, l, l2, messageFilter, 0);
    }

    @Override // com.odianyun.mq.common.inner.dao.MessageDAO
    public List<MqMessage> getMessagesGreaterThan(String str, Long l, Long l2, MessageFilter messageFilter, int i) {
        Set<String> param;
        DBCollection messageCollection = this.mongoNewClient.getMessageCollection(str, i);
        BasicDBObjectBuilder add = BasicDBObjectBuilder.start().add("uid", BasicDBObjectBuilder.start().add(QueryOperators.GTE, MongoUtil.longToBSONTimestamp(l)).add(QueryOperators.LT, MongoUtil.longToBSONTimestamp(l2)).get());
        if (messageFilter != null && (param = messageFilter.getParam()) != null && !param.isEmpty()) {
            add.add("t", new BasicDBObject(QueryOperators.IN, param));
        }
        DBCursor sort = messageCollection.find(add.get()).sort(BasicDBObjectBuilder.start().add("uid", 1).get());
        ArrayList arrayList = new ArrayList();
        while (sort.hasNext()) {
            try {
                DBObject next = sort.next();
                MqMessage mqMessage = new MqMessage();
                try {
                    convert(next, mqMessage);
                    arrayList.add(mqMessage);
                } catch (RuntimeException e) {
                    LOG.error("Error when convert resultset to mqMessage.", (Throwable) e);
                }
            } finally {
                sort.close();
            }
        }
        return arrayList;
    }

    @Override // com.odianyun.mq.common.inner.dao.MessageDAO
    public List<Long> getMessagesIdGreaterThan(String str, Long l, Long l2, MessageFilter messageFilter) {
        return getMessagesIdGreaterThan(str, l, l2, messageFilter, 0);
    }

    @Override // com.odianyun.mq.common.inner.dao.MessageDAO
    public List<Long> getMessagesIdGreaterThan(String str, Long l, Long l2, MessageFilter messageFilter, int i) {
        Set<String> param;
        DBCollection messageCollection = this.mongoNewClient.getMessageCollection(str, i);
        BasicDBObjectBuilder add = BasicDBObjectBuilder.start().add("uid", BasicDBObjectBuilder.start().add(QueryOperators.GTE, MongoUtil.longToBSONTimestamp(l)).add(QueryOperators.LT, MongoUtil.longToBSONTimestamp(l2)).get());
        if (messageFilter != null && (param = messageFilter.getParam()) != null && !param.isEmpty()) {
            add.add("t", new BasicDBObject(QueryOperators.IN, param));
        }
        DBCursor sort = messageCollection.find(add.get(), BasicDBObjectBuilder.start().add("uid", 1).get()).sort(BasicDBObjectBuilder.start().add("uid", 1).get());
        ArrayList arrayList = new ArrayList();
        while (sort.hasNext()) {
            try {
                try {
                    arrayList.add(MongoUtil.BSONTimestampToLong((BSONTimestamp) sort.next().get("uid")));
                } catch (RuntimeException e) {
                    LOG.error("Error when convert resultset to mqMessage.", (Throwable) e);
                }
            } finally {
                sort.close();
            }
        }
        return arrayList;
    }

    @Override // com.odianyun.mq.common.inner.dao.MessageDAO
    public List<Long> getMessageIdBetween(String str, Long l, Long l2) {
        return getMessageIdBetween(str, l, l2, 0);
    }

    @Override // com.odianyun.mq.common.inner.dao.MessageDAO
    public List<Long> getMessageIdBetween(String str, Long l, Long l2, int i) {
        DBCollection messageCollection = this.mongoNewClient.getMessageCollection(str, i);
        DBObject dBObject = BasicDBObjectBuilder.start().add("uid", BasicDBObjectBuilder.start().add(QueryOperators.GTE, MongoUtil.longToBSONTimestamp(l)).add(QueryOperators.LT, MongoUtil.longToBSONTimestamp(l2)).get()).get();
        DBCursor sort = messageCollection.find(dBObject, new BasicDBObject("uid", 1)).sort(BasicDBObjectBuilder.start().add("uid", 1).get());
        ArrayList arrayList = new ArrayList();
        while (sort.hasNext()) {
            arrayList.add(MongoUtil.BSONTimestampToLong((BSONTimestamp) sort.next().get("uid")));
        }
        return arrayList;
    }

    @Override // com.odianyun.mq.common.inner.dao.MessageDAO
    public Long getMinIdAfter(String str, BSONTimestamp bSONTimestamp) {
        return getMinIdAfter(str, bSONTimestamp, 0);
    }

    @Override // com.odianyun.mq.common.inner.dao.MessageDAO
    public Long getMinIdAfter(String str, BSONTimestamp bSONTimestamp, int i) {
        DBCollection messageCollection = this.mongoNewClient.getMessageCollection(str, i);
        DBObject dBObject = BasicDBObjectBuilder.start().add("uid", BasicDBObjectBuilder.start().add(QueryOperators.GTE, bSONTimestamp).get()).get();
        DBCursor sort = messageCollection.find(dBObject, new BasicDBObject("uid", 1)).limit(1).sort(BasicDBObjectBuilder.start().add("uid", 1).get());
        if (sort.hasNext()) {
            return MongoUtil.BSONTimestampToLong((BSONTimestamp) sort.next().get("uid"));
        }
        return null;
    }

    @Override // com.odianyun.mq.common.inner.dao.MessageDAO
    public int countBetween(String str, Long l, Long l2) {
        return countBetween(str, l, l2, 0);
    }

    @Override // com.odianyun.mq.common.inner.dao.MessageDAO
    public int countBetween(String str, Long l, Long l2, int i) {
        DBCollection messageCollection = this.mongoNewClient.getMessageCollection(str, i);
        DBObject dBObject = BasicDBObjectBuilder.start().add("uid", BasicDBObjectBuilder.start().add(QueryOperators.LT, MongoUtil.longToBSONTimestamp(l2)).add(QueryOperators.GTE, MongoUtil.longToBSONTimestamp(l)).get()).get();
        return messageCollection.find(dBObject).sort(BasicDBObjectBuilder.start().add("uid", -1).get()).count();
    }

    @Override // com.odianyun.mq.common.inner.dao.MessageDAO
    public Long getSkippedMessageId(String str, Long l, int i) {
        return getSkippedMessageId(str, l, i, 0);
    }

    @Override // com.odianyun.mq.common.inner.dao.MessageDAO
    public Long getSkippedMessageId(String str, Long l, int i, int i2) {
        DBCollection messageCollection = this.mongoNewClient.getMessageCollection(str, i2);
        DBObject dBObject = BasicDBObjectBuilder.start().add("uid", BasicDBObjectBuilder.start().add(QueryOperators.GTE, MongoUtil.longToBSONTimestamp(l)).get()).get();
        DBCursor sort = messageCollection.find(dBObject, new BasicDBObject("uid", 1)).skip(i).limit(1).sort(BasicDBObjectBuilder.start().add("uid", 1).get());
        if (sort.hasNext()) {
            return MongoUtil.BSONTimestampToLong((BSONTimestamp) sort.next().get("uid"));
        }
        return null;
    }

    @Override // com.odianyun.mq.common.inner.dao.MessageDAO
    public List<MqMessage> getMessagesBetween(String str, Long l, Long l2, MessageFilter messageFilter, int i) {
        Set<String> param;
        DBCollection messageCollection = this.mongoNewClient.getMessageCollection(str, i);
        BasicDBObjectBuilder add = BasicDBObjectBuilder.start().add("uid", BasicDBObjectBuilder.start().add(QueryOperators.GTE, MongoUtil.longToBSONTimestamp(l)).add(QueryOperators.LTE, MongoUtil.longToBSONTimestamp(l2)).get());
        if (messageFilter != null && (param = messageFilter.getParam()) != null && !param.isEmpty()) {
            add.add("t", new BasicDBObject(QueryOperators.IN, param));
        }
        DBCursor sort = messageCollection.find(add.get()).sort(BasicDBObjectBuilder.start().add("uid", 1).get());
        ArrayList arrayList = new ArrayList();
        while (sort.hasNext()) {
            try {
                DBObject next = sort.next();
                MqMessage mqMessage = new MqMessage();
                try {
                    convert(next, mqMessage);
                    arrayList.add(mqMessage);
                } catch (RuntimeException e) {
                    LOG.error("Error when convert resultset to mqMessage.", (Throwable) e);
                }
            } finally {
                sort.close();
            }
        }
        return arrayList;
    }

    @Override // com.odianyun.mq.common.inner.dao.MessageDAO
    public List<Long> getMessageIdBetween(String str, Long l, Long l2, MessageFilter messageFilter, int i) {
        Set<String> param;
        DBCollection messageCollection = this.mongoNewClient.getMessageCollection(str, i);
        BasicDBObjectBuilder add = BasicDBObjectBuilder.start().add("uid", BasicDBObjectBuilder.start().add(QueryOperators.GTE, MongoUtil.longToBSONTimestamp(l)).add(QueryOperators.LTE, MongoUtil.longToBSONTimestamp(l2)).get());
        if (messageFilter != null && (param = messageFilter.getParam()) != null && !param.isEmpty()) {
            add.add("t", new BasicDBObject(QueryOperators.IN, param));
        }
        DBCursor sort = messageCollection.find(add.get(), new BasicDBObject("uid", 1)).sort(BasicDBObjectBuilder.start().add("uid", 1).get());
        ArrayList arrayList = new ArrayList();
        while (sort.hasNext()) {
            try {
                try {
                    arrayList.add(MongoUtil.BSONTimestampToLong((BSONTimestamp) sort.next().get("uid")));
                } catch (RuntimeException e) {
                    LOG.error("Error when convert resultset to mqMessage.", (Throwable) e);
                }
            } finally {
                sort.close();
            }
        }
        return arrayList;
    }

    @Override // com.odianyun.mq.common.inner.dao.MessageDAO
    public List<MqMessage> getMessagesBetween(String str, Long l, Long l2, MessageFilter messageFilter) {
        return getMessagesBetween(str, l, l2, messageFilter, 0);
    }

    @Override // com.odianyun.mq.common.inner.dao.MessageDAO
    public List<Long> getMessageIdBetween(String str, Long l, Long l2, MessageFilter messageFilter) {
        return getMessageIdBetween(str, l, l2, messageFilter, 0);
    }

    @Override // com.odianyun.mq.common.inner.dao.MessageDAO
    public void transferMessage(String str, MqMessage mqMessage, int i) {
        transferMessage(str, mqMessage, 0, i);
    }

    @Override // com.odianyun.mq.common.inner.dao.MessageDAO
    public void transferMessage(String str, MqMessage mqMessage, int i, int i2) {
        WriteConcern writeConcern;
        DBCollection messageCollection = this.mongoNewClient.getMessageCollection(str, i);
        BasicDBObjectBuilder add = BasicDBObjectBuilder.start().add(OID, MongoUtil.longToBSONTimestamp(mqMessage.getMessageId())).add("uid", new BSONTimestamp());
        convert(mqMessage, add);
        switch (i2) {
            case 0:
                writeConcern = WriteConcern.NORMAL;
                break;
            case 1:
                writeConcern = WriteConcern.SAFE;
                break;
            case 2:
                writeConcern = WriteConcern.FSYNC_SAFE;
                break;
            case 3:
                writeConcern = WriteConcern.JOURNAL_SAFE;
                break;
            case 4:
                writeConcern = WriteConcern.REPLICAS_SAFE;
                break;
            default:
                writeConcern = WriteConcern.NORMAL;
                break;
        }
        messageCollection.insert(add.get(), writeConcern);
        if (i2 >= 1) {
        }
    }

    private void convert(MqMessage mqMessage, BasicDBObjectBuilder basicDBObjectBuilder) {
        String content = mqMessage.getContent();
        if (content != null && !"".equals(content.trim())) {
            basicDBObjectBuilder.add("c", content);
        }
        Date generatedTime = mqMessage.getGeneratedTime();
        if (generatedTime != null) {
            basicDBObjectBuilder.add("gt", generatedTime);
        }
        String version = mqMessage.getVersion();
        if (version != null && !"".equals(version.trim())) {
            basicDBObjectBuilder.add("v", version);
        }
        Map<String, String> properties = mqMessage.getProperties();
        if (properties != null && properties.size() > 0) {
            basicDBObjectBuilder.add("p", properties);
        }
        Map<String, String> internalProperties = mqMessage.getInternalProperties();
        if (internalProperties != null && internalProperties.size() > 0) {
            basicDBObjectBuilder.add(INTERNAL_PROPERTIES, internalProperties);
        }
        String sha1 = mqMessage.getSha1();
        if (sha1 != null && !"".equals(sha1.trim())) {
            basicDBObjectBuilder.add("s", sha1);
        }
        String type = mqMessage.getType();
        if (type != null && !"".equals(type.trim())) {
            basicDBObjectBuilder.add("t", type);
        }
        String sourceIp = mqMessage.getSourceIp();
        if (sourceIp != null && !"".equals(sourceIp.trim())) {
            basicDBObjectBuilder.add("si", sourceIp);
        }
        String protocolType = mqMessage.getProtocolType();
        if (protocolType != null) {
            basicDBObjectBuilder.add("pt", protocolType);
        }
    }
}
