/*
 * Decompiled with CFR 0.152.
 */
package com.lenovo.cloud.framework.mq.redis.core.job;

import cn.hutool.core.collection.CollUtil;
import com.lenovo.cloud.framework.mq.redis.core.RedisMQTemplate;
import com.lenovo.cloud.framework.mq.redis.core.stream.AbstractRedisStreamMessageListener;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import lombok.Generated;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.domain.Range;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.PendingMessages;
import org.springframework.data.redis.connection.stream.PendingMessagesSummary;
import org.springframework.data.redis.connection.stream.Record;
import org.springframework.data.redis.connection.stream.StreamRecords;
import org.springframework.data.redis.core.StreamOperations;
import org.springframework.scheduling.annotation.Scheduled;

public class RedisPendingMessageResendJob {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(RedisPendingMessageResendJob.class);
    private static final String LOCK_KEY = "redis:pending:msg:lock";
    private static final int EXPIRE_TIME = 300;
    private final List<AbstractRedisStreamMessageListener<?>> listeners;
    private final RedisMQTemplate redisTemplate;
    private final String groupName;
    private final RedissonClient redissonClient;

    @Scheduled(cron="35 * * * * ?")
    public void messageResend() {
        RLock rLock = this.redissonClient.getLock(LOCK_KEY);
        if (rLock.tryLock()) {
            try {
                this.execute();
            }
            catch (Exception exception) {
                log.error("[messageResend][\u6267\u884c\u5f02\u5e38]", (Throwable)exception);
            }
            finally {
                rLock.unlock();
            }
        }
    }

    private void execute() {
        StreamOperations streamOperations = this.redisTemplate.getRedisTemplate().opsForStream();
        this.listeners.forEach(abstractRedisStreamMessageListener -> {
            PendingMessagesSummary pendingMessagesSummary = Objects.requireNonNull(streamOperations.pending((Object)abstractRedisStreamMessageListener.getStreamKey(), this.groupName));
            Map map = pendingMessagesSummary.getPendingMessagesPerConsumer();
            map.forEach((string, l) -> {
                log.info("[processPendingMessage][\u6d88\u8d39\u8005({}) \u6d88\u606f\u6570\u91cf({})]", string, l);
                PendingMessages pendingMessages = streamOperations.pending((Object)abstractRedisStreamMessageListener.getStreamKey(), Consumer.from((String)this.groupName, (String)string), Range.unbounded(), l.longValue());
                if (pendingMessages.isEmpty()) {
                    return;
                }
                pendingMessages.forEach(pendingMessage -> {
                    Object l = pendingMessage.getElapsedTimeSinceLastDelivery().getSeconds();
                    if (l < 300L) {
                        return;
                    }
                    List list = streamOperations.range((Object)abstractRedisStreamMessageListener.getStreamKey(), Range.of((Range.Bound)Range.Bound.inclusive((Object)pendingMessage.getIdAsString()), (Range.Bound)Range.Bound.inclusive((Object)pendingMessage.getIdAsString())));
                    if (CollUtil.isEmpty((Collection)list)) {
                        return;
                    }
                    this.redisTemplate.getRedisTemplate().opsForStream().add((Record)StreamRecords.newRecord().ofObject((Object)((Map)((MapRecord)list.get(0)).getValue())).withStreamKey((Object)abstractRedisStreamMessageListener.getStreamKey()));
                    this.redisTemplate.getRedisTemplate().opsForStream().acknowledge(this.groupName, (Record)list.get(0));
                    log.info("[processPendingMessage][\u6d88\u606f({})\u91cd\u65b0\u6295\u9012\u6210\u529f]", (Object)((MapRecord)list.get(0)).getId());
                });
            });
        });
    }

    @Generated
    public RedisPendingMessageResendJob(List<AbstractRedisStreamMessageListener<?>> list, RedisMQTemplate redisMQTemplate, String string, RedissonClient redissonClient) {
        this.listeners = list;
        this.redisTemplate = redisMQTemplate;
        this.groupName = string;
        this.redissonClient = redissonClient;
    }
}

