七猫消息推送系统演进之路
七猫推送系统经过近两年的开发迭代,已逐渐稳定完善,作为成本较低的拉活手段,已成为七猫越来越不可缺少的基础能力。本文将带大家一起回顾七猫推送系统从诞生到成熟稳定的演进。
背景说明
随着七猫的逐步发展壮大,七猫免费小说等应用业务的拉新成本逐渐攀升,用户拉活逐步成为了我们工作目标之一,我们需要寻找一种成本相对低廉,效果相对明显的方式,来使我们沉默的用户重新活跃起来。而推送消息有着:
- 成本相对低廉
- 可以实现精准投放
- 增强用户黏性
等优点,非常契合我们当前发展阶段的需求。除此之外,产研团队中存在多个业务系统独立对接三方推送服务,有着大量的重复开发工作且无法统一进行管理。
基于上述原因,七猫消息推送系统的搭建便提上了我们的日程。
消息推送定义
消息推送,就是指通过公司 App(运营后台)或第三方工具对用户移动设备进行的主动消息推送。用户可以在移动设备锁定屏幕和通知栏看到推送消息通知,通知栏点击可唤起 App 或去往相应页面。对于我们普通用户而言,每天在手机通知栏收到的图文信息就是各个 App 的消息推送的消息。
下图展示的就是我们七猫在双端的典型推送:
演进时间轴
接下来,我们将依据图中所示的几个大版本,详细介绍七猫消息推送系统的演进之路。
推送系统 v0.1
原始系统设计
推送系统最初的流程大致设计如下:
- 首先由算法侧计算目标人群的可能喜欢的书籍,并将结果写入数据表中;
- 大数据系统圈选人群,从表中获取该用户需要推送的数据,组装推送消息,发送给推送系统;
- 推送系统对消息做一些校验及格式处理等操作,随后调用第三方推送服务 API;
- 第三方推送服务将消息最终发送到客户端,展示给用户;
- 用户点击消息后,客户端会上报对应埋点事件,产品人员可在后台查看相关报表。
完整架构图示
v0.1 整体各系统交互的实际架构图示如下:
我们选择 Kafka 作为消息系统与各业务或者大数据之间的消息队列。
- 各个业务方(包括但不限于大数据)将需要推送的数据按照推送系统约定的格式传入 Kafka 集群;
- 消息系统中的 jobconsumer 进程消费消息数据,调用我们封装的消息系统 SDK;
- SDK 负责校验消息字段、转换消息格式(转换成第三方推送服务需要的消息格式),并将消息发送给第三方,由第三方服务下发至客户端。
存在的问题
这个 v0.1 的推送架构看起来非常简单,从现在的角度回过头看,甚至可以简单的使用“API 调用者”来概括它。经过一段时间的使用,它存在的问题被充分暴露出来:
- 推送结果未知。消息发送出去之后,我们无法准确知道推送的消息是否真实触达到了用户,只能通过 App 中埋点反推推送效果,无法进行直接通过数据进行验证;
- 推送时存在一定比例的超时错误。经查证,我们没有考虑第三方的数据接收能力,发送 QPS 大于第三方的限定值,导致推送超时,消息丢失;
- 推送方式单一。想要使用推送,只能通过组装消息以及连接消息系统的 Kafka 集群才能完成,接入方式过于单一且代码相对繁琐。
推送系统 v0.x
针对以上不足,我们在 v0.1 的基础上进行了多次优化迭代,逐步进行解决。
针对推送结果未知的问题
为了了解实际推送的送达情况,我们接入了第三方的主动消息回执功能。所谓主动消息回执,是第三方通过 HTTP 协议调用我们的服务,将消息的送达、点击等事件回传过来。于是在消息系统中新增了回执服务(receipt),用以接收回执消息。其逻辑流图如下:
可以看到整体就是一个线性的操作,最终将收到的回执,通过服务端日志上报,直接上报到大数据侧,由大数据侧落表,以提供后续计算的数据支撑。
针对推送存在超时的问题
为了解决推送超时问题,我们在推送系统中引入了限流器,直接使用了 Go 标准库的限流器:golang.org/x/time/rate。
官方的限流器是基于令牌桶实现的。简单来说,令牌桶就是一个固定大小的桶,系统会以恒定的速率向桶中放置令牌(token),桶满则暂时不放。而使用者从桶中取出令牌,如果有剩余,则可以一直取。如果桶中没有剩余令牌,则需要等到桶中被放置了令牌才可以继续执行。其他关于限流器的相关背景知识,此处不再赘述。
我们的使用方式如下:比如我们想要限制 QPS 近似在 1000 左右,那么代码中我们构造一个初始大小为 1000,每秒生成 1 个令牌的令牌桶,每条消息获取一次令牌,并等待获取令牌所需的时间即完成限流。示例代码如下:
// handle kafka msg first
l := rate.NewLimiter(1000, 1)
r := l.Reverse()
time.Sleep(r.Delay())
// push msg to thir-party services
针对推送方式单一的问题
原先业务方是直接接入 Kafka,由于一些部署环境的问题,连接 Kafka 可能存在诸多不便,会给业务方带来不好的体验。因此我们提供了 HTTP 的方式进行推送,至于推送系统中,我们新增了 Producer 的角色,提供 HTTP API 。引入 Producer 后推送的整体流程图如下:
完整架构图示
至此,推送系统 v0.x 已经建立完毕了,v0.x 的总体交互架构图如下:(其中红色部分为新增内容,后同)
存在的问题
当前架构依然存在以下两个较大的问题:
- 第三方对厂商设备标识的映射记录不全,送达率偏低;
- 沉默 90 天以上用户第三方推送服务触达不到。
基于上述问题,我们决定在保留现有第三方推送服务的基础上,增加接入各手机厂商的推送服务,使用第三方推送+厂商推送结合的模式,并以此为契机我们开启了推送系统 v1.x 架构时代。
推送系统 v1.x
在 v1.x 架构设计之初,我们考虑到为了方便运营同学使用,同时引入了两个新的业务后台,即推送运营后台和人群圈选后台:
- 在人群圈选后台,可以圈选出想要推送的目标人群,供各业务方使用(此处推送系统作为业务接入方);
- 在推送运营后台,可以创建文案库,创建推送任务,测试推送等等。
对于推送系统,我们主要做了如下几件事情:
- 统一推送消息定义
- 统一厂商回执码
- 实现预处理服务
- 实时结果统计
- 增加报警监控
下文我们将逐一对其进行介绍。
统一推送消息定义SDK
完成消息对特定厂商的转换。我们根据各厂商以及第三方原先的消息格式,定义了七猫推送消息的格式。使用方只要将消息按我们统一的格式填充完成,对于切换厂商,只需要修改少数几个字段即可。对于原先接入第三方的,可在他们无感知的前提下完成对多厂商的支持。
统一厂商回执码
由于各厂商的回执码都有自己的定义,支持的回执类型也大相径庭。为了方便大数据侧的推送报表的生成,我们统一了七猫的消息回执码,以厂商回执为基准,设置了我们比较关心的回执码和回执情况的映射。
实现预处理服务
想要实现部分厂商的批量推送,或者实现厂商的图片推送,需要先将素材或者消息体上传。为了减少业务方的使用负担,我们提供了预处理服务,目前的使用流程为:推送运营后台创建推送任务,在推送之前后台请求预处理服务,提前上传素材,将素材 ID 填充到消息体的预处理字段部分中,最后由推送系统推送。
实时结果统计
系统运行一段后,当推送出现问题时,我们排查问题的效率仍旧很低,甚至很多时候,推送出现问题我们都感知不到。经过调研之后发现,还有一个很重要的信息我们没有利用起来:调用厂商 API 之后的返回结果,这一直被我们忽略了。其实这个信息正好可以用来监控或者排查推送的问题,当厂商推送 API 返回 200 时,我们认为推送发送成功了(至于终端设备是否能收到,那就是厂商的问题了,我们无法左右),其他情况,则推送到厂商这一流程本身就出现问题,需要我们及时排查。
我们设计了整体的实时统计数据功能,架构如下:
消息经由 Consumer 消费后,调用 Message SDK 将数据发送到厂商,并立即接收厂商的返回结果,通过 Upload SDK(用于日志收集)发送到大数据的实时计算链路,若出现异常,则会触发报警;正常的回执数据则会进入大数据的离线计算链路,用于推送运营后台的推送报表展示。我们还整合了所有厂商可能返回的 HTTP 和业务错误码情况,整合了七猫自己的统计错误码,主要包括以下几类:
- 请求厂商 API 超时
- 推送标识失效或非法
- 请求频控问题
- 消息内容或格式错误
- 厂商推送服务错误
让我们来看下这些到实际中的应用,下面两张图分别是推送运营后台中展示的实时发送量以及成功率趋势图。
增加报警监控
有了推送实时结果的统计,我们可以基于业务需求制定报警规则,在推送结果不符合预期时,将报警信息发送到业务群里面,提醒我们及时排查跟进。报警的群通知格式如下:
完整架构图示
至此,v1.x 的推送系统算是基本成型了,以下是整体的架构图示:
为了防止各个 App 推送之间的相互影响,我们以 App 为粒度,生成特定的 Topic 以及特定的消费者。v1.x 架构部署图如下:
存在的问题
v1.x 的推送系统看起来功能似乎比较完善了,但是仔细思考,当前的架构还是会有如下的问题:
- 限流粒度不足。由于消费者共用一组容器,无法精确调整某一厂商的发送频率;
- 限流器不够灵活。限流器参数目前在代码中硬编码,需要发版才能调整限流;
- 业务相互影响。当同一个推送 Topic 存在大量堆积时,其他推送任务会被阻塞;
- 可扩展性弱。暂时无法提供一些扩展性的功能,比如无法做一些设备 id 转换工作等。
推送系统 v2.x
为了解决 v1.x 架构的痛点,我们重新设计了系统架构,终于来到了 v2.x 架构,这个版本我们优化了消息数据流转流程,并着眼于未来,提高了系统的可扩展性。
消息数据流转优化
针对 v1.x 中存在的问题,我们做了下图所示的消息数据流转流程优化:
总结一下调整的几个要点:
- 简化接入方式
- 扩展 Kafka Topic
- 引入 Broker 服务
- 细化厂商消费者
简化接入方式
我们对外仅保留了使用 SDK 和直接写入 Kafka 两种推送方式:
- 使用 SDK 推送,原先业务方调用 API 或者直接写入 Kafka 的推送方式被整合到使用 SDK,使用方不需要关心底层是如何传递消息的,传入需要推送的消息,即可完成推送。
- 直接写入 Kafka,该种方式仅提供给使用人群包(相对大型的推送任务)进行推送的业务。
扩展 Kafka Topic
为了提高消息吞吐率和可扩展性,我们将 Kafka 分成了业务 Topic 和厂商消费 Topic 两层:
- 业务 Topic 层,为了保证兼容性,我们将 v1.x 架构使用的 Topic,在 v2.x 架构作为业务层的 Kafka,之前的使用方式不变,业务方或者任务方,仍旧将想要推送的消息传达至该 Topic 即可。
- 厂商消费 Topic 层,该层将直接与特定 App 的特定厂商消费者相绑定(为什么我们这么做呢?因为相同的 App 注册在不同厂商时,会有不同的密钥对等信息,厂商也会根据该 App 在各自厂商的订阅数等数据提供给我们不同的 QPS )。
引入 Broker 服务
我们在两层 Kafka Topic 中,借鉴 Kafka 的设计,引入了同名中间层 Broker,它的功能如下:
- 监听与转发:它会负责监听业务层的 Kafka,将收到的消息,根据消息体中的字段标识,找到对应的厂商消费者,将消息写入该 Topic。
- 业务性消息处理:它还会负责处理一些基本的消息预处理(消息清洗)等工作,也可以根据消息中的项目,推送任务信息,从推送运营后台获取推送配置,对消息做一些个性化的处理之后(比如 ID 转换之类)再将消息转发给厂商消费者。
通过引入 Broker 服务,我们可以将业务层的 Kafka 消息快速消费,并将消息精确分发,减少整体业务消息的积压,也增强了系统的扩展能力。
细化厂商消费者
如之前所说,每个厂商每个 App 有一个专属的进程,监听特定 Topic,它们要做的事情非常纯粹,只要负责将消息调用厂商 API,送达至厂商即可。在这个粒度之下,我们可以根据特定厂商 QPS 大小配额,为对应消费者配置最精细的设置限流器以及熔断器等辅助功能。
通过引入厂商消费者的概念,我们可以精细化灵活的控制限流器,减少了业务之间的相互影响。
完整架构图示
宏观看下整个流程,v2.x 整体交互架构图示如下:
在完整的架构图示中,可以看到我们还引入了两个重要组件:分布式限流器和熔断器。
增加分布式限流器
为了提高限流器的灵活性以及控制范围,我们基于单机限流器设计了分布式限流器。使用分布式限流器之后,所有 pod 共享同一个限流器,应用到厂商消费者,就可以达成:无论多少个 pod,整体消息推送频率都会按照设置的 QPS 发送,解决了 v1.x 架构调整单机限流器的时候要考虑 pod 数量的痛点。
与单机限流器的不同在于多 pod 需要共享限流器,因此需要考虑使用统一存储。从性能角度考虑,我们选择 Redis 作为存储。同样我们使用令牌桶限流,核心就是要知道当前剩余令牌是否足够。与单机限流器原理相同,同样可以通过计算时间差,算出令牌数从而代替生产令牌的操作,这样便可以在减少 Redis 压力和减少代码复杂度的前提下,实现分布式限流器。整体的逻辑流图如下:
由于一次限流判断需要对 Redis key 进行多次操作(EXISTS,HMGET,HMSET,EXPIRE)等,因此我们最终使用 Lua 脚本,将分布式限流器的操作在 Redis 中实现原子化。
增加熔断器
由于部分厂商对于每日的推送总量有限制,超过之后便不能再发送推送,会直接返回错误,我们可以接收这个错误,并这种的预期内错误做一道过滤和处理。比如对于某厂商总量超限的问题,当我们发现这个错误之后,便不再请求它来推送,不做无意义的尝试,提升单批实际推送的成功率。为此我们设计了熔断器,并根据业务,支持两类熔断器:
- 数值型熔断器:单位时间内出现某种错误超过阈值之后,熔断;
- 比例型熔断器:单位时间内百分之 x 的请求出现某种错误后,熔断。核心的实现还是基于 Redis,当生成熔断标志 Key 时,表示该厂商消费者已经需要熔断,不再请求厂商 API。
接入到推送系统中,整个推送的流程图如下:
为了减少 Redis IO,我们将主要计算的逻辑放在内存中,只有当时间窗口到了,才会对数据进行聚合计算。
- 对于数值型熔断而言,需要累和才能判断,因此,我们额外借助了临时的 Redis Key,使用 IncrBy 以确保原子性,当累和结果超过阈值,则设置熔断标志 Key。
- 对于比例性熔断而言,由于 Kafka 分区与 pod 数一一对应,宏观来说分配相对均衡,因此只需要考虑一台 pod 内到达比例的阈值,即可直接设置熔断标志。
升级前后数据对比
我们来看一下系统从 v1.x 重构到 v2.x 后,推送耗时、消息积压情况及消息送达率等关键数据指标的提升效果。
推送耗时
由于 v1.x 架构中,都是同一个 Topic,消费者组无法区分开,只有一个总体数据,当时完成推送任务平均需要花费 75m。而在 2.x 架构中,每个 App 的厂商消费者为独立 Topic,因此区分了各自的消费组,观察下来最快的小米只需要 2m 即可完成推送任务,而 QPS 最最严格的 vivo 平均也只要花费 40m 即可完成推送任务,相较以前有明显提升。
消息积压情况
我们可以通过具体消费组消息堆积量的折线图直观的看到这一点,一下是 v1.x 架构下消息积压图示:
上图为某日中午的推送任务,可以看到数据积压最高达到了 200KB 左右,整体积压从 12:15 左右开始,一直到 13:55 左右才消费完成,耗时接近 1h40m。对于用户来说,就很有可能运营打算让用户在 12:30 收到的推送,用户最终在 13:30 收到。这也超出了我们报警设置的阈值,我们规划的是 1h 内消费完成,再慢确实会影响推送效果。而在 v2.x 中使用分布式限流器控制后,Kafka 监控图如下:
查看上线稳定后的推送任务,以 v2.x 之后中午的推送为例,可以看到较之前而言,现在每个厂商都有一个消费组,看上方图例中,可以看到业务方 Topic 的消费组最高积压仅 4.3KB,转发至厂商 Topic,最高的积压也只有 30KB,观察各厂商消费组明细曲线可以发现,消费速度显著提升。
消息送达率
由于 v2.x 主要是架构上的变化,功能上与 v1.x 保持一致。我们对比接入厂商前后成果。下表以及下图显示了接入厂商推送前后的送达率变化:
可以看到各厂商的送达率均有很明显的提升,整体符合预期。
不足与展望
虽然 v2.x 架构的推送系统相对完善,较前代有了很大的改善,但是仍有一些已知的缺陷和功能上的缺失。比如:
- 缺少推送环境标识,无法从消息系统层面避免测试推送消息发送至生产的设备。
- 缺少黑名单过滤,对于不希望收到推送的用户,或者业务上需要屏蔽的用户无法做到精确过滤。
- 不支持 AB 实验,当前推送暂时不支持 AB 实验(比如选择最优推送时间段,最优推送文案等等),需要使用方离线查看报表自行对比。
- 更多的厂商支持,厂商通道仍可以扩展,比如支持荣耀,魅族等厂商,使得推送可以触达更多用户。
其中的黑名单过滤以及支持 AB 实验,对我们来说更偏向业务层面,不便在推送层面的上做支持,为了使得推送效果更好,我们后续又开发了独立的推送服务,用于对接运营后台和算法的推送等业务,在推送服务中我们实现了对 AB 实验以及黑名单过滤的支持,请大家期待我们后续的关于推送服务的文章分享。