package com.lenovo.cloud.framework.mq.redis.core.stream;

import cn.hutool.core.util.TypeUtil;
import com.lenovo.cloud.framework.common.util.json.JsonUtils;
import com.lenovo.cloud.framework.mq.redis.core.RedisMQTemplate;
import com.lenovo.cloud.framework.mq.redis.core.interceptor.RedisMessageInterceptor;
import com.lenovo.cloud.framework.mq.redis.core.message.AbstractRedisMessage;
import com.lenovo.cloud.framework.mq.redis.core.stream.AbstractRedisStreamMessage;
import java.lang.reflect.Type;
import java.util.List;
import lombok.Generated;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.connection.stream.ObjectRecord;
import org.springframework.data.redis.stream.StreamListener;

/* loaded from: input_file:com/lenovo/cloud/framework/mq/redis/core/stream/AbstractRedisStreamMessageListener.class */
public abstract class AbstractRedisStreamMessageListener<T extends AbstractRedisStreamMessage> implements StreamListener<String, ObjectRecord<String, String>> {
    private final Class<T> messageType;
    private final String streamKey;

    @Value("${spring.application.name}")
    private String group;
    private RedisMQTemplate redisMQTemplate;
    static final /* synthetic */ boolean $assertionsDisabled;

    protected AbstractRedisStreamMessageListener() {
        this.messageType = getMessageClass();
        this.streamKey = this.messageType.getDeclaredConstructor(new Class[0]).newInstance(new Object[0]).getStreamKey();
    }

    /* JADX WARN: Multi-variable type inference failed */
    public void onMessage(ObjectRecord<String, String> objectRecord) {
        AbstractRedisStreamMessage abstractRedisStreamMessage = (AbstractRedisStreamMessage) JsonUtils.parseObject((String) objectRecord.getValue(), this.messageType);
        try {
            consumeMessageBefore(abstractRedisStreamMessage);
            onMessage((AbstractRedisStreamMessageListener<T>) abstractRedisStreamMessage);
            this.redisMQTemplate.getRedisTemplate().opsForStream().acknowledge(this.group, objectRecord);
        } finally {
            consumeMessageAfter(abstractRedisStreamMessage);
        }
    }

    public abstract void onMessage(T t);

    private Class<T> getMessageClass() {
        Type typeArgument = TypeUtil.getTypeArgument(getClass(), 0);
        if (typeArgument == null) {
            throw new IllegalStateException(String.format("类型(%s) 需要设置消息类型", getClass().getName()));
        }
        return (Class) typeArgument;
    }

    private void consumeMessageBefore(AbstractRedisMessage abstractRedisMessage) {
        if (!$assertionsDisabled && this.redisMQTemplate == null) {
            throw new AssertionError();
        }
        this.redisMQTemplate.getInterceptors().forEach(redisMessageInterceptor -> {
            redisMessageInterceptor.consumeMessageBefore(abstractRedisMessage);
        });
    }

    private void consumeMessageAfter(AbstractRedisMessage abstractRedisMessage) {
        if (!$assertionsDisabled && this.redisMQTemplate == null) {
            throw new AssertionError();
        }
        List<RedisMessageInterceptor> interceptors = this.redisMQTemplate.getInterceptors();
        for (int size = interceptors.size() - 1; size >= 0; size--) {
            interceptors.get(size).consumeMessageAfter(abstractRedisMessage);
        }
    }

    @Generated
    public String getStreamKey() {
        return this.streamKey;
    }

    @Generated
    public String getGroup() {
        return this.group;
    }

    @Generated
    public AbstractRedisStreamMessageListener<T> setRedisMQTemplate(RedisMQTemplate redisMQTemplate) {
        this.redisMQTemplate = redisMQTemplate;
        return this;
    }

    static {
        $assertionsDisabled = !AbstractRedisStreamMessageListener.class.desiredAssertionStatus();
    }
}
