0
votes

I am using spring data Redis 2.5.7 to consume the stream messages from Redis, this is my consumer code looks like:

package com.dolphin.soa.post.common.mq;

import com.alibaba.fastjson.JSON;
import com.dolphin.soa.post.contract.request.ArticleRequest;
import com.dolphin.soa.post.model.entity.SubRelation;
import com.dolphin.soa.post.service.IArticleService;
import com.dolphin.soa.post.service.ISubRelationService;
import lombok.extern.slf4j.Slf4j;
import misc.enumn.user.SubStatus;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.collections4.MapUtils;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.core.DefaultTypedTuple;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.ZSetOperations;
import org.springframework.data.redis.stream.StreamListener;
import org.springframework.stereotype.Component;

import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;

/**
 * @author dolphin
 */
@Component
@Slf4j
public class StreamMessageListener implements StreamListener<String, MapRecord<String, String, String>> {

    @Value("${dolphin.redis.stream.group}")
    private String groupName;

    @Value("${dolphin.redis.user.sub.article.key}")
    private String subArticleKey;

    private final StringRedisTemplate stringRedisTemplate;

    private final RedisTemplate<String, Object> articleRedisTemplate;

    private final RedisTemplate<String, Long> redisLongTemplate;

    private final ISubRelationService subRelationService;

    private final IArticleService articleService;

    public StreamMessageListener(StringRedisTemplate stringRedisTemplate,
                                 @Qualifier("redisObjectTemplate") RedisTemplate<String, Object> articleRedisTemplate,
                                 ISubRelationService subRelationService,
                                 @Qualifier("redisLongTemplate") RedisTemplate<String, Long> redisLongTemplate,
                                 IArticleService articleService) {
        this.stringRedisTemplate = stringRedisTemplate;
        this.articleRedisTemplate = articleRedisTemplate;
        this.subRelationService = subRelationService;
        this.redisLongTemplate = redisLongTemplate;
        this.articleService = articleService;
    }

    @Override
    public void onMessage(MapRecord<String, String, String> message) {
        try {
            Map<String, String> body = message.getValue();
            log.debug("receive message from redis:" + JSON.toJSONString(body));
            handleArticle(body);
            this.stringRedisTemplate.opsForStream().acknowledge(groupName, message);
        } catch (Exception e) {
            log.error("handle redis stream message error", e);
        }
    }

    private void handleArticle(Map<String, String> body) {
        Long channelId = MapUtils.getLongValue(body, "sub_source_id", -1L);
        Long articleId = MapUtils.getLongValue(body, "id", -1L);
        if (channelId <= 0L || articleId <= 0L) {
            log.error("id incorrect", body);
            return;
        }
        ArticleRequest articleRequest = new ArticleRequest();
        articleRequest.setChannelId(channelId);
        articleRequest.setSubStatus(SubStatus.SUB);
        // TODO: may be page should avoid memory overflow, this is dangerous when the subscribe user increase
        List<SubRelation> relations = subRelationService.list(articleRequest);
        if (CollectionUtils.isEmpty(relations)) {
            return;
        }
        relations.forEach(item -> {
            var articleIdsSet = new HashSet<ZSetOperations.TypedTuple<Long>>();
            ZSetOperations.TypedTuple<Long> singleId = new DefaultTypedTuple<>(articleId, Double.valueOf(articleId));
            articleIdsSet.add(singleId);
            /**
             *  because now we only have less than 2GB memory
             *  the redis stream only pass the article id and channel id not the full article
             *  at this procedure has a extern stop 1 query from database
             */
            articleService.getArticleFromCache(Arrays.asList(articleId));
            String userSubCacheKey = subArticleKey + item.getUserId();
            Boolean isKeyExists = redisLongTemplate.hasKey(userSubCacheKey);
            if (isKeyExists) {
                /**
                 * only the user is active recently
                 * the redis will cache the user subscribe list then we push the newest article
                 * to the subscribe user one by one(may be we should make the operation less)
                 * when the channel subscribe user increment
                 * this may become a performance neck bottle
                 */
                redisLongTemplate.opsForZSet().add(userSubCacheKey, articleIdsSet);
            }
        });
    }
}

I am facing a problem that the Redis stream have message but the spring data Redis consumer did not consume it. when I am using this command to check the stream message:

> XINFO STREAM pydolphin:stream:article
length
45
radix-tree-keys
1
radix-tree-nodes
2
last-generated-id
1652101310592-0
groups
1
first-entry
1652083221122-0
id
2288687
sub_source_id
4817
last-entry
1652101310592-0
id
2288731
sub_source_id
4792

it shows that there have some message but the consumer did not consume it. why did this happen? what should I do to fix this problem?Sometimes it will consume one or more message but not all the message will be consumed. It always have many message in the queue. I have already tried to use this command to check the pending message:

XPENDING pydolphin:stream:article pydolphin:stream:group:article - + 20

it only return 1 pending message. but the stream queue have 40+ message.