package kd.ssc.task.partask;

import kd.bos.cache.CacheFactory;
import kd.bos.cache.DistributeSessionlessCache;
import kd.bos.exception.KDException;
import kd.bos.logging.Log;
import kd.bos.logging.LogFactory;
import kd.bos.mq.MessageAcker;
import kd.bos.mq.MessageConsumer;
import kd.bos.util.StringUtils;
import kd.ssc.task.partask.service.impl.ParTaskNodeServiceImpl;
import kd.ssc.task.partask.util.CreateNewPartask;

/* loaded from: input_file:kd/ssc/task/partask/ParTaskConsumer.class */
public class ParTaskConsumer implements MessageConsumer {
    private static final int RP_TIMES = 5;
    private static final Log log = LogFactory.getLog(ParTaskConsumer.class);
    private static final DistributeSessionlessCache cache = CacheFactory.getCommonCacheFactory().getDistributeSessionlessCache("parTask-consumer");

    public void onMessage(Object obj, String str, boolean z, MessageAcker messageAcker) throws KDException {
        log.info("成功进入并行任务消费者实现类:" + getClass().getName() + "，mq传入参数: " + obj + ", " + str + ", is resend: " + z);
        if (obj == null || StringUtils.isEmpty(obj.toString())) {
            log.error("消息队列中 message 为空（或为null）,不用消费");
            messageAcker.ack(str);
            return;
        }
        if (!(obj instanceof ParTaskMessage)) {
            log.error("消息队列中 message 类型不匹配 " + ParTaskMessage.class.getName());
            messageAcker.ack(str);
            return;
        }
        ParTaskMessage parTaskMessage = (ParTaskMessage) obj;
        int hashCode = parTaskMessage.toString().hashCode();
        String str2 = (String) cache.get(String.valueOf(hashCode));
        if (str2 != null && str2.equals("exist")) {
            log.error("并行任务消费者实现类" + getClass().getName() + "在1分钟内已消费相同message。mq传入参数: " + obj + ", " + str + ", is resend: " + z + ";message转化为" + ParTaskMessage.class.getName() + "的内容：" + parTaskMessage.toString());
            messageAcker.ack(str);
            return;
        }
        long j = 0;
        int i = 0;
        while (true) {
            if (i >= 5) {
                break;
            }
            try {
                j = CreateNewPartask.createNewTask(parTaskMessage).longValue();
                if (j != -2) {
                    if (j != 0 && j != -1) {
                        log.info("第" + i + "次创建并行任务成功，任务id为:" + j);
                        break;
                    }
                } else {
                    Thread.sleep(5000L);
                }
                i++;
            } catch (InterruptedException e) {
                messageAcker.ack(str);
                log.error("Thread sleep interrupted exception", e);
            } catch (KDException e2) {
                log.error("[实例id:节点id]:[" + parTaskMessage.getInstanceId() + ":" + parTaskMessage.getNextNodeDefId() + "]的并行任务创建过程中产生异常：" + e2.getMessage(), e2);
                new ParTaskNodeServiceImpl().updateTableAfterCreateParTaskFailed(parTaskMessage.getInstanceId(), parTaskMessage.getNextNodeDefId());
                messageAcker.ack(str);
                throw e2;
            }
        }
        if (j == 0) {
            log.error("并行任务创建失败,[实例id:节点id]:[" + parTaskMessage.getInstanceId() + ":" + parTaskMessage.getNextNodeDefId() + "]");
            new ParTaskNodeServiceImpl().updateTableAfterCreateParTaskFailed(parTaskMessage.getInstanceId(), parTaskMessage.getNextNodeDefId());
        }
        cache.put(String.valueOf(hashCode), "exist", 60);
        messageAcker.ack(str);
    }
}
