package kd.mmc.phm.mservice.framework.mq.manager;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import kd.bos.context.RequestContext;
import kd.bos.db.DB;
import kd.bos.exception.KDBizException;
import kd.bos.logging.Log;
import kd.bos.logging.LogFactory;
import kd.mmc.phm.common.consts.CommonConsts;
import kd.mmc.phm.common.enums.RunningState;
import kd.mmc.phm.common.errorcode.PHMErrorCode;
import kd.mmc.phm.mservice.QueueConsts;
import kd.mmc.phm.mservice.framework.mq.IEventManager;
import kd.mmc.phm.mservice.framework.mq.MQServiceHelper;
import kd.mmc.phm.mservice.framework.mq.consumer.ProcessControlConsumer;
import kd.mmc.phm.mservice.framework.mq.event.PHMEvent;
import kd.mmc.phm.mservice.framework.mq.event.ProcessAutoUpdateEvent;
import kd.mmc.phm.mservice.framework.runner.CalcManager;
import kd.mmc.phm.mservice.model.process.ProcessEventNode;

/* loaded from: input_file:kd/mmc/phm/mservice/framework/mq/manager/ProcessAutoUpdateEventManager.class */
public class ProcessAutoUpdateEventManager implements IEventManager {
    private static final Log log = LogFactory.getLog(ProcessAutoUpdateEventManager.class);
    private final String eventId;
    private final long historyId;
    private final Map<ProcessEventNode, CountDownLatch> node2Lock = new ConcurrentHashMap();
    private final Map<ProcessEventNode, CompletableFuture> allExecutionChain = new ConcurrentHashMap();
    private final Map<ProcessEventNode, CompletableFuture> parallelExecutionChain = new ConcurrentHashMap();
    private volatile boolean flag = true;

    public ProcessAutoUpdateEventManager(String str, long j) {
        this.eventId = str;
        this.historyId = j;
        MQServiceHelper.registControlQueue(MQServiceHelper.getRealControlQueueName(QueueConsts.CONTROL_QUEUE_NAME, str), ProcessControlConsumer.class);
    }

    @Override // kd.mmc.phm.mservice.framework.mq.IEventManager
    public void wait4Response(PHMEvent pHMEvent) {
        ProcessEventNode currentNode = ((ProcessAutoUpdateEvent) pHMEvent).getCurrentNode();
        try {
            if (!getLock(currentNode).await(30L, TimeUnit.MINUTES)) {
                throw new KDBizException(String.format("节点[%s]更新失败: 更新超时", currentNode.getName()));
            }
            log.info("自动更新节点[{}]等待结束, 执行下一个节点, eventId: {}", currentNode, this.eventId);
        } catch (InterruptedException e) {
            throw new KDBizException(e, PHMErrorCode.bizException, new Object[]{String.format("节点[%s]更新失败: 更新超时", currentNode.getName())});
        }
    }

    @Override // kd.mmc.phm.mservice.framework.mq.IEventManager
    public void onMessage(PHMEvent pHMEvent) {
        ProcessAutoUpdateEvent processAutoUpdateEvent = (ProcessAutoUpdateEvent) pHMEvent;
        ProcessEventNode currentNode = processAutoUpdateEvent.getCurrentNode();
        if (processAutoUpdateEvent.isFail()) {
            this.allExecutionChain.get(currentNode).completeExceptionally(new KDBizException(processAutoUpdateEvent.getExceptionMessage()));
        } else {
            this.allExecutionChain.remove(currentNode);
        }
        getLock(currentNode).countDown();
    }

    @Override // kd.mmc.phm.mservice.framework.mq.IEventManager
    public String getEventId() {
        return this.eventId;
    }

    public void addParallelExecutionChain(ProcessEventNode processEventNode, CompletableFuture completableFuture) {
        this.parallelExecutionChain.put(processEventNode, completableFuture);
    }

    public void putAllExecutionChain(Map<ProcessEventNode, CompletableFuture> map) {
        this.allExecutionChain.putAll(map);
    }

    public void complete(ExecutorService executorService, RequestContext requestContext) {
        for (Map.Entry<ProcessEventNode, CompletableFuture> entry : this.parallelExecutionChain.entrySet()) {
            entry.getValue().whenCompleteAsync((obj, obj2) -> {
                if (obj2 == null) {
                    log.info("自动更新节点[{}]执行链执行成功, eventId: {}", entry.getKey(), this.eventId);
                    return;
                }
                Throwable th = (Throwable) obj2;
                if (this.flag) {
                    this.flag = false;
                    RequestContext.copyAndSet(requestContext);
                    DB.update(CommonConsts.ROUTE_PHM, "UPDATE T_PHM_PROCESS_HISTORY SET FSTATUS = ?, FEXCEPTIONDESC = ? WHERE FID = ? AND FSTATUS != ?", new Object[]{RunningState.RUNNING.getValue(), th.getCause() == null ? th.getMessage() : th.getCause().getMessage(), Long.valueOf(this.historyId), RunningState.ERROR.getValue()});
                }
                log.error("自动更新节点[{}]执行链发生异常, eventId: {}", new Object[]{entry.getKey(), this.eventId, th});
            }, (Executor) executorService);
        }
        CompletableFuture.allOf((CompletableFuture[]) this.parallelExecutionChain.values().toArray(new CompletableFuture[0])).whenCompleteAsync((r7, th) -> {
            log.info("自动更新结束, 开始清除资源。eventId: {}", this.eventId);
            RequestContext.copyAndSet(requestContext);
            CalcManager.destoryEventManager(this.eventId);
            MQServiceHelper.destroyControlQueue(MQServiceHelper.getRealControlQueueName(QueueConsts.CONTROL_QUEUE_NAME, this.eventId));
            log.info("自动更新结束, 清除资源完成。eventId: {}", this.eventId);
            if (th == null) {
                log.info("自动更新所有执行链执行成功, eventId: {}", this.eventId);
                return;
            }
            ArrayList arrayList = new ArrayList();
            Iterator<Map.Entry<ProcessEventNode, CompletableFuture>> it = this.allExecutionChain.entrySet().iterator();
            while (it.hasNext()) {
                ProcessEventNode key = it.next().getKey();
                if (!this.node2Lock.containsKey(key)) {
                    arrayList.add(new Object[]{key.getStatus(), Long.valueOf(key.getEntryId())});
                }
            }
            if (arrayList.isEmpty()) {
                return;
            }
            DB.executeBatch(CommonConsts.ROUTE_PHM, "UPDATE T_PHM_PROCESS_NODE SET FSTATUS = ? WHERE FENTRYID = ?", arrayList);
        }, (Executor) executorService);
    }

    private CountDownLatch getLock(ProcessEventNode processEventNode) {
        return this.node2Lock.computeIfAbsent(processEventNode, processEventNode2 -> {
            return new CountDownLatch(1);
        });
    }
}
