/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.integration.amqp.outbound;

import java.util.function.BiConsumer;
import org.springframework.amqp.core.AmqpMessageReturnedException;
import org.springframework.amqp.core.AmqpReplyTimeoutException;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.AsyncRabbitTemplate;
import org.springframework.amqp.rabbit.RabbitMessageFuture;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.integration.amqp.outbound.AbstractAmqpOutboundEndpoint;
import org.springframework.integration.amqp.support.MappingUtils;
import org.springframework.integration.handler.ReplyRequiredException;
import org.springframework.integration.support.AbstractIntegrationMessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandlingException;
import org.springframework.messaging.MessagingException;
import org.springframework.util.Assert;

public class AsyncAmqpOutboundGateway
extends AbstractAmqpOutboundEndpoint {
    private final AsyncRabbitTemplate template;
    private final MessageConverter messageConverter;

    public AsyncAmqpOutboundGateway(AsyncRabbitTemplate template) {
        Assert.notNull((Object)template, (String)"AsyncRabbitTemplate cannot be null");
        this.template = template;
        this.messageConverter = template.getMessageConverter();
        Assert.notNull((Object)this.messageConverter, (String)"the template's message converter cannot be null");
        this.setConnectionFactory(this.template.getConnectionFactory());
        this.setAsync(true);
    }

    public String getComponentType() {
        return "amqp:outbound-async-gateway";
    }

    @Override
    protected RabbitTemplate getRabbitTemplate() {
        return this.template.getRabbitTemplate();
    }

    @Override
    protected void doStart() {
        super.doStart();
        this.template.start();
    }

    @Override
    protected void doStop() {
        this.template.stop();
        super.doStop();
    }

    protected Object handleRequestMessage(Message<?> requestMessage) {
        org.springframework.amqp.core.Message amqpMessage = MappingUtils.mapMessage(requestMessage, this.messageConverter, this.getHeaderMapper(), this.getDefaultDeliveryMode(), this.isHeadersMappedLast());
        this.addDelayProperty(requestMessage, amqpMessage);
        RabbitMessageFuture future = this.template.sendAndReceive(this.generateExchangeName(requestMessage), this.generateRoutingKey(requestMessage), amqpMessage);
        CorrelationData correlationData = this.generateCorrelationData(requestMessage);
        if (correlationData != null && future.getConfirm() != null) {
            future.getConfirm().whenComplete((BiConsumer)new CorrelationCallback(correlationData, future));
        }
        future.whenComplete((BiConsumer)new FutureCallback(requestMessage, correlationData));
        return null;
    }

    private final class CorrelationCallback
    implements BiConsumer<Boolean, Throwable> {
        private final CorrelationData correlationData;
        private final RabbitMessageFuture replyFuture;

        CorrelationCallback(CorrelationData correlationData, RabbitMessageFuture replyFuture) {
            this.correlationData = correlationData;
            this.replyFuture = replyFuture;
        }

        @Override
        public void accept(Boolean result, Throwable throwable) {
            if (result != null) {
                try {
                    AsyncAmqpOutboundGateway.this.handleConfirm(this.correlationData, result, this.replyFuture.getNackCause());
                }
                catch (Exception e) {
                    AsyncAmqpOutboundGateway.this.logger.error((CharSequence)"Failed to send publisher confirm");
                }
            }
        }
    }

    private final class FutureCallback
    implements BiConsumer<org.springframework.amqp.core.Message, Throwable> {
        private final Message<?> requestMessage;
        private final AbstractAmqpOutboundEndpoint.CorrelationDataWrapper correlationData;

        FutureCallback(Message<?> requestMessage, CorrelationData correlationData) {
            this.requestMessage = requestMessage;
            this.correlationData = (AbstractAmqpOutboundEndpoint.CorrelationDataWrapper)correlationData;
        }

        @Override
        public void accept(org.springframework.amqp.core.Message message, Throwable throwable) {
            if (throwable == null) {
                AbstractIntegrationMessageBuilder<?> replyMessageBuilder = null;
                try {
                    replyMessageBuilder = AsyncAmqpOutboundGateway.this.buildReply(AsyncAmqpOutboundGateway.this.messageConverter, message);
                    AsyncAmqpOutboundGateway.this.sendOutputs(replyMessageBuilder, this.requestMessage);
                }
                catch (Exception ex) {
                    Exception exceptionToLogAndSend = ex;
                    if (!(ex instanceof MessagingException)) {
                        exceptionToLogAndSend = new MessageHandlingException(this.requestMessage, "failed to handle a message in the [" + String.valueOf((Object)AsyncAmqpOutboundGateway.this) + "]", (Throwable)ex);
                        if (replyMessageBuilder != null) {
                            exceptionToLogAndSend = new MessagingException(replyMessageBuilder.build(), (Throwable)exceptionToLogAndSend);
                        }
                    }
                    AsyncAmqpOutboundGateway.this.logger.error((Throwable)exceptionToLogAndSend, () -> "Failed to send async reply: " + message.toString());
                    AsyncAmqpOutboundGateway.this.sendErrorMessage(this.requestMessage, exceptionToLogAndSend);
                }
            } else {
                Throwable exceptionToSend = throwable;
                if (throwable instanceof AmqpReplyTimeoutException) {
                    if (AsyncAmqpOutboundGateway.this.getRequiresReply()) {
                        exceptionToSend = new ReplyRequiredException(this.requestMessage, "Timeout on async request/reply", throwable);
                    } else {
                        AsyncAmqpOutboundGateway.this.logger.debug(() -> "Reply not required and async timeout for " + String.valueOf(this.requestMessage));
                        return;
                    }
                }
                if (throwable instanceof AmqpMessageReturnedException) {
                    AmqpMessageReturnedException amre = (AmqpMessageReturnedException)throwable;
                    MessageChannel returnChannel = AsyncAmqpOutboundGateway.this.getReturnChannel();
                    if (returnChannel != null) {
                        Message<?> returnedMessage = AsyncAmqpOutboundGateway.this.buildReturnedMessage(new ReturnedMessage(amre.getReturnedMessage(), amre.getReplyCode(), amre.getReplyText(), amre.getExchange(), amre.getRoutingKey()), AsyncAmqpOutboundGateway.this.messageConverter);
                        AsyncAmqpOutboundGateway.this.sendOutput(returnedMessage, returnChannel, true);
                    }
                    this.correlationData.setReturned(amre.getReturned());
                    this.correlationData.getFuture().complete(new CorrelationData.Confirm(true, null));
                } else {
                    AsyncAmqpOutboundGateway.this.sendErrorMessage(this.requestMessage, exceptionToSend);
                }
            }
        }
    }
}

