• 作者:老汪软件技巧
  • 发表时间:2024-10-03 00:02
  • 浏览量:

singleflight 的作用: 能实现瞬时发生多次相同的并发请求,只有一次会走实际请求逻辑,其他请求会等待实际请求的结果,达到防止缓存击穿的效果。典型应用场景:缓存击穿

这是业务开发中典型的一个场景,为了减轻 DB 的压力,将 DB 的数据存到缓存中,缓存获取不到,再去 DB 查询。

但是这个场景存在一个问题,当一条数据从缓存中获取不到的时候,如果这时候有大量的请求,去获取这条数据,大部分请求都会请求到 DB ,给 DB 造成瞬间比较大的压力、甚至有可能把 DB 压垮。

这时候我们就可以使用 singleflight 去解决这个问题, 同时只会有一个请求,会请求到 DB 层,其他请求,会使用第一个去 DB 获取的数据作为返回值。

这里写一个简单的例子,模拟 10 个用户同时去 db 获取数据

根据输出的打印,我们可以看到10个用户获取到了一样的数据,但是只执行了一次 db 查询,大大地减轻了 db 的压力

源码分析结构体

// Group represents a class of work and forms a namespace in
// which units of work can be executed with duplicate suppression.
type Group struct {
	mu sync.Mutex       // protects m
	m  map[string]*call // lazily initialized
}

Group 表示一个工作类,并形成一个命名空间,在该命名空间中,可以使用重复抑制来执行工作单元。其中 mu 字段是为了保护 m 字段用,m 字段使用了懒加载的方式。

// call is an in-flight or completed singleflight.Do call
type call struct {
	wg sync.WaitGroup
	// These fields are written once before the WaitGroup is done
	// and are only read after the WaitGroup is done.
	val any
	err error
	// These fields are read and written with the singleflight
	// mutex held before the WaitGroup is done, and are read but
	// not written after the WaitGroup is done.
	dups  int
	chans []chan<- Result
}

Group 中的 m 字段的值类型是 call 结构体,call 结构体的参数的含义/作用如下:

wg: 对于指定的 key ,如果出现重复项,则重复调用方将等待原始调用方完成并收到相同的结果。
val: 调用指定的函数后,返回的值。
err: 执行指定的函数时,出现的错误。
dups: 表示重复项的数量,这些重复调用会接收到相同的结果。
chans:是一个切片,字段的类型表示只能接受 Result 类型的值,在 调用 Dochan(...) 函数时用到。

_如何解决缓存击穿问题_应用缓存典型击穿场景怎么设置

// Result holds the results of Do, so they can be passed
// on a channel.
type Result struct {
	Val    any
	Err    error
	Shared bool
}

Result 保存 Do 的结果,通过通道传递。其中 Val 字段代表返回的值、Err 代表执行指定函数时,出现的错误,Shared代表是否共享该返回值。

函数

Do

// Do executes and returns the results of the given function, making
// sure that only one execution is in-flight for a given key at a
// time. If a duplicate comes in, the duplicate caller waits for the
// original to complete and receives the same results.
// The return value shared indicates whether v was given to multiple callers.
// 执行并返回给定函数的结果,确保每次只对给定键执行一次。
// 如果出现重复调用,重复调用将等待原始调用者执行完毕,并获得相同的结果。
// shared 的返回值表示是否被多个调用者使用
func (g *Group) Do(key string, fn func() (any, error)) (v any, err error, shared bool) {
	// 加锁,确保同时只有一个 Do 函数在执行
	g.mu.Lock()
	// 如果 Group 的 m 的值为 nil,进行初始化(懒加载)
	if g.m == nil {
		g.m = make(map[string]*call)
	}
	// 如果 Group 的 map 中存在这个 key,
	// 递增 dups 的值,并释放锁,
	// 通过 c.wg.Wait()等待原始调用者执行完毕,获取同样的结果
	if c, ok := g.m[key]; ok {
		c.dups++
		g.mu.Unlock()
		c.wg.Wait()
		return c.val, c.err, true
	}
	// 初始化 call
	c := new(call)
	// 增加一个等待任务
	c.wg.Add(1)
	// 将 c 作为当前指定 key 的 value
	g.m[key] = c
	// 解锁
	g.mu.Unlock()
	// 执行 doCall 函数,这里的传入值,分别是 call,指定的 key,调用方指定的执行函数
	g.doCall(c, key, fn)
	// 返回: 值、错误、多个调用方是否共享该返回值
	return c.val, c.err, c.dups > 0
}

DoChan

// DoChan is like Do but returns a channel that will receive the
// results when they are ready.
// DoChan 和 Do 类型,但它返回一个通道,该通道将在结果准备就绪时接受结果
func (g *Group) DoChan(key string, fn func() (any, error)) <-chan Result {
	// 初始化一个通道
	ch := make(chan Result, 1)
	// 加锁,确保同时只有一个 Do 函数在执行
	g.mu.Lock()
	// 如果 Group 的 m 的值为 nil,进行初始化(懒加载)
	if g.m == nil {
		g.m = make(map[string]*call)
	}
	// 如果 Group 的 map 中存在这个 key,
	// 递增 dups 的值,添加初始化的 ch 到 chans 切片中,然后释放锁,
	if c, ok := g.m[key]; ok {
		c.dups++
		c.chans = append(c.chans, ch)
		g.mu.Unlock()
		return ch
	}
	// 初始化 Call 结构体
	c := &call{chans: []chan<- Result{ch}}
	// 增加一个等待任务
	c.wg.Add(1)
	// 将 c 作为当前指定 key 的 value
	g.m[key] = c
	// 解锁
	g.mu.Unlock()
	// 并发调用 doCall 函数,这里的传入值,分别是 call,指定的 key,调用方指定的执行函数
	// 这里之所以可以并发调用,这个 DoChan 函数的返回值是一个 通道
	// 从这个通道获取值时,如果没有可以获取的值,会发生阻塞
	go g.doCall(c, key, fn)
	return ch
}

doCall

// doCall handles the single call for a key.
// doCall 处理对一个按键的单次调用
func (g *Group) doCall(c *call, key string, fn func() (any, error)) {
	// 执行调用方指定的执行函数,将返回 val和 err 赋值给 Call 结构体的 val 和 err
	c.val, c.err = fn()
	// 加锁
	g.mu.Lock()
	// 完成等待的任务
	c.wg.Done()
	// 如果 c == 执行 key 的 val
	// 删除这个键值对
	if g.m[key] == c {
		delete(g.m, key)
	}
	// 遍历 Call 结构体中的 chans 切片
	for _, ch := range c.chans {
	  // 组装返回数据发送给 chans 切片中的每一个 通道
		ch <- Result{c.val, c.err, c.dups > 0}
	}
	// 解锁
	g.mu.Unlock()
}

ForgetUnshared

// ForgetUnshared tells the singleflight to forget about a key if it is not
// shared with any other goroutines. Future calls to Do for a forgotten key
// will call the function rather than waiting for an earlier call to complete.
// Returns whether the key was forgotten or unknown--that is, whether no
// other goroutines are waiting for the result.
// 如果某个 key 未与其他程序共享,则 ForgetUnshared 会告诉单个程序忘记该 key。
// 返回键是否被遗忘或未知,即是否没有其他程序在等待结果。
func (g *Group) ForgetUnshared(key string) bool {
	// 解锁
	g.mu.Lock()
	// 最后解锁
	defer g.mu.Unlock()
	// 查询 Group 中的 m 是否有该 key 的键值对存在
	c, ok := g.m[key]
	// 如果不存在,返回true,即没有其他程序在等待结果
	if !ok {
		return true
	}
	// 如果 Call 中的 dups 值不为 0,证明没有其他程序在等待结果,删除这个 key 的键值对
	// 返回 true,
	if c.dups == 0 {
		delete(g.m, key)
		return true
	}
	// 返回 false,证明有其他程序在等待结果
	return false
}

结尾

大家可以针对上面的分析,结合测试用例看看自己是否理解得没错。


上一条查看详情 +React 中状态管理的最佳实践
下一条 查看详情 +没有了