package com.kingdee.bos.qing.common.distribute.task;

import com.kingdee.bos.qing.common.context.QingContext;
import com.kingdee.bos.qing.common.distribute.exception.TaskTransportException;
import com.kingdee.bos.qing.common.distribute.resource.ClusterCalcServerMonitor;
import com.kingdee.bos.qing.common.distribute.resource.ServerResourceMgr;
import com.kingdee.bos.qing.common.framework.model.QingServiceAsynDispatcherModel;
import com.kingdee.bos.qing.common.strategy.CustomStrategyRegistrar;
import com.kingdee.bos.qing.common.strategy.framework.ITaskDistributeStrategy;
import com.kingdee.bos.qing.common.thread.DelayedRunnable;
import com.kingdee.bos.qing.common.thread.GlobalScheduledExecutor;
import com.kingdee.bos.qing.util.LogUtil;
import com.kingdee.bos.qing.util.NetUtil;
import com.kingdee.bos.qing.util.StringUtils;
import com.kingdee.bos.qing.util.SystemPropertyUtil;
import com.kingdee.bos.qing.util.ThreadPoolManage;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;

/* loaded from: input_file:com/kingdee/bos/qing/common/distribute/task/DistributeTaskMgr.class */
public class DistributeTaskMgr {
    public static final String TASK_LOG_INFO_PREFIX = "QingDistributeTask--";
    public static final String TASK_STATUS_KEY_PREFIX = "Distribute.QingTask-";
    private static final DistributeTaskMgr instance = new DistributeTaskMgr();
    private static final String DEFAULT_APP_ID = "qing";
    private ClusterCalcServerMonitor serverNodeMonitor;
    private ITaskDistributeStrategy taskStrategy;
    private String localIp;
    private String appId;
    private Map<String, AbstractTaskChannel> taskChannels = new HashMap(10);
    private volatile boolean isOpened = false;
    private boolean ignoreAppId = false;
    private AtomicInteger initializeFlag = new AtomicInteger(1);
    private ServerResourceMgr serverResourceMgr = ServerResourceMgr.getInstance();

    private DistributeTaskMgr() {
    }

    public static DistributeTaskMgr getInstance() {
        return instance;
    }

    public void asyncStartWithNoAppId() {
        this.ignoreAppId = true;
        if (this.initializeFlag.compareAndSet(1, 0)) {
            this.appId = DEFAULT_APP_ID;
            LogUtil.info("QingDistributeTask-- begin initialize task distribution for appId:" + this.appId);
            asyncStart(this.appId);
        }
    }

    public void asyncStartForOnce(String str) {
        if (null == str || str.isEmpty() || !this.initializeFlag.compareAndSet(1, 0)) {
            return;
        }
        this.appId = str;
        if (str.equalsIgnoreCase(DEFAULT_APP_ID)) {
            LogUtil.info("QingDistributeTask-- begin initialize task distribution for appId:" + str);
            asyncStart(str);
        }
    }

    public String getAppId() {
        return this.appId;
    }

    private void asyncStart(final String str) {
        GlobalScheduledExecutor.schedule(new DelayedRunnable() { // from class: com.kingdee.bos.qing.common.distribute.task.DistributeTaskMgr.1
            @Override // com.kingdee.bos.qing.common.thread.DelayedRunnable
            public long getDelayTime() {
                return 100L;
            }

            @Override // java.lang.Runnable
            public void run() {
                if (!SystemPropertyUtil.getBoolean("qing.distribute.task.enable", false)) {
                    LogUtil.info("QingDistributeTask-- distribute not enabled");
                    return;
                }
                DistributeTaskMgr.this.taskStrategy = (ITaskDistributeStrategy) CustomStrategyRegistrar.getStrategy(ITaskDistributeStrategy.class);
                if (null == DistributeTaskMgr.this.taskStrategy) {
                    LogUtil.info("QingDistributeTask-- task strategy impl not found");
                    return;
                }
                DistributeTaskMgr.this.localIp = NetUtil.getLocalHostIp();
                LogUtil.info("QingDistributeTask-- begin init local task queue and consumer,localIp:" + DistributeTaskMgr.this.localIp);
                DistributeTaskMgr.this.taskStrategy.initLocalTaskQueueAndConsumer(str, DistributeTaskMgr.this.localIp);
                LogUtil.info("QingDistributeTask-- begin init server resource manager，localIp:" + DistributeTaskMgr.this.localIp);
                DistributeTaskMgr.this.serverResourceMgr.start();
                DistributeTaskMgr.this.serverNodeMonitor = new ClusterCalcServerMonitor();
                DistributeTaskMgr.this.serverNodeMonitor.addServerNodeListener(DistributeTaskMgr.this.taskStrategy.getServerNodeListener());
                DistributeTaskMgr.this.serverNodeMonitor.addServerNodeListener(DistributeTaskMgr.this.serverResourceMgr);
                LogUtil.info("QingDistributeTask-- begin init server zk node,localIp:" + DistributeTaskMgr.this.localIp);
                if (DistributeTaskMgr.this.serverNodeMonitor.start(str)) {
                    LogUtil.info("QingDistributeTask--qing thread task distribution opened,localIp:" + DistributeTaskMgr.this.localIp);
                    DistributeTaskMgr.this.isOpened = true;
                } else {
                    LogUtil.info("QingDistributeTask-- init server zk node failed,localIp:" + DistributeTaskMgr.this.localIp);
                    DistributeTaskMgr.this.isOpened = false;
                }
            }
        });
    }

    public void regNewTaskChannel(String str, AbstractTaskChannel abstractTaskChannel) {
        synchronized (this.taskChannels) {
            this.taskChannels.put(str, abstractTaskChannel);
        }
    }

    public boolean isTaskChannelExist(String str) {
        boolean containsKey;
        synchronized (this.taskChannels) {
            containsKey = this.taskChannels.containsKey(str);
        }
        return containsKey;
    }

    public AbstractTaskChannel getTaskChannel(String str) {
        AbstractTaskChannel abstractTaskChannel;
        synchronized (this.taskChannels) {
            abstractTaskChannel = this.taskChannels.get(str);
        }
        return abstractTaskChannel;
    }

    public AbstractTaskChannel removeAndCloseChannel(String str) {
        AbstractTaskChannel remove;
        synchronized (this.taskChannels) {
            remove = this.taskChannels.remove(str);
        }
        if (null != remove) {
            remove.close();
        }
        return remove;
    }

    private AbstractTaskChannel getFastestAckedChannel(String str, String str2, long j) {
        HashSet<AbstractTaskChannel> hashSet = new HashSet(5);
        synchronized (this.taskChannels) {
            hashSet.addAll(this.taskChannels.values());
        }
        AbstractTaskChannel abstractTaskChannel = null;
        long j2 = 0;
        for (AbstractTaskChannel abstractTaskChannel2 : hashSet) {
            if (!abstractTaskChannel2.isRecentBlockExist(str, str2)) {
                long selectRecentRemoteCosts = abstractTaskChannel2.selectRecentRemoteCosts(str, str2);
                if (-1 != selectRecentRemoteCosts) {
                    if (null == abstractTaskChannel) {
                        abstractTaskChannel = abstractTaskChannel2;
                        j2 = selectRecentRemoteCosts;
                    } else if (selectRecentRemoteCosts < j2) {
                        j2 = selectRecentRemoteCosts;
                        abstractTaskChannel = abstractTaskChannel2;
                    }
                }
            }
        }
        if (abstractTaskChannel == null || j2 >= j) {
            return null;
        }
        return abstractTaskChannel;
    }

    public boolean canDispatch() {
        int size;
        synchronized (this.taskChannels) {
            size = this.taskChannels.size();
        }
        return this.isOpened && size >= 1;
    }

    public RemoteSubmitState directExecuteTask(QingContext qingContext, QingServiceAsynDispatcherModel qingServiceAsynDispatcherModel, String str) {
        AbstractTaskChannel abstractTaskChannel;
        synchronized (this.taskChannels) {
            abstractTaskChannel = this.taskChannels.get(str);
        }
        if (null == abstractTaskChannel) {
            return RemoteSubmitState.ERR_NO_AVAILABLE_CHANNEL;
        }
        TaskEvent taskEvent = new TaskEvent();
        taskEvent.setEventType(TaskEventType.SUBMIT);
        taskEvent.setTaskId(UUID.randomUUID().toString());
        TaskRequest taskRequest = new TaskRequest();
        taskRequest.setFromServer(this.localIp);
        taskRequest.setTaskModel(qingServiceAsynDispatcherModel);
        taskRequest.setQingContext(qingContext);
        taskEvent.setData(taskRequest);
        try {
            RemoteSubmitState publishTask = abstractTaskChannel.publishTask(qingContext, taskEvent);
            switch (publishTask) {
                case SUCCEED:
                    if (qingServiceAsynDispatcherModel.getSourceServer() == null) {
                        GlobalScheduledExecutor.schedule(new TimeoutTaskReExecutor(taskEvent, abstractTaskChannel));
                        break;
                    }
                    break;
                case ERR_NO_AVAILABLE_CHANNEL:
                    LogUtil.warn("QingDistributeTask--qing remote task data size too big, serviceType= " + qingServiceAsynDispatcherModel.getServiceType() + ",method=" + qingServiceAsynDispatcherModel.getMethodName());
                    qingServiceAsynDispatcherModel.setSizeTooBig(true);
                    break;
            }
            return publishTask;
        } catch (TaskTransportException e) {
            LogUtil.error(StringUtils.EMPTY, e);
            return RemoteSubmitState.ERR_SUBMIT_FAILED;
        }
    }

    public RemoteSubmitState remoteExecuteTask(QingContext qingContext, QingServiceAsynDispatcherModel qingServiceAsynDispatcherModel, ThreadPoolManage.QingThreadPoolName qingThreadPoolName) {
        if (!this.isOpened) {
            return RemoteSubmitState.ERR_NOT_OPEN;
        }
        String appID = qingServiceAsynDispatcherModel.getAppID();
        if (!this.ignoreAppId && (null == appID || !appID.equals(this.appId))) {
            return RemoteSubmitState.ERR_APPID_NOT_MATCH;
        }
        AbstractTaskChannel fastestAckedChannel = getFastestAckedChannel(qingContext.getSessionID(), qingThreadPoolName.name(), 1000L);
        if (null == fastestAckedChannel) {
            String bestResourceServer = this.serverResourceMgr.getBestResourceServer(qingThreadPoolName);
            if (null == bestResourceServer) {
                return RemoteSubmitState.ERR_NO_AVAILABLE_CHANNEL;
            }
            synchronized (this.taskChannels) {
                fastestAckedChannel = this.taskChannels.get(bestResourceServer);
            }
            if (!fastestAckedChannel.isAvailable() || fastestAckedChannel.isRecentBlockExist(qingContext.getSessionID(), qingThreadPoolName.name())) {
                fastestAckedChannel = null;
            }
        }
        if (null == fastestAckedChannel) {
            return RemoteSubmitState.ERR_NO_AVAILABLE_CHANNEL;
        }
        TaskEvent taskEvent = new TaskEvent();
        taskEvent.setEventType(TaskEventType.SUBMIT);
        taskEvent.setTaskId(UUID.randomUUID().toString());
        TaskRequest taskRequest = new TaskRequest();
        taskRequest.setFromServer(this.localIp);
        taskRequest.setTaskModel(qingServiceAsynDispatcherModel);
        taskRequest.setQingContext(qingContext);
        taskEvent.setData(taskRequest);
        try {
            RemoteSubmitState publishTask = fastestAckedChannel.publishTask(qingContext, taskEvent);
            switch (publishTask) {
                case SUCCEED:
                    if (qingServiceAsynDispatcherModel.getSourceServer() == null) {
                        GlobalScheduledExecutor.schedule(new TimeoutTaskReExecutor(taskEvent, fastestAckedChannel));
                        break;
                    }
                    break;
                case ERR_MSG_TOO_BIG:
                    LogUtil.warn("QingDistributeTask--qing remote task data size too big, serviceType= " + qingServiceAsynDispatcherModel.getServiceType() + ",method=" + qingServiceAsynDispatcherModel.getMethodName());
                    qingServiceAsynDispatcherModel.setSizeTooBig(true);
                    break;
            }
            return publishTask;
        } catch (TaskTransportException e) {
            LogUtil.error(StringUtils.EMPTY, e);
            return RemoteSubmitState.ERR_SUBMIT_FAILED;
        }
    }

    public String getLocalIp() {
        return this.localIp;
    }

    public void handleTaskResponse(String str, String str2, TaskResponse taskResponse) {
        AbstractTaskChannel abstractTaskChannel;
        synchronized (this.taskChannels) {
            abstractTaskChannel = this.taskChannels.get(str);
        }
        if (null != abstractTaskChannel) {
            abstractTaskChannel.receiveTaskResponse(str2, taskResponse);
        }
    }

    public void sendTaskResponse(String str, TaskEvent taskEvent) {
        AbstractTaskChannel abstractTaskChannel;
        synchronized (this.taskChannels) {
            abstractTaskChannel = this.taskChannels.get(str);
        }
        if (null != abstractTaskChannel) {
            try {
                abstractTaskChannel.sendTaskResponse(taskEvent);
            } catch (TaskTransportException e) {
                LogUtil.error(StringUtils.EMPTY, e);
            }
        }
    }
}
