- 作者:老汪软件技巧
- 发表时间:2024-11-04 15:08
- 浏览量:
大家好,我是韩数,最近也学习了不少东西,Rust 和 看了一些 Golang 的书籍,鉴于 GMP 并发模型已经有写的非常好的博客,大家可以看下面这篇文章。
Golang三关-典藏版 Golang 调度器 GMP 原理与调度全分析
上面这篇关于 GMP 模型的文章写的十分详细,因此怎么想我也没有写一篇重复博客的必要了。不过看本文之前还是建议先看看上面这篇关于 GMP 的好文,更加容易理解为什么在某些情况下我们可能会需要一个 可复用 GoRoutine 池。
首先声明一下, 本文的代码并不适用于生产环境,生产环境有 ants 这类更加完善的库,初衷则是聚焦于 可复用 这个概念。当我们了解一个只具备基础功能的可复用GoRoutine 池是怎么样工作的,对于 ants 这样一些优秀的开源框架才会有一些些思路,看他们的源码也会更加容易些。至少在用的时候也不会知其然而不知所以然了。
本文相关代码示例和文章内容已经开源
本文涉及到的代码示例以及笔记内容已经开源至 GitHub: 韩数的学习笔记-西西弗版本
感兴趣的朋友可以自行下载代码。
为什么在某些情况下我们可能会需要一个 可复用 GoRoutine 池?
在《深入理解 Go 语言》这本书中,有一个非常重要的结论,那就是:
即使 GoLang 提供了像 Go Routine 这样的轻量级运行时,我们仍然不能无限制的创建它。迅速地开辟Goroutine 且不控制并发 Goroutine 的数量,会在短时间内占据操作系统的资源(CPU、内存、文件描述符等。最终因为内存占用不断上升导致主进程崩溃。
这就告诉我们一个道理,不管有多少钱也经不住随便嚯嚯。
超大规模并发的场景下, 不加限制的大规模的 goroutine 可能造成内存暴涨,给机器带来极大的压力,吞吐量下降和处理速度变慢还是其次,更危险的是可能使得程序 crash。想象一下现在有一个 http 的 Web 服务, 突然拥入 100 万的请求,golang 的 web 一般都是基于 net/http 库实现的,net/http 的代码中每来一个新的请求都会起一个单独的 goRoutine 去执行它。具体代码在这里:
既然不受限制数量的 Go Routine 会导致内存占用不断增长引发灾难,那么解决问题的思路就会变得非常简单:
那就是减少系统中正在运行的 Go Routine 数量。
为了方便,接下来会统一使用 G 来指代 Go Routine
旧的方式,一个 Task 占用一个 G 去执行,类似于下面这种方式:
func task() {
fmt.println("111")
}
go task()
而新的方式的核心逻辑其实就是让一个 G 可以运行多个 Task,以达到复用的目的,假设我们有 100 固定数量的 Task, 比较蠢的办法就是每个 G 分配十个 task 去执行,类似于下面这样:
go func(){
for _, task := range tasks[0:10] {
task()
}
}
这样原本有需要 100 个 G 才能执行完这 100 个 Task,现在只需要 10 个, 当然你过分点, 1 个也是可以做到的。当然,你这么跑 100 个 Task 肯定没有直接起 100 个 G 运行的快。所以
Go Routine 池本身是时间换空间, 以牺牲一定的并发性能为代价,避免内存的无限制增长导致的灾难。
Go Routine 池执行流程图
现在可以开始编写我们的代码了。
在上文中,我们用了一个非常挫的方式实现了 G 的复用,但是现实世界中,task 的数量并不是一开始就决定好的,因此我们换上正规军装备,使用 go channel 实现对于 Task 的获取。
♀️ Worker
当然,在实现这一切之前,我们需要先定义一个 Worker:
worker 的逻辑异常简单,大概只需要不到 30 行代码:
package main
type Worker struct {
task chan fn
pool *Pool
}
func (w *Worker) Run() {
go func() {
for t := range w.task {
// 退出机制,本文不会涉及
if t == nil {
// 正在执行的数量 -1
w.pool.decRunning()
return
}
t()
// 执行完了将该 worker 重新放回池子里
w.pool.putWorker(w)
}
}()
}
Worker 主要有以下两部分组成:
Pool
你现在是一个养鱼的塘主,鱼塘对你而言最重要的东西是什么?
鱼啊,现在有多少鱼,已经我这个鱼塘最多可以养多少条鱼。
天下大池小池殊途同归,对于 Go Routine 池也是这样的。
type fn func()
type Pool struct {
// 正在运行的 worker 数
running int64
// worker 列表
workers []*Worker
// worker 数量
capacity int64
// 锁
lock sync.Mutex
}
实现一些最基本的公共方法:
// 正在执行的数量 + 1
func (p *Pool) incRunning() {
atomic.AddInt64(&p.running, 1)
}
// 正在执行的数量 - 1
func (p *Pool) decRunning() {
running := atomic.LoadInt64(&p.running)
// 防止缩容的时候被打穿
if running == 0 {
return
}
atomic.AddInt64(&p.running, -1)
}
// 正在执行的数量
func (p *Pool) Running() int64 {
return atomic.LoadInt64(&p.running)
}
// 创建一个池子
func NewPool(capacity int64) *Pool {
return &Pool{
capacity: capacity,
workers: make([]*Worker, 0),
running: 0,
}
}
func (p *Pool) putWorker(worker *Worker) {
// 加锁,防止同时有 worker 被回收
p.lock.Lock()
p.workers = append(p.workers, worker)
p.decRunning()
p.lock.Unlock()
}
func (p *Pool) SubmitTask(task fn) {
// 获取到一个 worker
worker := p.GetWorker()
// 正在执行的数量 + 1
p.incRunning()
// 发送任务
worker.task <- task
}
Pool 最核心的方法主要有一个:
func (p *Pool) getWorker() *Worker {
// 加锁,防止获取到同一个 worker
p.lock.Lock()
defer p.lock.Unlock()
// 如果当前 worker 数量大于 1, 说明有空闲的,直接使用
if len(p.workers) > 0 {
worker := p.workers[len(p.workers)-1]
p.workers = p.workers[:len(p.workers)-1]
return worker
}
// 走到这里说明当前没有空闲的 worker,所有的 worker 都在跑
// 如果正在运行的数量小于容量,说明 worker 的总数量 小于容量,直接开启一个新的 worker
if p.running < p.capacity {
worker := &Worker{
pool: p,
task: make(chan fn),
}
// 启动这个新 worker
worker.Run()
return worker
}
for {
// 说明 worker 数量达到最大限制, 陷入等待,当然也可以做成异步的
p.lock.Unlock()
time.Sleep(time.Millisecond)
p.lock.Lock()
// 等到了空闲的 worker,取出来执行
if len(p.workers) > 0 {
worker := p.workers[len(p.workers)-1]
p.workers = p.workers[:len(p.workers)-1]
return worker
}
}
}
没了,加起来去掉注释不超过 100 行代码,当然,这只是 可复用GoRoutine 最核心的部分,做到一个生产级别的 GoRoutine 池子需要非常多额外的工作,基于上述 的 Pool, 最少缺少了:
对于上述的特性很大一部分都可以在 ants 的库的代码中找到,本文不会很深入去介绍这部分,感兴趣的可以去看 ants 的 源码,传送门:
/panjf2000/a…
测试阶段
现在一个异常捡漏的 Go Routine 池子已经完成了,现在我们可以写一下基本的BenchMark。
性能测试
当 Pool 池子的大小为 1w 时,执行200w+次,每一个任务执行大概 10ms 的时间。
不使用 Pool 大概比 使用 Pool 的性能高三倍。
如果我们把池子的大小设置成 1k. 使用 池子的性能就会极速下降。本质上 池化是在抑制病发。
此时的性能差距来到了 23 倍。
内存测试
运行 100 w 个任务,计算有 pool 和 无 pool 执行前后的内存变化,这个测试不是十分严谨,我们取十次,取平均值。
部分测试截图:
任务 100w, pool 的容量为 1w:
任务 100w, pool 的容量为 1k:
多次执行取平均值大概结论:
去除误差,仅仅从逻辑上计算,由于 G 的数量指数级减小,内存的占用也会相应直接减下来。
总结
写到这里我们已经实现了一个非常基础的 Go Routine 池。当然实现并发控制的方式远不如这一种。比如也可以使用带缓冲区的 channel 完成并发控制:
func main() {
ch := make(chan struct{}, 3)
for i := 0; i < 100; i++ {
ch <- struct{}{}
go func(i int) {
time.Sleep(1 * time.Second)
fmt.Println("i: ", i)
<-ch
}(i)
}
}
因此在决定是否使用 Go Routine 池子之前,最好需要先评估是否并发规模已经达到了需要控制的程度?如果平常只有几万个并发,内存占用上升也不那么明显的情况下,使用 Go Routine 池子复用反而会降低程序的处理性能。