package kd.fi.pa.engine.task.status;

import com.alibaba.fastjson.JSONArray;
import java.util.function.Consumer;
import kd.bos.logging.Log;
import kd.bos.logging.LogFactory;
import kd.fi.pa.common.cache.DistributeCache;
import kd.fi.pa.common.cache.IDataCacheModule;
import kd.fi.pa.common.event.IWorkTaskStatusEvent;
import kd.fi.pa.stream.datablock.IAsyncStreamDataBlock;
import kd.fi.pa.stream.pipe.AsyncStreamPipe;
import kd.fi.pa.utils.IDataBeanUtil;
import kd.fi.pa.utils.IDataJsonUtil;
import kd.fi.pa.utils.IDataValueUtil;

/* loaded from: input_file:kd/fi/pa/engine/task/status/AbstractIDataWorkTaskStatusConsumer.class */
public abstract class AbstractIDataWorkTaskStatusConsumer implements Consumer<IAsyncStreamDataBlock<IWorkTaskStatusEvent>>, IDataWorkTaskStatusMgr {
    private static final Log logger = LogFactory.getLog(AbstractIDataWorkTaskStatusConsumer.class);
    private static final int cacheTimeOut = 1800;
    protected AsyncStreamPipe<IWorkTaskStatusEvent> processPipe = new AsyncStreamPipe<>();
    protected DistributeCache cache;

    protected abstract IDataCacheModule getCacheModel();

    protected int getCacheTimeOut() {
        return cacheTimeOut;
    }

    public DistributeCache getDistributeCache() {
        if (this.cache == null) {
            this.cache = DistributeCache.getCache(getCacheModel());
        }
        return this.cache;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public AbstractIDataWorkTaskStatusConsumer() {
        this.processPipe.attach(this);
        logger.info(String.format("IDataWorkTaskStatusConsumer Started! [%s]", getClass().getName()));
    }

    @Override // kd.fi.pa.engine.task.status.IDataWorkTaskStatusMgr
    public void updateTaskStatus(IWorkTaskStatusEvent iWorkTaskStatusEvent) throws InterruptedException {
        this.processPipe.putToQueue((AsyncStreamPipe<IWorkTaskStatusEvent>) iWorkTaskStatusEvent);
    }

    @Override // kd.fi.pa.engine.task.status.IDataWorkTaskStatusMgr
    public boolean checkTaskCancelStatus(Object obj, boolean z) {
        DistributeCache distributeCache = getDistributeCache();
        String taskCacheKey = getTaskCacheKey(obj, true);
        boolean booleanValue = IDataValueUtil.getBoolean(distributeCache.get(taskCacheKey)).booleanValue();
        if (z) {
            getDistributeCache().remove(taskCacheKey);
        }
        return booleanValue;
    }

    @Override // kd.fi.pa.engine.task.status.IDataWorkTaskStatusMgr
    public void markTaskCancel(Object obj) {
        getDistributeCache().put(getTaskCacheKey(obj, true), "y", getCacheTimeOut());
    }

    protected void putCacheValue(Object obj, Object obj2) {
        getDistributeCache().put(getTaskCacheKey(obj, false), IDataJsonUtil.toJSONString(obj2), getCacheTimeOut());
    }

    @Override // kd.fi.pa.engine.task.status.IDataWorkTaskStatusMgr
    public final IWorkTaskStatusEvent getCachedWorkTaskStatusEvent(Object obj) {
        JSONArray jSONArray = IDataValueUtil.toJSONArray(getDistributeCache().get(getTaskCacheKey(obj, false)));
        if (jSONArray == null || jSONArray.isEmpty()) {
            return null;
        }
        return (IWorkTaskStatusEvent) IDataBeanUtil.getClassNewInstance(IDataBeanUtil.getClassByName(String.valueOf(jSONArray.get(0))), jSONArray);
    }

    protected String getTaskCacheKey(Object obj, boolean z) {
        return z ? getCacheModel() + "||" + obj : getCacheModel() + "|" + obj;
    }

    @Override // java.util.function.Consumer
    public void accept(IAsyncStreamDataBlock<IWorkTaskStatusEvent> iAsyncStreamDataBlock) {
        IWorkTaskStatusEvent data;
        if (iAsyncStreamDataBlock == null || (data = iAsyncStreamDataBlock.getData()) == null) {
            return;
        }
        Object taskId = data.getTaskId();
        if (data.needMergeStatus()) {
            data.mergeTaskStatus(getCachedWorkTaskStatusEvent(taskId));
        }
        putCacheValue(taskId, data);
    }
}
