消息队列

消息队列是一种非常重要的数据结构,它既可以用于内部组件,也可以用于外部应用:

  • 内部组件在使用消息队列时,通常将其用于各个内部组件之间的信息交换,比如在系统的不同子系统中进行通信交流。

  • 外部应用在使用消息队列时,通常将其用作通信手段,比如直播弹幕、即时聊天软件和即时在线聊天室,这些应用的核心数据结构实际上就是消息队列。

总的来说,消息队列不仅可以用于计算机系统的信息交换,它还可以用于人类用户的信息交换,其用途非常广泛。

在消息队列中,消息的格式往往会随着应用的不同而变化,因此它们通常会以键值对或是JSON等方式存储,然后通过唯一的ID对每条消息进行标识和引用。

需求描述

你想要用Redis实现消息队列,并通过其提供发送消息、接收消息等服务。

解决方案

过去在较旧版本的Redis中,人们往往使用订阅与发布、列表或有序集合等方式实现消息队列,但这些实现基本上都有它们各自的缺点,比如消息安全性无法保证、性能不佳或者功能不足等。对于版本较新的Redis来说,使用流来实现消息队列才是最佳选择。

具体来说,对于每一个消息队列,可以使用一个流作为其底层实现,其中向队列中添加新消息的工作可以通过调用XADD命令来完成,从队列中获取消息的工作可以通过XRANGE命令或XREAD命令来完成,而获取队列长度的工作则可以通过XLEN命令来完成。

比方说,通过执行以下XADD命令,可以向键名为MessageQueue:10086的消息队列推入两条新消息:

redis> XADD MessageQueue:10086 * uid "Jack" msg "Hello!"
"1720785174751-0"
redis> XADD MessageQueue:10086 * uid "Tom" msg "Hi!"
"1720785196452-0"

接着可以使用以下XREAD命令来读取这两条新消息:

redis> XREAD STREAMS MessageQueue:10086 0
1) 1) "MessageQueue:10086"
   2) 1) 1) "1720785174751-0"
         2) 1) "uid"
            2) "Jack"
            3) "msg"
            4) "Hello!"
      2) 1) "1720785196452-0"
         2) 1) "uid"
            2) "Tom"
            3) "msg"
            4) "Hi!"

又或者使用以下XRANGE命令来读取带有指定ID的消息:

redis> XRANGE MessageQueue:10086 1720785174751-0 1720785174751-0
1) 1) "1720785174751-0"
   2) 1) "uid"
      2) "Jack"
      3) "msg"
      4) "Hello!"

最后,还使用以下XLEN命令可以获取整个消息队列包含的消息总数量:

redis> XLEN MessageQueue:10086
(integer) 2

如果有需要的话,还可以在上述命令的基础上进一步扩展,给消息队列加上基于流消费者组实现的各种增强功能。

实现代码

代码清单 CODE_MESSAGE_QUEUE 展示了基于上述原理实现的消息队列程序。


代码清单 CODE_MESSAGE_QUEUE 消息队列程序 message_queue.py

from xread_iterator import StreamIterator, START_OF_STREAM

NON_BLOCK = None
BLOCK_FOREVER = 0

DEFAULT_COUNT = 10

START_OF_MQ = START_OF_STREAM

class MessageQueue:

    def __init__(self, client, key, cursor=START_OF_MQ):
        """
        根据给定的键,创建与之对应的消息队列。
        消息队列的起始访问位置可以通过可选参数cursor指定。
        默认情况下,cursor指向消息队列的最开头。
        """
        self.client = client
        self.key = key
        self._iterator = StreamIterator(self.client, self.key, cursor)

    def send(self, message):
        """
        接受一个键值对形式的消息,并将其放入队列。
        完成之后返回消息在队列中的ID作为结果。
        """
        return self.client.xadd(self.key, message)

    def receive(self, count=DEFAULT_COUNT, timeout=NON_BLOCK):
        """
        根据消息的入队顺序,访问队列中的消息。
        可选的count参数用于指定每次访问最多能够获取多少条消息,默认为10。
        可选的timeout参数用于指定方法在未发现消息时是否阻塞,它的值可以是:
        - NON_BLOCK,不阻塞直接返回,这是默认值;
        - BLOCK_FOREVER,一直阻塞直到有消息可读为止;
        - 一个大于零的整数,用于代表阻塞的最大毫秒数
        """
        return self._iterator.next(count, timeout)

    def get(self, message_id):
        """
        获取指定ID对应的消息。
        """
        ret = self.client.xrange(self.key, message_id, message_id)
        if ret != []:
            return ret[0][1]

    def length(self):
        """
        返回整个消息队列目前包含的消息总数量。
        """
        return self.client.xlen(self.key)

这个程序复用了《流迭代器》一章中使用XREAD命令实现的流迭代器。通过这个迭代器,程序成功地将原本非常复杂的消息接收操作简化为一次针对StreamIterator.next()方法的调用,这也是这个消息队列程序能够保持简练的关键。

作为例子,以下代码展示了这个消息队列程序的具体使用方式:

>>> from redis import Redis
>>> from message_queue import MessageQueue
>>> client = Redis(decode_responses=True)
>>> mq = MessageQueue(client, "MessageQueue:10086")  # 创建消息队列对象
>>> mq.send({"uid": "Jack", "msg": "Hello!"})  # 发送消息
'1720847899374-0'
>>> mq.send({"uid": "Tom", "msg": "Hi!"})
'1720847912318-0'
>>> mq.receive()  # 接收消息
[{'id': '1720847899374-0', 'msg': {'uid': 'Jack', 'msg': 'Hello!'}},
 {'id': '1720847912318-0', 'msg': {'uid': 'Tom', 'msg': 'Hi!'}}]
>>> mq.get("1720847899374-0")  # 获取特定消息
{'uid': 'Jack', 'msg': 'Hello!'}
>>> mq.length()  # 获取消息总数量
2

扩展实现:直播间弹幕系统

为了更好地展示消息队列的功能,代码清单 CODE_CHAT 展示了一个使用消息队列实现的直播弹幕程序。这个程序可以让客户端通过执行send()方法发送普通弹幕或是付费弹幕,还可以通过执行receive()方法接收弹幕,或是直接通过执行show()方法不断地接收并打印弹幕。


代码清单 CODE_CHAT 直播弹幕程序 chat.py

from message_queue import MessageQueue, DEFAULT_COUNT, BLOCK_FOREVER

def make_chat_key(broadcaster):
    """
    根据给定的主播ID,构建出相应的消息队列键,用于存储直播间发出的弹幕。
    例子:"Chat:Peter"、"Chat:10086"等。
    """
    return "Chat:{}".format(broadcaster)

class Chat:

    def __init__(self, client, broadcaster):
        """
        根据给定的主播ID,为其构建存储直播间弹幕的消息队列。
        """
        self.client = client
        self.broadcaster = broadcaster
        self.key = make_chat_key(broadcaster)
        # 因为消息队列默认从队列开头开始返回元素,所以这里需要
        # 传入特殊值"$"作为ID,让队列只返回阻塞之后出现的新消息
        self.mq = MessageQueue(self.client, self.key, "$")

    def send(self, uid, content, donate=0):
        """
        根据给定的用户ID和内容,向直播间发送一条弹幕。
        如果donate参数的值不为0,那么说明这是一条付费弹幕(super chat)。
        """
        if donate == 0:
            msg = {"uid": uid, "content": content}
        else:
            msg = {"uid": uid, "content": content, "donate": donate}
        # 将弹幕发送至消息队列中
        return self.mq.send(msg)

    def receive(self, count=DEFAULT_COUNT):
        """
        从直播间接收并返回最多count条弹幕,默认为10条。
        如果上次调用之后还有未接收的弹幕存在,那么先接收已有的弹幕,再接收新出现的弹幕。
        """
        # 尝试从消息队列中取出弹幕
        # 若队列为空就一直阻塞直到有弹幕可弹出为止
        return self.mq.receive(count, BLOCK_FOREVER)

    def show(self):
        """
        简易的弹幕打印器。
        """
        while True:
            # 一直接收弹幕
            for item in self.receive():
                # 丢弃消息ID,只获取消息本身
                msg = item["msg"]
                if "donate" not in msg:
                    # 打印普通弹幕
                    print("{0}: {1}".format(msg["uid"], msg["content"]))
                else:
                    # 打印付费弹幕
                    print("+"*10 + "SUPER CHAT" + "+"*10)
                    print("{0}: {1}".format(msg["uid"], msg["content"]))
                    print("Donate: {}".format(msg["donate"]))
                    print("-"*10 + "SUPER CHAT" + "-"*10)

实现这个弹幕程序的关键是要做到以下两点:

  1. 在调用Chat.__init__()方法创建Chat对象的时候,向底层的消息队列传入特殊ID $,从而表示只接收最新出现的消息/弹幕。

  2. 在调用Chat.receive()方法接收弹幕的时候,将底层消息队列的接收模式设置为BLOCK_FOREVER

这样一来,Chat.receive()方法在被调用的时候,就会一直阻塞直到有可供接收的新弹幕出现为止。这确保了用户只会看到最新发送的弹幕,而之前已经存在于消息队列的旧弹幕则会自动被忽略。

作为例子,以下代码展示了使用弹幕程序接收并打印弹幕的具体方法:

>>> from redis import Redis
>>> from chat import Chat
>>> client = Redis(decode_responses=True)
>>> chat = Chat(client, "Peter")
>>> chat.show()
Jack: Hello, Peter!
Tom: Hello, Peter and Jack!
++++++++++SUPER CHAT++++++++++
Mary: Peter, thanks for your help yesterday!
Donate: 50
----------SUPER CHAT----------

可以看到,在这个属于Peter的直播间里面,整个过程共出现了三条弹幕:其中两条是分别来自Jack和Tom的普通弹幕,而最后一条则是来自Mary的价值50元的付费弹幕。

以下是与上述代码对应的,负责发送弹幕的代码:

>>> from redis import Redis
>>> from chat import Chat
>>> client = Redis(decode_responses=True)
>>> chat = Chat(client, "Peter")
>>> chat.send("Jack", "Hello, Peter!")
'1720532188953-0'
>>> chat.send("Tom", "Hello, Peter and Jack!")
'1720532204379-0'
>>> chat.send("Mary", "Peter, thanks for your help yesterday!", 50)
'1720532284213-0'

为了让这个弹幕程序保持简单,send()方法只接收用户ID、消息正文和付费量三个参数。在实现真实的弹幕程序时,你可以根据自己的需求随时扩展这个方法的参数数量,又或者直接将所有相关参数都放进一个字典里,这样就可以在一条弹幕里面包含更多信息了。

最后,除了可以用于实现直播间弹幕系统之外,这个程序同样可以用于实现在线即时聊天室,两者的原理是完全一样的。

重点回顾

  • 消息队列不仅可以用于计算机系统的信息交换,它还可以用于人类用户的信息交换,其用途非常广泛。

  • 过去在较旧版本的Redis中,人们往往使用订阅与发布、列表或有序集合等方式实现消息队列,但这些实现基本上都有它们各自的缺点,比如消息安全性无法保证、性能不佳或者功能不足等。对于版本较新的Redis来说,使用流来实现消息队列才是最佳选择。

  • 对于每一个消息队列,可以使用一个流作为其底层实现,其中向队列中添加新消息的工作可以通过调用XADD命令来完成,从队列中获取消息的工作可以通过XRANGE命令或XREAD命令来完成,而获取队列长度的工作则可以通过XLEN命令来完成。

  • 消息队列除了可以用于实现直播间弹幕系统之外,同样可以用于实现在线聊天室,两者的原理是完全一样的。