本文基于Feed流架构参考,记录从学习到实现一个可以承载小流量的仿微博Feed流系统的过程。
Feed流
参考文章:
- 掘金: 使用redis实现feed流
- 即刻:自主实现了一个feed流系统
- 微博feed流系统架构
- Feed流系统架构理论
- IM开发技术学习:揭秘微信朋友圈这种信息推流背后的系统设计
什么是 feed 流
信息推流(以下简称“Feed流”)这种功能在我们手机APP中几乎无处不在(尤其是社交/社群产品中),最常用的就是微信朋友圈、新浪微博等。
Feed
:Feed 流中的每一条状态或者消息都是 Feed,比如微博中的一条微博就是一个 Feed。Feed流
:持续更新并呈现给用户内容的信息流。每个人的朋友圈,微博关注页等等都是一个 Feed 流。- 对Feed流的释义,可以简单理解为只要大拇指不停地往下划手机屏幕,就有一条条的信息不断涌现出来。就像给牲畜喂饲料一样,只要它吃光了就要不断再往里加,故此得名Feed(饲养)。
feed 流分类
按帖子类型分类:
Timeline
:按发布的时间顺序排序,产品如果选择 Timeline 类型,那么就是认为 Feed 流中的 Feed 不多,但是每个 Feed 都很重要,都需要用户看到。类似于微信朋友圈,微博等。Rank
:按某个非时间的因子排序,一般是按照用户的喜好度排序,一般用于新闻推荐类、商品推荐等。
按Feed流效果分类:
- 基于算法:即个人主页feed流面依赖动态算法推荐,比如今日头条、抖音短视频;
- 基于关注:即社交/好友关系,比如微信、知乎。
Feed流推送行为
关于feed流推送,一般有两种 推/拉。
拉模式
每一个内容发布者都有一个自己的发件箱(“我发布的内容”),每当我们发出一个新帖子,都存入自己的发件箱中。当我们的粉丝来阅读时,系统首先需要拿到粉丝关注的所有人,然后遍历所有发布者的发件箱,取出他们所发布的帖子,然后依据发布时间排序,展示给阅读者(聚合到个人主页广场)。
这种设计: 阅读者读一次Feed流,后台会扩散为N次读操作(N等于关注的人数)以及一次聚合操作,因此称为读扩散。每次读Feed流相当于去关注者的收件箱主动拉取帖子,因此也得名——拉模式。
优缺点:
- 好处是:底层存储简单,没有空间浪费。
- 坏处是:每次读操作会非常重,读操作多。
设想一下:如果我关注的人数非常多,遍历一遍我所关注的所有人,并且再聚合一下,这个系统开销会非常大,时延上可能达到无法忍受的地步。
拉模式还有一个比较大的缺点:就是分页不方便,我们刷微博或朋友圈,肯定是随着大拇指在屏幕不断划动,内容一页一页的从后台拉取。如果不做其他优化,只采用实时聚合的方式,下滑到比较靠后的页码时会非常麻烦。 因此:读扩散主要适用系统中阅读者关注的人没那么多,并且刷Feed流并不频繁的场景。
推模式
系统中每个用户除了有发件箱,也会有自己的收件箱。当发布者发表一篇帖子的时候,除了往自己发件箱记录一下之外,还会遍历发布者的所有粉丝,往这些粉丝的收件箱也投放一份相同内容。这样阅读者来读Feed流时,直接从自己的收件箱读取即可。
这种设计:每次发表帖子,都会扩散为M次写操作(M等于自己的粉丝数),因此成为写扩散。每篇帖子都会主动推送到所有粉丝的收件箱,因此也得名推模式。
特点:
- 可优化为异步化处理:发一篇帖子,背后会涉及到很多次的写操作。通常为了发帖人的用户体验,当发布的帖子写到自己发件箱时,就可以返回发布成功。后台另外起一个异步任务,不慌不忙地往粉丝收件箱投递帖子即可。
- 但由于写扩散是异步操作,写的太慢会导致帖子发出去半天,有些粉丝依然没能看见,这种体验也不太好。
- 以空间换时间:写扩散的好处在于通过数据冗余(一篇帖子会被存储M份副本),减少了拉模式获取消息的时间,提升了阅读者的用户体验。通常适当的数据冗余不是什么问题,但是到了微博明星这里,完全行不通。比如目前微博粉丝量Top2的谢娜与何炅,两个人微博粉丝过亿。
推拉结合模式(重点)
【在本项目中,将用户账号区分为普通用户账号和官方账号(类似于微博中的大V账号,例如学生会官号、学院消息官号【学校消息会推送每日是否需要早点到/晨读的消息】都为大V账号,特定学生群体都会关注),对于不同类型的账号分别使用推/拉结合模式】
当用户发布一条新的 Feed 时,处理流程如下:
- 先从关注列表中读取到自己的粉丝列表,以及判断自己是否是大V。
- 将自己的Feed消息写入个人页Timeline。
- 如果是大V,写入流程到此就结束了。(对于大V用户发帖,普通用户读取时采用拉模式)
- 如果是普通用户,还需要将自己的Feed消息写给自己的粉丝,如果有100个粉丝,那么就要写给100个用户。(普通用户发帖采用推模式)
当刷新自己的Feed流的时候,处理流程如下:
- 先去读取自己关注的大V列表,如果有关注的大V,则需要按截止的Timeline并发读取每一个大V的发件箱,如果关注了10个大V,那么则需要10次访问。
- 去读取自己的收件箱。
- 合并前2步的结果,然后按时间排序,返回给用户。
至此,使用推拉结合方式的发布、读取Feed流的流程都结束了。
设计一个 Feed 流系统
设计一个 Feed 流系统,两个关键步骤,一个是 Feed 流的 初始化
,一个是 推送
。关于 Feed 流的存储其实也是一个核心的点,普通企业级别feed流系统持久化可以使用 MySQL,后续可以考虑优化。
Feed 流初始化
Feed 流的初始化指的是,当用户的 Feed 流还不存在的时候,为该用户创建一个属于他自己的关注页 Feed 流。
维护活跃用户列表
- 我们使用redis维护一个在线用户列表,当用户打开APP后三个小时内均算在线(三个小时外的用户被移除列表,用zset实现)。
- 当用户打开app后查看自己是不是在在线列表里,如果是的话直接返回feed流中的内容(已经初始化过)。如果判断为离线,这时需要进行feed流初始化。
redis初始化
- 初始化数据:在线用户的关注用户id列表需要从数据库中加载出来。
- key 值:sortSet 的 key 值需要使用当前用户的 id 做标识。
- score 值:如果是 Timeline 类型,直接取每条帖子创建的时间戳即可。(如果是 rank 类型,则把你的业务对应的权重值设进去。)(项目中直接使用timeline类型)
- 如何初始化呢?首先先遍历一遍个人关注列表,分页取出所有关注用户的帖子发件箱中,一定数量的最新帖子 ,将每条帖子的 feedId 存放到 redis 的
sortSet
中,这样有个好处,如果用户在线就不用遍历关注者列表获取动态了(尤其是当关注者很多的时候,这是个很费时间的过程)。
组装帖子
- 接下来,再根据 Feed ID 列表获取对应的 Feed 内容,如帖子的文字、视频、发表时间、源帖子 ID 等。
- 再进一步获取所有微博的发表者 user 详细信息、源帖子内容等信息,并进行内容组装。
- 再根据帖子ID获取用户对这些帖子的收藏、赞等状态,并设置到对应帖子中。最后,获取这些 Feed 的转发数、评论数、赞数等,并进行计数组装。至此,Feed 流获取处理完毕,Feed 列表以 JSON 形式返回给前端,用户刷新个人首页成功完成。
这里面有几个关键点:
- feed流不是无限长的(主要是穷,其次是没必要),例如类似朋友圈功能的feed系统可以保存500个,500个也不少了,一次返回10个要往下刷新50次才能到。超过500的也不存数据库了直接舍弃。【5000用户每人一个长度500的zset作为feed流,key 30B,score 13B,value 18B ,占用大小为958.93MB】
- 需要每隔一段时间主动维护在线用户zset。
推送
经过上面的初始化,已经把 feed 流放在了 redis 缓存中了,当用户点进个人feed流主页时就可以通过。接下来就是需要更新维护 feed 流了,在下面四种情况需要维护redis 中每个用户收件箱的 sortSet
列表:
- 关注的用户发布新的 feed。
- 关注的用户删除 feed。
- 用户新增关注。
- 用户取消关注。
仿微博示例demo
我们在初始化feed流结束之后,还会根据用户关注列表以及自己新增/删除帖子的情况动态维护每个在线用户的feed流zset。
初始化 Feed 流
当用户第一进来刷新Feed 流,且 Feed 流还不存在时,我们需要进行初始化,初始化的具体代码如下:核心思想就是从数据库中load出 feed 信息,塞到 zSet 中,然后分页返回。
/**
* 获取关注的人的信息流
*/
public List<FeedDto> listFocusFeed(Long userId, Integer page, Integer size) {
String focusFeedKey = "focusFeedKey" + userId;
// 如果 zset 为空,先初始化
if (!zSetRedisTemplate.exists(focusFeedKey)) {
initFocusIdeaSet(userId);
}
// 如果 zset 存在,但是存在 0 值
Double zscore = zSetRedisTemplate.zscore(focusFeedKey, "0");
if (zscore != null && zscore > 0) {
return null;
}
//分页
int offset = (page - 1) * size;
long score = System.currentTimeMillis();
// 按 score 值从大到小从 zSet 中取出 FeedId 集合
List<String> list = zSetRedisTemplate.zrevrangeByScore(focusFeedKey, score, 0, offset, size);
List<FeedDto> result = new ArrayList<>();
if (QlchatUtil.isNotEmpty(list)) {
for (String s : list) {
// 根据 feedId 从缓存中 load 出 feed
FeedDto feedDto = this.loadCache(Long.valueOf(s));
if (feedDto != null) {
result.add(feedDto);
}
}
}
return result;
}
/**
* 初始化关注的人的信息流 zSet
*/
private void initFocusFeedSet( Long userId) {
String focusFeedKey = "focusFeedKey" + userId;
zSetRedisTemplate.del(focusIdeaKey);
// 从数据库中加载当前用户关注的人发布过的 Feed
List<Feed> list = this.feedMapper.listFocusFeed(userId);
if (QlchatUtil.isEmpty(list)) {
//保存0,避免空数据频繁查库
zSetRedisTemplate.zadd(focusFeedKey, 1, "0");
zSetRedisTemplate.expire(focusFeedKey, RedisKeyConstants.ONE_MINUTE * 5);
return;
}
// 遍历 FeedList,把 FeedId 存到 zSet 中
for (Feed feed : list) {
zSetRedisTemplate.zadd(focusFeedKey, feed.getCreateTime().getTime(), feed.getId().toString());
}
zSetRedisTemplate.expire(focusFeedKey, 60 * 60 * 60);
}
关注的用户发布/删除新的 feed
每当用户发布/删除新的 feed,我们需要更新该用户所有的粉丝的feed流zset列表,该步骤一般比较耗时,所以建议异步处理,逻辑删除后就返回成功,为了避免一次性load出太多的粉丝数据,这里采用循环分页查询。为了避免粉丝的 Feed流过大,我们会限制 Feed 流的长度为500,当Feed流长度超过500时,会移除最旧的 Feed。
/**
* 新增/删除 feed时,处理粉丝 feed 流
*
* @param userId 新增/删除 feed的用户id
* @param feedId 新增/删除 的feedId
* @param type feed_add = 新增feed feed_sub = 删除feed
*/
public void handleFeed(Long userId, Long feedId, String type) {
Integer currentPage = 1;
Integer size = 1000;
List<FansDto> fansDtos;
while (true) {
Page page = new Page();
page.setSize(size);
page.setPage(currentPage);
fansDtos = this.fansService.listFans(userId, page);
for (FansDto fansDto : fansDtos) {
String focusFeedKey = "focusFeedKey" + userId;
// 如果粉丝 zSet 不存在,退出
if (!this.zSetRedisTemplate.exists(focusFeedKey)) {
continue;
}
// 新增Feed
if ("feed_add".equals(type)) {
this.removeOldestZset(focusFeedKey);
zSetRedisTemplate.zadd(focusFeedKey, System.currentTimeMillis(), feedId);
}
// 删除Feed
else if ("feed_sub".equals(type)) {
zSetRedisTemplate.zrem(focusFeedKey, feedId);
}
}
if (fansDtos.size() < size) {
break;
}
currentPage++;
}
}
/**
* 删除 zSet 中最旧的数据
*/
private void removeOldestZset(String focusFeedKey){
// 如果当前 zSet 大于1000,删除最旧的数据
if (this.zSetRedisTemplate.zcard(focusFeedKey) >= 1000) {
// 获取当前 zSet 中 score 值最小的
List<String> zrevrange = this.zSetRedisTemplate.zrevrange(focusFeedKey, -1, -1, String.class);
if (QlchatUtil.isNotEmpty(zrevrange)) {
this.zSetRedisTemplate.zrem(focusFeedKey, zrevrange.get(0));
}
}
}
用户新增关注/取消关注
这里比较简单,新增/取消关注,把新关注的 Feed 往自己的 Feed流中增加/删除 对应帖子id即可,但是同样需要异步处理。
/**
* 关注/取关 时,处理followerId的zSet
*
* @param followId 被关注的人
* @param followerId 当前用户
* @param type focus = 关注 unfocus = 取关
*/
public void handleFocus( Long followId, Long followerId, String type) {
String focusFeedKey = "focusFeedKey" + userId;
// 如果粉丝 zSet 不存在,退出
if (!this.zSetRedisTemplate.exists(focusFeedKey)) {
return;
}
List<FeedDto> FeedDtos = this.listFeedByFollowId(source, followId);
for (FeedDto feedDto : FeedDtos) {
// 关注
if ("focus".equals(type)) {
this.removeOldestZset(focusFeedKey);
this.zSetRedisTemplate.zadd(focusFeedKey, feedDto.getCreateTime().getTime(), feedDto.getId());
}
// 取关
else if ("unfocus".equals(type)) {
this.zSetRedisTemplate.zrem(focusFeedKey, feedDto.getId());
}
}
}
redis pipeline
参考文章:
通常情况下,客户端与Redis通信时采用的是Ping-pong网络交互模式,Ping-pong模式是指客户端(Client)发送一个命令后会等待命令的执行结果,在客户端收到服务器端(Server)返回的结果后,再发送下一个命令,以此类推。
Redis也支持Pipeline模式,不同于Ping-pong模式,Pipeline模式类似流水线的工作模式:客户端发送一个命令后无需等待执行结果,会继续发送其他命令;在全部请求发送完毕后,客户端关闭请求,开始接收响应,收到执行结果后再与之前发送的命令按顺序进行一一匹配。在Pipeline模式的具体实现中,大部分Redis客户端采用批处理的方式,即一次发送多个命令,在接收完所有命令执行结果后再返回给上层业务。
使用Pipeline可通过降低网络往返时延(Round-trip time,简称RTT),减少read()
和write()
的系统调用以及进程上下文切换次数,以提升程序的执行效率与性能。
Pipeline在某些场景下非常有效,例如有多个操作命令需要被迅速提交至服务器端,但用户并不依赖每个操作返回的响应结果,对结果响应也无需立即获得,那么Pipeline就可以用来作为优化性能的批处理工具。
问题
- 由于服务端以及部分客户端存在缓存区限制,建议单次Pipeline中不要使用过多的命令。
- 根据官方的解释,推荐是以 10k 每批 (注意:这个是一个参考值,请根据自身实际业务情况调整)。
- 几乎所有的Redis客户端都支持Pipeline操作,因此实现起来非常容易。
- Pipeline的本质为客户端与服务端的交互模式,与服务端的架构无关,因此集群架构代理模式、集群架构直连模式以及读写分离架构实例均支持Pipeline。
- Pipeline不能保证原子性。 Pipeline模式只是将客户端发送命令的方式改为发送批量命令,而服务端在处理批量命令的数据流时,仍然是解析出多个单命令并按顺序执行,各个命令相互独立,即服务端仍有可能在该过程中执行其他客户端的命令。如需保证原子性,请使用事务或Lua脚本。
- 若Pipeline执行过程中发生错误,不支持回滚。 Pipeline没有事务的特性,如待执行命令的前后存在依赖关系,请勿使用Pipeline。
Pipeline 批量执行的时候,是否对Redis进行了锁定,导致其他应用无法再进行读写?
Redis 采用多路I/O复用模型,非阻塞IO,所以Pipeline批量写入的时候,一定范围内不影响其他的读写操作。 虽然Redis本身支持并发操作,但它还是一个单线程模型,命令依然是顺序执行的。处理Pipeline的时候,从接收到Pipeline开始,到所有命令执行完毕,这期间的所有命令被看作一个整体,其他客户端提交的命令会排在这个整体后面等待执行。
使用
在Feed流时发现服务重启后,初始化数据保存数据到Redis时异常慢,原来之前使用的是单个set命令,由于Pipeline是redis批量操作的利器,因此这样初始化的大批量操作正好可以用Pipeline进行优化。
- feed流初始化时,我们可以使用redis pipeline命令向每个用户feed流zset中批量添加对应的粉丝帖子id。
- 在读取初始化好的zset集合时也可以使用redis pipeline进行批量读取操作。
使用pipeline函数
@Test void pipeline() {
List<Object> result = redisTemplate.executePipelined((RedisCallback<String>) connection -> { for (int i = 0; i < 100; i++) {
// 模拟多次传入批量命令
redisTemplate.opsForValue().set("pipel:" + i, i); } return null; }); }
RedisCallback 和SessionCallback
共同点:
让RedisTemplate进行回调,通过它们可以在同一条连接下执行多个Redis命令。
SessionCalback提供了良好的封装,优先使用它,RedisCallback稍微复杂一些。
区别:
SessionCallback 可以支持事务。
示例demo
以下代码不管set还是get 都需要return null。 另外批量读取时,返回的List,如果 connection.get 在Redis不存在,则List中会有null值存在。 需要自己去过滤null值。
@Overrider
private RedisTemplate strRedisTemplate;
//单个新增操作
@RequestMapping(value = "/set/single", method = RequestMethod.GET)
public void setSingle() {
for (int i = 0; i < 100; i++) {
strRedisTemplate.opsForValue().set("single:" + i, "123");
}
}
//批量新增操作
@RequestMapping(value = "/set/pipeline", method = RequestMethod.GET)
public void setPipeline() {
strRedisTemplate.executePipelined(new RedisCallback<String>() {
@Override
public String doInRedis(RedisConnection connection) throws DataAccessException {
for (int i = 0; i < 100; i++) {
connection.set(("pipel:" + i).getBytes(), "123".getBytes());
}
return null;
}
});
}
//单个读取操作
@RequestMapping(value = "/get/single", method = RequestMethod.GET)
public void addSingle() {
for (int i = 0; i < 100; i++) {
strRedisTemplate.opsForValue().get("single:" + i);
}
}
//批量读取操作
@RequestMapping(value = "/get/pipeline", method = RequestMethod.GET)
public void getPipeline() {
List<String> list = strRedisTemplate.executePipelined(new RedisCallback<String>() {
@Override
public String doInRedis(RedisConnection connection) throws DataAccessException {
for (int i = 0; i < 100; i++) {
connection.get(("pipel:" + i).getBytes());
}
return null;
}
});
}
性能对比
如下代码将演示使用Pipeline与不使用Pipeline的性能对比。
package pipeline.kvstore.aliyun.com;
import java.util.Date;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.Pipeline;
public class RedisPipelinePerformanceTest {
static final String host = "xxxxxx.m.cnhza.kvstore.aliyuncs.com";
static final int port = 6379;
static final String password = "password";
public static void main(String[] args) {
Jedis jedis = new Jedis(host, port);
//ApsaraDB for Redis的实例密码
String authString = jedis.auth(password);// password
if (!authString.equals("OK")) {
System.err.println("AUTH Failed: " + authString);
jedis.close();
return;
}
//连续执行多次命令操作
final int COUNT=5000;
String key = "KVStore-Tanghan";
// 1 ---不使用pipeline操作---
jedis.del(key);//初始化key
Date ts1 = new Date();
for (int i = 0; i < COUNT; i++) {
//发送一个请求,并接收一个响应(Send Request and Receive Response)
jedis.incr(key);
}
Date ts2 = new Date();
System.out.println("不用Pipeline > value为:"+jedis.get(key)+" > 操作用时:" + (ts2.getTime() - ts1.getTime())+ "ms");
//2 ----对比使用pipeline操作---
jedis.del(key);//初始化key
Pipeline p1 = jedis.pipelined();
Date ts3 = new Date();
for (int i = 0; i < COUNT; i++) {
//发出请求 Send Request
p1.incr(key);
}
//接收响应 Receive Response
p1.sync();
Date ts4 = new Date();
System.out.println("使用Pipeline > value为:"+jedis.get(key)+" > 操作用时:" + (ts4.getTime() - ts3.getTime())+ "ms");
jedis.close();
}
}
不用Pipeline > value为:5000 > 操作用时:5844ms
使用Pipeline > value为:5000 > 操作用时:78ms