I am using spring data Redis 2.5.7 to consume the stream messages from Redis, this is my consumer code looks like:
package com.dolphin.soa.post.common.mq;
import com.alibaba.fastjson.JSON;
import com.dolphin.soa.post.contract.request.ArticleRequest;
import com.dolphin.soa.post.model.entity.SubRelation;
import com.dolphin.soa.post.service.IArticleService;
import com.dolphin.soa.post.service.ISubRelationService;
import lombok.extern.slf4j.Slf4j;
import misc.enumn.user.SubStatus;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.collections4.MapUtils;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.core.DefaultTypedTuple;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.ZSetOperations;
import org.springframework.data.redis.stream.StreamListener;
import org.springframework.stereotype.Component;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
/**
* @author dolphin
*/
@Component
@Slf4j
public class StreamMessageListener implements StreamListener<String, MapRecord<String, String, String>> {
@Value("${dolphin.redis.stream.group}")
private String groupName;
@Value("${dolphin.redis.user.sub.article.key}")
private String subArticleKey;
private final StringRedisTemplate stringRedisTemplate;
private final RedisTemplate<String, Object> articleRedisTemplate;
private final RedisTemplate<String, Long> redisLongTemplate;
private final ISubRelationService subRelationService;
private final IArticleService articleService;
public StreamMessageListener(StringRedisTemplate stringRedisTemplate,
@Qualifier("redisObjectTemplate") RedisTemplate<String, Object> articleRedisTemplate,
ISubRelationService subRelationService,
@Qualifier("redisLongTemplate") RedisTemplate<String, Long> redisLongTemplate,
IArticleService articleService) {
this.stringRedisTemplate = stringRedisTemplate;
this.articleRedisTemplate = articleRedisTemplate;
this.subRelationService = subRelationService;
this.redisLongTemplate = redisLongTemplate;
this.articleService = articleService;
}
@Override
public void onMessage(MapRecord<String, String, String> message) {
try {
Map<String, String> body = message.getValue();
log.debug("receive message from redis:" + JSON.toJSONString(body));
handleArticle(body);
this.stringRedisTemplate.opsForStream().acknowledge(groupName, message);
} catch (Exception e) {
log.error("handle redis stream message error", e);
}
}
private void handleArticle(Map<String, String> body) {
Long channelId = MapUtils.getLongValue(body, "sub_source_id", -1L);
Long articleId = MapUtils.getLongValue(body, "id", -1L);
if (channelId <= 0L || articleId <= 0L) {
log.error("id incorrect", body);
return;
}
ArticleRequest articleRequest = new ArticleRequest();
articleRequest.setChannelId(channelId);
articleRequest.setSubStatus(SubStatus.SUB);
// TODO: may be page should avoid memory overflow, this is dangerous when the subscribe user increase
List<SubRelation> relations = subRelationService.list(articleRequest);
if (CollectionUtils.isEmpty(relations)) {
return;
}
relations.forEach(item -> {
var articleIdsSet = new HashSet<ZSetOperations.TypedTuple<Long>>();
ZSetOperations.TypedTuple<Long> singleId = new DefaultTypedTuple<>(articleId, Double.valueOf(articleId));
articleIdsSet.add(singleId);
/**
* because now we only have less than 2GB memory
* the redis stream only pass the article id and channel id not the full article
* at this procedure has a extern stop 1 query from database
*/
articleService.getArticleFromCache(Arrays.asList(articleId));
String userSubCacheKey = subArticleKey + item.getUserId();
Boolean isKeyExists = redisLongTemplate.hasKey(userSubCacheKey);
if (isKeyExists) {
/**
* only the user is active recently
* the redis will cache the user subscribe list then we push the newest article
* to the subscribe user one by one(may be we should make the operation less)
* when the channel subscribe user increment
* this may become a performance neck bottle
*/
redisLongTemplate.opsForZSet().add(userSubCacheKey, articleIdsSet);
}
});
}
}
I am facing a problem that the Redis stream have message but the spring data Redis consumer did not consume it. when I am using this command to check the stream message:
> XINFO STREAM pydolphin:stream:article
length
45
radix-tree-keys
1
radix-tree-nodes
2
last-generated-id
1652101310592-0
groups
1
first-entry
1652083221122-0
id
2288687
sub_source_id
4817
last-entry
1652101310592-0
id
2288731
sub_source_id
4792
it shows that there have some message but the consumer did not consume it. why did this happen? what should I do to fix this problem?Sometimes it will consume one or more message but not all the message will be consumed. It always have many message in the queue. I have already tried to use this command to check the pending message:
XPENDING pydolphin:stream:article pydolphin:stream:group:article - + 20
it only return 1 pending message. but the stream queue have 40+ message.