package com.alibaba.hologres.client;

import com.alibaba.hologres.client.exception.ExceptionCode;
import com.alibaba.hologres.client.exception.HoloClientException;
import com.alibaba.hologres.client.impl.binlog.ArrayBuffer;
import com.alibaba.hologres.client.impl.binlog.BinlogEventType;
import com.alibaba.hologres.client.impl.binlog.BinlogRecordCollector;
import com.alibaba.hologres.client.impl.binlog.Committer;
import com.alibaba.hologres.client.model.binlog.BinlogRecord;
import java.io.Closeable;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/alibaba/hologres/client/BinlogShardGroupReader.class */
public class BinlogShardGroupReader implements Closeable {
    public static final Logger LOGGER = LoggerFactory.getLogger(BinlogShardGroupReader.class);
    private final HoloConfig config;
    private final Subscribe subscribe;
    private final Map<Integer, Committer> committerMap;
    private final AtomicBoolean started;
    BlockingQueue<BinlogRecord> queue;
    volatile HoloClientException exception = null;
    List<Thread> threadList = new ArrayList();
    int bufferPosition = 0;
    List<BinlogRecord> buffer = new ArrayList();
    Collector collector = new Collector();

    /* loaded from: input_file:com/alibaba/hologres/client/BinlogShardGroupReader$Collector.class */
    class Collector implements BinlogRecordCollector {
        Collector() {
        }

        @Override // com.alibaba.hologres.client.impl.binlog.BinlogRecordCollector
        public BinlogRecord emit(int i, ArrayBuffer<BinlogRecord> arrayBuffer) throws InterruptedException {
            BinlogRecord binlogRecord = null;
            while (BinlogShardGroupReader.this.queue.offer(arrayBuffer.peek(), 1000L, TimeUnit.MILLISECONDS)) {
                binlogRecord = arrayBuffer.pop();
                if (arrayBuffer.remain() <= 0) {
                    break;
                }
            }
            return binlogRecord;
        }

        @Override // com.alibaba.hologres.client.impl.binlog.BinlogRecordCollector
        public void exceptionally(int i, Throwable th) {
            BinlogShardGroupReader.LOGGER.error("shard id " + i + "fetch binlog fail", th);
            if (th instanceof HoloClientException) {
                BinlogShardGroupReader.this.exception = (HoloClientException) th;
            } else {
                BinlogShardGroupReader.this.exception = new HoloClientException(ExceptionCode.INTERNAL_ERROR, "shard id " + i + " fetch binlog fail", th);
            }
        }
    }

    public BinlogShardGroupReader(HoloConfig holoConfig, Subscribe subscribe, int i, Map<Integer, Committer> map, AtomicBoolean atomicBoolean) {
        this.config = holoConfig;
        this.subscribe = subscribe;
        this.committerMap = map;
        this.queue = new ArrayBlockingQueue(Math.max(1024, (map.size() * holoConfig.getBinlogReadBatchSize()) / 2));
        this.started = atomicBoolean;
    }

    private void tryFetch(long j) throws InterruptedException, TimeoutException, HoloClientException {
        if (this.buffer.size() <= this.bufferPosition) {
            if (this.buffer.size() > 0) {
                this.buffer.clear();
            }
            BinlogRecord binlogRecord = null;
            while (binlogRecord == null) {
                if (null != this.exception) {
                    throw this.exception;
                }
                if (System.nanoTime() > j) {
                    throw new TimeoutException();
                }
                binlogRecord = this.queue.poll(1000L, TimeUnit.MILLISECONDS);
                if (binlogRecord != null) {
                    this.buffer.add(binlogRecord);
                    this.queue.drainTo(this.buffer);
                    this.bufferPosition = 0;
                }
            }
        }
    }

    public Collector getCollector() {
        return this.collector;
    }

    public BinlogRecord getBinlogRecord() throws HoloClientException, InterruptedException, TimeoutException {
        return getBinlogRecord(-1L);
    }

    public BinlogRecord getBinlogRecord(long j) throws HoloClientException, InterruptedException, TimeoutException {
        if (null != this.exception) {
            throw this.exception;
        }
        BinlogRecord binlogRecord = null;
        long nanoTime = j > 0 ? System.nanoTime() + (j * 1000000) : Long.MAX_VALUE;
        while (binlogRecord == null) {
            tryFetch(nanoTime);
            if (this.buffer.size() > this.bufferPosition) {
                List<BinlogRecord> list = this.buffer;
                int i = this.bufferPosition;
                this.bufferPosition = i + 1;
                binlogRecord = list.get(i);
            }
            if (binlogRecord != null) {
                Committer committer = this.committerMap.get(Integer.valueOf(binlogRecord.getShardId()));
                if (committer == null) {
                    throw new HoloClientException(ExceptionCode.INTERNAL_ERROR, "reader for shard " + binlogRecord.getShardId() + " is not exists!");
                }
                committer.updateLastReadLsn(binlogRecord.getBinlogLsn());
                if ((binlogRecord.getBinlogEventType() == BinlogEventType.DELETE && this.config.getBinlogIgnoreDelete()) || (binlogRecord.getBinlogEventType() == BinlogEventType.BEFORE_UPDATE && this.config.getBinlogIgnoreBeforeUpdate())) {
                    binlogRecord = null;
                }
            }
        }
        return binlogRecord;
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        cancel();
    }

    public void commit(long j) throws HoloClientException, TimeoutException, InterruptedException {
        ArrayList<CompletableFuture> arrayList = new ArrayList();
        for (Map.Entry<Integer, Committer> entry : this.committerMap.entrySet()) {
            arrayList.add(commitFlushedLsn(entry.getValue(), entry.getKey().intValue(), entry.getValue().getLastReadLsn(), j));
        }
        long currentTimeMillis = System.currentTimeMillis() + j;
        for (CompletableFuture completableFuture : arrayList) {
            long currentTimeMillis2 = System.currentTimeMillis();
            if (currentTimeMillis2 >= currentTimeMillis) {
                throw new TimeoutException();
            }
            try {
                completableFuture.get(currentTimeMillis - currentTimeMillis2, TimeUnit.MILLISECONDS);
            } catch (ExecutionException e) {
                Throwable cause = e.getCause();
                if (!(cause instanceof HoloClientException)) {
                    throw new HoloClientException(ExceptionCode.INTERNAL_ERROR, "commit fail", cause);
                }
                throw ((HoloClientException) cause);
            } catch (TimeoutException e2) {
                throw e2;
            }
        }
    }

    public CompletableFuture<Void> commitFlushedLsn(Committer committer, int i, long j, long j2) throws TimeoutException, InterruptedException {
        LOGGER.info("begin commit {} shardId {} flushedLsn to {}", new Object[]{this.subscribe.getTableName(), Integer.valueOf(i), Long.valueOf(j)});
        return committer.commit(j, 1000L).thenRun(() -> {
            LOGGER.info("done commit {} shardId {} flushedLsn to {}", new Object[]{this.subscribe.getTableName(), Integer.valueOf(i), Long.valueOf(j)});
        });
    }

    public void commitFlushedLsn(int i, long j, long j2) throws HoloClientException, TimeoutException, InterruptedException {
        Committer committer = this.committerMap.get(Integer.valueOf(i));
        if (committer == null) {
            throw new HoloClientException(ExceptionCode.INVALID_REQUEST, "unknown shard " + i);
        }
        try {
            commitFlushedLsn(committer, i, j, j2).get(j2, TimeUnit.MILLISECONDS);
        } catch (ExecutionException e) {
            Throwable cause = e.getCause();
            if (!(cause instanceof HoloClientException)) {
                throw new HoloClientException(ExceptionCode.INTERNAL_ERROR, "commit fail", cause);
            }
            throw ((HoloClientException) cause);
        }
    }

    public void addThread(Thread thread) {
        this.threadList.add(thread);
    }

    public void cancel() {
        this.started.set(false);
        while (this.queue.size() > 0) {
            this.queue.clear();
            try {
                Thread.sleep(100L);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        for (Thread thread : this.threadList) {
            if (thread.isAlive()) {
                try {
                    Thread.sleep(1000L);
                } catch (InterruptedException e2) {
                }
                thread.interrupt();
            }
        }
    }

    public boolean isCanceled() {
        return !this.started.get();
    }
}
