package kd.bos.xdb.eventbus;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import kd.bos.config.client.util.ConfigUtils;
import kd.bos.ha.component.ComponentStatusManager;
import kd.bos.instance.Instance;
import kd.bos.util.ConfigurationUtil;
import kd.bos.xdb.eventbus.db.ClusterDBStore;
import kd.bos.xdb.eventbus.redis.ClusterRedisStore;
import kd.bos.xdb.exception.ExceptionUtil;
import kd.bos.xdb.util.Pair;
import kd.bos.zk.ZKFactory;
import org.apache.curator.framework.CuratorFramework;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:kd/bos/xdb/eventbus/EventStore.class */
public final class EventStore {
    private static final int eventBucketSize = 10000;
    private static final String ZK_URL = System.getProperty("configUrl");
    private static final String ZK_WATCHER_ROOT_PATH = ConfigUtils.getCommonPropertyPath(Instance.getClusterName()) + "/";
    private final String ZK_WATCH_KEY;
    private StoreBase rs;
    private final List<MaxEventSeqChangeListener> cl;
    private final String channel;
    private final int expireSeconds;

    EventStore() {
        this(EventBus.DEFAULT_CHANNEL);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public EventStore(String str) {
        this(str, 600);
    }

    EventStore(String str, int i) {
        this.cl = new ArrayList();
        this.channel = str;
        this.expireSeconds = i;
        this.ZK_WATCH_KEY = "XDBEventBus#EventMaxSeq#" + str;
        if (ComponentStatusManager.isException(ConfigurationUtil.getString(StoreBase.REDIS_CACHE_URL_KEY))) {
            this.rs = new ClusterDBStore("XDBEventBus#" + str + '#', i);
        } else {
            this.rs = new ClusterRedisStore("XDBEventBus#" + str + '#', i);
        }
        ConfigurationUtil.observeLong(this.ZK_WATCH_KEY, 0L, l -> {
            Iterator<MaxEventSeqChangeListener> it = this.cl.iterator();
            while (it.hasNext()) {
                it.next().onChanged(l.longValue());
            }
        });
    }

    public void setStoreBase(StoreBase storeBase) {
        this.rs = storeBase;
    }

    public StoreBase getStoreBase() {
        return this.rs;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void addMaxEventSeqChangeListener(MaxEventSeqChangeListener maxEventSeqChangeListener) {
        this.cl.add(maxEventSeqChangeListener);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public String getChannel() {
        return this.channel;
    }

    public int getExpireSeconds() {
        return this.expireSeconds;
    }

    private String getEventBucket(long j) {
        return "eventsBucket" + (j % 10000);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void publish(Event event) {
        long incMaxEventSeq = incMaxEventSeq();
        this.rs.mapSet(getEventBucket(incMaxEventSeq), String.valueOf(incMaxEventSeq), event);
        setZKWatchValue(this.ZK_WATCH_KEY, String.valueOf(incMaxEventSeq));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Pair<Long, Event> consumeEvent(long j) {
        Event event;
        long maxEventSeq = getMaxEventSeq();
        do {
            long j2 = j + 1;
            j = j2;
            if (j2 > maxEventSeq) {
                return new Pair<>(Long.valueOf(maxEventSeq), null);
            }
            event = (Event) this.rs.mapGet(getEventBucket(j), String.valueOf(j));
        } while (event == null);
        return new Pair<>(Long.valueOf(j), event);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public long getMaxEventSeq() {
        return this.rs.getInc("maxEventSeq");
    }

    private long incMaxEventSeq() {
        long inc = this.rs.inc("maxEventSeq", 1L, Integer.MAX_VALUE);
        if (inc < 0) {
            inc = 0;
            this.rs.set("maxEventSeq", 0);
        }
        return inc;
    }

    private void setZKWatchValue(String str, String str2) {
        try {
            String str3 = ZK_WATCHER_ROOT_PATH + str;
            CuratorFramework zKClient = ZKFactory.getZKClient(ZK_URL);
            if (zKClient.checkExists().forPath(str3) == null) {
                zKClient.create().forPath(str3, str2.getBytes());
            } else {
                zKClient.setData().forPath(str3, str2.getBytes());
            }
        } catch (Exception e) {
            throw ExceptionUtil.wrap(e);
        }
    }
}
