/*
 * Decompiled with CFR 0.152.
 */
package com.lambdaworks.redis.protocol;

import com.lambdaworks.redis.protocol.Command;
import com.lambdaworks.redis.protocol.RedisStateMachine;
import java.util.concurrent.BlockingQueue;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBufferFactory;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelHandler;

public class CommandHandler<K, V>
extends SimpleChannelHandler {
    protected BlockingQueue<Command<K, V, ?>> queue;
    protected ChannelBuffer buffer;
    protected RedisStateMachine<K, V> rsm;

    public CommandHandler(BlockingQueue<Command<K, V, ?>> queue) {
        this.queue = queue;
    }

    public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
        this.buffer = ChannelBuffers.dynamicBuffer((ChannelBufferFactory)ctx.getChannel().getConfig().getBufferFactory());
        this.rsm = new RedisStateMachine();
    }

    public void writeRequested(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
        Command cmd = (Command)e.getMessage();
        Channel channel = ctx.getChannel();
        ChannelBuffer buf = ChannelBuffers.dynamicBuffer((ChannelBufferFactory)channel.getConfig().getBufferFactory());
        cmd.encode(buf);
        Channels.write((ChannelHandlerContext)ctx, (ChannelFuture)e.getFuture(), (Object)buf);
    }

    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
        ChannelBuffer input = (ChannelBuffer)e.getMessage();
        if (!input.readable()) {
            return;
        }
        this.buffer.discardReadBytes();
        this.buffer.writeBytes(input);
        this.decode(ctx, this.buffer);
    }

    protected void decode(ChannelHandlerContext ctx, ChannelBuffer buffer) throws InterruptedException {
        while (!this.queue.isEmpty() && this.rsm.decode(buffer, ((Command)this.queue.peek()).getOutput())) {
            Command<K, V, ?> cmd = this.queue.take();
            cmd.complete();
        }
    }
}

