先进先出队列¶
先进先出队列是一种非常常见的数据结构,它允许用户有序地将多个元素推入至队列,然后在有需要的时候再以相反的顺序依次从队列中弹出相应的元素。
先进先出队列经常用于实现排队系统或是抢购/秒杀系统,这些系统的一个特点是它们在短时间内接收到的请求数量远远超过它们能够正常处理的请求数量。 因此系统就需要将短时间内接收到的大量请求按顺序存放在队列里面,然后以“先到先处理,先到先服务”的方式有序地处理它们。
举个例子,对于网店应用来说,一件热门的限量打折商品可能会引来大量用户抢购,其购买订单的数量可能会远远超过该网店瞬间能够处理的数量,并且被抢购的商品也随时会售罄。 这时系统就可以考虑不立即执行购买操作,这是先将用户发送的购买请求全部放入先进先出队列里面,然后再按照这些请求入队的先后顺序依次处理它们。 这样既能保证订单系统不会被阻塞,也保证了系统对每个抢购用户都是公平的。
需求描述¶
你想要使用Redis实现先进先出队列,这个队列应该能够提供基本的推入和弹出等操作。
解决方案¶
在Redis中实现先进先出队列最简单的方法就是使用列表:程序只需要向列表的一端推入元素,并从列表的另一端弹出元素,那么列表自然就会形成一个先进先出队列。
举个例子,如果我们使用以下RPUSH
命令,依次向列表FifoQueue
的右端推入"a"
、"b"
、"c"
三个元素:
redis> RPUSH FifoQueue "a"
(integer) 1
redis> RPUSH FifoQueue "b"
(integer) 2
redis> RPUSH FifoQueue "c"
(integer) 3
那么之后只需要使用RPUSH
命令的反操作LPOP
,就可以以先进先出的方式依次从列表的左端弹出"a"
、"b"
、"c"
三个元素:
redis> LPOP FifoQueue
"a"
redis> LPOP FifoQueue
"b"
redis> LPOP FifoQueue
"c"
在有需要的时候,还可以使用LLEN
命令来获知队列目前包含的元素数量:
redis> LLEN FifoQueue
(integer) 0
实现代码¶
代码清单 CODE_FIFO 展示了使用上述原理实现的先进先出队列。
代码清单 CODE_FIFO 先进先出队列 fifo_queue.py
NON_BLOCK = -1
class FifoQueue:
def __init__(self, client, key):
self.client = client
self.key = key
def enqueue(self, *items):
"""
以先进先出顺序将给定的一个或多个元素推入队列。
返回推入操作执行之后,队列当前的长度作为结果。
"""
return self.client.rpush(self.key, *items)
def dequeue(self, timeout=NON_BLOCK):
"""
以先进先出顺序弹出队列中的一个元素。
如果队列为空则返回None。
可选的timeout参数用于启用/关闭阻塞功能,它的值可以是:
1) NON_BLOCK ,默认值,不启用阻塞功能;
2)0 ,一直等待直到有值可弹出为止;
3)N ,大于0的秒数N,表示等待的最长秒数。
"""
if timeout == NON_BLOCK:
return self.client.lpop(self.key)
else:
ret = self.client.blpop(self.key, timeout)
if ret is not None:
return ret[1] # 从(list, item)元组中获取被弹出元素
def length(self):
"""
返回队列的长度。
"""
return self.client.llen(self.key)
这个程序通过RPUSH
命令将元素推入到列表的右端,并通过LPOP
或BLPOP
命令将元素从列表的左端弹出,从而形成先进先出队列。
为了能够灵活地执行弹出操作,dequeue()
方法同时支持阻塞和不阻塞两种形式。对于简单的程序和较短的队列,可以通过多次执行该方法来弹出元素:
item = queue.dequeue() # 弹出一个元素
do_something(item) # 处理元素
item = queue.dequeue() # 弹出另一个元素
do_something(item) # 处理元素
但是对于持续运行的程序或是包含大量元素的队列,则可以通过阻塞模式让程序避免空转、保持高效:
while True:
item = queue.dequeue(0) # 阻塞直到有元素出现
do_something(item) # 处理元素
作为例子,以下代码展示了上述先进先出队列程序的具体用法:
>>> from redis import Redis
>>> from fifo_queue import FifoQueue
>>> client = Redis(decode_responses=True)
>>> queue = FifoQueue(client, "FifoQueue")
>>> queue.enqueue("a") # 元素入队
1
>>> queue.enqueue("b")
2
>>> queue.enqueue("c")
3
>>> queue.dequeue() # 元素出队
'a'
>>> queue.dequeue()
'b'
>>> queue.dequeue()
'c'
>>> queue.dequeue() # 队列已空
>>>
扩展实现:反方向的队列¶
正如代码清单 CODE_REVERSED_FIFO 所示,除了“右端入队、左端出队”的方式之外,先进先出队列还可以通过“左端入队、右端出队”的方式实现。
跟之前FifoQueue
类的定义相比,这个新实现的FifoQueueR
类将原来的RPUSH
替换成了LPUSH
,而LPOP
和BLPOP
则分别替换成了RPOP
和BRPOP
。
代码清单 CODE_REVERSED_FIFO 先进先出队列的另一种实现 fifo_queue_r.py
NON_BLOCK = -1
class FifoQueueR:
def __init__(self, client, key):
self.client = client
self.key = key
def enqueue(self, *items):
"""
以先进先出顺序将给定的一个或多个元素推入队列。
返回推入操作执行之后,队列当前的长度作为结果。
"""
return self.client.lpush(self.key, *items)
def dequeue(self, timeout=NON_BLOCK):
"""
以先进先出顺序弹出队列中的一个元素。
如果队列为空则返回None。
可选的timeout参数用于启用/关闭阻塞功能,它的值可以是:
1) NON_BLOCK ,默认值,不启用阻塞功能;
2)0 ,一直等待直到有值可弹出为止;
3)N ,大于0的秒数N,表示等待的最长秒数。
"""
if timeout == NON_BLOCK:
return self.client.rpop(self.key)
else:
ret = self.client.brpop(self.key, timeout)
if ret is not None:
return ret[1] # 从(list, item)元组中获取被弹出元素
def length(self):
"""
返回队列的长度。
"""
return self.client.llen(self.key)
除了底层实现入队和出队的方向不一样之外,FifoQueueR
和FifoQueue
的行为应该是完全一致的,它们对于相同的输入应该产生相同的输出:
>>> from redis import Redis
>>> from fifo_queue_r import FifoQueueR
>>> client = Redis(decode_responses=True)
>>> queue = FifoQueueR(client, "FifoQueueR")
>>> queue.enqueue("a", "b", "c")
3
>>> queue.dequeue()
'a'
>>> queue.dequeue()
'b'
>>> queue.dequeue()
'c'
重点回顾¶
先进先出队列能够有序地将多个元素推入至队列,并在有需要的时候以相反的顺序弹出元素。
先进先出队列通常用于实现排队系统或是抢购/秒杀系统,它们能够将多个请求按顺序存放在队列里面,然后以“先到先处理,先到先服务”的方式有序地处理它们。
在Redis中实现先进先出队列最简单的方式就是使用列表,程序只需要向列表的一端推入元素,并从列表的另一端弹出元素,那么列表自然就会形成一个先进先出队列。
列表在实现先进先出队列时既可以选择“右端入队、左端出队”的方式,也可以选择“左端入队、右端出队”的方式:前者需要用到
RPUSH
、LPOP
和BLPOP
三个命令,而后者则会用到LPUSH
、RPOP
和BRPOP
三个命令。