延迟队列
如果对消息可靠性没有极高要求时可以使用Redis
List 数据结构通常作为异步消息队列使用,rpush/lpush
,lpop/rpop
阻塞读
blpop
/brpop
阻塞读在没有数据时会进入休眠状态,数据到来时会立刻醒过来,几乎没有延迟
空闲连接问题
长时间阻塞会变为闲置连接,太久服务器会主动断开连接,blpop
/brpop
会抛出异常
解决方法是捕获异常时重试
锁冲突
加锁失败
- 直接抛出和异常给用户让用户自己重试,也可以前端重试
- 等待一会儿重试(不建议),如果是死锁会导致线程堵死
- 将请求转移至延迟队列,以避开冲突
延迟队列
延迟队列可以使用有序列表(ZSet)来实现,到期处理时间是score
,使用多线程轮询的方式对到期的任务进行处理
# 代码示例(执行无效) def loop(): while True: values = r.zrangebyscore('delay-queue', 0, time.time(), start=0, num=1) if not values: time.sleep(1) continue value = value[0] # 移除 result = r.zrem('delay-queue', value) # 成功则为 True 代表当前线程抢到了执行权 if result: msg = json.loads(value) handle_msg(msg)
上面的代码可以用 lua scripting
进行优化,把 zrangebyscore
和 zrem
一起放到服务器端进行原子化操作,可以减少一些浪费
位图
用来存储 bool
类型的数据,例如用户的签到记录,签了是 1 没签是 0,需要记录 365 天
位图不是特殊的数据结构,其实就是一个 byte
数组,使用 get
/set
或者 getbit
/setbit
操作都可以
bin(ord('h')) # 获取字符的 ASCII 的二进制值
结果:'0b1101000'
bin(ord('e'))
结果:'0b1100101'
# 以二进制值的方式存入两个字符 r.setbit('bit', 1, 1) r.setbit('bit', 2, 1) r.setbit('bit', 4, 1) r.setbit('bit', 9, 1) r.setbit('bit', 10, 1) r.setbit('bit', 13, 1) r.setbit('bit', 15, 1)
结果:0
r.get('bit') # 取出(如果是不可打印字符,则会显示该字符的16进制形式)
结果:'he'
r.bitcount('bit') # 统计个数
结果:7
r.bitcount('bit', 0, 0) # 第一个字符中 1 的个数
结果:3
r.bitcount('bit', 0, 2) # 前三个字符中 1 的个数(其实这里只有两个字符)
结果:7
r.bitpos('bit', 0) # 第一个 0 的位置
结果:0
r.bitpos('bit', 1) # 第一个 1 的位置
结果:1
r.bitpos('bit', 1, 2, 2) # 第三个字符里第一个 1 的位置
结果:-1
r.bitpos('bit', 1, 1, 1) # 第二个字符里第一个 1 的位置
结果:9
r.set('bit', 'hello')
结果:True
r.delete('bit')
结果:1
魔术指令 bitfield (python 的这个包好像暂时不支持,略)
HyperLogLog
HyperLogLog
主要提供不精确的去重计数,标准误差是 0.81%
r.pfadd('HyperLogLogCounter', 'user_id_1') # 添加
结果:1
r.pfcount('HyperLogLogCounter') # 计数
结果:1
r.pfadd('HyperLogLogCounter2', 'user_id_2') # 添加
结果:1
r.pfmerge('HyperLogLogCounter', 'HyperLogLogCounter2') # 合并
结果:True
r.pfcount('HyperLogLogCounter') # 计数
结果:2
r.delete('HyperLogLogCounter') r.delete('HyperLogLogCounter2')
结果:1
# 尝试打印出第一个出现误差的时间点 for i in range(10000): r.pfadd('HyperLogLogCounter', "user_id_%d" % i) total = r.pfcount('HyperLogLogCounter') if total != i + 1: print(str(total) + " " + str(i + 1)) break
结果:129 128
r.delete('HyperLogLogCounter')
结果:1
这个网站演示了 HyperLogLog 是如何执行的
http://content.research.neustar.biz/blog/hll.html
内存占用
Redis 使用了 16384(2¹⁴) 个桶,每个桶的 maxbits 需要6个bit来存储,(2¹⁴) x 6 / 8 = 12 KB (除以 8 是为了转换成 KB)
布隆过滤器
估算是否存在
bf.add
之前可以用 bf.reserve
指令显式创建,支持三个参数 key
(可以在其中找到过滤器的键)、error_rate
(错误率,默认0.01,越接近0消耗资源越大)、initial_size
(打算添加到过滤器中的条目数,默认100)
相关文档:https://oss.redislabs.com/redisbloom/Bloom_Commands/
r.execute_command('bf.add', 'bloom', 'mark')
结果: ------------------------------------------- ResponseErrorTraceback (most recent call last) <ipython-input-87-f39b11b2b1b1> in <module> ----> 1 r.execute_command('bf.add', 'bloom', 'mark') ~/.local/lib/python3.8/site-packages/redis/client.py in execute_command(self, *args, **options) 899 try: 900 conn.send_command(*args) --> 901 return self.parse_response(conn, command_name, **options) 902 except (ConnectionError, TimeoutError) as e: 903 conn.disconnect() ~/.local/lib/python3.8/site-packages/redis/client.py in parse_response(self, connection, command_name, **options) 913 "Parses a response from the Redis server" 914 try: --> 915 response = connection.read_response() 916 except ResponseError: 917 if EMPTY_RESPONSE in options: ~/.local/lib/python3.8/site-packages/redis/connection.py in read_response(self) 754 755 if isinstance(response, ResponseError): --> 756 raise response 757 return response 758 ResponseError: unknown command `bf.add`, with args beginning with: `bloom`, `mark`,
# r.execute_command('BF.MADD', 'bloom', *[i for i in range(int(1e5))]) # r.execute_command('BF.MEXISTS', 'bloom', *[1, 27, 1000001])
Redis 没有自带这个功能,需要使用推荐的三方库加载
https://github.com/RedisBloom/RedisBloom
在爬虫系统中可以使用它,标记已爬过的网页
布隆过滤器可以显著降低数据库的IO请求,可以通过内存中的过滤器滤掉大量不存在的请求
垃圾邮件过滤功能也普遍用到了它,所以有时会有误判进垃圾邮件中
简单限流
滑动窗口
import time
def is_action_is_allowed(user_id, action_key, period, max_count): key = 'hist:%s:%s' % (user_id, action_key) now_ts = int(time.time() * 1000) # 毫秒时间戳 with r.pipeline() as pipe: # value:score 都是毫秒时间戳 pipe.zadd(key, {now_ts: now_ts}) # 移除时间窗口之前的操作记录 pipe.zremrangebyscore(key, 0, now_ts - period * 1000) # 获取窗口内的行为数量 pipe.zcard(key) # 设置 zet 过期时间,避免冷用户持续占用内存 pipe.expire(key, period + 1) # 过期时间再宽限一秒 _, _, current_count, _ = pipe.execute() return current_count <= max_count
for i in range(8): print(is_action_is_allowed("tabll6", "replay", 60, 5)) # 60秒内限制5次操作
结果: True True True True True False False False