/*
 * Decompiled with CFR 0.152.
 */
package com.weibo.api.motan.transport.netty;

import com.weibo.api.motan.common.URLParamType;
import com.weibo.api.motan.exception.MotanErrorMsgConstant;
import com.weibo.api.motan.exception.MotanFrameworkException;
import com.weibo.api.motan.exception.MotanServiceException;
import com.weibo.api.motan.rpc.DefaultResponse;
import com.weibo.api.motan.rpc.Request;
import com.weibo.api.motan.rpc.Response;
import com.weibo.api.motan.rpc.RpcContext;
import com.weibo.api.motan.transport.Channel;
import com.weibo.api.motan.transport.MessageHandler;
import com.weibo.api.motan.util.LoggerUtil;
import com.weibo.api.motan.util.MotanFrameworkUtil;
import com.weibo.api.motan.util.NetUtils;
import com.weibo.api.motan.util.StatisticCallback;
import java.net.SocketAddress;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicInteger;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelFutureListener;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelHandler;

public class NettyChannelHandler
extends SimpleChannelHandler
implements StatisticCallback {
    private ThreadPoolExecutor threadPoolExecutor;
    private MessageHandler messageHandler;
    private Channel serverChannel;
    private AtomicInteger rejectCounter = new AtomicInteger(0);

    public NettyChannelHandler(Channel serverChannel) {
        this.serverChannel = serverChannel;
    }

    public NettyChannelHandler(Channel serverChannel, MessageHandler messageHandler) {
        this.serverChannel = serverChannel;
        this.messageHandler = messageHandler;
    }

    public NettyChannelHandler(Channel serverChannel, MessageHandler messageHandler, ThreadPoolExecutor threadPoolExecutor) {
        this.serverChannel = serverChannel;
        this.messageHandler = messageHandler;
        this.threadPoolExecutor = threadPoolExecutor;
    }

    public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
        LoggerUtil.info((String)("NettyChannelHandler channelConnected: remote=" + ctx.getChannel().getRemoteAddress() + " local=" + ctx.getChannel().getLocalAddress() + " event=" + e.getClass().getSimpleName()));
    }

    public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
        LoggerUtil.info((String)("NettyChannelHandler channelDisconnected: remote=" + ctx.getChannel().getRemoteAddress() + " local=" + ctx.getChannel().getLocalAddress() + " event=" + e.getClass().getSimpleName()));
    }

    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
        Object message = e.getMessage();
        if (message instanceof Request) {
            this.processRequest(ctx, e);
        } else if (message instanceof Response) {
            this.processResponse(ctx, e);
        } else {
            LoggerUtil.error((String)("NettyChannelHandler messageReceived type not support: class=" + message.getClass()));
            throw new MotanFrameworkException("NettyChannelHandler messageReceived type not support: class=" + message.getClass());
        }
    }

    private void processRequest(final ChannelHandlerContext ctx, MessageEvent e) {
        final Request request = (Request)e.getMessage();
        request.setAttachment(URLParamType.host.getName(), NetUtils.getHostName((SocketAddress)ctx.getChannel().getRemoteAddress()));
        final long processStartTime = System.currentTimeMillis();
        try {
            this.threadPoolExecutor.execute(new Runnable(){

                @Override
                public void run() {
                    try {
                        MotanFrameworkUtil.logEvent((Request)request, (String)"TRACE_SEXECUTOR_START");
                        RpcContext.init((Request)request);
                        NettyChannelHandler.this.processRequest(ctx, request, processStartTime);
                    }
                    finally {
                        RpcContext.destroy();
                    }
                }
            });
        }
        catch (RejectedExecutionException rejectException) {
            DefaultResponse response = new DefaultResponse();
            response.setRequestId(request.getRequestId());
            response.setException((Exception)((Object)new MotanServiceException("process thread pool is full, reject", MotanErrorMsgConstant.SERVICE_REJECT)));
            response.setProcessTime(System.currentTimeMillis() - processStartTime);
            e.getChannel().write((Object)response);
            LoggerUtil.warn((String)"process thread pool is full, reject, active={} poolSize={} corePoolSize={} maxPoolSize={} taskCount={} requestId={}", (Object[])new Object[]{this.threadPoolExecutor.getActiveCount(), this.threadPoolExecutor.getPoolSize(), this.threadPoolExecutor.getCorePoolSize(), this.threadPoolExecutor.getMaximumPoolSize(), this.threadPoolExecutor.getTaskCount(), request.getRequestId()});
            this.rejectCounter.incrementAndGet();
        }
    }

    private void processRequest(ChannelHandlerContext ctx, Request request, long processStartTime) {
        ChannelFuture channelFuture;
        Object result;
        try {
            result = this.messageHandler.handle(this.serverChannel, (Object)request);
        }
        catch (Exception e) {
            LoggerUtil.error((String)("NettyChannelHandler processRequest fail!request:" + MotanFrameworkUtil.toString((Request)request)), (Throwable)e);
            result = MotanFrameworkUtil.buildErrorResponse((long)request.getRequestId(), (Exception)((Object)new MotanServiceException("process request fail. errmsg:" + e.getMessage())));
        }
        if (result instanceof Response) {
            MotanFrameworkUtil.logEvent((Response)((Response)result), (String)"TRACE_PROCESS");
        }
        final DefaultResponse response = !(result instanceof DefaultResponse) ? new DefaultResponse(result) : (DefaultResponse)result;
        response.setRequestId(request.getRequestId());
        response.setProcessTime(System.currentTimeMillis() - processStartTime);
        if (ctx.getChannel().isConnected() && (channelFuture = ctx.getChannel().write((Object)response)) != null) {
            channelFuture.addListener(new ChannelFutureListener(){

                public void operationComplete(ChannelFuture future) throws Exception {
                    MotanFrameworkUtil.logEvent((Response)response, (String)"TRACE_SSEND", (long)System.currentTimeMillis());
                    response.onFinish();
                }
            });
        }
    }

    private void processResponse(ChannelHandlerContext ctx, MessageEvent e) {
        this.messageHandler.handle(this.serverChannel, e.getMessage());
    }

    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
        LoggerUtil.error((String)("NettyChannelHandler exceptionCaught: remote=" + ctx.getChannel().getRemoteAddress() + " local=" + ctx.getChannel().getLocalAddress() + " event=" + e.getCause()), (Throwable)e.getCause());
        ctx.getChannel().close();
    }

    public String statisticCallback() {
        int count = this.rejectCounter.getAndSet(0);
        if (count > 0) {
            return String.format("type: motan name: reject_request_pool total_count: %s reject_count: %s", this.threadPoolExecutor.getPoolSize(), count);
        }
        return null;
    }
}

