【分布式webscoket】未读消息如何设计?解决缓存与数据库数据一致性!推送未读消息流程【第17期】
前言
IM系统关系消息未读消息是个需要频繁访问的功能,
如果直接从聊天消息表里面根据状态拉取未读数,那么每次都是一次sql的查询性能开销。对于系统来说查询太过频繁。所以采用新建立一张表来管理未读消息数量。
使用一张表避免拉取消息表内容,但是频繁的查询表对于性能来说也非常的受影响。所以引入Redis缓存。以及如何给前端完成推送。推送报文设计等。
简而言之,通过建表以及缓存来优化聊天中未读消息。引入缓存又会有新的问题。需要思考一致性问题。
git项目地址 【IM即时通信系统(企聊聊)】点击可跳转
sprinboot单体项目升级成springcloud项目 【第一期】
前端项目技术选型以及页面展示【第二期】
分布式权限 shiro + jwt + redis【第三期】
给为服务添加运维模块 统一管理【第四期】
微服务数据库模块【第五期】
netty与mq在项目中的使用(第六期(废弃))】
分布式websocket即时通信(IM)系统构建指南【第七期】
分布式websocket即时通信(IM)系统保证消息可靠性【第八期】
分布式websocket IM聊天系统相关问题问答【第九期】
什么?websocket也有权限!这个应该怎么做?【第十期】
分布式ID是什么,以美团Leaf为例改造融入自己项目【第十一期】
IM聊天系统为什么需要做消息幂等?如何使用Redis以及Lua脚本做消息幂等【第12期】
微信发送一条消息经历哪些过程。企业微信以及钉钉的IM架构对比【第13期】
微信群为什么上限是500人,IM设计系统中的群聊的设计难点【第14期】
【分布式websocket】RocketMQ发送消息保证消息最终一致性需要做哪些处理?【第15期】
未读消息条数
虽然聊天消息表中有状态去区分这个已读未读。但是已读未读相当频繁。建议使用缓存、
- 用户-会话关系表:yan_im_read用于记录用户与每个聊天会话(无论是单聊还是群聊)的关系,以及用户在每个会话中的未读消息数。
1用户ID
2会话ID
3未读消息数
4最后阅读时间或最后阅读的消息ID
- 消息表:存储所有聊天消息,每条消息都有一个唯一的ID和时间戳,以及所属的会话ID。
消息ID
会话ID
发送者ID
发送时间
消息内容
因为群聊id可以作为会话id,所以未读消息群聊和单聊采用一张表yan_im_read的设计。
CREATE TABLE yan_im_read( user_id VARCHAR(255) NOT NULL, group_id VARCHAR(255) NOT NULL, last_read_message_id BIGINT, count int );
功能实现
- 发送消息:
当用户发送消息时,将消息存入消息表,并为每个接收者在用户-会话关系表中的未读消息数加一。
对于群聊,为群内每个成员(除了发送者)的未读消息数加一。
- 阅读消息:
当用户打开一个聊天窗口时,系统将该会话的未读消息数重置为0,并更新最后阅读时间或最后阅读的消息ID。
同时,前端展示未读消息数,并将其清零。
- 查询未读消息数:
用户登录或在主界面时,系统查询用户-会话关系表,获取每个会话的未读消息数,以及总的未读消息数。
这些信息用于在用户的聊天列表中显示每个聊天窗口的未读消息数,以及应用图标上显示的总未读数。
websocket推送未读消息
{ "type": "unreadMessage", "data": { "sessionId": "123456", "unreadCount": 5, "messageId": "654321", "senderId": "user123", "content": "你好,这是最后一条未读消息的内容。", "timestamp": "2023-04-01T12:00:00Z" } } type: 消息类型,这里是unreadMessage,表示这是一个未读消息的推送。 data: 包含未读消息具体信息的对象。 sessionId: 会话ID,标识这些未读消息属于哪个聊天会话。 unreadCount: 未读消息数量,告诉用户这个会话中有多少条消息未读。 lastMessage: 最后一条未读消息的详细信息,有助于用户预览。 messageId: 消息ID,唯一标识这条消息。 senderId: 发送者ID,标识谁发送了这条消息。 content: 消息内容,最后一条未读消息的文本内容。 timestamp: 消息发送时间,ISO 8601格式。
用户a给用户b发送消息,websocket推送b未读消息。是每次a发消息就推送给b一条消息用于更新未读消息数量。阅读后则取消。
前端判断逻辑:b收到未读消息后则判断在不在与a的聊天窗口中,在的话则向后台发送取消未读数去更新数据库和缓存。
其他情况点进去用户聊天界面然后更新未读数量。
未读消息查询结合缓存
- 缓存设计
缓存结构:对于每个用户,使用一个以用户ID为键(Key)的缓存结构,存储该用户所有会话的未读消息数。这个结构可以是一个哈希表,其中每个条目的键是会话ID,值是对应的未读消息数。
Key: userId Value: { sessionId1: unreadCount1, sessionId2: unreadCount2, ... }
缓存更新:
发送消息时:当一条消息被发送到会话中时,对会话中的每个用户(除了发送者),在缓存中对应的未读消息数加一。
阅读消息时:当用户打开一个会话阅读消息时,将该会话的未读消息数重置为0,并更新缓存。
缓存查询:当需要获取用户的未读消息数时,首先查询缓存。如果缓存中有数据,则直接返回;如果没有,从数据库查询,然后更新缓存。
2. WebSocket推送
简而言之:用户未读消息发生变化。服务器推送。用户自己改变了未读数。比如已阅。则用户发送消息给服务器更改数据库。
实时更新未读消息数:当用户的未读消息数发生变化时(无论是增加还是清零),通过WebSocket实时推送更新给用户。这样,用户界面上的未读消息数可以即时更新,无需刷新页面。
推送消息体设计:可以使用之前设计的消息体格式,根据实际情况调整。例如,当未读消息数增加时,推送包含最新消息的简要信息;当用户阅读了某个会话的消息时,推送该会话未读消息数清零的通知。
3. 性能和一致性考虑
缓存失效和同步:需要处理缓存失效和数据同步的情况,确保缓存中的未读消息数与数据库保持一致。可以使用缓存过期策略,或在更新数据库时同步更新缓存。
缓存预热:对于频繁访问的热点数据,可以在系统启动时进行缓存预热,减少冷启动时的数据库访问压力。
分布式缓存:在分布式系统中,考虑使用分布式缓存解决方案(如Redis集群),以支持高并发访问和高可用性。
通过上述设计,可以有效地利用缓存提高未读消息功能的性能,同时通过WebSocket实时推送,确保用户界面上的未读消息数能够即时更新,提升用户体验。
将未读消息性能优化
读取性能:频繁读取的操作,如查询未读消息数,可以通过缓存来提高性能。
写入性能:对于高频率的更新操作,可以先写入缓存,并通过异步批量写入的方式更新数据库,减少数据库的压力。
未读消息存储的优势
- 持久性和可靠性
持久性:数据库提供了数据持久化的能力,确保即使在系统重启或发生故障时,未读消息的状态也不会丢失。
可靠性:数据库管理系统通常提供了事务支持,可以保证数据的一致性和完整性,这对于维护用户的未读消息状态尤为重要。
- 用户体验
多设备同步:用户可能会在多个设备上使用IM系统。将未读消息状态存储在数据库中,可以确保用户在任何设备上查看时,未读消息的状态都是同步的。
历史记录查询:用户可能希望查看历史未读消息。存储在数据库中的未读消息状态可以方便地进行查询和管理。
- 系统设计和扩展性
数据分析:未读消息数据可以为系统提供重要的用户行为分析信息,比如哪些类型的消息更容易被忽略,用户的活跃时间段等。
扩展性:随着系统用户量的增加,如果未读消息状态仅存储在内存中,将会对系统的扩展性和稳定性构成挑战。数据库可以通过增加资源、读写分离等方式来扩展。
未读消息全流程
- 消息发送与存储
发送者 发送消息到服务器。
服务器将消息存储到mysql中,并标记为未读。服务器根据消息的接收者,
redis里面同步更新以用户ID为键(Key)的缓存结构,存储该用户所有会话的未读消息数。
mysql里面异步 更新用户-会话关系表中的未读消息计数。
- 消息推送
服务器通过WebSocket或其他实时通信技术,向接收者推送消息。
步骤发生mq消费消息,消息落库那步。
推送的消息体包含消息内容、发送者信息、消息ID等,以及从redis里面读取出来的更新的未读消息计数。
- 接收者在线:实时接收
如果接收者在线,客户端通过WebSocket接收到消息,并实时展示。
客户端收到消息后,发送消息接收确认给服务器,服务器据此更新消息状态为已送达(但仍未读)。
- 接收者离线:离线推送与同步
如果接收者离线,服务器将消息标记为待推送状态,并可通过移动推送服务(如APNs或FCM)发送离线通知。
当接收者上线时,客户端向服务器请求同步离线消息,服务器响应未读消息内容及计数。
- 阅读消息
接收者打开聊天窗口,客户端向服务器发送阅读确认,包括已阅读的最后一条消息ID或时间戳。
服务器根据确认信息,更新用户-会话关系表中的未读消息计数为0,更新redis并将相关消息标记为已读。
- 未读消息计数更新与推送
每当未读消息计数发生变化时,服务器通过WebSocket向相关用户推送最新的未读消息计数。
客户端接收到未读消息计数推送后,更新用户界面上的未读标记。
- 缓存与性能优化
使用缓存存储频繁访问的数据,如未读消息计数,以减少数据库访问。
对于高频更新操作,如更新未读消息计数,采用异步批量处理策略,减轻数据库压力。
- 多设备同步
确保所有操作在用户的所有设备上保持同步,包括消息的接收、阅读状态和未读消息计数。
设备上线时,同步服务器上的最新状态,包括未读消息和会话列表。
注意 用户在线但是不在和另一个用户的聊天页面的场景
-
客户端判断逻辑
状态跟踪:客户端可以跟踪用户当前是否处于某个聊天页面。当用户进入或离开聊天页面时,客户端更新这个状态。
接收消息处理:当客户端接收到新消息时,根据当前的状态(是否在聊天页面)决定如何处理消息。如果用户正在查看聊天页面,客户端可以直接显示消息;如果用户不在聊天页面,客户端可以显示未读消息提示。
-
服务器推送策略
推送完整报文消息:服务器不需要知道用户是否处于聊天页面。当有新消息时,服务器根据用户的在线状态推送完整的报文消息。这样做的好处是服务器逻辑简单,易于维护。
未读消息计数:服务器还应负责维护每个用户的未读消息计数,并在适当的时候(如用户上线、新消息到达等)更新未读消息计数。这个计数可以随消息一起推送给客户端,或者由客户端在需要时主动查询。
-
优点
灵活性:客户端根据当前状态处理消息提供了更大的灵活性,可以根据具体的应用场景和用户需求定制消息处理逻辑。
减轻服务器负担:服务器只负责推送消息和维护未读消息计数,不需要处理复杂的状态判断逻辑,这有助于减轻服务器的负担,提高系统的可扩展性。
-
注意事项
实时性:客户端需要及时向服务器报告状态变化,以确保服务器推送的未读消息计数准确无误。
数据同步:在多设备场景下,需要确保用户状态和消息的同步,避免在一个设备上阅读了消息而其他设备上仍显示未读。
解决缓存与数据库一致性;
在设计未读消息功能时,确保缓存与数据库之间的一致性是一个关键挑战,尤其是在以缓存为主、频繁更新且采用异步操作更新数据库的场景下。以下是一些策略和方法来解决这一挑战:
- 缓存更新策略
先写缓存:当有新消息到达时,首先更新缓存中的未读消息计数,然后异步更新数据库。这样可以确保用户立即看到最新的未读消息计数。
延迟数据库同步:采用批处理或定时任务来异步更新数据库中的未读消息计数。这可以减少对数据库的写操作,但需要合理设计同步频率,以减少缓存与数据库之间的不一致时间窗口。
- 一致性保障机制
使用事务:在更新数据库时,使用事务来保证操作的原子性,确保缓存与数据库状态的一致性。
乐观锁:对于并发更新操作,可以在数据库中使用乐观锁(如版本号),以避免更新冲突。
- 缓存失效与回填策略
主动失效:当数据库成功更新后,主动使缓存中对应的未读消息计数失效。这要求异步更新操作完成后,有一个回调机制来处理缓存失效。
被动更新:在缓存失效或未命中时,从数据库加载最新的未读消息计数,并回填到缓存中。这种策略保证了即使缓存数据丢失或过期,也能保持与数据库的一致性。
- 缓存一致性框架
考虑使用成熟的缓存一致性框架或解决方案,如阿里巴巴的Canal,它可以监控数据库变更并同步到缓存系统,帮助维护缓存与数据库之间的一致性。
- 容错与降级策略
双写一致性保障:在极端情况下,如果缓存与数据库之间出现不一致,可以设计一套容错机制,如通过定期校验或用户请求触发的校验逻辑,来修正不一致的数据。
降级处理:在系统压力极大时,可以临时降级处理,如直接从数据库读取未读消息计数,虽然这会增加数据库压力,但保证了数据的准确性。
- 监控与告警
实时监控:监控缓存与数据库之间的同步延迟和失败率,一旦发现异常,立即触发告警。
日志记录:详细记录所有缓存更新和数据库异步写入的操作日志,便于问题追踪和后期分析。
我们计划使用读写锁来实现缓存一致性!
- 缓存更新策略