package kd.mmc.phm.mservice.service.impl;

import java.util.Date;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import kd.bos.context.RequestContext;
import kd.bos.dataentity.entity.DynamicObject;
import kd.bos.dataentity.utils.StringUtils;
import kd.bos.db.DB;
import kd.bos.dlock.DLock;
import kd.bos.exception.KDBizException;
import kd.bos.logging.Log;
import kd.bos.logging.LogFactory;
import kd.bos.orm.query.QFilter;
import kd.bos.servicehelper.BusinessDataServiceHelper;
import kd.bos.servicehelper.operation.SaveServiceHelper;
import kd.bos.threads.ThreadPools;
import kd.mmc.phm.common.consts.CommonConsts;
import kd.mmc.phm.common.errorcode.PHMErrorCode;
import kd.mmc.phm.mservice.factory.ResourceAutoUpdateServiceFactory;
import kd.mmc.phm.mservice.framework.mq.MQServiceHelper;
import kd.mmc.phm.mservice.framework.mq.event.ProcessAutoUpdateEvent;
import kd.mmc.phm.mservice.model.process.ProcessEventNode;
import kd.mmc.phm.mservice.service.ConsumerService;
import org.apache.commons.lang3.exception.ExceptionUtils;

/* loaded from: input_file:kd/mmc/phm/mservice/service/impl/ProcessAutoUpdateServiceImpl.class */
public class ProcessAutoUpdateServiceImpl implements ConsumerService {
    private static final long RESOURCE_TIMEOUT = 300000;
    private static final String PREFIX = "mmc/phm/autoUpdateService/";
    private static final Log log = LogFactory.getLog(ProcessAutoUpdateServiceImpl.class);
    private static final ExecutorService executorService = ThreadPools.newCachedExecutorService("PHM_PROCESS_AUTOUPDATE_EXECUTE_WORKER", 1, 6);

    @Override // kd.mmc.phm.mservice.service.ConsumerService
    public void execute(Object obj) {
        ProcessAutoUpdateEvent processAutoUpdateEvent = (ProcessAutoUpdateEvent) obj;
        ProcessEventNode currentNode = processAutoUpdateEvent.getCurrentNode();
        long entryId = currentNode.getEntryId();
        try {
            log.info("开始自动更新节点[{}], 上节点集合{}", currentNode, processAutoUpdateEvent.getPreviousNodes());
            DynamicObject[] load = BusinessDataServiceHelper.load("phm_process_resources", "resource_type, resource_number, resource_datatype, resource_data, resource_role, resource_processhistory, resource_modifytime", new QFilter("entry_node", "=", Long.valueOf(entryId)).toArray());
            int length = load.length;
            if (length == 0) {
                log.info("自动更新节点[{}]成功, 资源为空", currentNode);
                MQServiceHelper.publishControlEvent(processAutoUpdateEvent);
                DB.update(CommonConsts.ROUTE_PHM, "UPDATE T_PHM_PROCESS_NODE SET FSTATUS = ? WHERE FENTRYID = ?", new Object[]{currentNode.getStatus(), Long.valueOf(entryId)});
                return;
            }
            RequestContext requestContext = RequestContext.get();
            CompletableFuture[] completableFutureArr = new CompletableFuture[length];
            for (int i = 0; i < length; i++) {
                int i2 = i;
                completableFutureArr[i] = CompletableFuture.runAsync(() -> {
                    autoUpdateResourceData(load[i2], processAutoUpdateEvent, requestContext);
                }, executorService);
            }
            try {
                try {
                    CompletableFuture.allOf(completableFutureArr).get(30L, TimeUnit.MINUTES);
                    log.info("自动更新节点[{}]成功", currentNode);
                } catch (InterruptedException | TimeoutException e) {
                    processAutoUpdateEvent.setFail(true);
                    processAutoUpdateEvent.setExceptionMessage(String.format("节点[%s]更新失败: 并行计算超时", currentNode.getName()));
                }
            } catch (ExecutionException e2) {
                processAutoUpdateEvent.setFail(true);
                Throwable cause = e2.getCause();
                String message = cause == null ? e2.getMessage() : cause.getMessage();
                if (StringUtils.isBlank(message)) {
                    message = "程序出现异常, 请联系管理员处理。";
                }
                processAutoUpdateEvent.setExceptionMessage(message);
            }
            MQServiceHelper.publishControlEvent(processAutoUpdateEvent);
            DB.update(CommonConsts.ROUTE_PHM, "UPDATE T_PHM_PROCESS_NODE SET FSTATUS = ? WHERE FENTRYID = ?", new Object[]{currentNode.getStatus(), Long.valueOf(entryId)});
        } catch (Throwable th) {
            MQServiceHelper.publishControlEvent(processAutoUpdateEvent);
            DB.update(CommonConsts.ROUTE_PHM, "UPDATE T_PHM_PROCESS_NODE SET FSTATUS = ? WHERE FENTRYID = ?", new Object[]{currentNode.getStatus(), Long.valueOf(entryId)});
            throw th;
        }
    }

    private void autoUpdateResourceData(DynamicObject dynamicObject, ProcessAutoUpdateEvent processAutoUpdateEvent, RequestContext requestContext) {
        RequestContext.copyAndSet(requestContext);
        ProcessEventNode currentNode = processAutoUpdateEvent.getCurrentNode();
        DynamicObject dynamicObject2 = dynamicObject.getDynamicObject("resource_number");
        DynamicObject dynamicObject3 = dynamicObject.getDynamicObject("resource_data");
        DynamicObject dynamicObject4 = dynamicObject.getDynamicObject("resource_role");
        String string = dynamicObject2 == null ? "" : dynamicObject2.getString("number");
        String string2 = dynamicObject2 == null ? "" : dynamicObject2.getString("name");
        String string3 = dynamicObject3 == null ? "" : dynamicObject3.getString("number");
        String string4 = dynamicObject4 == null ? "" : dynamicObject4.getString("name");
        DLock create = DLock.create(PREFIX + dynamicObject.getPkValue());
        if (!create.tryLock(RESOURCE_TIMEOUT)) {
            throw new KDBizException(String.format("节点[%s]更新资源[%s]失败: 获取分布式锁超时", currentNode.getName(), string3));
        }
        try {
            try {
                ResourceAutoUpdateServiceFactory.get(dynamicObject.getString("resource_type")).execute(processAutoUpdateEvent, dynamicObject);
                dynamicObject.set("resource_modifytime", new Date());
                SaveServiceHelper.save(new DynamicObject[]{dynamicObject});
                log.info("自动更新节点资源[{}]成功, 更新资源编码[{}], 更新资源版本号[{}]", new Object[]{currentNode, string, string3});
                create.unlock();
            } catch (Exception e) {
                String format = String.format("更新失败, 节点[%s]更新资源[%s]失败, 资源编码[%s], 资源名称[%s], 处理角色[%s]", currentNode.getName(), string3, string, string2, string4);
                insertEntryLog(processAutoUpdateEvent, e, format);
                throw new KDBizException(e, PHMErrorCode.bizException, new Object[]{format});
            }
        } catch (Throwable th) {
            create.unlock();
            throw th;
        }
    }

    private void insertEntryLog(ProcessAutoUpdateEvent processAutoUpdateEvent, Exception exc, String str) {
        DynamicObject loadSingle = BusinessDataServiceHelper.loadSingle(Long.valueOf(processAutoUpdateEvent.getLogId()), "phm_process_calclog");
        DynamicObject addNew = loadSingle.getDynamicObjectCollection("entryentity").addNew();
        addNew.set("exceptionmsg", str);
        addNew.set("exceptionmsg_tag", ExceptionUtils.getStackTrace(exc));
        SaveServiceHelper.save(new DynamicObject[]{loadSingle});
    }
}
