package com.odianyun.mq.common.memeory;

import com.odianyun.mq.common.inner.exceptions.MemeoryQueueException;
import com.odianyun.mq.common.inner.strategy.DefaultPullStrategy;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:BOOT-INF/lib/omq-real-client-2.0.17.6.RELEASE.jar:com/odianyun/mq/common/memeory/MemeoryQueueImpl.class */
public class MemeoryQueueImpl<T> implements MemeoryQueue<T> {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) MemeoryQueueImpl.class);
    private final MemeoryQueueConfig config;
    private final String queueName;
    private final LinkedBlockingQueue<MemCachedMessage> memcacheQueue;
    private final MemeoryQueueMeta meta;
    private final MemeoryQueueDataFile dataFile;
    private ObjectOutputStream dataFileOutputStream;
    private String memCacheFileReading;
    private Thread readerTask;
    private AtomicReference<MemCachedMessage> lastMsg;
    private AtomicLong memCacheLoadPos = new AtomicLong(0);
    private final CountDownLatch latch = new CountDownLatch(1);
    private AtomicBoolean stopped = new AtomicBoolean(false);
    private final ReentrantLock addLock = new ReentrantLock();
    private final ReentrantLock getLock = new ReentrantLock();
    private final ConcurrentMap<String, AtomicLong> fileMap = new ConcurrentHashMap();
    private int objectOutputstreamTimes = 0;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:BOOT-INF/lib/omq-real-client-2.0.17.6.RELEASE.jar:com/odianyun/mq/common/memeory/MemeoryQueueImpl$MemCachedMessage.class */
    public static class MemCachedMessage {
        private Object data;
        private String file;
        private boolean isFirstMessage;
        private long pos;

        public MemCachedMessage(Object obj, String str, boolean z, long j) {
            this.data = obj;
            this.file = str;
            this.isFirstMessage = z;
            this.pos = j;
        }

        public long getPos() {
            return this.pos;
        }

        public Object getData() {
            return this.data;
        }

        public String getFile() {
            return this.file;
        }

        public boolean isFirstMessage() {
            return this.isFirstMessage;
        }
    }

    public MemeoryQueueImpl(MemeoryQueueConfig memeoryQueueConfig, String str) {
        this.config = memeoryQueueConfig;
        this.queueName = str;
        this.meta = new MemeoryQueueMeta(this.queueName, memeoryQueueConfig);
        this.dataFile = new MemeoryQueueDataFile(this.queueName, memeoryQueueConfig);
        this.memcacheQueue = new LinkedBlockingQueue<>(memeoryQueueConfig.getMemeoryMaxSize());
        loadMeta();
        this.lastMsg = new AtomicReference<>(null);
        this.dataFileOutputStream = null;
        start();
        LOG.info("Inited FileQueue: " + this.queueName + " readingFile: " + this.memCacheFileReading + " pos: " + this.memCacheLoadPos);
    }

    private void loadMeta() {
        this.memCacheLoadPos.set(this.meta.getReadPos());
        this.memCacheFileReading = this.meta.getFileReading();
        if ("null".equalsIgnoreCase(this.memCacheFileReading) || StringUtils.isBlank(this.memCacheFileReading)) {
            this.memCacheFileReading = null;
        }
        String queueFileName = this.dataFile.getQueueFileName();
        if (queueFileName == null) {
            if (this.memCacheFileReading != null) {
                LOG.error("meta file's fileReading: " + this.memCacheFileReading + " not equal to data file remaining: " + queueFileName);
            }
            this.memCacheFileReading = null;
            this.memCacheLoadPos.set(0L);
            return;
        }
        if (this.memCacheFileReading == null) {
            this.memCacheFileReading = queueFileName;
            this.memCacheLoadPos.set(0L);
        } else {
            if (this.memCacheFileReading.equals(queueFileName)) {
                return;
            }
            LOG.error("meta file fileReading: " + this.memCacheFileReading + " not equal to " + queueFileName);
            if (new File(this.memCacheFileReading).exists()) {
                this.dataFile.adjust(this.memCacheFileReading);
            } else {
                this.memCacheFileReading = queueFileName;
                this.memCacheLoadPos.set(0L);
            }
        }
    }

    private void start() {
        this.readerTask = new Thread(new Runnable() { // from class: com.odianyun.mq.common.memeory.MemeoryQueueImpl.1
            @Override // java.lang.Runnable
            public void run() {
                MemeoryQueueImpl.this.loadMessages();
            }
        }, "filequeue_cache_reader_thread_" + this.queueName);
        this.readerTask.start();
    }

    private void switchReadFile(ObjectInputStream objectInputStream) {
        if (this.dataFile.isEmpty()) {
            return;
        }
        if (objectInputStream != null) {
            try {
                objectInputStream.close();
            } catch (IOException e) {
                LOG.error("close file failed when switching file", (Throwable) e);
            }
        }
        this.memCacheFileReading = this.dataFile.getQueueFileName();
        this.memCacheLoadPos.set(0L);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void loadMessages() {
        if (StringUtils.isBlank(this.memCacheFileReading)) {
            try {
                this.latch.await();
                switchReadFile(null);
            } catch (InterruptedException e) {
                LOG.error("lacth interruptted", (Throwable) e);
                return;
            }
        }
        ObjectInputStream objectInputStream = null;
        while (!this.stopped.get()) {
            try {
                if (this.memCacheLoadPos.get() == 0 || objectInputStream == null) {
                    objectInputStream = openInputStream();
                    seek(objectInputStream, this.memCacheLoadPos.get());
                }
                loadFromFile(objectInputStream);
                switchReadFile(objectInputStream);
            } catch (Throwable th) {
                LOG.error("read file " + this.memCacheFileReading + " error:", th);
                th.printStackTrace();
            }
        }
        LOG.warn("Read MsgFile thread[" + Thread.currentThread().getName() + "] exit");
    }

    private boolean seek(ObjectInputStream objectInputStream, long j) {
        long j2 = 0;
        while (true) {
            long j3 = j2;
            if (j3 >= j) {
                return true;
            }
            try {
                objectInputStream.readObject();
                j2 = j3 + 1;
            } catch (Exception e) {
                LOG.error("Seek objectInputStream failed.", (Throwable) e);
                return false;
            }
        }
    }

    private void loadFromFile(ObjectInputStream objectInputStream) {
        AtomicLong atomicLong;
        DefaultPullStrategy defaultPullStrategy = new DefaultPullStrategy(1000, 3000);
        int i = 0;
        boolean z = this.memCacheLoadPos.get() == 0;
        while (!Thread.interrupted() && !this.stopped.get()) {
            if (this.dataFile.isEmpty() && (atomicLong = this.fileMap.get(this.memCacheFileReading)) != null && this.memCacheLoadPos.get() >= atomicLong.get() - 1) {
                delay(200);
            }
            Object obj = null;
            try {
                obj = objectInputStream.readObject();
            } catch (Exception e) {
                if (e.getMessage() != null) {
                    LOG.warn("retry " + i + " read file[" + this.memCacheFileReading + "] failed:" + e.getMessage());
                }
                i++;
                try {
                    defaultPullStrategy.fail(true);
                } catch (InterruptedException e2) {
                    e2.printStackTrace();
                }
                if (i >= 10) {
                    if (e.getMessage() != null) {
                        LOG.error("retry " + i + " read file[" + this.memCacheFileReading + "] failed,reached the max times 10:" + e.getMessage() + "");
                        return;
                    }
                    return;
                }
            }
            if (obj != null) {
                this.memCacheLoadPos.incrementAndGet();
                addToMemCache(obj, z, this.memCacheLoadPos.get());
                z = false;
                delayLoad(new Random(System.currentTimeMillis()).nextInt(2));
            } else {
                delay(20);
            }
        }
        Thread.currentThread().interrupt();
    }

    private void addToMemCache(Object obj, boolean z, long j) {
        try {
            this.memcacheQueue.put(new MemCachedMessage(obj, this.memCacheFileReading, z, j));
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        } catch (Exception e2) {
            LOG.error("Exception occured.", (Throwable) e2);
        }
    }

    private void delayLoad(int i) {
        if (i == 0 && this.dataFile.isEmpty()) {
            delay(2);
        }
    }

    private ObjectInputStream openInputStream() {
        ObjectInputStream objectInputStream = null;
        Exception exc = null;
        for (int i = 0; i < 3; i++) {
            try {
                objectInputStream = new ObjectInputStream(new FileInputStream(new File(this.memCacheFileReading)));
                exc = null;
                break;
            } catch (Exception e) {
                exc = e;
                delay((i + 1) * 10);
            }
        }
        if (exc == null) {
            return objectInputStream;
        }
        String str = "Open InputStream for background thread failed. file: " + this.memCacheFileReading;
        LOG.error(str, (Throwable) exc);
        throw new RuntimeException(str, exc);
    }

    private void delay(int i) {
        try {
            Thread.sleep(5 * (i + 1));
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    private void commitMessage() {
        MemCachedMessage andSet = this.lastMsg.getAndSet(null);
        if (andSet != null) {
            this.meta.updateRead(andSet.getPos(), andSet.getFile());
            this.meta.incConsumeCount();
            if (andSet.isFirstMessage()) {
                this.dataFile.archiveAndRemoveOldFile(andSet.getFile());
            }
        }
    }

    @Override // com.odianyun.mq.common.memeory.MemeoryQueue
    public T get() {
        MemCachedMessage memCachedMessage = null;
        this.getLock.lock();
        try {
            memCachedMessage = this.memcacheQueue.take();
            this.lastMsg.set(memCachedMessage);
            commitMessage();
        } catch (InterruptedException e) {
            LOG.warn("interrupted when taking messsage from FileQueue");
            Thread.currentThread().interrupt();
        } finally {
            this.getLock.unlock();
        }
        if (memCachedMessage != null) {
            return (T) memCachedMessage.getData();
        }
        return null;
    }

    @Override // com.odianyun.mq.common.memeory.MemeoryQueue
    public T get(long j, TimeUnit timeUnit) {
        MemCachedMessage memCachedMessage = null;
        this.getLock.lock();
        try {
            try {
                memCachedMessage = this.memcacheQueue.poll(j, timeUnit);
                this.lastMsg.set(memCachedMessage);
                commitMessage();
                this.getLock.unlock();
            } catch (InterruptedException e) {
                LOG.warn("interrupted when taking messsage from FileQueue");
                Thread.currentThread().interrupt();
                this.getLock.unlock();
            }
            if (memCachedMessage != null) {
                return (T) memCachedMessage.getData();
            }
            return null;
        } catch (Throwable th) {
            this.getLock.unlock();
            throw th;
        }
    }

    @Override // com.odianyun.mq.common.memeory.MemeoryQueue
    public void add(T t) throws MemeoryQueueException {
        if (this.stopped.get()) {
            throw new MemeoryQueueException("FileQueue has been closed. Queue name: " + this.queueName);
        }
        this.addLock.lock();
        try {
            checkDataFileOutputStream();
            write(t);
            this.meta.incAddCount();
            this.latch.countDown();
        } finally {
            this.addLock.unlock();
        }
    }

    public void close() {
        this.stopped.set(true);
        if (this.readerTask != null) {
            this.readerTask.interrupt();
        }
        this.meta.close();
    }

    private void checkDataFileOutputStream() {
        FileInputStream fileInputStream = null;
        try {
            try {
                try {
                    if (this.dataFileOutputStream == null) {
                        createNewWriteFile();
                    }
                    fileInputStream = new FileInputStream(this.meta.getFileWriting());
                    long maxDataFileSize = this.config.getMaxDataFileSize();
                    if (fileInputStream.available() >= maxDataFileSize) {
                        LOG.info("current writeFile:" + this.meta.getFileWriting() + " size is > max size:" + maxDataFileSize + ",will write to new file");
                        this.dataFileOutputStream.close();
                        createNewWriteFile();
                    }
                    if (fileInputStream != null) {
                        try {
                            fileInputStream.close();
                        } catch (IOException e) {
                            LOG.error("Close temp inputstream for size cal failed.", (Throwable) e);
                        }
                    }
                } catch (IOException e2) {
                    LOG.error("Create data file failed.", (Throwable) e2);
                    throw new RuntimeException("create data file failed.", e2);
                }
            } catch (FileNotFoundException e3) {
                LOG.error("Create data file failed.", (Throwable) e3);
                throw new RuntimeException("create data file failed.", e3);
            }
        } catch (Throwable th) {
            if (fileInputStream != null) {
                try {
                    fileInputStream.close();
                } catch (IOException e4) {
                    LOG.error("Close temp inputstream for size cal failed.", (Throwable) e4);
                }
            }
            throw th;
        }
    }

    private void createNewWriteFile() {
        try {
            String createQueueFile = this.dataFile.createQueueFile();
            this.dataFileOutputStream = new ObjectOutputStream(new FileOutputStream(createQueueFile));
            this.fileMap.remove(this.meta.getFileWriting());
            this.meta.updateWrite(createQueueFile);
            this.fileMap.put(createQueueFile, new AtomicLong(0L));
            this.dataFile.addFile(createQueueFile);
            this.objectOutputstreamTimes = 0;
        } catch (FileNotFoundException e) {
            LOG.error("Create data file failed.", (Throwable) e);
            throw new RuntimeException("create data file failed.", e);
        } catch (IOException e2) {
            LOG.error("Create data file failed.", (Throwable) e2);
            throw new RuntimeException("create data file failed.", e2);
        }
    }

    private void write(Object obj) {
        if (obj == null) {
            return;
        }
        try {
            this.objectOutputstreamTimes++;
            if (this.objectOutputstreamTimes == 2000) {
                this.dataFileOutputStream.reset();
                this.objectOutputstreamTimes = 1;
            }
            this.dataFileOutputStream.writeObject(obj);
            this.dataFileOutputStream.flush();
            this.fileMap.get(this.meta.getFileWriting()).incrementAndGet();
        } catch (IOException e) {
            LOG.error("write message failed", (Throwable) e);
            throw new RuntimeException("write message failed", e);
        }
    }

    @Override // com.odianyun.mq.common.memeory.MemeoryQueue
    public long size() {
        return this.meta.getUnReadCount();
    }

    public MemeoryQueueConfig getConfig() {
        return this.config;
    }

    public String getQueueName() {
        return this.queueName;
    }
}
