package com.jzt.wotu.etl.core.datasource.es;

import com.baomidou.mybatisplus.core.toolkit.CollectionUtils;
import com.jzt.wotu.etl.core.DatasourceManager;
import com.jzt.wotu.etl.core.job.JobContext;
import com.jzt.wotu.etl.core.job.LoadData;
import com.jzt.wotu.etl.core.schema.load.AbstractLoad;
import com.jzt.wotu.etl.core.utils.SqlUtils;
import java.util.ArrayList;
import java.util.Locale;
import java.util.Map;

/* loaded from: input_file:com/jzt/wotu/etl/core/datasource/es/KafkaLoad.class */
public class KafkaLoad extends AbstractLoad<KafkaLoadDsl> {
    private KafkaDataSource kafkaDataSource;
    private static final String INSERT = "insert";

    public KafkaLoad(KafkaLoadDsl kafkaLoadDsl, JobContext<?> jobContext) {
        super(kafkaLoadDsl, jobContext);
        this.kafkaDataSource = DatasourceManager.INSTANCE.getKafkaDatasource(kafkaLoadDsl.getDatasource());
    }

    @Override // com.jzt.wotu.etl.core.schema.load.AbstractLoad
    public void loadDateMapHandler(Map<String, LoadData> map) {
        String topic = ((KafkaLoadDsl) this.loadConfig).getTopic();
        String msgKey = ((KafkaLoadDsl) this.loadConfig).getMsgKey();
        this.kafkaDataSource.execute(kafkaTemplate -> {
            map.keySet().forEach(str -> {
                if (((LoadData) map.get(str)).getSize() <= 0) {
                    return;
                }
                LoadData loadData = new LoadData(new ArrayList());
                ((LoadData) map.get(str)).getData().forEach(obj -> {
                    Map<String, Object> lowerKey = SqlUtils.toLowerKey((Map) obj);
                    ArrayList arrayList = new ArrayList();
                    arrayList.add(lowerKey);
                    loadData.getData().addAll(arrayList);
                    String lowerCase = str.toLowerCase(Locale.ROOT);
                    if (INSERT.equalsIgnoreCase(lowerCase)) {
                        lowerCase = "index";
                    }
                    System.out.println("loadToEs  operate is " + lowerCase + "  pk is " + lowerKey.get("pk"));
                    this.context.setOperateType(lowerCase);
                    Map<String, Object> apply = ((KafkaLoadDsl) this.loadConfig).getJsonObject().apply(loadData, this.context);
                    if (CollectionUtils.isEmpty(apply)) {
                        return;
                    }
                    kafkaTemplate.send(topic, msgKey, apply);
                });
            });
        });
    }

    @Override // com.jzt.wotu.etl.core.schema.load.AbstractLoad
    public void handler(LoadData loadData, String str) {
    }
}
