package kd.bos.bec.engine.consumer;

import kd.bos.bec.engine.EvtLogUtils;
import kd.bos.bec.engine.persistence.job.EvtJobEntity;
import kd.bos.context.RequestContext;
import kd.bos.dataentity.resource.ResManager;
import kd.bos.logging.Log;
import kd.bos.logging.LogFactory;
import kd.bos.mq.MessageAcker;
import kd.bos.mq.MessageConsumer;
import kd.bos.workflow.engine.ProcessEngines;
import kd.bos.workflow.engine.WfUtils;
import kd.bos.workflow.engine.delegate.event.ActivitiEventType;
import kd.bos.workflow.engine.delegate.event.impl.ActivitiEventBuilder;
import kd.bos.workflow.engine.impl.asyncexecutor.ExecuteAsyncRunnable;
import kd.bos.workflow.engine.impl.cache.WfCacheHelper;
import kd.bos.workflow.engine.impl.cfg.ProcessEngineConfigurationImpl;
import kd.bos.workflow.engine.impl.cmd.job.JobHandleStrategy;
import kd.bos.workflow.engine.impl.cmd.job.JobHandleStrategyFactory;
import kd.bos.workflow.engine.impl.persistence.entity.event.EventLogConstants;
import kd.bos.workflow.engine.impl.persistence.entity.event.EventLogEntity;
import kd.bos.workflow.engine.impl.persistence.entity.event.EventLogEntityImpl;
import kd.bos.workflow.engine.impl.persistence.entity.job.JobStateEnum;
import kd.bos.workflow.service.WfTraceType;

/* loaded from: input_file:kd/bos/bec/engine/consumer/BizEventConsumer.class */
public class BizEventConsumer implements MessageConsumer {
    private static Log log = LogFactory.getLog(BizEventConsumer.class);
    private ProcessEngineConfigurationImpl processEngineConfiguration = null;

    public void onMessage(Object obj, String str, boolean z, MessageAcker messageAcker) {
        log.info(String.format("事件中心消费者接收到mq：%s", Long.valueOf(System.currentTimeMillis())));
        if (this.processEngineConfiguration == null) {
            this.processEngineConfiguration = (ProcessEngineConfigurationImpl) ProcessEngines.getDefaultProcessEngine().getProcessEngineConfiguration();
        }
        log.info(String.format("prepare to execute schedule task [%s]", str));
        long currentTimeMillis = System.currentTimeMillis();
        EvtJobEntity evtJobEntity = (EvtJobEntity) obj;
        EventLogEntity eventLogEntity = null;
        StringBuilder sb = new StringBuilder();
        try {
            try {
                EventLogEntityImpl create = EventLogEntityImpl.create();
                if (evtJobEntity != null) {
                    sb.append("eventJob【id=").append(evtJobEntity.getId()).append(",type=").append(evtJobEntity.getJobHandlerType()).append(ResManager.loadKDString("】分发过来，开始执行：", "BizEventConsumer_0", "bos-wf-engine", new Object[0]));
                    String jobHandlerConfiguration = evtJobEntity.getJobHandlerConfiguration();
                    if (WfUtils.isNotEmpty(jobHandlerConfiguration)) {
                        WfUtils.restoreRequestContext(jobHandlerConfiguration);
                    }
                    String source = evtJobEntity.getSource();
                    JobHandleStrategy jobHandleStrategy = JobHandleStrategyFactory.getJobHandleStrategy(source, evtJobEntity.getJobType());
                    if (jobHandleStrategy != null) {
                        jobHandleStrategy.setSource(source);
                        jobHandleStrategy.setProcessEngineConfiguration(this.processEngineConfiguration);
                        Long id = evtJobEntity.getId();
                        sb.append(ResManager.loadKDString("再次查询job，恢复信息！", "BizEventConsumer_1", "bos-wf-engine", new Object[0]));
                        evtJobEntity = (EvtJobEntity) jobHandleStrategy.findJob(id, true);
                        if (evtJobEntity != null) {
                            Long rootJobId = evtJobEntity.getRootJobId();
                            if (!WfUtils.isNotEmpty(rootJobId) || WfCacheHelper.isCurrentExecutingJob(rootJobId)) {
                                String format = String.format(ResManager.loadKDString("job【rootId=%1$s,id=%2$s】正在被其他实例机执行，此消息废弃！！！", "BizEventConsumer_7", "bos-wf-engine", new Object[0]), evtJobEntity.getRootJobId(), evtJobEntity.getId());
                                sb.append(format);
                                log.info(format);
                            } else {
                                Long id2 = evtJobEntity.getId();
                                WfTraceType.getOrCreate().setJobInfo(evtJobEntity);
                                evtJobEntity.setRootTraceNo(RequestContext.get().getTraceId());
                                evtJobEntity.setExecutor(WfUtils.getServerHost());
                                create.setBusinesskey(evtJobEntity.getBusinessKey());
                                create.setEntitynumber(evtJobEntity.getEntityNumber());
                                create.setJobid(evtJobEntity.getId());
                                create.setScene(EventLogConstants.SCENE_JOBRECEIVE);
                                if (JobStateEnum.CREATED.getNumber().equals(evtJobEntity.getState())) {
                                    evtJobEntity.setState(JobStateEnum.PREEXECUTING.getNumber());
                                }
                                jobHandleStrategy.updateJobState(evtJobEntity, false);
                                log.info(String.format("try to execute job[%s]", evtJobEntity.getId()));
                                WfCacheHelper.putCurrentExecuteRootJob(evtJobEntity.getRootJobId());
                                ExecuteAsyncRunnable executeAsyncRunnable = new ExecuteAsyncRunnable(evtJobEntity, this.processEngineConfiguration);
                                long currentTimeMillis2 = System.currentTimeMillis();
                                sb.append(ResManager.loadKDString("开始执行 \r\n", "BizEventConsumer_2", "bos-wf-engine", new Object[0]));
                                executeAsyncRunnable.run();
                                sb.append(ResManager.loadKDString("执行完毕！！", "BizEventConsumer_3", "bos-wf-engine", new Object[0]));
                                sb.append(ResManager.loadKDString("花费时间：", "BizEventConsumer_8", "bos-wf-engine", new Object[0])).append(System.currentTimeMillis() - currentTimeMillis2).append("ms。");
                                if (evtJobEntity != null && this.processEngineConfiguration.getEventDispatcher().isEnabled() && WfUtils.isTesting()) {
                                    this.processEngineConfiguration.getEventDispatcher().dispatchEvent(ActivitiEventBuilder.createEntityEvent(ActivitiEventType.JOB_EXECUTION_FINISH, evtJobEntity));
                                }
                                log.info(String.format("job[%s],taskId[%s] execute normally,cost [%s]ms", id2, str, Long.valueOf(System.currentTimeMillis() - currentTimeMillis)));
                            }
                        } else {
                            log.info(String.format("从MQ获取到的job为空，可能job已经被处理了,消息task[%s]！", str));
                        }
                    } else {
                        sb.append(String.format(ResManager.loadKDString("从MQ获取到的job为非法Job，source错误，无法找到解析类,消息task[%1$s],source[%2$s]！", "BizEventConsumer_4", "bos-wf-engine", new Object[0]), null, source));
                        log.info(String.format("从MQ获取到的job为非法Job，source错误，无法找到解析类,消息task[%s],source[%s]！", str, source));
                    }
                } else {
                    sb.append(String.format(ResManager.loadKDString("从MQ获取到的job为非法Job，未包含消息配置,消息task[%s],调度错了！", "BizEventConsumer_5", "bos-wf-engine", new Object[0]), str));
                    log.info(String.format("从MQ获取到的job为非法Job，未包含消息配置,消息task[%s],调度错了！", str));
                }
                messageAcker.ack(str);
                if (create != null) {
                    create.setContent(sb.toString());
                    EvtLogUtils.saveEvtLog(create);
                }
                if (evtJobEntity != null) {
                    if (WfUtils.isNotEmpty(evtJobEntity.getRootJobId())) {
                        WfCacheHelper.removeCurrentExecuteRootJob(evtJobEntity.getRootJobId());
                    }
                    WfCacheHelper.removeCurrentExecuteJobState(evtJobEntity.getId(), evtJobEntity.getBusinessKey());
                }
            } finally {
            }
        } catch (Throwable th) {
            messageAcker.ack(str);
            if (0 != 0) {
                eventLogEntity.setContent(sb.toString());
                EvtLogUtils.saveEvtLog(null);
            }
            if (evtJobEntity != null) {
                if (WfUtils.isNotEmpty(evtJobEntity.getRootJobId())) {
                    WfCacheHelper.removeCurrentExecuteRootJob(evtJobEntity.getRootJobId());
                }
                WfCacheHelper.removeCurrentExecuteJobState(evtJobEntity.getId(), evtJobEntity.getBusinessKey());
            }
            throw th;
        }
    }
}
