你还不知道?这些Redis的基本操作一定要掌握(2)

Redis

延迟队列

如果对消息可靠性没有极高要求时可以使用Redis

List 数据结构通常作为异步消息队列使用,rpush/lpushlpop/rpop

阻塞读

blpop/brpop

阻塞读在没有数据时会进入休眠状态,数据到来时会立刻醒过来,几乎没有延迟

空闲连接问题

长时间阻塞会变为闲置连接,太久服务器会主动断开连接,blpop/brpop 会抛出异常

解决方法是捕获异常时重试

锁冲突

加锁失败
  1. 直接抛出和异常给用户让用户自己重试,也可以前端重试
  2. 等待一会儿重试(不建议),如果是死锁会导致线程堵死
  3. 将请求转移至延迟队列,以避开冲突

延迟队列

延迟队列可以使用有序列表(ZSet)来实现,到期处理时间是score,使用多线程轮询的方式对到期的任务进行处理

# 代码示例(执行无效)
def loop():
    while True:
        values = r.zrangebyscore('delay-queue', 0, time.time(), start=0, num=1)
        if not values:
            time.sleep(1)
            continue
        value = value[0]
        # 移除
        result = r.zrem('delay-queue', value)  # 成功则为 True 代表当前线程抢到了执行权
        if result:
            msg = json.loads(value)
            handle_msg(msg)

上面的代码可以用 lua scripting 进行优化,把 zrangebyscorezrem 一起放到服务器端进行原子化操作,可以减少一些浪费

位图

用来存储 bool 类型的数据,例如用户的签到记录,签了是 1 没签是 0,需要记录 365 天

位图不是特殊的数据结构,其实就是一个 byte 数组,使用 get/set 或者 getbit/setbit 操作都可以

bin(ord('h'))  # 获取字符的 ASCII 的二进制值
结果:'0b1101000'
bin(ord('e'))
结果:'0b1100101'
# 以二进制值的方式存入两个字符
r.setbit('bit', 1, 1)
r.setbit('bit', 2, 1)
r.setbit('bit', 4, 1)
r.setbit('bit', 9, 1)
r.setbit('bit', 10, 1)
r.setbit('bit', 13, 1)
r.setbit('bit', 15, 1)
结果:0
r.get('bit')  # 取出(如果是不可打印字符,则会显示该字符的16进制形式)
结果:'he'
r.bitcount('bit')  # 统计个数
结果:7
r.bitcount('bit', 0, 0)  # 第一个字符中 1 的个数
结果:3
r.bitcount('bit', 0, 2)  # 前三个字符中 1 的个数(其实这里只有两个字符)
结果:7
r.bitpos('bit', 0)  # 第一个 0 的位置
结果:0
r.bitpos('bit', 1)  # 第一个 1 的位置
结果:1
r.bitpos('bit', 1, 2, 2)  # 第三个字符里第一个 1 的位置
结果:-1
r.bitpos('bit', 1, 1, 1)  # 第二个字符里第一个 1 的位置
结果:9
r.set('bit', 'hello')
结果:True
r.delete('bit')
结果:1

魔术指令 bitfield (python 的这个包好像暂时不支持,略)

HyperLogLog

HyperLogLog 主要提供不精确的去重计数,标准误差是 0.81%

r.pfadd('HyperLogLogCounter', 'user_id_1')  # 添加
结果:1
r.pfcount('HyperLogLogCounter')  # 计数
结果:1
r.pfadd('HyperLogLogCounter2', 'user_id_2')  # 添加
结果:1
r.pfmerge('HyperLogLogCounter', 'HyperLogLogCounter2')  # 合并
结果:True
r.pfcount('HyperLogLogCounter')  # 计数
结果:2
r.delete('HyperLogLogCounter')
r.delete('HyperLogLogCounter2')
结果:1
# 尝试打印出第一个出现误差的时间点
for i in range(10000):
    r.pfadd('HyperLogLogCounter', "user_id_%d" % i)
    total = r.pfcount('HyperLogLogCounter')
    if total != i + 1:
        print(str(total) + " " + str(i + 1))
        break
结果:129 128
r.delete('HyperLogLogCounter')
结果:1

这个网站演示了 HyperLogLog 是如何执行的
http://content.research.neustar.biz/blog/hll.html

内存占用

Redis 使用了 16384(2¹⁴) 个桶,每个桶的 maxbits 需要6个bit来存储,(2¹⁴) x 6 / 8 = 12 KB (除以 8 是为了转换成 KB)

布隆过滤器

估算是否存在

bf.add 之前可以用 bf.reserve 指令显式创建,支持三个参数 key(可以在其中找到过滤器的键)、error_rate(错误率,默认0.01,越接近0消耗资源越大)、initial_size(打算添加到过滤器中的条目数,默认100)
相关文档:https://oss.redislabs.com/redisbloom/Bloom_Commands/

r.execute_command('bf.add', 'bloom', 'mark')
结果:
-------------------------------------------

ResponseErrorTraceback (most recent call last)

<ipython-input-87-f39b11b2b1b1> in <module>
----> 1 r.execute_command('bf.add', 'bloom', 'mark')


~/.local/lib/python3.8/site-packages/redis/client.py in execute_command(self, *args, **options)
    899         try:
    900             conn.send_command(*args)
--> 901             return self.parse_response(conn, command_name, **options)
    902         except (ConnectionError, TimeoutError) as e:
    903             conn.disconnect()


~/.local/lib/python3.8/site-packages/redis/client.py in parse_response(self, connection, command_name, **options)
    913         "Parses a response from the Redis server"
    914         try:
--> 915             response = connection.read_response()
    916         except ResponseError:
    917             if EMPTY_RESPONSE in options:


~/.local/lib/python3.8/site-packages/redis/connection.py in read_response(self)
    754 
    755         if isinstance(response, ResponseError):
--> 756             raise response
    757         return response
    758 


ResponseError: unknown command `bf.add`, with args beginning with: `bloom`, `mark`, 
# r.execute_command('BF.MADD', 'bloom', *[i for i in range(int(1e5))])
# r.execute_command('BF.MEXISTS', 'bloom', *[1, 27, 1000001])

Redis 没有自带这个功能,需要使用推荐的三方库加载
https://github.com/RedisBloom/RedisBloom

在爬虫系统中可以使用它,标记已爬过的网页

布隆过滤器可以显著降低数据库的IO请求,可以通过内存中的过滤器滤掉大量不存在的请求

垃圾邮件过滤功能也普遍用到了它,所以有时会有误判进垃圾邮件中

简单限流

滑动窗口

import time
def is_action_is_allowed(user_id, action_key, period, max_count):
    key = 'hist:%s:%s' % (user_id, action_key)
    now_ts = int(time.time() * 1000)  # 毫秒时间戳
    with r.pipeline() as pipe:
        #  value:score 都是毫秒时间戳
        pipe.zadd(key, {now_ts: now_ts})
        # 移除时间窗口之前的操作记录
        pipe.zremrangebyscore(key, 0, now_ts - period * 1000)
        # 获取窗口内的行为数量
        pipe.zcard(key)
        # 设置 zet 过期时间,避免冷用户持续占用内存
        pipe.expire(key, period + 1)  # 过期时间再宽限一秒
        _, _, current_count, _ = pipe.execute()
    return current_count <= max_count
for i in range(8):
    print(is_action_is_allowed("tabll6", "replay", 60, 5))  # 60秒内限制5次操作
结果:
True
True
True
True
True
False
False
False