package com.sinosoftgz.starter.rabbitmq.utils;

import com.rabbitmq.client.Channel;
import com.sinosoftgz.starter.rabbitmq.constant.MessageDelayLevel;
import java.io.IOException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.AbstractExchange;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Exchange;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.amqp.RabbitProperties;
import org.springframework.util.ObjectUtils;

/* loaded from: input_file:com/sinosoftgz/starter/rabbitmq/utils/RabbitMqUtils.class */
public class RabbitMqUtils {
    private static final Logger log = LoggerFactory.getLogger(RabbitMqUtils.class);
    private final RabbitAdmin rabbitAdmin;
    private final RabbitTemplate rabbitTemplate;

    @Autowired
    RabbitProperties rabbitProperties;

    public RabbitMqUtils(RabbitAdmin rabbitAdmin, RabbitTemplate rabbitTemplate) {
        this.rabbitAdmin = rabbitAdmin;
        this.rabbitTemplate = rabbitTemplate;
    }

    public void addExchange(AbstractExchange abstractExchange) {
        this.rabbitAdmin.declareExchange(abstractExchange);
    }

    public boolean deleteExchange(String str) {
        return this.rabbitAdmin.deleteExchange(str);
    }

    public Queue addQueue() {
        return this.rabbitAdmin.declareQueue();
    }

    public String addQueue(Queue queue) {
        return this.rabbitAdmin.declareQueue(queue);
    }

    public void deleteQueue(String str, boolean z, boolean z2) {
        this.rabbitAdmin.deleteQueue(str, z, z2);
    }

    public boolean deleteQueue(String str) {
        return this.rabbitAdmin.deleteQueue(str);
    }

    public void addBinding(Queue queue, TopicExchange topicExchange, String str) {
        this.rabbitAdmin.declareBinding(BindingBuilder.bind(queue).to(topicExchange).with(str));
    }

    public void addBinding(Exchange exchange, TopicExchange topicExchange, String str) {
        this.rabbitAdmin.declareBinding(BindingBuilder.bind(exchange).to(topicExchange).with(str));
    }

    public void removeBinding(Binding binding) {
        this.rabbitAdmin.removeBinding(binding);
    }

    public void basicAck(Message message, Channel channel) {
        checkAcknowledgeMode();
        try {
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (IOException e) {
            log.error("通知 MQ 消息已被成功消费,可以ACK了异常信息", e);
        }
    }

    private void basicRecover(Channel channel) {
        try {
            channel.basicRecover();
        } catch (IOException e) {
            log.error("处理失败,重新压入MQ发生异常，异常信息", e);
        }
    }

    public void basicAckAndRecover(Message message, Channel channel) {
        checkAcknowledgeMode();
        try {
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (IOException e) {
            log.error("通知 MQ 消息已被成功消费,可以ACK发生异常，异常信息", e);
            basicRecover(channel);
        }
    }

    private void checkAcknowledgeMode() {
        AcknowledgeMode acknowledgeMode = this.rabbitProperties.getListener().getSimple().getAcknowledgeMode();
        if (ObjectUtils.isEmpty(acknowledgeMode)) {
            log.info("未配置的消息确认模式，默认为：{} 。请确保消费方没有手动确认消息，否则将抛出如下异常：Caused by: org.springframework.amqp.AmqpException: PublisherCallbackChannel is closed", AcknowledgeMode.AUTO);
        } else {
            if (AcknowledgeMode.MANUAL != acknowledgeMode) {
                log.error("配置的消息确认模式不是手动模式，不允许手动确认，配置的消息确认模式为：{} ", acknowledgeMode);
                throw new RuntimeException("配置的消息确认模式不是手动模式，不允许手动确认。");
            }
            if (AcknowledgeMode.AUTO == acknowledgeMode) {
                log.info("配置的消息确认为：{} 。请确保消费方没有手动确认消息，否则将抛出如下异常：Caused by: org.springframework.amqp.AmqpException: PublisherCallbackChannel is closed", AcknowledgeMode.AUTO);
            }
        }
    }

    public void directQueue(String str, Object obj) {
        log.debug("directQueue queueName:{},message:{}", str, obj);
        this.rabbitTemplate.convertAndSend(str, obj);
    }

    public void fanoutExchange(String str, String str2, Object obj) {
        log.debug("fanoutExchange queueName:{},routingKey:{},message:{}", new Object[]{str, str2, obj});
        this.rabbitTemplate.convertAndSend(str, str2, obj);
    }

    public void topicExchange(String str, String str2, Object obj) {
        log.debug("topicExchange queueName:{},routingKey:{},message:{}", new Object[]{str, str2, obj});
        this.rabbitTemplate.convertAndSend(str, str2, obj);
    }

    public void delayQueue(String str, String str2, Object obj, MessageDelayLevel messageDelayLevel) {
        log.debug("delayQueue exchange:{},queueName:{},message:{}", new Object[]{str, str2, obj});
        this.rabbitTemplate.convertAndSend(str, str2, obj, message -> {
            message.getMessageProperties().setHeader("x-delay", Integer.valueOf(messageDelayLevel.getValue()));
            return message;
        });
    }
}
