/*
 * Decompiled with CFR 0.152.
 */
package net.oschina.j2cache.cluster;

import java.util.List;
import java.util.Properties;
import net.oschina.j2cache.CacheProviderHolder;
import net.oschina.j2cache.Command;
import net.oschina.j2cache.cluster.ClusterPolicy;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RocketMQClusterPolicy
implements ClusterPolicy,
MessageListenerConcurrently {
    private static final Logger log = LoggerFactory.getLogger(RocketMQClusterPolicy.class);
    private int LOCAL_COMMAND_ID = Command.genRandomSrc();
    private CacheProviderHolder holder;
    private String hosts;
    private String topic;
    private DefaultMQProducer producer;
    private DefaultMQPushConsumer consumer;

    public RocketMQClusterPolicy(Properties props) {
        this.hosts = props.getProperty("hosts");
        String groupName = props.getProperty("name", "j2cache");
        this.topic = props.getProperty("topic", "j2cache");
        this.producer = new DefaultMQProducer(groupName);
        this.producer.setNamesrvAddr(this.hosts);
        this.consumer = new DefaultMQPushConsumer(groupName);
        this.consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        this.consumer.setNamesrvAddr(this.hosts);
        this.consumer.setMessageModel(MessageModel.BROADCASTING);
    }

    @Override
    public boolean isLocalCommand(Command cmd) {
        return cmd.getSrc() == this.LOCAL_COMMAND_ID;
    }

    @Override
    public void evict(String region, String ... keys) {
        this.holder.getLevel1Cache(region).evict(keys);
    }

    @Override
    public void clear(String region) {
        this.holder.getLevel1Cache(region).clear();
    }

    @Override
    public void connect(Properties props, CacheProviderHolder holder) {
        this.holder = holder;
        try {
            this.producer.start();
            this.publish(Command.join());
            this.consumer.subscribe(this.topic, "*");
            this.consumer.registerMessageListener((MessageListenerConcurrently)this);
            this.consumer.start();
        }
        catch (MQClientException e) {
            log.error("Failed to start producer", (Throwable)e);
        }
    }

    @Override
    public void publish(Command cmd) {
        cmd.setSrc(this.LOCAL_COMMAND_ID);
        Message msg = new Message(this.topic, "", "", cmd.json().getBytes());
        try {
            this.producer.send(msg);
        }
        catch (Exception e) {
            log.error("Failed to publish {} to RocketMQ", (Object)cmd.json(), (Object)e);
        }
    }

    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext context) {
        for (MessageExt msg : list) {
            this.handleCommand(Command.parse(new String(msg.getBody())));
        }
        return null;
    }

    @Override
    public void disconnect() {
        try {
            this.publish(Command.quit());
        }
        finally {
            this.producer.shutdown();
            this.consumer.shutdown();
        }
    }
}

