package kd.bos.xdb.eventbus;

import kd.bos.ha.component.KeepAliveService;
import kd.bos.instance.Instance;
import kd.bos.xdb.XDBLogable;
import kd.bos.xdb.util.Pair;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:kd/bos/xdb/eventbus/EventBusDispatcher.class */
public final class EventBusDispatcher implements Runnable, XDBLogable {
    private static final String instanceId = Instance.getInstanceId();
    private static final int pull_interval = 10000;
    private EventStore es;
    private final String channel;
    private final Object snooper = new Object();
    private volatile long consumedEventSeq = 0;

    /* JADX INFO: Access modifiers changed from: package-private */
    public EventBusDispatcher(EventStore eventStore) {
        this.es = eventStore;
        this.channel = eventStore.getChannel();
        KeepAliveService.registerKeepAliveListener(new RedisErrorChangeDispatcherListener(this));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void start() {
        this.consumedEventSeq = this.es.getMaxEventSeq() + 1;
        this.es.addMaxEventSeqChangeListener(j -> {
            if (this.consumedEventSeq != j) {
                synchronized (this.snooper) {
                    this.snooper.notifyAll();
                }
            }
        });
        Thread thread = new Thread(this);
        thread.setName("XDBEventBus-" + this.channel);
        thread.setDaemon(true);
        thread.start();
    }

    @Override // java.lang.Runnable
    public void run() {
        Thread currentThread = Thread.currentThread();
        while (!currentThread.isInterrupted()) {
            try {
                snoop();
                while (true) {
                    Pair<Long, Event> consumeEvent = this.es.consumeEvent(this.consumedEventSeq);
                    this.consumedEventSeq = consumeEvent.getKey().longValue();
                    Event value = consumeEvent.getValue();
                    if (value == null) {
                        break;
                    } else if (!value.isIgnoreConsumeByMySelf() || !instanceId.equals(value.getInstanceId())) {
                        EventBusImpl.onReceiveEvent(this.channel, value);
                    }
                }
            } catch (Exception e) {
                log.error("EventBusDispatcher error: " + e.getMessage(), e);
            }
        }
    }

    private void snoop() {
        synchronized (this.snooper) {
            try {
                this.snooper.wait(10000L);
            } catch (InterruptedException e) {
            }
        }
    }

    public EventStore getEventStore() {
        return this.es;
    }

    public void setEventStore(EventStore eventStore) {
        this.es = eventStore;
    }
}
