Posted by lily's blog on May 9, 2025

本文基于Feed流架构参考,记录从学习到实现一个可以承载小流量的仿微博Feed流系统的过程。

Feed流

参考文章:

  1. 掘金: 使用redis实现feed流
  2. 即刻:自主实现了一个feed流系统
  3. 微博feed流系统架构
  4. Feed流系统架构理论
  5. IM开发技术学习:揭秘微信朋友圈这种信息推流背后的系统设计

    什么是 feed 流

信息推流(以下简称“Feed流”)这种功能在我们手机APP中几乎无处不在(尤其是社交/社群产品中),最常用的就是微信朋友圈、新浪微博等。

  • Feed:Feed 流中的每一条状态或者消息都是 Feed,比如微博中的一条微博就是一个 Feed。
  • Feed流:持续更新并呈现给用户内容的信息流。每个人的朋友圈,微博关注页等等都是一个 Feed 流。
    • 对Feed流的释义,可以简单理解为只要大拇指不停地往下划手机屏幕,就有一条条的信息不断涌现出来。就像给牲畜喂饲料一样,只要它吃光了就要不断再往里加,故此得名Feed(饲养)。

feed 流分类

按帖子类型分类:

  • Timeline:按发布的时间顺序排序,产品如果选择 Timeline 类型,那么就是认为 Feed 流中的 Feed 不多,但是每个 Feed 都很重要,都需要用户看到。类似于微信朋友圈,微博等。
  • Rank:按某个非时间的因子排序,一般是按照用户的喜好度排序,一般用于新闻推荐类、商品推荐等。

按Feed流效果分类:

  • 基于算法:即个人主页feed流面依赖动态算法推荐,比如今日头条、抖音短视频;
  • 基于关注:即社交/好友关系,比如微信、知乎。

Feed流推送行为

关于feed流推送,一般有两种 推/拉

拉模式

每一个内容发布者都有一个自己的发件箱(“我发布的内容”),每当我们发出一个新帖子,都存入自己的发件箱中。当我们的粉丝来阅读时,系统首先需要拿到粉丝关注的所有人,然后遍历所有发布者的发件箱,取出他们所发布的帖子,然后依据发布时间排序,展示给阅读者(聚合到个人主页广场)。

这种设计: 阅读者读一次Feed流,后台会扩散为N次读操作(N等于关注的人数)以及一次聚合操作,因此称为读扩散。每次读Feed流相当于去关注者的收件箱主动拉取帖子,因此也得名——拉模式。

优缺点:

  • 好处是:底层存储简单,没有空间浪费。
  • 坏处是:每次读操作会非常重,读操作多。 设想一下:如果我关注的人数非常多,遍历一遍我所关注的所有人,并且再聚合一下,这个系统开销会非常大,时延上可能达到无法忍受的地步。
    拉模式还有一个比较大的缺点:就是分页不方便,我们刷微博或朋友圈,肯定是随着大拇指在屏幕不断划动,内容一页一页的从后台拉取。如果不做其他优化,只采用实时聚合的方式,下滑到比较靠后的页码时会非常麻烦。 因此:读扩散主要适用系统中阅读者关注的人没那么多,并且刷Feed流并不频繁的场景

推模式

系统中每个用户除了有发件箱,也会有自己的收件箱。当发布者发表一篇帖子的时候,除了往自己发件箱记录一下之外,还会遍历发布者的所有粉丝,往这些粉丝的收件箱也投放一份相同内容。这样阅读者来读Feed流时,直接从自己的收件箱读取即可。

这种设计:每次发表帖子,都会扩散为M次写操作(M等于自己的粉丝数),因此成为写扩散。每篇帖子都会主动推送到所有粉丝的收件箱,因此也得名推模式

特点:

  • 可优化为异步化处理:发一篇帖子,背后会涉及到很多次的写操作。通常为了发帖人的用户体验,当发布的帖子写到自己发件箱时,就可以返回发布成功。后台另外起一个异步任务,不慌不忙地往粉丝收件箱投递帖子即可。
    • 但由于写扩散是异步操作,写的太慢会导致帖子发出去半天,有些粉丝依然没能看见,这种体验也不太好。
  • 以空间换时间:写扩散的好处在于通过数据冗余(一篇帖子会被存储M份副本),减少了拉模式获取消息的时间,提升了阅读者的用户体验。通常适当的数据冗余不是什么问题,但是到了微博明星这里,完全行不通。比如目前微博粉丝量Top2的谢娜与何炅,两个人微博粉丝过亿。

    推拉结合模式(重点)

【在本项目中,将用户账号区分为普通用户账号和官方账号(类似于微博中的大V账号,例如学生会官号、学院消息官号【学校消息会推送每日是否需要早点到/晨读的消息】都为大V账号,特定学生群体都会关注),对于不同类型的账号分别使用推/拉结合模式】

当用户发布一条新的 Feed 时,处理流程如下:

  1. 先从关注列表中读取到自己的粉丝列表,以及判断自己是否是大V。
  2. 将自己的Feed消息写入个人页Timeline。
    1. 如果是大V,写入流程到此就结束了。(对于大V用户发帖,普通用户读取时采用拉模式
    2. 如果是普通用户,还需要将自己的Feed消息写给自己的粉丝,如果有100个粉丝,那么就要写给100个用户。(普通用户发帖采用推模式)

当刷新自己的Feed流的时候,处理流程如下:

  1. 先去读取自己关注的大V列表,如果有关注的大V,则需要按截止的Timeline并发读取每一个大V的发件箱,如果关注了10个大V,那么则需要10次访问。
  2. 去读取自己的收件箱。
  3. 合并前2步的结果,然后按时间排序,返回给用户。

至此,使用推拉结合方式的发布、读取Feed流的流程都结束了。

设计一个 Feed 流系统

设计一个 Feed 流系统,两个关键步骤,一个是 Feed 流的 初始化,一个是 推送。关于 Feed 流的存储其实也是一个核心的点,普通企业级别feed流系统持久化可以使用 MySQL,后续可以考虑优化。

Feed 流初始化

Feed 流的初始化指的是,当用户的 Feed 流还不存在的时候,为该用户创建一个属于他自己的关注页 Feed 流。

维护活跃用户列表
  1. 我们使用redis维护一个在线用户列表,当用户打开APP后三个小时内均算在线(三个小时外的用户被移除列表,用zset实现)。
  2. 当用户打开app后查看自己是不是在在线列表里,如果是的话直接返回feed流中的内容(已经初始化过)。如果判断为离线,这时需要进行feed流初始化。
redis初始化
  1. 初始化数据:在线用户的关注用户id列表需要从数据库中加载出来。
  2. key 值:sortSet 的 key 值需要使用当前用户的 id 做标识。
  3. score 值:如果是 Timeline 类型,直接取每条帖子创建的时间戳即可。(如果是 rank 类型,则把你的业务对应的权重值设进去。)(项目中直接使用timeline类型)
  4. 如何初始化呢?首先先遍历一遍个人关注列表,分页取出所有关注用户的帖子发件箱中,一定数量的最新帖子 ,将每条帖子的 feedId 存放到 redis 的 sortSet 中,这样有个好处,如果用户在线就不用遍历关注者列表获取动态了(尤其是当关注者很多的时候,这是个很费时间的过程)。
组装帖子
  1. 接下来,再根据 Feed ID 列表获取对应的 Feed 内容,如帖子的文字、视频、发表时间、源帖子 ID 等。
  2. 再进一步获取所有微博的发表者 user 详细信息、源帖子内容等信息,并进行内容组装。
  3. 再根据帖子ID获取用户对这些帖子的收藏、赞等状态,并设置到对应帖子中。最后,获取这些 Feed 的转发数、评论数、赞数等,并进行计数组装。至此,Feed 流获取处理完毕,Feed 列表以 JSON 形式返回给前端,用户刷新个人首页成功完成。

这里面有几个关键点:

  1. feed流不是无限长的(主要是穷,其次是没必要),例如类似朋友圈功能的feed系统可以保存500个,500个也不少了,一次返回10个要往下刷新50次才能到。超过500的也不存数据库了直接舍弃。【5000用户每人一个长度500的zset作为feed流,key 30B,score 13B,value 18B ,占用大小为958.93MB】
  2. 需要每隔一段时间主动维护在线用户zset。

    推送

经过上面的初始化,已经把 feed 流放在了 redis 缓存中了,当用户点进个人feed流主页时就可以通过。接下来就是需要更新维护 feed 流了,在下面四种情况需要维护redis 中每个用户收件箱的 sortSet 列表:

  1. 关注的用户发布新的 feed。
  2. 关注的用户删除 feed。
  3. 用户新增关注。
  4. 用户取消关注。

仿微博示例demo

我们在初始化feed流结束之后,还会根据用户关注列表以及自己新增/删除帖子的情况动态维护每个在线用户的feed流zset。

初始化 Feed 流

当用户第一进来刷新Feed 流,且 Feed 流还不存在时,我们需要进行初始化,初始化的具体代码如下:核心思想就是从数据库中load出 feed 信息,塞到 zSet 中,然后分页返回。

    /**
     * 获取关注的人的信息流
     */
    public List<FeedDto> listFocusFeed(Long userId, Integer page, Integer size) {
        String focusFeedKey = "focusFeedKey" + userId;

        // 如果 zset 为空,先初始化
        if (!zSetRedisTemplate.exists(focusFeedKey)) {
            initFocusIdeaSet(userId);
        }

        // 如果 zset 存在,但是存在 0 值
        Double zscore = zSetRedisTemplate.zscore(focusFeedKey, "0");
        if (zscore != null && zscore > 0) {
            return null;
        }

        //分页
        int offset = (page - 1) * size;

        long score = System.currentTimeMillis();
        // 按 score 值从大到小从 zSet 中取出 FeedId 集合
        List<String> list = zSetRedisTemplate.zrevrangeByScore(focusFeedKey, score, 0, offset, size);

        List<FeedDto> result = new ArrayList<>();
        if (QlchatUtil.isNotEmpty(list)) {
            for (String s : list) {
                // 根据 feedId 从缓存中 load 出 feed
                FeedDto feedDto = this.loadCache(Long.valueOf(s));
                if (feedDto != null) {
                    result.add(feedDto);
                }
            }
        }
        return result;
    }

    /**
     * 初始化关注的人的信息流 zSet
     */
    private void initFocusFeedSet( Long userId) {
        String focusFeedKey = "focusFeedKey" + userId;
        zSetRedisTemplate.del(focusIdeaKey);

        // 从数据库中加载当前用户关注的人发布过的 Feed
        List<Feed> list = this.feedMapper.listFocusFeed(userId);

        if (QlchatUtil.isEmpty(list)) {
            //保存0,避免空数据频繁查库
            zSetRedisTemplate.zadd(focusFeedKey, 1, "0");
            zSetRedisTemplate.expire(focusFeedKey, RedisKeyConstants.ONE_MINUTE * 5);
            return;
        }

        // 遍历 FeedList,把 FeedId 存到 zSet 中
        for (Feed feed : list) {
            zSetRedisTemplate.zadd(focusFeedKey, feed.getCreateTime().getTime(), feed.getId().toString());
        }

        zSetRedisTemplate.expire(focusFeedKey, 60 * 60 * 60);
    }

关注的用户发布/删除新的 feed

每当用户发布/删除新的 feed,我们需要更新该用户所有的粉丝的feed流zset列表,该步骤一般比较耗时,所以建议异步处理,逻辑删除后就返回成功,为了避免一次性load出太多的粉丝数据,这里采用循环分页查询。为了避免粉丝的 Feed流过大,我们会限制 Feed 流的长度为500,当Feed流长度超过500时,会移除最旧的 Feed。


/**
 * 新增/删除 feed时,处理粉丝 feed 流
 *
 * @param userId 新增/删除 feed的用户id
 * @param feedId 新增/删除 的feedId
 * @param type   feed_add = 新增feed feed_sub = 删除feed
 */
public void handleFeed(Long userId, Long feedId, String type) {

    Integer currentPage = 1;
    Integer size = 1000;
    List<FansDto> fansDtos;

    while (true) {
        Page page = new Page();
        page.setSize(size);
        page.setPage(currentPage);
        fansDtos = this.fansService.listFans(userId, page);

        for (FansDto fansDto : fansDtos) {
            String focusFeedKey = "focusFeedKey" + userId;

            // 如果粉丝 zSet 不存在,退出
            if (!this.zSetRedisTemplate.exists(focusFeedKey)) {
                continue;
            }

            // 新增Feed
            if ("feed_add".equals(type)) {
                this.removeOldestZset(focusFeedKey);
                zSetRedisTemplate.zadd(focusFeedKey, System.currentTimeMillis(), feedId);
            }
            // 删除Feed
            else if ("feed_sub".equals(type)) {
                zSetRedisTemplate.zrem(focusFeedKey, feedId);
            }

        }

        if (fansDtos.size() < size) {
            break;
        }
        currentPage++;
    }

}

/**
 * 删除 zSet 中最旧的数据
 */
private void removeOldestZset(String focusFeedKey){
    // 如果当前 zSet 大于1000,删除最旧的数据
    if (this.zSetRedisTemplate.zcard(focusFeedKey) >= 1000) {
        // 获取当前 zSet 中 score 值最小的
        List<String> zrevrange = this.zSetRedisTemplate.zrevrange(focusFeedKey, -1, -1, String.class);
        if (QlchatUtil.isNotEmpty(zrevrange)) {
            this.zSetRedisTemplate.zrem(focusFeedKey, zrevrange.get(0));
        }
    }
}

用户新增关注/取消关注

这里比较简单,新增/取消关注,把新关注的 Feed 往自己的 Feed流中增加/删除 对应帖子id即可,但是同样需要异步处理。

/**
 * 关注/取关 时,处理followerId的zSet
 *
 * @param followId   被关注的人
 * @param followerId 当前用户
 * @param type       focus = 关注 unfocus = 取关
 */
public void handleFocus( Long followId, Long followerId, String type) {

    String focusFeedKey = "focusFeedKey" + userId;

    // 如果粉丝 zSet 不存在,退出
    if (!this.zSetRedisTemplate.exists(focusFeedKey)) {
        return;
    }
    List<FeedDto> FeedDtos = this.listFeedByFollowId(source, followId);
    for (FeedDto feedDto : FeedDtos) {

        // 关注
        if ("focus".equals(type)) {
            this.removeOldestZset(focusFeedKey);
            this.zSetRedisTemplate.zadd(focusFeedKey, feedDto.getCreateTime().getTime(), feedDto.getId());
        }
        // 取关
        else if ("unfocus".equals(type)) {
            this.zSetRedisTemplate.zrem(focusFeedKey, feedDto.getId());
        }


    }
}

redis pipeline

参考文章:

  1. Redis管道传输
  2. pipeline优化feed流
  3. redis性能优化:理解&使用redis pipeline

    简介

通常情况下,客户端与Redis通信时采用的是Ping-pong网络交互模式,Ping-pong模式是指客户端(Client)发送一个命令后会等待命令的执行结果,在客户端收到服务器端(Server)返回的结果后,再发送下一个命令,以此类推。

Redis也支持Pipeline模式,不同于Ping-pong模式,Pipeline模式类似流水线的工作模式:客户端发送一个命令后无需等待执行结果,会继续发送其他命令;在全部请求发送完毕后,客户端关闭请求,开始接收响应,收到执行结果后再与之前发送的命令按顺序进行一一匹配。在Pipeline模式的具体实现中,大部分Redis客户端采用批处理的方式,即一次发送多个命令,在接收完所有命令执行结果后再返回给上层业务。

使用Pipeline可通过降低网络往返时延(Round-trip time,简称RTT),减少read()write()的系统调用以及进程上下文切换次数,以提升程序的执行效率与性能。

Pipeline在某些场景下非常有效,例如有多个操作命令需要被迅速提交至服务器端,但用户并不依赖每个操作返回的响应结果,对结果响应也无需立即获得,那么Pipeline就可以用来作为优化性能的批处理工具。

问题

  • 由于服务端以及部分客户端存在缓存区限制,建议单次Pipeline中不要使用过多的命令。
    • 根据官方的解释,推荐是以 10k 每批 (注意:这个是一个参考值,请根据自身实际业务情况调整)。
  • 几乎所有的Redis客户端都支持Pipeline操作,因此实现起来非常容易。
    • Pipeline的本质为客户端与服务端的交互模式,与服务端的架构无关,因此集群架构代理模式、集群架构直连模式以及读写分离架构实例均支持Pipeline。
  • Pipeline不能保证原子性。 Pipeline模式只是将客户端发送命令的方式改为发送批量命令,而服务端在处理批量命令的数据流时,仍然是解析出多个单命令并按顺序执行,各个命令相互独立,即服务端仍有可能在该过程中执行其他客户端的命令。如需保证原子性,请使用事务或Lua脚本。
  • 若Pipeline执行过程中发生错误,不支持回滚。 Pipeline没有事务的特性,如待执行命令的前后存在依赖关系,请勿使用Pipeline。

Pipeline 批量执行的时候,是否对Redis进行了锁定,导致其他应用无法再进行读写?

Redis 采用多路I/O复用模型,非阻塞IO,所以Pipeline批量写入的时候,一定范围内不影响其他的读写操作。 虽然Redis本身支持并发操作,但它还是一个单线程模型,命令依然是顺序执行的。处理Pipeline的时候,从接收到Pipeline开始,到所有命令执行完毕,这期间的所有命令被看作一个整体,其他客户端提交的命令会排在这个整体后面等待执行。

使用

在Feed流时发现服务重启后,初始化数据保存数据到Redis时异常慢,原来之前使用的是单个set命令,由于Pipeline是redis批量操作的利器,因此这样初始化的大批量操作正好可以用Pipeline进行优化。

  • feed流初始化时,我们可以使用redis pipeline命令向每个用户feed流zset中批量添加对应的粉丝帖子id。
  • 在读取初始化好的zset集合时也可以使用redis pipeline进行批量读取操作。

使用pipeline函数

@Test void pipeline() { 
List<Object> result = redisTemplate.executePipelined((RedisCallback<String>) connection -> { for (int i = 0; i < 100; i++) { 
// 模拟多次传入批量命令
redisTemplate.opsForValue().set("pipel:" + i, i); } return null; }); }

RedisCallback 和SessionCallback 共同点: 让RedisTemplate进行回调,通过它们可以在同一条连接下执行多个Redis命令。 SessionCalback提供了良好的封装,优先使用它,RedisCallback稍微复杂一些。 区别:
SessionCallback 可以支持事务。

示例demo

以下代码不管set还是get 都需要return null。 另外批量读取时,返回的List,如果 connection.get 在Redis不存在,则List中会有null值存在。 需要自己去过滤null值。

    @Overrider
    private RedisTemplate strRedisTemplate;

 
    //单个新增操作
    @RequestMapping(value = "/set/single", method = RequestMethod.GET)
    public void setSingle() {
        for (int i = 0; i < 100; i++) {
            strRedisTemplate.opsForValue().set("single:" + i, "123");
        }
    }

    //批量新增操作
    @RequestMapping(value = "/set/pipeline", method = RequestMethod.GET)
    public void setPipeline() {
        strRedisTemplate.executePipelined(new RedisCallback<String>() {
            @Override
            public String doInRedis(RedisConnection connection) throws DataAccessException {
                for (int i = 0; i < 100; i++) {
                    connection.set(("pipel:" + i).getBytes(), "123".getBytes());
                }
                return null;
            }
        });
    }

    //单个读取操作
    @RequestMapping(value = "/get/single", method = RequestMethod.GET)
    public void addSingle() {
        for (int i = 0; i < 100; i++) {
            strRedisTemplate.opsForValue().get("single:" + i);
        }
    }


    //批量读取操作
    @RequestMapping(value = "/get/pipeline", method = RequestMethod.GET)
    public void getPipeline() {
        List<String> list = strRedisTemplate.executePipelined(new RedisCallback<String>() {
            @Override
            public String doInRedis(RedisConnection connection) throws DataAccessException {
                for (int i = 0; i < 100; i++) {
                    connection.get(("pipel:" + i).getBytes());
                }
                return null;
            }
        });
    }


性能对比

如下代码将演示使用Pipeline与不使用Pipeline的性能对比。

package pipeline.kvstore.aliyun.com;
import java.util.Date;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.Pipeline;
public class RedisPipelinePerformanceTest {
        static final String host = "xxxxxx.m.cnhza.kvstore.aliyuncs.com";
        static final int port = 6379;
        static final String password = "password";
        public static void main(String[] args) {
            Jedis jedis = new Jedis(host, port);
                //ApsaraDB for Redis的实例密码
                String authString = jedis.auth(password);// password
                if (!authString.equals("OK")) {
                    System.err.println("AUTH Failed: " + authString);
                    jedis.close();
                    return;
                }
                //连续执行多次命令操作
                final int COUNT=5000;
                String key = "KVStore-Tanghan";
                // 1 ---不使用pipeline操作---
                jedis.del(key);//初始化key
                Date ts1 = new Date();
                for (int i = 0; i < COUNT; i++) {
                    //发送一个请求,并接收一个响应(Send Request and  Receive Response)
                    jedis.incr(key);
                }
                Date ts2 = new Date();
                System.out.println("不用Pipeline > value为:"+jedis.get(key)+" > 操作用时:" + (ts2.getTime() - ts1.getTime())+ "ms");
                //2 ----对比使用pipeline操作---
                jedis.del(key);//初始化key
                Pipeline p1 = jedis.pipelined();
                Date ts3 = new Date();
                for (int i = 0; i < COUNT; i++) {
                    //发出请求 Send Request 
                    p1.incr(key);
                }
                //接收响应 Receive Response
                p1.sync();
                Date ts4 = new Date();
                System.out.println("使用Pipeline > value为:"+jedis.get(key)+" > 操作用时:" + (ts4.getTime() - ts3.getTime())+ "ms");
                jedis.close();
        }
    }

不用Pipeline > value为:5000 > 操作用时:5844ms
使用Pipeline > value为:5000 > 操作用时:78ms