优先队列

优先队列是一种常见的数据结构,这种队列中的每个元素都有一个优先级,而优先级则决定了元素在队列中的出队顺序。

优先队列又可以分为最小优先队列和最大优先队列,前者最先弹出优先级最低的元素,而后者则最先弹出优先级最高的元素。

优先队列经常被用作基础数据结构,又或者用于实现调度系统和事件模拟器:比如调度系统通常就使用优先队列来优先处理高优先级的任务,而事件模拟器则使用优先队列来优先处理最近到达的事件。

需求描述

你想要使用Redis实现优先队列,并通过它提供推入元素、弹出最高优先级元素和弹出最低优先级元素等操作。

解决方案

在Redis中实现优先队列最直接的方法就是使用有序集合——通过将优先队列元素映射为有序集合成员,并将元素的优先级映射为有序集合成员的分值,程序就可以通过操纵有序集合来操纵优先队列:

  • 通过使用ZADD命令,程序可以把指定的优先队列元素储存在有序集合里面,让它成为一个有序集合成员。

  • 当用户想要移除指定的优先队列元素时,程序只需要使用ZREM命令移除其对应的有序集合成员即可。

  • 在把优先队列元素有序地储存在有序集合里面之后,程序可以使用相应的ZPOPMIN命令弹出优先级最低的元素,又或者使用ZPOPMAX命令弹出优先级最高的元素。

  • 此外,程序还可以使用ZRANGE命令获取具有指定优先级排名的元素,比如优先级最低的元素和优先级最高的元素,它们分别位于有序集合排名的第一位和最后一位。


表 TABLE_JOB_PRIORITY 一系列任务以及它们的优先级

排名

任务

优先级

1

"Job A"

100

2

"Job C"

200

3

"Job B"

250

4

"Job E"

280

5

"Job D"

330


以表 TABLE_JOB_PRIORITY 所示的任务为例,可以通过执行以下ZADD命令来构建与之对应的优先队列:

redis> ZADD Jobs 100 "Job A" 200 "Job C" 250 "Job B" 280 "Job E" 330 "Job D"
(integer) 5

然后通过ZRANGE命令获取这个优先队列(或者它的某个元素、某个部分等):

redis> ZRANGE Jobs 0 -1 WITHSCORES
 1) "Job A"
 2) "100"
 3) "Job C"
 4) "200"
 5) "Job B"
 6) "250"
 7) "Job E"
 8) "280"
 9) "Job D"
10) "330"

又或者通过ZPOPMIN命令弹出队列中优先级最低的元素"Job A",或是通过ZPOPMAX命令弹出队列中优先级最高的元素"Job D"

redis> ZPOPMIN Jobs
1) "Job A"
2) "100"
redis> ZPOPMAX Jobs
1) "Job D"
2) "330"

代码实现

代码清单 CODE_PRIORITY_QUEUE 展示了基于上一节所述原理实现的优先队列程序。


代码清单 CODE_PRIORITY_QUEUE 优先队列 priority_queue.py

def none_or_single_queue_item(result):
    """
    以{item: priority}形式返回result列表中包含的单个优先队列元素。
    若result为空列表则直接返回None。
    """
    if result == []:
        return None
    else:
        item, priority = result[0]
        return {item: priority}

class PriorityQueue:

    def __init__(self, client, key):
        self.client = client
        self.key = key

    def insert(self, item, priority):
        """
        将带有指定优先级的元素添加至队列,如果元素已存在那么更新它的优先级。
        """
        self.client.zadd(self.key, {item: priority})

    def remove(self, item):
        """
        尝试从队列中移除指定的元素。
        移除成功时返回True,返回False则表示由于元素不存在而导致移除失败。
        """
        return self.client.zrem(self.key, item) == 1

    def min(self):
        """
        获取队列中优先级最低的元素及其优先级,如果队列为空则返回None。
        """
        result = self.client.zrange(self.key, 0, 0, withscores=True)
        return none_or_single_queue_item(result)

    def max(self):
        """
        获取队列中优先级最高的元素及其优先级,如果队列为空则返回None。
        """
        result = self.client.zrange(self.key, -1, -1, withscores=True)
        return none_or_single_queue_item(result)

    def pop_min(self):
        """
        弹出并返回队列中优先级最低的元素及其优先级,如果队列为空则返回None。
        """
        result = self.client.zpopmin(self.key)
        return none_or_single_queue_item(result)

    def pop_max(self):
        """
        弹出并返回队列中优先级最高的元素及其优先级,如果队列为空则返回None。
        """
        result = self.client.zpopmax(self.key)
        return none_or_single_queue_item(result)

    def length(self):
        """
        返回队列的长度,也即是队列包含的元素数量。
        """
        return self.client.zcard(self.key)

这个优先队列程序提供了实现最大优先队列和最小优先队列所需的操作:

  • 用户既可以使用min()方法和max()方法查看优先级最低和最高的元素,也可以在有需要时使用pop_min()方法和pop_max()方法弹出它们。

  • 一个程序如果只使用max()pop_max(),那么它就是一个最大优先队列;反之,程序如果只使用min()pop_min(),那么它就是一个最小优先队列。

另外需要说明的一点是,因为用户在使用优先队列的时候通常只关心位于队列两端具有最高和最低优先级的两个元素,所以这个程序只实现了访问队列两端元素的方法,而不是能够访问队列中任意元素的方法,但如果有需要的话,也可以随时通过ZRANGE命令来实现它。

作为例子,以下代码展示了上述优先队列程序的具体使用方式:

>>> from redis import Redis
>>> from priority_queue import PriorityQueue
>>> client = Redis(decode_responses=True)
>>> queue = PriorityQueue(client, "PriorityQueue")
>>> queue.insert("Job A", 100)  # 元素入队
>>> queue.insert("Job C", 200)
>>> queue.insert("Job B", 250)
>>> queue.insert("Job E", 280)
>>> queue.insert("Job D", 330)
>>> queue.length()  # 查看队列长度
5
>>> queue.max()  # 查看优先级最高的元素
{'Job D': 330.0}
>>> queue.min()  # 查看优先级最低的元素
{'Job A': 100.0}
>>> queue.pop_max()  # 弹出优先级最高的元素
{'Job D': 330.0}
>>> queue.pop_min()  # 弹出优先级最低的元素
{'Job A': 100.0}

扩展实现:为优先队列加上阻塞操作

代码清单 CODE_PRIORITY_QUEUE 展示的优先队列实现作为底层数据结构使用已经足够,但如果要实际地将其应用到调度系统或事件模拟器中,那么为了避免程序重复调用pop_max()pop_min()造成系统空转,我们还必须实现相应的阻塞弹出操作——这样程序就可以在循环里面高效地通过阻塞操作来等待元素出现,然后只在有元素需要处理的情况下进行响应:

while True:
    job = queue.blocking_pop_max()  # 阻塞直到元素出现
    do_something(job)

而不是写出以下这种蹩脚又低效的代码:

while True:
    job = queue.pop_max()
    if job is None:
        sleep(SOMETIME)  # 队列为空,休眠一段时间
    else:
        do_something(job)

代码清单 CODE_BLOCKING_PRIORITY_QUEUE 展示了优先队列程序的阻塞操作实现,它在之前实现的基础上添加了使用BZPOPMIN命令实现的blocking_pop_min()方法以及使用BZPOPMAX命令实现的blocking_pop_max()方法。


代码清单 CODE_BLOCKING_PRIORITY_QUEUE 优先队列的阻塞操作实现 priority_queue.py

BLOCK_FOREVER = 0

class PriorityQueue:

    def blocking_pop_min(self, timeout=BLOCK_FOREVER):
        """
        尝试从队列中弹出优先级最低的元素及其优先级,若队列为空则阻塞。
        可选参数timeout用于指定最大阻塞秒数,默认将一直阻塞到有元素可弹出为止。
        """
        result = self.client.bzpopmin(self.key, timeout)
        if result is not None:
            zset_name, item, priority = result
            return {item: priority}

    def blocking_pop_max(self, timeout=BLOCK_FOREVER):
        """
        尝试从队列中弹出优先级最高的元素及其优先级,若队列为空则阻塞。
        可选参数timeout用于指定最大阻塞秒数,默认将一直阻塞到有元素可弹出为止。
        """
        result = self.client.bzpopmax(self.key, timeout)
        if result is not None:
            zset_name, item, priority = result
            return {item: priority}

作为例子,以下代码展示了上述阻塞操作的具体使用方法:

>>> from redis import Redis
>>> from priority_queue import PriorityQueue
>>> client = Redis(decode_responses=True)
>>> queue2 = PriorityQueue(client, "PriorityQueue2")
>>> queue2.insert("Job F", 400)  # 入队一个元素
True
>>> queue2.blocking_pop_max(5)  # 队列非空,立即返回队中元素
{'Job F': 400.0}
>>> queue2.blocking_pop_max(5)  # 队列为空,阻塞5秒钟后返回None
>>>

重点回顾

  • 优先队列中的每个元素都有一个优先级,优先级决定了元素在队列中的出队顺序。

  • 优先队列又可以分为最小优先队列和最大优先队列,前者最先弹出优先级最低的元素,而后者则最先弹出优先级最高的元素。

  • 在Redis中实现优先队列最直接的方法就是使用有序集合——通过将优先队列元素映射为有序集合成员,并将元素的优先级映射为有序集合成员的分值,程序就可以通过操纵有序集合来操纵优先队列。

  • 使用有序集合的阻塞弹出命令可以实现相应的阻塞式优先队列弹出操作,这些操作可以让程序只在有元素可供处理的情况下进行响应,从而保证程序尽可能高效地运行。