为用户聚合来自不同社交平台的消息流


本篇示例教程的主题是聚合来自不同社交媒体的消息并将它们导入单一的客户支持平台。

每个集成的消息平台通过调用 Webhook 给我们发送消息,如果为每条接收到的消息分发一个队列任务,就不能保证最终处理后的消息顺序与接收时一致(处理耗时、处理时是否出现问题等多重原因所致)。

要保证消息投递顺序,只能在一条消息发送成功后发送另一条消息。

为了简化演示,我们只从 Telegram 接收消息然后将其发送到 Intercom。

二级队列

要实现一条消息在另一条消息发送成功后发送,需要创建一个二级队列。这个队列基于数据表驱动,其中存储着所有接收到的消息,我们可以通过用户对其进行分组,也可以通过时间对其进行排序:

要在队列中查找特定用户的消息,可以这么做:

分发消息任务

前面我们已经介绍过可以通过 Laravel 提供的 API 方法分发任务链,其中的任务会一个接一个执行,显然,这也适合我们这个场景:

不过我们需要找到一种方式在 messages 表中为每个用户查询消息,然后将其推送到该用户对应的任务链进行处理。要实现这个功能,我们可以在用户账户激活时推送一个 Messenger 任务,该任务会监控 messages 表,并在其控制下推送任务到任务链:

这个任务类的 handle 方法实现代码如下所示:

待到将所有消息推送到任务链后,会将 Messenger 本身推送到队列。以便所有消息处理完成后,可以继续通过 Messenger 处理消息推送。

如果暂时没有消息,则延迟 5 秒后继续执行这个任务。

为了让 Messenger 可以无限循环执行,需要将其 tries 属性值设置为 0,否则的话,会在重试若干次后失败,退出该任务的执行。

整个流程如下所示:

每个用户都有自己的 Messenger 任务处理消息接收与发送。

发送消息

ProcessMessage 任务类中,handle() 方法如下所示:

这个任务必须是容错的,否则如果任务被标记为失败,则下一个任务不会被执行,用户就不能接收到消息了,为此,我们需要让任务支持无限重试并自己处理失败:

tries 属性值设置为 0 后,Laravel 永远也不会将队列任务标记为失败,与此同时,我们会统计任务运行次数,如果超过 5 次,则报告错误,然后跳过这个任务运行下一个任务。

控制有效载荷大小

Laravel 会将任务链中所有任务的载荷数据存储到任务链中第一个任务的载荷中,这意味着如果任务链中的任务过多,第一个任务会有着非常巨大的载荷数据,这对于我们所依赖的队列驱动和系统资源将是一个隐患。

要解决这个问题,需要限制每次从 messages 数据表中获取的结果集大小:

撤销用户账户

现在我们已经有了一个一直为每个用户处理消息收发的 Messager 队列任务,如果我们想要撤销指定用户账户,需要通过某种方式从队列中移除这个任务。

要实现这个功能,可以在用户模型上设置一个标记 is_active = true,然后在每次运行 Messager 任务时检查这个标记:

如果要撤销某个用户,将该标记值设置为 false,然后将这个任务标记为运行失败,终止任务链的继续执行。

如果用户账户再次被激活,则会分发一个新的 Messager 任务继续开始周而复始地为该用户执行消息收发工作。


Vote Vote Cancel Collect Collect Cancel

<< 上一篇: 基于任务链和批处理生成复杂报告

>> 下一篇: 通过事件监听器异步发放优惠券