package kd.bos.mq.delay;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeoutException;
import kd.bos.exception.BosErrorCode;
import kd.bos.exception.KDException;

/* loaded from: input_file:kd/bos/mq/delay/RabbitMQDelayManager.class */
public class RabbitMQDelayManager {
    private static final String DELAY_EXCHANGE = "delayQueueExchange";
    private static final String DEAD_LETTER_EXCHANGE = "dlxExchange";
    private static final String TTL_QUEUE_SUFFIX = "_ttlQueue";
    private static final String DELAY_EXCHANGE_BINDING_KEY_SUFFIX = ".#";
    private static final String DEAD_LETTER_EXCHANGE_BINDING_KEY_PREFIX = "*.";
    private static final String PUBLISH_ROUTE_KEY_SEPARATOR = ".";
    private static Set<String> preDealCache = new HashSet();

    private static void initDelayAndDeadLetterExchange(Channel channel) throws IOException {
        channel.exchangeDeclare(DELAY_EXCHANGE, BuiltinExchangeType.TOPIC, true, false, (Map) null);
        channel.exchangeDeclare(DEAD_LETTER_EXCHANGE, BuiltinExchangeType.TOPIC, true, false, (Map) null);
    }

    private static void initTTLQueue(Channel channel, MetaTime metaTime) throws IOException {
        HashMap hashMap = new HashMap();
        hashMap.put("x-message-ttl", Integer.valueOf(metaTime.getMillis()));
        hashMap.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
        hashMap.put("x-max-length", 1000000);
        channel.queueDeclare(metaTime.getName() + TTL_QUEUE_SUFFIX, true, false, false, hashMap);
        channel.queueBind(metaTime.getName() + TTL_QUEUE_SUFFIX, DELAY_EXCHANGE, metaTime.getName() + DELAY_EXCHANGE_BINDING_KEY_SUFFIX);
    }

    private static void dlxBindTargetQueue(Channel channel, String str) throws IOException {
        channel.queueBind(str, DEAD_LETTER_EXCHANGE, DEAD_LETTER_EXCHANGE_BINDING_KEY_PREFIX + str);
    }

    private static void delayMessagePreDeal(Channel channel, String str, MetaTime metaTime) throws IOException {
        if (preDealCache.add(metaTime.getName() + str)) {
            initDelayAndDeadLetterExchange(channel);
            initTTLQueue(channel, metaTime);
            dlxBindTargetQueue(channel, str);
        }
    }

    private static String assembleRoutingKey(String str, MetaTime metaTime) {
        return metaTime.getName() + PUBLISH_ROUTE_KEY_SEPARATOR + str;
    }

    public static void publishDelayMessage(byte[] bArr, Channel channel, String str, MetaTime metaTime) throws IOException {
        delayMessagePreDeal(channel, str, metaTime);
        channel.basicPublish(DELAY_EXCHANGE, assembleRoutingKey(str, metaTime), new AMQP.BasicProperties().builder().deliveryMode(2).build(), bArr);
    }

    public static void publishDelayMessageConfirmModel(byte[] bArr, Channel channel, String str, MetaTime metaTime) throws IOException, TimeoutException, InterruptedException {
        delayMessagePreDeal(channel, str, metaTime);
        AMQP.BasicProperties build = new AMQP.BasicProperties().builder().deliveryMode(2).build();
        String assembleRoutingKey = assembleRoutingKey(str, metaTime);
        channel.confirmSelect();
        channel.basicPublish(DELAY_EXCHANGE, assembleRoutingKey, build, bArr);
        if (!channel.waitForConfirms(Long.getLong("mq.publish.confirm.timeout", 60000L).longValue())) {
            throw new KDException(BosErrorCode.rabbitmqException, new Object[]{"publish fail: wait for confirm timeout"});
        }
    }
}
