/*
 * Decompiled with CFR 0.152.
 */
package com.aizuda.snailjob.server.job.task.support.dispatch;

import akka.actor.AbstractActor;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.lang.Assert;
import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum;
import com.aizuda.snailjob.common.core.util.JsonUtil;
import com.aizuda.snailjob.common.log.SnailJobLog;
import com.aizuda.snailjob.server.common.exception.SnailJobServerException;
import com.aizuda.snailjob.server.job.task.dto.ReduceTaskDTO;
import com.aizuda.snailjob.server.job.task.support.JobExecutor;
import com.aizuda.snailjob.server.job.task.support.JobTaskConverter;
import com.aizuda.snailjob.server.job.task.support.executor.job.JobExecutorContext;
import com.aizuda.snailjob.server.job.task.support.executor.job.JobExecutorFactory;
import com.aizuda.snailjob.server.job.task.support.generator.task.JobTaskGenerateContext;
import com.aizuda.snailjob.server.job.task.support.generator.task.JobTaskGenerator;
import com.aizuda.snailjob.server.job.task.support.generator.task.JobTaskGeneratorFactory;
import com.aizuda.snailjob.server.job.task.support.handler.DistributedLockHandler;
import com.aizuda.snailjob.server.job.task.support.handler.JobTaskBatchHandler;
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobMapper;
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskMapper;
import com.aizuda.snailjob.template.datasource.persistence.mapper.WorkflowTaskBatchMapper;
import com.aizuda.snailjob.template.datasource.persistence.po.Job;
import com.aizuda.snailjob.template.datasource.persistence.po.JobTask;
import com.aizuda.snailjob.template.datasource.persistence.po.WorkflowTaskBatch;
import com.baomidou.mybatisplus.core.conditions.Wrapper;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.core.toolkit.support.SFunction;
import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO;
import java.io.Serializable;
import java.text.MessageFormat;
import java.time.Duration;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import lombok.Generated;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;

@Component(value="JobReduceActor")
@Scope(value="prototype")
public class ReduceActor
extends AbstractActor {
    private static final String KEY = "job_generate_reduce_{0}_{1}";
    private final DistributedLockHandler distributedLockHandler;
    private final JobMapper jobMapper;
    private final JobTaskMapper jobTaskMapper;
    private final WorkflowTaskBatchMapper workflowTaskBatchMapper;
    private final JobTaskBatchHandler jobTaskBatchHandler;

    public AbstractActor.Receive createReceive() {
        return this.receiveBuilder().match(ReduceTaskDTO.class, reduceTask -> {
            SnailJobLog.LOCAL.info("\u6267\u884cReduce, [{}]", new Object[]{JsonUtil.toJsonString((Object)reduceTask)});
            try {
                Assert.notNull((Object)reduceTask.getMrStage(), () -> new SnailJobServerException("mrStage can not be null"));
                Assert.notNull((Object)reduceTask.getJobId(), () -> new SnailJobServerException("jobId can not be null"));
                Assert.notNull((Object)reduceTask.getTaskBatchId(), () -> new SnailJobServerException("taskBatchId can not be null"));
                String key = MessageFormat.format(KEY, reduceTask.getTaskBatchId(), reduceTask.getJobId());
                this.distributedLockHandler.lockWithDisposableAndRetry(() -> this.doReduce((ReduceTaskDTO)reduceTask), key, Duration.ofSeconds(1L), Duration.ofSeconds(2L), 6);
            }
            catch (Exception e) {
                SnailJobLog.LOCAL.error("Reduce processing exception. [{}]", new Object[]{reduceTask, e});
            }
        }).build();
    }

    private void doReduce(ReduceTaskDTO reduceTask) {
        List jobTasks = this.jobTaskMapper.selectList((IPage)new PageDTO(1L, 1L), (Wrapper)((LambdaQueryWrapper)((LambdaQueryWrapper)new LambdaQueryWrapper().select(new SFunction[]{JobTask::getId}).eq(JobTask::getTaskBatchId, (Object)reduceTask.getTaskBatchId())).eq(JobTask::getMrStage, (Object)reduceTask.getMrStage())).orderByAsc(JobTask::getId));
        if (CollUtil.isNotEmpty((Collection)jobTasks)) {
            return;
        }
        Job job = (Job)this.jobMapper.selectById((Serializable)reduceTask.getJobId());
        if (JobTaskTypeEnum.MAP_REDUCE.getType() != job.getTaskType().intValue()) {
            return;
        }
        String argStr = this.jobTaskBatchHandler.getArgStr(reduceTask.getTaskBatchId(), job);
        JobTaskGenerator taskInstance = JobTaskGeneratorFactory.getTaskInstance(JobTaskTypeEnum.MAP_REDUCE.getType());
        JobTaskGenerateContext context = JobTaskConverter.INSTANCE.toJobTaskInstanceGenerateContext(job);
        context.setTaskBatchId(reduceTask.getTaskBatchId());
        context.setMrStage(reduceTask.getMrStage());
        context.setWfContext(reduceTask.getWfContext());
        context.setArgsStr(argStr);
        List<JobTask> taskList = taskInstance.generate(context);
        if (CollUtil.isEmpty(taskList)) {
            SnailJobLog.LOCAL.warn("Job task is empty, taskBatchId:[{}]", new Object[]{reduceTask.getTaskBatchId()});
            return;
        }
        String wfContext = null;
        if (Objects.nonNull(reduceTask.getWorkflowTaskBatchId())) {
            WorkflowTaskBatch workflowTaskBatch = (WorkflowTaskBatch)this.workflowTaskBatchMapper.selectOne((Wrapper)new LambdaQueryWrapper().select(new SFunction[]{WorkflowTaskBatch::getWfContext, WorkflowTaskBatch::getId}).eq(WorkflowTaskBatch::getId, (Object)reduceTask.getWorkflowTaskBatchId()));
            wfContext = workflowTaskBatch.getWfContext();
        }
        JobExecutor jobExecutor = JobExecutorFactory.getJobExecutor(JobTaskTypeEnum.MAP_REDUCE.getType());
        jobExecutor.execute(ReduceActor.buildJobExecutorContext(reduceTask, job, taskList, wfContext));
    }

    private static JobExecutorContext buildJobExecutorContext(ReduceTaskDTO reduceTask, Job job, List<JobTask> taskList, String wfContext) {
        JobExecutorContext context = JobTaskConverter.INSTANCE.toJobExecutorContext(job);
        context.setTaskList(taskList);
        context.setTaskBatchId(reduceTask.getTaskBatchId());
        context.setWorkflowTaskBatchId(reduceTask.getWorkflowTaskBatchId());
        context.setWorkflowNodeId(reduceTask.getWorkflowNodeId());
        context.setWfContext(wfContext);
        return context;
    }

    @Generated
    public ReduceActor(DistributedLockHandler distributedLockHandler, JobMapper jobMapper, JobTaskMapper jobTaskMapper, WorkflowTaskBatchMapper workflowTaskBatchMapper, JobTaskBatchHandler jobTaskBatchHandler) {
        this.distributedLockHandler = distributedLockHandler;
        this.jobMapper = jobMapper;
        this.jobTaskMapper = jobTaskMapper;
        this.workflowTaskBatchMapper = workflowTaskBatchMapper;
        this.jobTaskBatchHandler = jobTaskBatchHandler;
    }
}

