package io.openjob.worker.master;

import akka.actor.ActorContext;
import com.google.common.collect.Lists;
import io.openjob.common.task.TaskQueue;
import io.openjob.common.util.TaskUtil;
import io.openjob.worker.constant.WorkerConstant;
import io.openjob.worker.context.JobContext;
import io.openjob.worker.dao.TaskDAO;
import io.openjob.worker.dto.JobInstanceDTO;
import io.openjob.worker.entity.Task;
import io.openjob.worker.processor.MapReduceProcessor;
import io.openjob.worker.processor.ProcessResult;
import io.openjob.worker.processor.ProcessorHandler;
import io.openjob.worker.processor.TaskResult;
import io.openjob.worker.request.MasterStartContainerRequest;
import io.openjob.worker.request.ProcessorMapTaskRequest;
import io.openjob.worker.task.MapReduceTaskConsumer;
import io.openjob.worker.util.ProcessorUtil;
import io.openjob.worker.util.ThreadLocalUtil;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/openjob/worker/master/MapReduceTaskMaster.class */
public class MapReduceTaskMaster extends AbstractDistributeTaskMaster {
    private static final Logger log = LoggerFactory.getLogger(MapReduceTaskMaster.class);
    protected TaskQueue<MasterStartContainerRequest> childTaskQueue;
    protected MapReduceTaskConsumer childTaskConsumer;

    public MapReduceTaskMaster(JobInstanceDTO jobInstanceDTO, ActorContext actorContext) {
        super(jobInstanceDTO, actorContext);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // io.openjob.worker.master.AbstractDistributeTaskMaster, io.openjob.worker.master.AbstractTaskMaster
    public void init() {
        super.init();
        this.childTaskQueue = new TaskQueue<>(this.jobInstanceDTO.getJobInstanceId(), 10240);
        this.childTaskConsumer = new MapReduceTaskConsumer(this.jobInstanceDTO.getJobInstanceId(), 1, 1, "Openjob-mapreduce-consumer", 100, "Openjob-mapreduce-consumer-poll", this.childTaskQueue);
        this.childTaskConsumer.start();
    }

    @Override // io.openjob.worker.master.AbstractTaskMaster, io.openjob.worker.master.TaskMaster
    public void completeTask() throws InterruptedException {
        reduce();
        super.completeTask();
    }

    public void map(ProcessorMapTaskRequest processorMapTaskRequest) {
        try {
            for (byte[] bArr : processorMapTaskRequest.getTasks()) {
                MasterStartContainerRequest masterStartContainerRequest = getMasterStartContainerRequest();
                masterStartContainerRequest.setTask(bArr);
                masterStartContainerRequest.setTaskName(processorMapTaskRequest.getTaskName());
                masterStartContainerRequest.setParentTaskId(Long.valueOf(processorMapTaskRequest.getTaskId()));
                this.childTaskQueue.submit(masterStartContainerRequest);
            }
        } catch (Throwable th) {
            th.printStackTrace();
        }
    }

    @Override // io.openjob.worker.master.AbstractDistributeTaskMaster, io.openjob.worker.master.TaskMaster
    public void submit() {
        MasterStartContainerRequest masterStartContainerRequest = getMasterStartContainerRequest();
        masterStartContainerRequest.setTaskName(WorkerConstant.MAP_TASK_ROOT_NAME);
        dispatchTasks(Lists.newArrayList(new MasterStartContainerRequest[]{masterStartContainerRequest}), false, Collections.emptySet());
    }

    @Override // io.openjob.worker.master.AbstractTaskMaster
    protected Boolean isTaskComplete(Long l, Long l2) {
        return Boolean.valueOf(!this.childTaskConsumer.isActive() && super.isTaskComplete(l, l2).booleanValue());
    }

    @Override // io.openjob.worker.master.AbstractTaskMaster, io.openjob.worker.master.TaskMaster
    public void stop() {
        this.childTaskConsumer.stop();
        this.scheduledService.shutdown();
        super.stop();
    }

    @Override // io.openjob.worker.master.AbstractTaskMaster, io.openjob.worker.master.TaskMaster
    public void destroyTaskContainer() {
        this.childTaskConsumer.stop();
        this.scheduledService.shutdown();
        super.destroyTaskContainer();
    }

    protected void reduce() {
        ProcessorHandler processor = ProcessorUtil.getProcessor(this.jobInstanceDTO.getProcessorInfo());
        if (Objects.isNull(processor) || Objects.isNull(processor.getBaseProcessor())) {
            log.error("Not find processor! processorInfo={}", this.jobInstanceDTO.getProcessorInfo());
            return;
        }
        if (processor.getBaseProcessor() instanceof MapReduceProcessor) {
            MapReduceProcessor mapReduceProcessor = (MapReduceProcessor) processor.getBaseProcessor();
            JobContext reduceJobContext = getReduceJobContext();
            ProcessResult processResult = new ProcessResult((Boolean) false);
            try {
                try {
                    ThreadLocalUtil.setJobContext(reduceJobContext);
                    processResult = mapReduceProcessor.reduce(reduceJobContext);
                    ThreadLocalUtil.removeJobContext();
                } catch (Throwable th) {
                    processResult.setResult(th.toString());
                    ThreadLocalUtil.removeJobContext();
                }
                persistReduceTask(processResult);
            } catch (Throwable th2) {
                ThreadLocalUtil.removeJobContext();
                throw th2;
            }
        }
    }

    protected JobContext getReduceJobContext() {
        JobContext baseJobContext = getBaseJobContext();
        baseJobContext.setTaskName(WorkerConstant.MAP_TASK_REDUCE_NAME);
        baseJobContext.setTaskResultList(getReduceTaskResultList());
        return baseJobContext;
    }

    protected List<TaskResult> getReduceTaskResultList() {
        return null;
    }

    protected void persistReduceTask(ProcessResult processResult) {
        long longValue = this.jobInstanceDTO.getJobId().longValue();
        long longValue2 = this.jobInstanceDTO.getJobInstanceId().longValue();
        long j = this.circleIdGenerator.get();
        Task task = new Task();
        task.setJobId(this.jobInstanceDTO.getJobId());
        task.setInstanceId(this.jobInstanceDTO.getJobInstanceId());
        task.setCircleId(Long.valueOf(this.circleIdGenerator.get()));
        task.setTaskId(TaskUtil.getRandomUniqueId(Long.valueOf(longValue), Long.valueOf(longValue2), Long.valueOf(j), acquireTaskId()));
        task.setTaskName("reduce");
        task.setWorkerAddress(this.localWorkerAddress);
        task.setTaskParentId(TaskUtil.getReduceParentUniqueId(Long.valueOf(longValue), Long.valueOf(longValue2), Long.valueOf(j)));
        task.setStatus(processResult.getStatus().getStatus());
        task.setResult(processResult.getResult());
        TaskDAO.INSTANCE.batchAdd(Collections.singletonList(task));
    }
}
