您好,欢迎来到三六零分类信息网!老站,搜索引擎当天收录,欢迎发信息
免费发信息
三六零分类信息网 > 通化分类信息网,免费分类信息发布

Golang分布式应用之Redis怎么使用

2024/4/7 3:34:28发布6次查看
正文redis作是一个高性能的内存数据库,常被应用于分布式系统中,除了作为分布式缓存或简单的内存数据库还有一些特殊的应用场景,本文结合golang来编写对应的中间件。
分布式锁单机系统中我们可以使用sync.mutex来保护临界资源,在分布式系统中同样有这样的需求,当多个主机抢占同一个资源,需要加对应的“分布式锁”。
在redis中我们可以通过setnx命令来实现
如果key不存在可以设置对应的值,设置成功则加锁成功,key不存在返回失败
释放锁可以通过del实现。
主要逻辑如下:
type redislock struct { client *redis.client key string expiration time.duration // 过期时间,防止宕机或者异常}func newlock(client *redis.client, key string, expiration time.duration) *redislock { return &redislock{ client: client, key: key, expiration: expiration, }}// 加锁将成功会将调用者id保存到redis中func (l *redislock) lock(id string) (bool, error) { return l.client.setnx(context.todo(), l.key, id, l.expiration).result()}const unlockscript = `if (redis.call("get", keys[1]) == keys[2]) then redis.call("del", keys[1]) return trueendreturn false`// 解锁通过lua脚本来保证原子性,只能解锁当前调用者加的锁func (l *redislock) unlock(id string) error { _, err := l.client.eval(context.todo(), unlockscript, []string{l.key, id}).result() if err != nil && err != redis.nil { return err } return nil}
为了防止系统宕机或异常请求导致的死锁,需要添加一个额外的超时时间,该超时时间应设为最大估计运行时间的两倍。
解锁时通过lua脚本来保证原子性,调用者只会解自己加的锁。避免由于超时造成的混乱,例如:进程a在时间t1获取了锁,但由于执行缓慢,在时间t2锁超时失效,进程b在t3获取了锁,这是如果进程a执行完去解锁会取消进程b的锁。
运行测试func main() { client := redis.newclient(&redis.options{ addr: "localhost:6379", password: "123456", db: 0, // use default db }) lock := newlock(client, "counter", 30*time.second) counter := 0 worker := func(i int) { for { id := fmt.sprintf("worker%d", i) ok, err := lock.lock(id) log.printf("worker %d attempt to obtain lock, ok: %v, err: %v", i, ok, err) if !ok { time.sleep(100 * time.millisecond) continue } defer lock.unlock(id) counter++ log.printf("worker %d, add counter %d", i, counter) break } } wg := sync.waitgroup{} for i := 1; i <= 5; i++ { wg.add(1) id := i go func() { defer wg.done() worker(id) }() } wg.wait()}
运行结果,可以看到与sync.mutex使用效果类似
2022/07/22 09:58:09 worker 5 attempt to obtain lock, ok: true, err: <nil>
2022/07/22 09:58:09 worker 5, add counter 1
2022/07/22 09:58:09 worker 4 attempt to obtain lock, ok: false, err: <nil>
2022/07/22 09:58:09 worker 1 attempt to obtain lock, ok: false, err: <nil>
2022/07/22 09:58:09 worker 2 attempt to obtain lock, ok: false, err: <nil>
2022/07/22 09:58:09 worker 3 attempt to obtain lock, ok: false, err: <nil>
2022/07/22 09:58:10 worker 3 attempt to obtain lock, ok: false, err: <nil>
2022/07/22 09:58:10 worker 1 attempt to obtain lock, ok: false, err: <nil>
2022/07/22 09:58:10 worker 2 attempt to obtain lock, ok: false, err: <nil>
2022/07/22 09:58:10 worker 4 attempt to obtain lock, ok: true, err: <nil>
2022/07/22 09:58:10 worker 4, add counter 2
2022/07/22 09:58:10 worker 1 attempt to obtain lock, ok: true, err: <nil>
2022/07/22 09:58:10 worker 1, add counter 3
2022/07/22 09:58:10 worker 3 attempt to obtain lock, ok: false, err: <nil>
2022/07/22 09:58:10 worker 2 attempt to obtain lock, ok: false, err: <nil>
2022/07/22 09:58:10 worker 2 attempt to obtain lock, ok: true, err: <nil>
2022/07/22 09:58:10 worker 2, add counter 4
2022/07/22 09:58:10 worker 3 attempt to obtain lock, ok: false, err: <nil>
2022/07/22 09:58:10 worker 3 attempt to obtain lock, ok: true, err: <nil>
2022/07/22 09:58:10 worker 3, add counter 5
特别注意的是,在分布式redis集群中,如果发生异常时(主节点宕机),可能会降低分布式锁的可用性,可以通过强一致性的组件etcd、zookeeper等实现。
分布式过滤器假设要开发一个爬虫服务,爬取百万级的网页,怎么判断某一个网页是否爬取过,除了借助数据库和hashmap,我们可以借助布隆过滤器来做。相对于其他方法,布隆过滤器占用空间非常少,且插入和查询时间非常快。
布隆过滤器用来判断某个元素是否在集合中,利用bitset
插入数据时将值进行多次hash,将bitset对应位置1
查询时同样进行多次hash对比所有位上是否为1,如是则存在。
布隆过滤器有一定的误判率,不适合精确查询的场景。另外也不支持删除元素。通常适用于url去重、垃圾邮件过滤、防止缓存击穿等场景中。
在redis中,我们可以使用自带的bitset实现,同样也借助lua脚本的原子性来避免多次查询数据不一致。
const ( // 插入数据,调用setbit设置对应位 setscript = `for _, offset in ipairs(argv) do redis.call("setbit", keys[1], offset, 1)end` // 查询数据,如果所有位都为1返回true getscript = `for _, offset in ipairs(argv) do if tonumber(redis.call("getbit", keys[1], offset)) == 0 then return false endendreturn true`)type bloomfilter struct { client *redis.client key string // 存在redis中的key bits uint // bitset的大小 maps uint // hash的次数}func newbloomfilter(client *redis.client, key string, bits, maps uint) *bloomfilter { client.del(context.todo(), key) if maps == 0 { maps = 14 } return &bloomfilter{ key: key, client: client, bits: bits, maps: maps, }}// 进行多次hash, 得到位置列表func (f *bloomfilter) getlocations(data []byte) []uint { locations := make([]uint, f.maps) for i := 0; i < int(f.maps); i++ { val := murmur3.sum64(append(data, byte(i))) locations[i] = uint(val) % f.bits } return locations}func (f *bloomfilter) add(data []byte) error { args := getargs(f.getlocations(data)) _, err := f.client.eval(context.todo(), setscript, []string{f.key}, args).result() if err != nil && err != redis.nil { return err } return nil}func (f *bloomfilter) exists(data []byte) (bool, error) { args := getargs(f.getlocations(data)) resp, err := f.client.eval(context.todo(), getscript, []string{f.key}, args).result() if err != nil { if err == redis.nil { return false, nil } return false, err } exists, ok := resp.(int64) if !ok { return false, nil } return exists == 1, nil}func getargs(locations []uint) []string { args := make([]string, 0) for _, l := range locations { args = append(args, strconv.formatuint(uint64(l), 10)) } return args}
运行测试func main() { bf := newbloomfilter(client,"bf-test", 2^16, 14) exists, err := bf.exists([]byte("test1")) log.printf("exist %t, err %v", exists, err) if err := bf.add([]byte("test1")); err != nil { log.printf("add err: %v", err) } exists, err = bf.exists([]byte("test1")) log.printf("exist %t, err %v", exists, err) exists, err = bf.exists([]byte("test2")) log.printf("exist %t, err %v", exists, err)// output// 2022/07/22 10:05:58 exist false, err <nil>// 2022/07/22 10:05:58 exist true, err <nil>// 2022/07/22 10:05:58 exist false, err <nil>}
分布式限流器在golang.org/x/time/rate包中提供了基于令牌桶的限流器,如果要实现分布式环境的限流可以基于redis lua脚本实现。
令牌桶的主要原理如下:
假设一个令牌桶容量为burst,每秒按照qps的速率往里面放置令牌
初始时放满令牌,令牌溢出则直接丢弃,请求令牌时,如果桶中有足够令牌则允许,否则拒绝
当burst==qps时,严格按照qps限流;当burst>qps时,可以允许一定的突增流量
这里主要参考了官方rate包的实现,将核心逻辑改为lua实现。
--- 相关key--- limit rate key值,对应value为当前令牌数local limit_key = keys[1]--- 输入参数--[[qps: 每秒请求数;burst: 令牌桶容量;now: 当前timestamp;cost: 请求令牌数;max_wait: 最大等待时间--]]local qps = tonumber(argv[1])local burst = tonumber(argv[2])local now = argv[3]local cost = tonumber(argv[4])local max_wait = tonumber(argv[5])--- 获取redis中的令牌数local tokens = redis.call("hget", limit_key, "token")if not tokens then tokens = burstend--- 上次修改时间local last_time = redis.call("hget", limit_key, "last_time")if not last_time then last_time = 0end--- 最新等待时间local last_event = redis.call("hget", limit_key, "last_event")if not last_event then last_event = 0end--- 通过当前时间与上次修改时间的差值,qps计算出当前时间得令牌数local delta = math.max(0, now-last_time)local new_tokens = math.min(burst, delta * qps + tokens)new_tokens = new_tokens - cost --- 最新令牌数,减少请求令牌--- 如果最新令牌数小于0,计算需要等待的时间local wait_period = 0if new_tokens < 0 and qps > 0 then wait_period = wait_period - new_tokens / qpsendwait_period = math.ceil(wait_period)local time_act = now + wait_period --- 满足等待间隔的时间戳--- 允许请求有两种情况--- 当请求令牌数小于burst, 等待时间不超过最大等待时间,可以通过补充令牌满足请求--- qps为0时,只要最新令牌数不小于0即可local ok = (cost <= burst and wait_period <= max_wait and qps > 0) or (qps == 0 and new_tokens >= 0)--- 设置对应值if ok then redis.call("set", limit_key, new_tokens) redis.call("set", last_time_key, now) redis.call("set", last_event_key, time_act)end--- 返回列表,{是否允许, 等待时间}return {ok, wait_period}
在golang中的相关接口allow、allown、wait等都是通过调用reserven实现
// 调用lua脚本func (lim *redislimiter) reserven(now time.time, n int, maxfuturereservesecond int) (*reservation, error) { // ... res, err := lim.rdb.eval(context.todo(), reservenscript, []string{lim.limitkey}, lim.qps, lim.burst, now.unix(), n, maxfuturereservesecond).result() if err != nil && err != redis.nil { return nil, err } //... return &reservation{ ok: allow == 1, lim: lim, tokens: n, timetoact: now.add(time.duration(wait) * time.second), }, nil}
运行测试func main() { rdb := redis.newclient(&redis.options{ addr: "localhost:6379", password: "123456", db: 0, // use default db }) r, err := newredislimiter(rdb, 1, 2, "testrate") if err != nil { log.fatal(err) } r.reset() for i := 0; i < 5; i++ { err := r.wait(context.todo()) log.printf("worker %d allowed: %v", i, err) }}// output// 2022/07/22 12:50:31 worker 0 allowed: <nil>// 2022/07/22 12:50:31 worker 1 allowed: <nil>// 2022/07/22 12:50:32 worker 2 allowed: <nil>// 2022/07/22 12:50:33 worker 3 allowed: <nil>// 2022/07/22 12:50:34 worker 4 allowed: <nil>
前两个请求在burst内,直接可以获得,后面的请求按照qps的速率生成。
其他redis还可用于全局计数、去重以及发布订阅等不同情境。参考redis官方提供的模块,可以通过加载这些模块实现过滤、限流等特性。
以上就是golang分布式应用之redis怎么使用的详细内容。
通化分类信息网,免费分类信息发布

VIP推荐

免费发布信息,免费发布B2B信息网站平台 - 三六零分类信息网 沪ICP备09012988号-2
企业名录