- 作者:老汪软件技巧
- 发表时间:2024-10-03 00:02
- 浏览量:
singleflight 的作用: 能实现瞬时发生多次相同的并发请求,只有一次会走实际请求逻辑,其他请求会等待实际请求的结果,达到防止缓存击穿的效果。典型应用场景:缓存击穿
这是业务开发中典型的一个场景,为了减轻 DB 的压力,将 DB 的数据存到缓存中,缓存获取不到,再去 DB 查询。
但是这个场景存在一个问题,当一条数据从缓存中获取不到的时候,如果这时候有大量的请求,去获取这条数据,大部分请求都会请求到 DB ,给 DB 造成瞬间比较大的压力、甚至有可能把 DB 压垮。
这时候我们就可以使用 singleflight 去解决这个问题, 同时只会有一个请求,会请求到 DB 层,其他请求,会使用第一个去 DB 获取的数据作为返回值。
这里写一个简单的例子,模拟 10 个用户同时去 db 获取数据
根据输出的打印,我们可以看到10个用户获取到了一样的数据,但是只执行了一次 db 查询,大大地减轻了 db 的压力
源码分析结构体
// Group represents a class of work and forms a namespace in
// which units of work can be executed with duplicate suppression.
type Group struct {
mu sync.Mutex // protects m
m map[string]*call // lazily initialized
}
Group 表示一个工作类,并形成一个命名空间,在该命名空间中,可以使用重复抑制来执行工作单元。其中 mu 字段是为了保护 m 字段用,m 字段使用了懒加载的方式。
// call is an in-flight or completed singleflight.Do call
type call struct {
wg sync.WaitGroup
// These fields are written once before the WaitGroup is done
// and are only read after the WaitGroup is done.
val any
err error
// These fields are read and written with the singleflight
// mutex held before the WaitGroup is done, and are read but
// not written after the WaitGroup is done.
dups int
chans []chan<- Result
}
Group 中的 m 字段的值类型是 call 结构体,call 结构体的参数的含义/作用如下:
wg: 对于指定的 key ,如果出现重复项,则重复调用方将等待原始调用方完成并收到相同的结果。
val: 调用指定的函数后,返回的值。
err: 执行指定的函数时,出现的错误。
dups: 表示重复项的数量,这些重复调用会接收到相同的结果。
chans:是一个切片,字段的类型表示只能接受 Result 类型的值,在 调用 Dochan(...) 函数时用到。
// Result holds the results of Do, so they can be passed
// on a channel.
type Result struct {
Val any
Err error
Shared bool
}
Result 保存 Do 的结果,通过通道传递。其中 Val 字段代表返回的值、Err 代表执行指定函数时,出现的错误,Shared代表是否共享该返回值。
函数
Do
// Do executes and returns the results of the given function, making
// sure that only one execution is in-flight for a given key at a
// time. If a duplicate comes in, the duplicate caller waits for the
// original to complete and receives the same results.
// The return value shared indicates whether v was given to multiple callers.
// 执行并返回给定函数的结果,确保每次只对给定键执行一次。
// 如果出现重复调用,重复调用将等待原始调用者执行完毕,并获得相同的结果。
// shared 的返回值表示是否被多个调用者使用
func (g *Group) Do(key string, fn func() (any, error)) (v any, err error, shared bool) {
// 加锁,确保同时只有一个 Do 函数在执行
g.mu.Lock()
// 如果 Group 的 m 的值为 nil,进行初始化(懒加载)
if g.m == nil {
g.m = make(map[string]*call)
}
// 如果 Group 的 map 中存在这个 key,
// 递增 dups 的值,并释放锁,
// 通过 c.wg.Wait()等待原始调用者执行完毕,获取同样的结果
if c, ok := g.m[key]; ok {
c.dups++
g.mu.Unlock()
c.wg.Wait()
return c.val, c.err, true
}
// 初始化 call
c := new(call)
// 增加一个等待任务
c.wg.Add(1)
// 将 c 作为当前指定 key 的 value
g.m[key] = c
// 解锁
g.mu.Unlock()
// 执行 doCall 函数,这里的传入值,分别是 call,指定的 key,调用方指定的执行函数
g.doCall(c, key, fn)
// 返回: 值、错误、多个调用方是否共享该返回值
return c.val, c.err, c.dups > 0
}
DoChan
// DoChan is like Do but returns a channel that will receive the
// results when they are ready.
// DoChan 和 Do 类型,但它返回一个通道,该通道将在结果准备就绪时接受结果
func (g *Group) DoChan(key string, fn func() (any, error)) <-chan Result {
// 初始化一个通道
ch := make(chan Result, 1)
// 加锁,确保同时只有一个 Do 函数在执行
g.mu.Lock()
// 如果 Group 的 m 的值为 nil,进行初始化(懒加载)
if g.m == nil {
g.m = make(map[string]*call)
}
// 如果 Group 的 map 中存在这个 key,
// 递增 dups 的值,添加初始化的 ch 到 chans 切片中,然后释放锁,
if c, ok := g.m[key]; ok {
c.dups++
c.chans = append(c.chans, ch)
g.mu.Unlock()
return ch
}
// 初始化 Call 结构体
c := &call{chans: []chan<- Result{ch}}
// 增加一个等待任务
c.wg.Add(1)
// 将 c 作为当前指定 key 的 value
g.m[key] = c
// 解锁
g.mu.Unlock()
// 并发调用 doCall 函数,这里的传入值,分别是 call,指定的 key,调用方指定的执行函数
// 这里之所以可以并发调用,这个 DoChan 函数的返回值是一个 通道
// 从这个通道获取值时,如果没有可以获取的值,会发生阻塞
go g.doCall(c, key, fn)
return ch
}
doCall
// doCall handles the single call for a key.
// doCall 处理对一个按键的单次调用
func (g *Group) doCall(c *call, key string, fn func() (any, error)) {
// 执行调用方指定的执行函数,将返回 val和 err 赋值给 Call 结构体的 val 和 err
c.val, c.err = fn()
// 加锁
g.mu.Lock()
// 完成等待的任务
c.wg.Done()
// 如果 c == 执行 key 的 val
// 删除这个键值对
if g.m[key] == c {
delete(g.m, key)
}
// 遍历 Call 结构体中的 chans 切片
for _, ch := range c.chans {
// 组装返回数据发送给 chans 切片中的每一个 通道
ch <- Result{c.val, c.err, c.dups > 0}
}
// 解锁
g.mu.Unlock()
}
ForgetUnshared
// ForgetUnshared tells the singleflight to forget about a key if it is not
// shared with any other goroutines. Future calls to Do for a forgotten key
// will call the function rather than waiting for an earlier call to complete.
// Returns whether the key was forgotten or unknown--that is, whether no
// other goroutines are waiting for the result.
// 如果某个 key 未与其他程序共享,则 ForgetUnshared 会告诉单个程序忘记该 key。
// 返回键是否被遗忘或未知,即是否没有其他程序在等待结果。
func (g *Group) ForgetUnshared(key string) bool {
// 解锁
g.mu.Lock()
// 最后解锁
defer g.mu.Unlock()
// 查询 Group 中的 m 是否有该 key 的键值对存在
c, ok := g.m[key]
// 如果不存在,返回true,即没有其他程序在等待结果
if !ok {
return true
}
// 如果 Call 中的 dups 值不为 0,证明没有其他程序在等待结果,删除这个 key 的键值对
// 返回 true,
if c.dups == 0 {
delete(g.m, key)
return true
}
// 返回 false,证明有其他程序在等待结果
return false
}
结尾
大家可以针对上面的分析,结合测试用例看看自己是否理解得没错。