package kd.bos.schedule.executor;

import java.io.IOException;
import java.sql.ResultSet;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.EnumMap;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.LockSupport;
import java.util.concurrent.locks.ReentrantLock;
import kd.bos.context.RequestContext;
import kd.bos.dataentity.TypesContainer;
import kd.bos.dataentity.serialization.SerializationUtils;
import kd.bos.dataentity.utils.StringUtils;
import kd.bos.db.DB;
import kd.bos.db.DBRoute;
import kd.bos.db.ResultSetHandler;
import kd.bos.db.SqlParameter;
import kd.bos.dc.api.model.Account;
import kd.bos.dc.utils.AccountUtils;
import kd.bos.dlock.DLock;
import kd.bos.instance.AppGroup;
import kd.bos.instance.Instance;
import kd.bos.instance.MainGroupChangeListener;
import kd.bos.logging.Log;
import kd.bos.logging.LogFactory;
import kd.bos.metric.Counter;
import kd.bos.metric.MetricSystem;
import kd.bos.mq.MessageAcker;
import kd.bos.schedule.api.Executor;
import kd.bos.schedule.api.JobInfo;
import kd.bos.schedule.api.MessageHandler;
import kd.bos.schedule.api.MessageInfo;
import kd.bos.schedule.api.MessageType;
import kd.bos.schedule.api.MessageWatcher;
import kd.bos.schedule.api.ObjectFactory;
import kd.bos.schedule.api.ScheduleDeployMode;
import kd.bos.schedule.api.StrategyType;
import kd.bos.schedule.message.AbstractService;
import kd.bos.schedule.message.JobProcessor;
import kd.bos.schedule.message.MessageCreator;
import kd.bos.schedule.message.mq.MQObjectFactory;
import kd.bos.schedule.server.ExecutorServerStatistic;
import kd.bos.schedule.server.broadcast.BroadcastJobHandler;
import kd.bos.schedule.utils.RequestContextUtils;
import kd.bos.schedule.utils.ScheduleAccountUtils;
import kd.bos.schedule.zk.ActiveKeyValueStore;
import kd.bos.schedule.zk.ZkConfig;
import kd.bos.thread.ThreadLifeCycleManager;
import kd.bos.threads.ThreadPool;
import kd.bos.threads.ThreadPools;
import org.apache.zookeeper.CreateMode;

/* loaded from: input_file:kd/bos/schedule/executor/ExecutorService.class */
public class ExecutorService extends AbstractService implements Executor {
    public static final String SCHEDULE_EXECUTOR_NUMOFWORKTHREAD_KEY = "Schedule.Executor.NumOfWorkThread";
    private static final String DEFAULT_APPID = "bos";
    protected Map<MessageType, List<MessageHandler>> messageHandlePool = new EnumMap(MessageType.class);
    protected Map<MessageType, List<Class<? extends MessageHandler>>> messageHandleClassPool = new EnumMap(MessageType.class);
    protected EnumMap<MessageType, ThreadPool> threadPools = new EnumMap<>(MessageType.class);
    protected ReentrantLock lock = new ReentrantLock();
    private AtomicBoolean registered = new AtomicBoolean(false);
    private AtomicBoolean startService = new AtomicBoolean(false);
    private static Log log = LogFactory.getLog("kd.bos.schedule.executor.ExecutorService");
    private static String SEPARATOR = "/";
    private static ExecutorService instance = null;
    private static final Counter activeComsumer = MetricSystem.counter("kd.metrics.schedule.consumer.activeThreads");
    private static final Counter maxComsumer = MetricSystem.counter("kd.metrics.schedule.consumer.maxThreads");

    public static ExecutorService getInstance() {
        return instance;
    }

    public String getName() {
        return "executor service";
    }

    protected ObjectFactory createObjectFactory(String str) {
        if (str.equalsIgnoreCase("ZooKeeper")) {
            log.error("现在已经不支持消息模式为ZooKeeper!!!!!!!!");
            MQObjectFactory mQObjectFactory = new MQObjectFactory();
            mQObjectFactory.setExecutor(this);
            return mQObjectFactory;
        }
        if (!str.equalsIgnoreCase("MQ")) {
            return null;
        }
        MQObjectFactory mQObjectFactory2 = new MQObjectFactory();
        mQObjectFactory2.setExecutor(this);
        return mQObjectFactory2;
    }

    public void start() {
        instance = this;
        super.start();
        this.startService.set(true);
        if (startExecutorNode()) {
            registerWorkConsumer();
            AppGroup.registGroupListener(new MainGroupChangeListener() { // from class: kd.bos.schedule.executor.ExecutorService.1
                public void productionToEliminateNode(String str, String str2) {
                }

                public void grayToProductionNode(String str, String str2) {
                    if (ExecutorService.this.registered.get()) {
                        Iterator<Map.Entry<MessageType, List<Class<? extends MessageHandler>>>> it = ExecutorService.this.messageHandleClassPool.entrySet().iterator();
                        while (it.hasNext()) {
                            ExecutorService.this.prepareToCustomerMessage(it.next().getKey());
                        }
                        if (Boolean.getBoolean("schedule.gray.upgrade.cleargrayconfig") && StringUtils.isNotBlank(str2)) {
                            ThreadPools.executeOnce("schedule_graytoproduct", () -> {
                                for (Account account : ScheduleAccountUtils.getAllAccountsOfCurrentEnv(false)) {
                                    try {
                                        RequestContextUtils.createRequestContext(account.getTenantId(), account.getAccountId(), "0");
                                        DLock create = DLock.create(String.format("%s_%s_%s", account.getAccountId(), "schedule", "cleargraysetting"));
                                        Throwable th = null;
                                        try {
                                            try {
                                                if (create.tryLock(0L)) {
                                                    ExecutorService.log.info("清理灰度信息： group=" + str2 + ",app=" + str);
                                                    String str3 = (String) DB.query(DBRoute.meta, String.format("select FID from %s where FNUMBER = ? ", "T_META_BIZAPP"), new SqlParameter[]{new SqlParameter(":FNUMBER", 12, str)}, new ResultSetHandler<String>() { // from class: kd.bos.schedule.executor.ExecutorService.1.1
                                                        /* renamed from: handle, reason: merged with bridge method [inline-methods] */
                                                        public String m5handle(ResultSet resultSet) throws Exception {
                                                            if (resultSet.next()) {
                                                                return resultSet.getString("FID");
                                                            }
                                                            return null;
                                                        }
                                                    });
                                                    if (StringUtils.isNotBlank(str3)) {
                                                        DB.execute(DBRoute.of("sys"), "delete from t_sch_graysetting where FGROUP = ? and  FBIZAPPID = ?", new SqlParameter[]{new SqlParameter(":FGROUP", 12, str2), new SqlParameter(":FBIZAPPID", 12, str3)});
                                                    }
                                                    DB.execute(DBRoute.of("sys"), "delete from t_sch_graysetting where FGROUP = ? and  FWHOLEAPP = ?", new SqlParameter[]{new SqlParameter(":FGROUP", 12, str2), new SqlParameter(":FWHOLEAPP", 1, "1")});
                                                }
                                                if (create != null) {
                                                    if (0 != 0) {
                                                        try {
                                                            create.close();
                                                        } catch (Throwable th2) {
                                                            th.addSuppressed(th2);
                                                        }
                                                    } else {
                                                        create.close();
                                                    }
                                                }
                                                RequestContext requestContext = RequestContext.get();
                                                HashMap hashMap = new HashMap();
                                                hashMap.put("tenantId", requestContext.getTenantId());
                                                hashMap.put("accountId", requestContext.getAccountId());
                                                hashMap.put("userId", requestContext.getUserId());
                                                ActiveKeyValueStore.create().write(ZkConfig.getGrayChangedMessagePath() + "/chg_", SerializationUtils.toJsonString(hashMap), CreateMode.PERSISTENT_SEQUENTIAL);
                                            } catch (Throwable th3) {
                                                th = th3;
                                                throw th3;
                                                break;
                                            }
                                        } finally {
                                        }
                                    } catch (Exception e) {
                                        ExecutorService.log.error("灰度节点变更，清理灰度数据失败：" + e.getMessage(), e);
                                    }
                                }
                            });
                        }
                    }
                }
            });
        }
        startDaemonThread();
    }

    private void registerWorkConsumer() {
        String runServerName = AbstractService.getRunServerName();
        registHandler(MessageType.REALTIMEJOB, BizJobHandler.class);
        registHandler(MessageType.BIZJOB, BizJobHandler.class);
        registHandler(MessageType.BROADCASTJOB, BroadcastJobHandler.class);
        log.info("Executor server name is " + runServerName);
        MessageWatcher messageWatcher = this.objectFactory.getMessageWatcher();
        messageWatcher.setRunAt(runServerName);
        try {
            messageWatcher.start();
            register();
            if ("ZooKeeper".equalsIgnoreCase(ZkConfig.getMessageMode())) {
                registerBroadcastConsumer(runServerName);
            }
        } catch (Exception e) {
            log.error(e);
        }
        this.registered.set(true);
        this.isStart = true;
    }

    private void startDaemonThread() {
        Thread thread = new Thread(ThreadLifeCycleManager.wrapRunnable(() -> {
            while (this.startService.get()) {
                LockSupport.parkNanos(ZkConfig.getSelfCheck() * 1000000000);
                if (!this.registered.get() && startExecutorNode()) {
                    registerWorkConsumer();
                }
            }
        }));
        thread.setDaemon(true);
        thread.setName("KD_ExecutorServiceDaemo");
        thread.setUncaughtExceptionHandler((thread2, th) -> {
            log.error("executorservice.daemon thread error ", th);
        });
        thread.start();
    }

    private void registerBroadcastConsumer(String str) throws IOException {
        String[] appIds = Instance.isAppSplit() ? Instance.getAppIds() : new String[]{DEFAULT_APPID};
        ArrayList arrayList = new ArrayList();
        List allAccountsOfCurrentEnv = AccountUtils.getAllAccountsOfCurrentEnv();
        String executorServerName = ZkConfig.getExecutorServerName();
        Iterator it = allAccountsOfCurrentEnv.iterator();
        while (it.hasNext()) {
            String accountId = ((Account) it.next()).getAccountId();
            for (String str2 : appIds) {
                if (StringUtils.isNotBlank(accountId) && StringUtils.isNotBlank(str2)) {
                    String str3 = ZkConfig.getJobRootPath() + SEPARATOR + accountId + SEPARATOR + str2 + SEPARATOR + executorServerName;
                    arrayList.add(str3);
                    new BroadcastMessageCustomer().addBroadcastListener(str3);
                }
            }
        }
        log.info("Executor server: " + str + "注册监听：" + StringUtils.join(arrayList.toArray(), ","));
    }

    private boolean startExecutorNode() {
        if (ZkConfig.getDeployMode() == ScheduleDeployMode.DISABLE || ZkConfig.getDeployMode() == ScheduleDeployMode.SCHEDULE_NODE) {
            log.info("当前节点不启用worker，deploymode = " + ZkConfig.getDeployMode());
            return false;
        }
        if (Boolean.getBoolean("Schedule.disableToWork")) {
            log.info("当前节点禁用所有调度服务");
            return false;
        }
        Boolean valueOf = Boolean.valueOf(Boolean.parseBoolean(System.getProperty("Schedule.executor.qingnode.defaultstart", "false")));
        String[] appIds = Instance.getAppIds();
        if (valueOf.booleanValue() || appIds == null || appIds.length != 1 || !"qing".equalsIgnoreCase(appIds[0])) {
            return true;
        }
        log.info("轻分析节点默认不启用调度");
        return false;
    }

    public void stop() {
        this.startService.set(false);
        if (this.registered.get()) {
            this.objectFactory.getMessageWatcher().stop();
            this.zkStore.delete(ZkConfig.getExecutorServerPath() + "/" + ZkConfig.getExecutorServerName());
            this.registered.set(false);
        }
        super.stop();
    }

    private void register() {
        String executorServerName = ZkConfig.getExecutorServerName();
        String str = ZkConfig.getExecutorServerPath() + "/" + executorServerName;
        if (this.zkStore.exists(str) != null) {
            this.zkStore.delete(str);
            log.info("Executor Server启动时，前面的session没有来的及释放，自动删除。");
        }
        this.zkStore.create(str, new String(new byte[]{1}), CreateMode.EPHEMERAL);
        ExecutorServerStatistic executorServerStatistic = new ExecutorServerStatistic();
        executorServerStatistic.setId(UUID.randomUUID().toString());
        executorServerStatistic.setName(executorServerName);
        executorServerStatistic.setStartTime(System.currentTimeMillis());
        HashMap hashMap = new HashMap();
        String[] appIds = Instance.getAppIds();
        hashMap.put("host", ZkConfig.getHostIpAddress());
        hashMap.put("instanceid", Instance.getInstanceId());
        if (appIds == null) {
            hashMap.put("appids", SerializationUtils.toJsonString(Collections.EMPTY_LIST));
        } else {
            hashMap.put("appids", SerializationUtils.toJsonString(Arrays.asList(appIds)));
        }
        hashMap.put("appsplit", String.valueOf(Instance.isAppSplit()));
        hashMap.put("electionserver", ZkConfig.getHosts());
        hashMap.put("curAppGroup", AppGroup.getCurAppGroup());
        executorServerStatistic.setDetail(hashMap);
        executorServerStatistic.writeZk();
        log.info("executor Server register is successful!");
    }

    private ThreadPool getThreadPool(MessageType messageType) {
        ThreadPool threadPool = this.threadPools.get(messageType);
        if (threadPool == null) {
            try {
                this.lock.lock();
                threadPool = this.threadPools.get(messageType);
                if (threadPool == null) {
                    threadPool = ThreadPools.newFixedThreadPool("scheduleTask-" + messageType.name(), ZkConfig.getNumOfWorkThread());
                    this.threadPools.put((EnumMap<MessageType, ThreadPool>) messageType, (MessageType) threadPool);
                }
            } finally {
                this.lock.unlock();
            }
        }
        return threadPool;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void prepareToCustomerMessage(MessageType messageType) {
        try {
            this.lock.lock();
            log.info("开始注册消费");
            if (ZkConfig.getAccessJobMsgType().contains(messageType)) {
                log.info("创建消息类型：" + messageType.name() + "的消费者和对应的线程池");
                this.objectFactory.getMessageWatcher().startToWatchMessage(messageType);
                if (!this.threadPools.containsKey(messageType)) {
                    log.info("核心线程数:" + ZkConfig.getNumOfWorkThread() + ";最大线程数:" + ZkConfig.getMaxNumOfWorkThread(messageType) + ";系统限制最大线程数：" + Integer.getInteger("threadpool.cached.maxthread.size", 10000));
                    ThreadPool newCachedThreadPool = ThreadPools.newCachedThreadPool("scheduleTask-" + messageType.name(), ZkConfig.getNumOfWorkThread(), ZkConfig.getMaxNumOfWorkThread(messageType));
                    getMaxComsumer().inc(ZkConfig.getMaxNumOfWorkThread(messageType));
                    this.threadPools.put((EnumMap<MessageType, ThreadPool>) messageType, (MessageType) newCachedThreadPool);
                }
            } else {
                log.warn("配置中不支持这种类型的消息,请检查");
            }
        } finally {
            this.lock.unlock();
        }
    }

    public void registHandler(MessageType messageType, MessageHandler messageHandler) {
        log.info("handler在执行中注册：" + messageHandler.getClass().getSimpleName());
        if (!startExecutorNode()) {
            log.info("当前节点不启用调度，禁止注册handler");
            return;
        }
        messageHandler.setObjectFactory(this.objectFactory);
        if (!this.messageHandlePool.containsKey(messageType)) {
            ArrayList arrayList = new ArrayList();
            arrayList.add(messageHandler);
            this.messageHandlePool.put(messageType, arrayList);
            prepareToCustomerMessage(messageType);
            return;
        }
        List<MessageHandler> list = this.messageHandlePool.get(messageType);
        if (list.contains(messageHandler)) {
            return;
        }
        list.add(messageHandler);
        prepareToCustomerMessage(messageType);
    }

    public void unRegistHandler(MessageType messageType, MessageHandler messageHandler) {
        if (this.messageHandlePool.containsKey(messageType)) {
            this.messageHandlePool.get(messageType).remove(messageHandler);
        }
    }

    public void registHandler(MessageType messageType, Class<? extends MessageHandler> cls) {
        log.info("handler在执行中注册：" + cls.getSimpleName());
        if (!startExecutorNode()) {
            log.info("当前节点不启用调度，禁止注册handler");
            return;
        }
        if (!this.messageHandleClassPool.containsKey(messageType)) {
            ArrayList arrayList = new ArrayList();
            arrayList.add(cls);
            this.messageHandleClassPool.put(messageType, arrayList);
            prepareToCustomerMessage(messageType);
            return;
        }
        List<Class<? extends MessageHandler>> list = this.messageHandleClassPool.get(messageType);
        if (list.contains(cls)) {
            return;
        }
        list.add(cls);
        prepareToCustomerMessage(messageType);
    }

    public void unRegistHandler(MessageType messageType, Class<? extends MessageHandler> cls) {
        if (this.messageHandleClassPool.containsKey(messageType)) {
            this.messageHandleClassPool.get(messageType).remove(cls);
        }
    }

    public List<MessageHandler> getMessageHandler(MessageType messageType) {
        ArrayList arrayList = new ArrayList();
        List<Class<? extends MessageHandler>> list = this.messageHandleClassPool.get(messageType);
        if (list != null) {
            Iterator<Class<? extends MessageHandler>> it = list.iterator();
            while (it.hasNext()) {
                try {
                    arrayList.add(TypesContainer.createInstance(it.next()));
                } catch (Exception e) {
                    log.error(e);
                }
            }
        }
        if (this.messageHandlePool.get(messageType) != null) {
            arrayList.addAll(this.messageHandlePool.get(messageType));
        }
        return arrayList;
    }

    private static Counter getActiveComsumer() {
        return activeComsumer;
    }

    private static Counter getMaxComsumer() {
        return maxComsumer;
    }

    public void processMessage(MessageInfo messageInfo, MessageAcker messageAcker) {
        try {
            getActiveComsumer().inc();
            JobProcessor jobProcessor = new JobProcessor();
            jobProcessor.setObjectFactory(getObjectFactory());
            List<MessageHandler> messageHandler = getMessageHandler(messageInfo.getMessageType());
            if (messageHandler.isEmpty()) {
                log.error("没有注册对应的handler,无法处理这个类型的消息:" + messageInfo.toString());
                if (messageAcker != null) {
                    messageAcker.deny(messageInfo.getMessageMQId());
                }
                getActiveComsumer().dec();
                return;
            }
            Iterator<MessageHandler> it = messageHandler.iterator();
            while (it.hasNext()) {
                jobProcessor.setHandler(it.next());
                jobProcessor.setMessage(messageInfo);
                jobProcessor.setAcker(messageAcker);
                Future submit = getThreadPool(messageInfo.getMessageType()).submit(jobProcessor);
                JobInfo convertMessage = jobProcessor.convertMessage(messageInfo);
                try {
                    int timeout = convertMessage.getTimeout();
                    String property = System.getProperty("Schedule.task.default.timeout");
                    if (timeout == 0) {
                        submit.get(StringUtils.isNotBlank(property) ? Integer.parseInt(property) : 86400, TimeUnit.SECONDS);
                    } else if (timeout == -1) {
                        submit.get();
                    } else {
                        submit.get(timeout, TimeUnit.SECONDS);
                    }
                } catch (Exception e) {
                    log.info("后台事务-超时:", e);
                    jobProcessor.setCancle(true);
                    jobProcessor.setTimeOut(true);
                    MessageTimeout(messageInfo);
                    if (StrategyType.WAITBEFORETASK.getValue().equals(convertMessage.getStrategy())) {
                        String jobLockKey = JobProcessor.getJobLockKey(convertMessage);
                        log.info("后台事务-超时锁释放:" + jobLockKey);
                        DLock.forceUnlock(new String[]{jobLockKey});
                    }
                    submit.cancel(true);
                }
            }
            getActiveComsumer().dec();
        } catch (Throwable th) {
            getActiveComsumer().dec();
            throw th;
        }
    }

    private void MessageTimeout(MessageInfo messageInfo) {
        MessageInfo createStatusMessage = MessageCreator.createStatusMessage(messageInfo, "TIMEOUT", (String) null);
        createStatusMessage.setTarget(ZkConfig.getExecutorServerName());
        getInstance().getObjectFactory().getMessageSender().send(createStatusMessage);
    }
}
