package kd.bos.mq.rocket;

import java.util.Date;
import java.util.Iterator;
import kd.bos.exception.BosErrorCode;
import kd.bos.exception.KDException;
import kd.bos.mq.MessageConsumer;
import kd.bos.mq.rabbit.ExceptionLogger;
import kd.bos.mq.support.Consumer;
import kd.bos.mq.support.ConsumerManager;
import kd.bos.mq.support.QueueManager;
import kd.bos.mq.support.TranscationSupport;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;

/* loaded from: input_file:kd/bos/mq/rocket/RocketConsumer.class */
public class RocketConsumer implements Consumer {
    private String queue;
    private boolean autoAck;
    private int concurrency;
    private MessageConsumer mc;
    private String region;
    private DefaultMQPushConsumer defaultRocketConsumer;
    private volatile boolean isStartedFlag;
    private Date startAt;

    public RocketConsumer(String str, String str2, boolean z, int i, MessageConsumer messageConsumer, int i2) {
        this.region = str;
        this.queue = str2;
        this.concurrency = i;
        this.mc = messageConsumer;
        this.autoAck = z;
        ConsumerManager.getConsumerounter().inc();
    }

    @Override // kd.bos.mq.support.Consumer
    public String getRegion() {
        return this.region;
    }

    @Override // kd.bos.mq.support.Consumer
    public void setConcurrency(int i) {
        this.concurrency = i;
    }

    @Override // kd.bos.mq.support.Consumer
    public int getConcurrency() {
        return this.concurrency;
    }

    @Override // kd.bos.mq.support.Consumer
    public String getQueueName() {
        return this.queue;
    }

    @Override // kd.bos.mq.support.Consumer
    public boolean isAutoAck() {
        return this.autoAck;
    }

    @Override // kd.bos.mq.support.Consumer
    public MessageConsumer getMessageConsumer() {
        return this.mc;
    }

    @Override // kd.bos.mq.support.Consumer
    public Date getStartAt() {
        return this.startAt;
    }

    @Override // kd.bos.mq.support.Consumer
    public void start() {
        try {
            this.isStartedFlag = true;
            this.startAt = new Date();
            this.defaultRocketConsumer = ConsumerFactory.getConsumer(this.region, this.queue);
            this.defaultRocketConsumer.setConsumeThreadMin(this.concurrency);
            this.defaultRocketConsumer.setConsumeThreadMax(this.concurrency);
            this.defaultRocketConsumer.registerMessageListener((list, consumeConcurrentlyContext) -> {
                RocketAcker rocketAcker = new RocketAcker();
                Iterator it = list.iterator();
                while (it.hasNext()) {
                    handleDelivery((MessageExt) it.next(), rocketAcker);
                }
                return getConsumerStatus(Integer.valueOf(rocketAcker.getAckStatus()));
            });
            this.defaultRocketConsumer.start();
        } catch (MQClientException e) {
            ExceptionLogger.log("Can't init consumer for queue " + this.queue, e);
            throw new KDException(e, BosErrorCode.mqException, new Object[]{"Can't init consumer for queue " + this.queue});
        }
    }

    private void handleDelivery(MessageExt messageExt, RocketAcker rocketAcker) {
        ConsumerManager.innerHandleDelivery(rocketAcker, this.mc, this.region, this.queue, messageExt.getMsgId(), messageExt.getReconsumeTimes(), messageExt.getBody(), messageExt.getTopic());
    }

    private ConsumeConcurrentlyStatus getConsumerStatus(Integer num) {
        if (this.autoAck) {
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
        switch (num.intValue()) {
            case QueueManager.DEFAULT_CONSUMER_CONCURRENCY /* 1 */:
            case TranscationSupport.CONSUMER_DELAY_MIN_LEVEL /* 3 */:
            default:
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            case 2:
                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
        }
    }

    @Override // kd.bos.mq.support.Consumer
    public void $$stop() {
        try {
            this.isStartedFlag = false;
            this.defaultRocketConsumer.shutdown();
        } catch (Exception e) {
            ExceptionLogger.log("error when stop mqchannel" + this.queue, e);
        }
    }

    @Override // kd.bos.mq.support.Consumer
    public boolean isStarted() {
        return this.isStartedFlag;
    }
}
