package com.odianyun.mq.common.inner.dao.impl.mongodb;

import com.mongodb.BasicDBObject;
import com.mongodb.BasicDBObjectBuilder;
import com.mongodb.DBCollection;
import com.mongodb.DBCursor;
import com.mongodb.DBObject;
import com.mongodb.MongoException;
import com.mongodb.QueryOperators;
import com.odianyun.mq.common.inner.dao.AckDAO;
import com.odianyun.mq.common.inner.util.MongoUtil;
import java.util.Date;
import org.bson.types.BSONTimestamp;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:WEB-INF/lib/omq-real-client-2.0.17.RELEASE.jar:com/odianyun/mq/common/inner/dao/impl/mongodb/AckDAOImpl.class */
public class AckDAOImpl implements AckDAO {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) AckDAOImpl.class);
    public static final String SRC_CONSUMER_IP = "cip";
    public static final String TICK = "t";
    private static final int MONGO_ORDER_DESC = -1;
    private static final int MONGO_ORDER_ASC = 1;
    private MongoNewClient mongoNewClient;

    public void setMongoNewClient(MongoNewClient mongoNewClient) {
        this.mongoNewClient = mongoNewClient;
    }

    @Override // com.odianyun.mq.common.inner.dao.AckDAO
    public Long getMaxMessageID(String str, String str2) {
        return getMaxMessageID(str, str2, 0);
    }

    @Override // com.odianyun.mq.common.inner.dao.AckDAO
    public boolean isAcked(String str, String str2, Long l) {
        return isAcked(str, str2, l, 0);
    }

    @Override // com.odianyun.mq.common.inner.dao.AckDAO
    public boolean isAcked(String str, String str2, Long l, int i) {
        DBCollection ackCollection = this.mongoNewClient.getAckCollection(str, str2, i);
        BasicDBObject basicDBObject = new BasicDBObject();
        basicDBObject.put("uid", (Object) MongoUtil.longToBSONTimestamp(l));
        BasicDBObject basicDBObject2 = new BasicDBObject();
        basicDBObject2.put("uid", (Object) 1);
        return ackCollection.findOne((DBObject) basicDBObject, (DBObject) basicDBObject2) != null;
    }

    @Override // com.odianyun.mq.common.inner.dao.AckDAO
    public Long getMaxMessageID(String str, String str2, int i) {
        DBCollection ackCollection = this.mongoNewClient.getAckCollection(str, str2, i);
        DBObject dBObject = BasicDBObjectBuilder.start().add("uid", 1).get();
        DBCursor limit = ackCollection.find(new BasicDBObject(), dBObject).sort(BasicDBObjectBuilder.start().add("uid", -1).get()).limit(1);
        try {
            if (!limit.hasNext()) {
                limit.close();
                return null;
            }
            Long BSONTimestampToLong = MongoUtil.BSONTimestampToLong((BSONTimestamp) limit.next().get("uid"));
            limit.close();
            return BSONTimestampToLong;
        } catch (Throwable th) {
            limit.close();
            throw th;
        }
    }

    @Override // com.odianyun.mq.common.inner.dao.AckDAO
    public void add(String str, String str2, Long l, String str3) {
        add(str, str2, l, str3, 0);
    }

    @Override // com.odianyun.mq.common.inner.dao.AckDAO
    public void add(String str, String str2, Long l, String str3, int i) {
        DBCollection ackCollection = this.mongoNewClient.getAckCollection(str, str2, i);
        BSONTimestamp longToBSONTimestamp = MongoUtil.longToBSONTimestamp(l);
        try {
            ackCollection.insert(BasicDBObjectBuilder.start().add("uid", longToBSONTimestamp).add(SRC_CONSUMER_IP, str3).add("t", new Date()).get());
        } catch (MongoException e) {
            if ((e.getMessage() == null || e.getMessage().indexOf("duplicate key") < 0) && e.getCode() != 11000) {
                throw e;
            }
            LOG.warn(e.getMessage() + ": uid is " + longToBSONTimestamp + ",topic=" + str + ",consumer=" + str2);
        }
    }

    @Override // com.odianyun.mq.common.inner.dao.AckDAO
    public int countBetween(String str, String str2, Long l, Long l2) {
        return countBetween(str, str2, l, l2, 0);
    }

    @Override // com.odianyun.mq.common.inner.dao.AckDAO
    public int countBetween(String str, String str2, Long l, Long l2, int i) {
        DBCollection ackCollection = this.mongoNewClient.getAckCollection(str, str2, i);
        DBObject dBObject = BasicDBObjectBuilder.start().add("uid", BasicDBObjectBuilder.start().add(QueryOperators.LT, MongoUtil.longToBSONTimestamp(l2)).add(QueryOperators.GTE, MongoUtil.longToBSONTimestamp(l)).get()).get();
        return ackCollection.find(dBObject).sort(BasicDBObjectBuilder.start().add("uid", -1).get()).count();
    }

    @Override // com.odianyun.mq.common.inner.dao.AckDAO
    public Long getMinMessageID(String str, String str2) {
        return getMinMessageID(str, str2, 0);
    }

    @Override // com.odianyun.mq.common.inner.dao.AckDAO
    public Long getMinMessageID(String str, String str2, int i) {
        DBCollection ackCollection = this.mongoNewClient.getAckCollection(str, str2, i);
        DBObject dBObject = BasicDBObjectBuilder.start().add("uid", 1).get();
        DBCursor limit = ackCollection.find(new BasicDBObject(), dBObject).sort(BasicDBObjectBuilder.start().add("uid", 1).get()).limit(1);
        try {
            if (!limit.hasNext()) {
                limit.close();
                return null;
            }
            Long BSONTimestampToLong = MongoUtil.BSONTimestampToLong((BSONTimestamp) limit.next().get("uid"));
            limit.close();
            return BSONTimestampToLong;
        } catch (Throwable th) {
            limit.close();
            throw th;
        }
    }
}
