package com.dangdang.ddframe.job.api;

import com.dangdang.ddframe.job.internal.statistics.ProcessCountStatistics;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/dangdang/ddframe/job/api/AbstractSequencePerpetualElasticJob.class */
public abstract class AbstractSequencePerpetualElasticJob<T> extends AbstractElasticJob {
    private static final Logger log = LoggerFactory.getLogger(AbstractSequencePerpetualElasticJob.class);
    private final ExecutorService executorService = Executors.newCachedThreadPool();

    @Override // com.dangdang.ddframe.job.api.AbstractElasticJob
    protected final void executeJob(JobExecutionMultipleShardingContext jobExecutionMultipleShardingContext) {
        Map<Integer, List<T>> takeData = takeData(jobExecutionMultipleShardingContext);
        log.debug("Elastic job: sequence perpetual elastic job fetch data size: {}.", Integer.valueOf(takeData != null ? takeData.size() : 0));
        while (!takeData.isEmpty() && !isStoped() && !getShardingService().isNeedSharding()) {
            processDataInMultipleThreads(jobExecutionMultipleShardingContext, takeData);
            takeData = takeData(jobExecutionMultipleShardingContext);
            log.debug("Elastic job: sequence perpetual elastic job fetch data size: {}.", Integer.valueOf(takeData != null ? takeData.size() : 0));
        }
    }

    private Map<Integer, List<T>> takeData(JobExecutionMultipleShardingContext jobExecutionMultipleShardingContext) {
        List<Integer> shardingItems = jobExecutionMultipleShardingContext.getShardingItems();
        HashMap hashMap = new HashMap(shardingItems.size());
        Iterator<Integer> it = shardingItems.iterator();
        while (it.hasNext()) {
            int intValue = it.next().intValue();
            List<T> fetchData = fetchData(jobExecutionMultipleShardingContext.createJobExecutionSingleShardingContext(intValue));
            if (null != fetchData && !fetchData.isEmpty()) {
                hashMap.put(Integer.valueOf(intValue), fetchData);
            }
        }
        return hashMap;
    }

    private void processDataInMultipleThreads(final JobExecutionMultipleShardingContext jobExecutionMultipleShardingContext, Map<Integer, List<T>> map) {
        final CountDownLatch countDownLatch = new CountDownLatch(map.size());
        for (final Map.Entry<Integer, List<T>> entry : map.entrySet()) {
            this.executorService.submit(new Runnable() { // from class: com.dangdang.ddframe.job.api.AbstractSequencePerpetualElasticJob.1
                @Override // java.lang.Runnable
                public void run() {
                    try {
                        AbstractSequencePerpetualElasticJob.this.processDataList(jobExecutionMultipleShardingContext.createJobExecutionSingleShardingContext(((Integer) entry.getKey()).intValue()), (List) entry.getValue());
                        countDownLatch.countDown();
                    } catch (Throwable th) {
                        countDownLatch.countDown();
                        throw th;
                    }
                }
            });
        }
        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void processDataList(JobExecutionSingleShardingContext jobExecutionSingleShardingContext, List<T> list) {
        Iterator<T> it = list.iterator();
        while (it.hasNext()) {
            try {
                if (processData(jobExecutionSingleShardingContext, it.next())) {
                    ProcessCountStatistics.incrementProcessSuccessCount(jobExecutionSingleShardingContext.getJobName());
                } else {
                    ProcessCountStatistics.incrementProcessFailureCount(jobExecutionSingleShardingContext.getJobName());
                }
            } catch (Exception e) {
                ProcessCountStatistics.incrementProcessFailureCount(jobExecutionSingleShardingContext.getJobName());
                log.error("Elastic job: exception occur in job processing...", e);
            }
        }
    }

    protected abstract List<T> fetchData(JobExecutionSingleShardingContext jobExecutionSingleShardingContext);

    protected abstract boolean processData(JobExecutionSingleShardingContext jobExecutionSingleShardingContext, T t);
}
