package com.alibaba.otter.canal.example;

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.example.db.utils.SqlUtils;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.InvalidProtocolBufferException;
import java.io.UnsupportedEncodingException;
import java.lang.Thread;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.SystemUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;

/* loaded from: input_file:com/alibaba/otter/canal/example/AbstractCanalClientTest.class */
public class AbstractCanalClientTest {
    protected static final Logger logger = LoggerFactory.getLogger(AbstractCanalClientTest.class);
    protected static final String SEP = SystemUtils.LINE_SEPARATOR;
    protected static final String DATE_FORMAT = "yyyy-MM-dd HH:mm:ss";
    protected volatile boolean running;
    protected Thread.UncaughtExceptionHandler handler;
    protected Thread thread;
    protected CanalConnector connector;
    protected static String context_format;
    protected static String row_format;
    protected static String transaction_format;
    protected String destination;

    public AbstractCanalClientTest(String str) {
        this(str, null);
    }

    public AbstractCanalClientTest(String str, CanalConnector canalConnector) {
        this.running = false;
        this.handler = new Thread.UncaughtExceptionHandler() { // from class: com.alibaba.otter.canal.example.AbstractCanalClientTest.1
            @Override // java.lang.Thread.UncaughtExceptionHandler
            public void uncaughtException(Thread thread, Throwable th) {
                AbstractCanalClientTest.logger.error("parse events has an error", th);
            }
        };
        this.thread = null;
        this.destination = str;
        this.connector = canalConnector;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void start() {
        Assert.notNull(this.connector, "connector is null");
        this.thread = new Thread(new Runnable() { // from class: com.alibaba.otter.canal.example.AbstractCanalClientTest.2
            @Override // java.lang.Runnable
            public void run() {
                AbstractCanalClientTest.this.process();
            }
        });
        this.thread.setUncaughtExceptionHandler(this.handler);
        this.running = true;
        this.thread.start();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void stop() {
        if (this.running) {
            this.running = false;
            if (this.thread != null) {
                try {
                    this.thread.join();
                } catch (InterruptedException e) {
                }
            }
            MDC.remove("destination");
        }
    }

    protected void process() {
        while (this.running) {
            try {
                try {
                    MDC.put("destination", this.destination);
                    this.connector.connect();
                    this.connector.subscribe();
                    while (this.running) {
                        Message withoutAck = this.connector.getWithoutAck(5120);
                        long id = withoutAck.getId();
                        int size = withoutAck.getEntries().size();
                        if (id != -1 && size != 0) {
                            printSummary(withoutAck, id, size);
                            printEntry(withoutAck.getEntries());
                        }
                        this.connector.ack(id);
                    }
                    this.connector.disconnect();
                    MDC.remove("destination");
                } catch (Exception e) {
                    logger.error("process error!", e);
                    this.connector.disconnect();
                    MDC.remove("destination");
                }
            } catch (Throwable th) {
                this.connector.disconnect();
                MDC.remove("destination");
                throw th;
            }
        }
    }

    private void printSummary(Message message, long j, int i) {
        long j2 = 0;
        Iterator it = message.getEntries().iterator();
        while (it.hasNext()) {
            j2 += ((CanalEntry.Entry) it.next()).getHeader().getEventLength();
        }
        String str = null;
        String str2 = null;
        if (!CollectionUtils.isEmpty(message.getEntries())) {
            str = buildPositionForDump((CanalEntry.Entry) message.getEntries().get(0));
            str2 = buildPositionForDump((CanalEntry.Entry) message.getEntries().get(message.getEntries().size() - 1));
        }
        logger.info(context_format, new Object[]{Long.valueOf(j), Integer.valueOf(i), Long.valueOf(j2), new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()), str, str2});
    }

    protected String buildPositionForDump(CanalEntry.Entry entry) {
        String str = entry.getHeader().getLogfileName() + ":" + entry.getHeader().getLogfileOffset() + ":" + entry.getHeader().getExecuteTime() + "(" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date(entry.getHeader().getExecuteTime())) + ")";
        if (StringUtils.isNotEmpty(entry.getHeader().getGtid())) {
            str = str + " gtid(" + entry.getHeader().getGtid() + ")";
        }
        return str;
    }

    protected void printEntry(List<CanalEntry.Entry> list) {
        for (CanalEntry.Entry entry : list) {
            long time = new Date().getTime() - entry.getHeader().getExecuteTime();
            Date date = new Date(entry.getHeader().getExecuteTime());
            SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
                if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN) {
                    try {
                        CanalEntry.TransactionBegin parseFrom = CanalEntry.TransactionBegin.parseFrom(entry.getStoreValue());
                        logger.info(transaction_format, new Object[]{entry.getHeader().getLogfileName(), String.valueOf(entry.getHeader().getLogfileOffset()), String.valueOf(entry.getHeader().getExecuteTime()), simpleDateFormat.format(date), entry.getHeader().getGtid(), String.valueOf(time)});
                        logger.info(" BEGIN ----> Thread id: {}", Long.valueOf(parseFrom.getThreadId()));
                        printXAInfo(parseFrom.getPropsList());
                    } catch (InvalidProtocolBufferException e) {
                        throw new RuntimeException("parse event has an error , data:" + entry.toString(), e);
                    }
                } else if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
                    try {
                        CanalEntry.TransactionEnd parseFrom2 = CanalEntry.TransactionEnd.parseFrom(entry.getStoreValue());
                        logger.info("----------------\n");
                        logger.info(" END ----> transaction id: {}", parseFrom2.getTransactionId());
                        printXAInfo(parseFrom2.getPropsList());
                        logger.info(transaction_format, new Object[]{entry.getHeader().getLogfileName(), String.valueOf(entry.getHeader().getLogfileOffset()), String.valueOf(entry.getHeader().getExecuteTime()), simpleDateFormat.format(date), entry.getHeader().getGtid(), String.valueOf(time)});
                    } catch (InvalidProtocolBufferException e2) {
                        throw new RuntimeException("parse event has an error , data:" + entry.toString(), e2);
                    }
                } else {
                    continue;
                }
            } else if (entry.getEntryType() == CanalEntry.EntryType.ROWDATA) {
                try {
                    CanalEntry.RowChange parseFrom3 = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
                    CanalEntry.EventType eventType = parseFrom3.getEventType();
                    logger.info(row_format, new Object[]{entry.getHeader().getLogfileName(), String.valueOf(entry.getHeader().getLogfileOffset()), entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType, String.valueOf(entry.getHeader().getExecuteTime()), simpleDateFormat.format(date), entry.getHeader().getGtid(), String.valueOf(time)});
                    if (eventType == CanalEntry.EventType.QUERY || parseFrom3.getIsDdl()) {
                        logger.info(" sql ----> " + parseFrom3.getSql() + SEP);
                    } else {
                        printXAInfo(parseFrom3.getPropsList());
                        for (CanalEntry.RowData rowData : parseFrom3.getRowDatasList()) {
                            if (eventType == CanalEntry.EventType.DELETE) {
                                printColumn(rowData.getBeforeColumnsList());
                            } else if (eventType == CanalEntry.EventType.INSERT) {
                                printColumn(rowData.getAfterColumnsList());
                            } else {
                                printColumn(rowData.getAfterColumnsList());
                            }
                        }
                    }
                } catch (Exception e3) {
                    throw new RuntimeException("parse event has an error , data:" + entry.toString(), e3);
                }
            } else {
                continue;
            }
        }
    }

    protected void printColumn(List<CanalEntry.Column> list) {
        for (CanalEntry.Column column : list) {
            StringBuilder sb = new StringBuilder();
            try {
                if (StringUtils.containsIgnoreCase(column.getMysqlType(), "BLOB") || StringUtils.containsIgnoreCase(column.getMysqlType(), "BINARY")) {
                    sb.append(column.getName() + " : " + new String(column.getValue().getBytes("ISO-8859-1"), "UTF-8"));
                } else {
                    sb.append(column.getName() + " : " + column.getValue());
                }
            } catch (UnsupportedEncodingException e) {
            }
            sb.append("    type=" + column.getMysqlType());
            if (column.getUpdated()) {
                sb.append("    update=" + column.getUpdated());
            }
            sb.append(SEP);
            logger.info(sb.toString());
        }
    }

    protected void printXAInfo(List<CanalEntry.Pair> list) {
        if (list == null) {
            return;
        }
        String str = null;
        String str2 = null;
        for (CanalEntry.Pair pair : list) {
            String key = pair.getKey();
            if (StringUtils.endsWithIgnoreCase(key, "XA_TYPE")) {
                str = pair.getValue();
            } else if (StringUtils.endsWithIgnoreCase(key, "XA_XID")) {
                str2 = pair.getValue();
            }
        }
        if (str == null || str2 == null) {
            return;
        }
        logger.info(" ------> " + str + SqlUtils.REQUIRED_FIELD_NULL_SUBSTITUTE + str2);
    }

    public void setConnector(CanalConnector canalConnector) {
        this.connector = canalConnector;
    }

    public static String getCurrentGtid(CanalEntry.Header header) {
        List<CanalEntry.Pair> propsList = header.getPropsList();
        if (propsList == null || propsList.size() <= 0) {
            return "";
        }
        for (CanalEntry.Pair pair : propsList) {
            if ("curtGtid".equals(pair.getKey())) {
                return pair.getValue();
            }
        }
        return "";
    }

    public static String getCurrentGtidSn(CanalEntry.Header header) {
        List<CanalEntry.Pair> propsList = header.getPropsList();
        if (propsList == null || propsList.size() <= 0) {
            return "";
        }
        for (CanalEntry.Pair pair : propsList) {
            if ("curtGtidSn".equals(pair.getKey())) {
                return pair.getValue();
            }
        }
        return "";
    }

    public static String getCurrentGtidLct(CanalEntry.Header header) {
        List<CanalEntry.Pair> propsList = header.getPropsList();
        if (propsList == null || propsList.size() <= 0) {
            return "";
        }
        for (CanalEntry.Pair pair : propsList) {
            if ("curtGtidLct".equals(pair.getKey())) {
                return pair.getValue();
            }
        }
        return "";
    }

    static {
        context_format = null;
        row_format = null;
        transaction_format = null;
        context_format = SEP + "****************************************************" + SEP;
        context_format += "* Batch Id: [{}] ,count : [{}] , memsize : [{}] , Time : {}" + SEP;
        context_format += "* Start : [{}] " + SEP;
        context_format += "* End : [{}] " + SEP;
        context_format += "****************************************************" + SEP;
        row_format = SEP + "----------------> binlog[{}:{}] , name[{},{}] , eventType : {} , executeTime : {}({}) , gtid : ({}) , delay : {} ms" + SEP;
        transaction_format = SEP + "================> binlog[{}:{}] , executeTime : {}({}) , gtid : ({}) , delay : {}ms" + SEP;
    }
}
