消息队列¶
消息队列是一种非常重要的数据结构,它既可以用于内部组件,也可以用于外部应用:
内部组件在使用消息队列时,通常将其用于各个内部组件之间的信息交换,比如在系统的不同子系统中进行通信交流。
外部应用在使用消息队列时,通常将其用作通信手段,比如直播弹幕、即时聊天软件和即时在线聊天室,这些应用的核心数据结构实际上就是消息队列。
总的来说,消息队列不仅可以用于计算机系统的信息交换,它还可以用于人类用户的信息交换,其用途非常广泛。
在消息队列中,消息的格式往往会随着应用的不同而变化,因此它们通常会以键值对或是JSON等方式存储,然后通过唯一的ID对每条消息进行标识和引用。
需求描述¶
你想要用Redis实现消息队列,并通过其提供发送消息、接收消息等服务。
解决方案¶
过去在较旧版本的Redis中,人们往往使用订阅与发布、列表或有序集合等方式实现消息队列,但这些实现基本上都有它们各自的缺点,比如消息安全性无法保证、性能不佳或者功能不足等。对于版本较新的Redis来说,使用流来实现消息队列才是最佳选择。
具体来说,对于每一个消息队列,可以使用一个流作为其底层实现,其中向队列中添加新消息的工作可以通过调用XADD
命令来完成,从队列中获取消息的工作可以通过XRANGE
命令或XREAD
命令来完成,而获取队列长度的工作则可以通过XLEN
命令来完成。
比方说,通过执行以下XADD
命令,可以向键名为MessageQueue:10086
的消息队列推入两条新消息:
redis> XADD MessageQueue:10086 * uid "Jack" msg "Hello!"
"1720785174751-0"
redis> XADD MessageQueue:10086 * uid "Tom" msg "Hi!"
"1720785196452-0"
接着可以使用以下XREAD
命令来读取这两条新消息:
redis> XREAD STREAMS MessageQueue:10086 0
1) 1) "MessageQueue:10086"
2) 1) 1) "1720785174751-0"
2) 1) "uid"
2) "Jack"
3) "msg"
4) "Hello!"
2) 1) "1720785196452-0"
2) 1) "uid"
2) "Tom"
3) "msg"
4) "Hi!"
又或者使用以下XRANGE
命令来读取带有指定ID的消息:
redis> XRANGE MessageQueue:10086 1720785174751-0 1720785174751-0
1) 1) "1720785174751-0"
2) 1) "uid"
2) "Jack"
3) "msg"
4) "Hello!"
最后,还使用以下XLEN
命令可以获取整个消息队列包含的消息总数量:
redis> XLEN MessageQueue:10086
(integer) 2
如果有需要的话,还可以在上述命令的基础上进一步扩展,给消息队列加上基于流消费者组实现的各种增强功能。
实现代码¶
代码清单 CODE_MESSAGE_QUEUE 展示了基于上述原理实现的消息队列程序。
代码清单 CODE_MESSAGE_QUEUE 消息队列程序 message_queue.py
from xread_iterator import StreamIterator, START_OF_STREAM
NON_BLOCK = None
BLOCK_FOREVER = 0
DEFAULT_COUNT = 10
START_OF_MQ = START_OF_STREAM
class MessageQueue:
def __init__(self, client, key, cursor=START_OF_MQ):
"""
根据给定的键,创建与之对应的消息队列。
消息队列的起始访问位置可以通过可选参数cursor指定。
默认情况下,cursor指向消息队列的最开头。
"""
self.client = client
self.key = key
self._iterator = StreamIterator(self.client, self.key, cursor)
def send(self, message):
"""
接受一个键值对形式的消息,并将其放入队列。
完成之后返回消息在队列中的ID作为结果。
"""
return self.client.xadd(self.key, message)
def receive(self, count=DEFAULT_COUNT, timeout=NON_BLOCK):
"""
根据消息的入队顺序,访问队列中的消息。
可选的count参数用于指定每次访问最多能够获取多少条消息,默认为10。
可选的timeout参数用于指定方法在未发现消息时是否阻塞,它的值可以是:
- NON_BLOCK,不阻塞直接返回,这是默认值;
- BLOCK_FOREVER,一直阻塞直到有消息可读为止;
- 一个大于零的整数,用于代表阻塞的最大毫秒数
"""
return self._iterator.next(count, timeout)
def get(self, message_id):
"""
获取指定ID对应的消息。
"""
ret = self.client.xrange(self.key, message_id, message_id)
if ret != []:
return ret[0][1]
def length(self):
"""
返回整个消息队列目前包含的消息总数量。
"""
return self.client.xlen(self.key)
这个程序复用了《流迭代器》一章中使用XREAD
命令实现的流迭代器。通过这个迭代器,程序成功地将原本非常复杂的消息接收操作简化为一次针对StreamIterator.next()
方法的调用,这也是这个消息队列程序能够保持简练的关键。
作为例子,以下代码展示了这个消息队列程序的具体使用方式:
>>> from redis import Redis
>>> from message_queue import MessageQueue
>>> client = Redis(decode_responses=True)
>>> mq = MessageQueue(client, "MessageQueue:10086") # 创建消息队列对象
>>> mq.send({"uid": "Jack", "msg": "Hello!"}) # 发送消息
'1720847899374-0'
>>> mq.send({"uid": "Tom", "msg": "Hi!"})
'1720847912318-0'
>>> mq.receive() # 接收消息
[{'id': '1720847899374-0', 'msg': {'uid': 'Jack', 'msg': 'Hello!'}},
{'id': '1720847912318-0', 'msg': {'uid': 'Tom', 'msg': 'Hi!'}}]
>>> mq.get("1720847899374-0") # 获取特定消息
{'uid': 'Jack', 'msg': 'Hello!'}
>>> mq.length() # 获取消息总数量
2
扩展实现:直播间弹幕系统¶
为了更好地展示消息队列的功能,代码清单 CODE_CHAT 展示了一个使用消息队列实现的直播弹幕程序。这个程序可以让客户端通过执行send()
方法发送普通弹幕或是付费弹幕,还可以通过执行receive()
方法接收弹幕,或是直接通过执行show()
方法不断地接收并打印弹幕。
代码清单 CODE_CHAT 直播弹幕程序 chat.py
from message_queue import MessageQueue, DEFAULT_COUNT, BLOCK_FOREVER
def make_chat_key(broadcaster):
"""
根据给定的主播ID,构建出相应的消息队列键,用于存储直播间发出的弹幕。
例子:"Chat:Peter"、"Chat:10086"等。
"""
return "Chat:{}".format(broadcaster)
class Chat:
def __init__(self, client, broadcaster):
"""
根据给定的主播ID,为其构建存储直播间弹幕的消息队列。
"""
self.client = client
self.broadcaster = broadcaster
self.key = make_chat_key(broadcaster)
# 因为消息队列默认从队列开头开始返回元素,所以这里需要
# 传入特殊值"$"作为ID,让队列只返回阻塞之后出现的新消息
self.mq = MessageQueue(self.client, self.key, "$")
def send(self, uid, content, donate=0):
"""
根据给定的用户ID和内容,向直播间发送一条弹幕。
如果donate参数的值不为0,那么说明这是一条付费弹幕(super chat)。
"""
if donate == 0:
msg = {"uid": uid, "content": content}
else:
msg = {"uid": uid, "content": content, "donate": donate}
# 将弹幕发送至消息队列中
return self.mq.send(msg)
def receive(self, count=DEFAULT_COUNT):
"""
从直播间接收并返回最多count条弹幕,默认为10条。
如果上次调用之后还有未接收的弹幕存在,那么先接收已有的弹幕,再接收新出现的弹幕。
"""
# 尝试从消息队列中取出弹幕
# 若队列为空就一直阻塞直到有弹幕可弹出为止
return self.mq.receive(count, BLOCK_FOREVER)
def show(self):
"""
简易的弹幕打印器。
"""
while True:
# 一直接收弹幕
for item in self.receive():
# 丢弃消息ID,只获取消息本身
msg = item["msg"]
if "donate" not in msg:
# 打印普通弹幕
print("{0}: {1}".format(msg["uid"], msg["content"]))
else:
# 打印付费弹幕
print("+"*10 + "SUPER CHAT" + "+"*10)
print("{0}: {1}".format(msg["uid"], msg["content"]))
print("Donate: {}".format(msg["donate"]))
print("-"*10 + "SUPER CHAT" + "-"*10)
实现这个弹幕程序的关键是要做到以下两点:
在调用
Chat.__init__()
方法创建Chat
对象的时候,向底层的消息队列传入特殊ID$
,从而表示只接收最新出现的消息/弹幕。在调用
Chat.receive()
方法接收弹幕的时候,将底层消息队列的接收模式设置为BLOCK_FOREVER
。
这样一来,Chat.receive()
方法在被调用的时候,就会一直阻塞直到有可供接收的新弹幕出现为止。这确保了用户只会看到最新发送的弹幕,而之前已经存在于消息队列的旧弹幕则会自动被忽略。
作为例子,以下代码展示了使用弹幕程序接收并打印弹幕的具体方法:
>>> from redis import Redis
>>> from chat import Chat
>>> client = Redis(decode_responses=True)
>>> chat = Chat(client, "Peter")
>>> chat.show()
Jack: Hello, Peter!
Tom: Hello, Peter and Jack!
++++++++++SUPER CHAT++++++++++
Mary: Peter, thanks for your help yesterday!
Donate: 50
----------SUPER CHAT----------
可以看到,在这个属于Peter的直播间里面,整个过程共出现了三条弹幕:其中两条是分别来自Jack和Tom的普通弹幕,而最后一条则是来自Mary的价值50元的付费弹幕。
以下是与上述代码对应的,负责发送弹幕的代码:
>>> from redis import Redis
>>> from chat import Chat
>>> client = Redis(decode_responses=True)
>>> chat = Chat(client, "Peter")
>>> chat.send("Jack", "Hello, Peter!")
'1720532188953-0'
>>> chat.send("Tom", "Hello, Peter and Jack!")
'1720532204379-0'
>>> chat.send("Mary", "Peter, thanks for your help yesterday!", 50)
'1720532284213-0'
为了让这个弹幕程序保持简单,send()
方法只接收用户ID、消息正文和付费量三个参数。在实现真实的弹幕程序时,你可以根据自己的需求随时扩展这个方法的参数数量,又或者直接将所有相关参数都放进一个字典里,这样就可以在一条弹幕里面包含更多信息了。
最后,除了可以用于实现直播间弹幕系统之外,这个程序同样可以用于实现在线即时聊天室,两者的原理是完全一样的。
重点回顾¶
消息队列不仅可以用于计算机系统的信息交换,它还可以用于人类用户的信息交换,其用途非常广泛。
过去在较旧版本的Redis中,人们往往使用订阅与发布、列表或有序集合等方式实现消息队列,但这些实现基本上都有它们各自的缺点,比如消息安全性无法保证、性能不佳或者功能不足等。对于版本较新的Redis来说,使用流来实现消息队列才是最佳选择。
对于每一个消息队列,可以使用一个流作为其底层实现,其中向队列中添加新消息的工作可以通过调用
XADD
命令来完成,从队列中获取消息的工作可以通过XRANGE
命令或XREAD
命令来完成,而获取队列长度的工作则可以通过XLEN
命令来完成。消息队列除了可以用于实现直播间弹幕系统之外,同样可以用于实现在线聊天室,两者的原理是完全一样的。