package kd.bos.archive.task.taskgroup;

import kd.bos.archive.task.config.BaseConfiguration;
import kd.bos.archive.task.enums.State;
import kd.bos.archive.task.taskgroup.communication.Communication;
import kd.bos.archive.task.taskgroup.runner.ReaderRunner;
import kd.bos.archive.task.taskgroup.runner.WriterRunner;
import kd.bos.archive.transport.channel.Channel;
import kd.bos.archive.transport.channel.memory.MemoryChannel;
import kd.bos.archive.transport.exchanger.BufferdRecordExchanger;
import kd.bos.xdb.util.Threads;

/* loaded from: input_file:kd/bos/archive/task/taskgroup/TaskGroupExecutor.class */
public class TaskGroupExecutor {
    private BaseConfiguration taskConfig;
    private String taskGroupMark;
    private Channel channel;
    private Thread threadWriter;
    private Thread threadReader;
    private WriterRunner writerRunner;
    private ReaderRunner readerRunner;
    private Communication taskCommunication;

    public static TaskGroupExecutor taskGroup(Writer writer, Reader reader, BaseConfiguration baseConfiguration, Communication communication, String str) {
        return new TaskGroupExecutor(writer, reader, baseConfiguration, communication, str, false);
    }

    public static TaskGroupExecutor taskGroup(Writer writer, Reader reader, BaseConfiguration baseConfiguration, Communication communication, String str, boolean z) {
        return new TaskGroupExecutor(writer, reader, baseConfiguration, communication, str, z);
    }

    public TaskGroupExecutor(Writer writer, Reader reader, BaseConfiguration baseConfiguration, Communication communication, String str, boolean z) {
        this.writerRunner = new WriterRunner(writer);
        this.readerRunner = new ReaderRunner(reader);
        this.taskConfig = baseConfiguration;
        this.taskCommunication = communication;
        this.taskGroupMark = str;
        this.channel = new MemoryChannel(z);
        this.readerRunner.setRecordSender(new BufferdRecordExchanger(this.channel, z));
        this.readerRunner.setRunnerCommunication(this.taskCommunication);
        this.readerRunner.setTaskId(this.taskConfig.getTaskEntity().getId());
        this.writerRunner.setRecordReceiver(new BufferdRecordExchanger(this.channel, z));
        this.writerRunner.setRunnerCommunication(this.taskCommunication);
        this.writerRunner.setTaskId(this.taskConfig.getTaskEntity().getId());
        this.threadWriter = new Thread(Threads.wrapRunnable(this.writerRunner), String.format("archive-writer-%s-%d", this.taskGroupMark, Long.valueOf(this.writerRunner.getTaskId())));
        this.threadReader = new Thread(Threads.wrapRunnable(this.readerRunner), String.format("archive-reader-%s-%d", this.taskGroupMark, Long.valueOf(this.readerRunner.getTaskId())));
    }

    public void doStart() throws InterruptedException {
        this.threadWriter.start();
        if (!this.threadWriter.isAlive() || this.taskCommunication.getState() == State.FAILED) {
            throw new RuntimeException(this.taskCommunication.getThrowable());
        }
        this.threadReader.start();
        if (!this.threadReader.isAlive() || this.taskCommunication.getState() == State.FAILED) {
            throw new RuntimeException(this.taskCommunication.getThrowable());
        }
        while (!this.taskCommunication.isFinished()) {
            Thread.sleep(2000L);
        }
    }
}
