thumbnail

goroutine调度

liuyuede liuyuede | 1 分钟阅读
2年前

0.1、索引

https://blog.waterflow.link/articles/1662974432717

1、进程

一个进程包含可以由任何进程分配的公共资源。这些资源包括但不限于内存地址空间、文件句柄、设备和线程。

一个进程会包含下面一些属性:

  • Process ID:进程 ID
  • Process State:进程状态
  • Process Priority:进程优先级
  • Program Counter:程序计数器
  • General purpose register:通用寄存器
  • List of open files:打开的文件列表
  • List of open devices:打开的设备列表
  • Protection information:保护信息
  • List of the child process:子进程列表
  • Pending alarms:待定警告
  • Signals and signal handlers:信号和信号处理程序
  • Accounting information:记账信息

2、线程

线程是轻量级的进程,一个线程将在进程内的所有线程之间共享进程的资源,如代码、数据、全局变量、文件和内存地址空间。但是栈和寄存器不会共享,每个线程都有自己的栈和寄存器

线程的优点:

  • 提高系统的吞吐量
  • 提高响应能力
  • 由于属性更少,上下文切换更快
  • 多核 CPU 的有效利用
  • 资源共享(代码、数据、地址空间、文件、全局变量)

3、用户级线程

用户级线程也称为绿色线程,如:C 中的 coroutine、Go 中的 goroutine 和 Ruby 中的 Fiber
http://image-1313007945.cos.ap-nanjing.myqcloud.com/image/1662974591.png

该进程维护一个内存地址空间,处理文件,以及正在运行的应用程序的设备和线程。操作系统调度程序决定哪些线程将在任何给定的 CPU 上接收时间

因此,与耗时和资源密集型的进程创建相比,在一个进程中创建多个用户线程(goroutine)效率更高。

4、goroutine

在 Go 中用户级线程被称作 Goroutine,在创建 goroutine 时需要做到:

  • 易于创建
  • 轻量级
  • 并发执行
  • 可扩展
  • 无限堆栈(最大堆栈大小在 64 位上为 1 GB,在 32 位上为 250 MB。)
  • 处理阻塞调用
  • 高效 (work stealing)

其中阻塞调用可能是下面一些原因:

  • 在 channel 中收发数据
  • 网络 IO 调用
  • 阻塞的系统调用
  • 计时器
  • 互斥操作(Mutex)

为什么 go 需要调度 goroutine?

Go 使用称为 goroutine 的用户级线程,它比内核级线程更轻且更便宜。 例如,创建一个初始 goroutine 将占用 2KB 的堆栈大小,而内核级线程将占用 8KB 的堆栈大小。 还有,goroutine 比内核线程有更快的创建、销毁和上下文切换,所以 go 调度器 需要退出来调度 goroutine。OS 不能调度用户级线程,OS 只知道内核级线程。 Go 调度器 将 goroutine 多路复用到内核级线程,这些线程将在不同的 CPU 内核上运行

什么时候会调度 goroutine?

如果有任何操作应该或将会影响 goroutine 的执行,比如 goroutine 的启动、等待执行和阻塞调用等……

go 调度 如何将 goroutine 多路复用到内核线程中?

1、1:1 调度(1 个线程对应一个 goroutine)

  • 并行执行(每个线程可以在不同的内核上运行)
  • 可以工作但是代价太高
  • 内存至少〜32k(用户堆栈和内核堆栈的内存)
  • 性能问题(系统调用)
  • 没有无限堆栈

2、N:1 调度(在单个内核线程上多路复用所有 goroutine)

  • 没有并行性(即使有更多 CPU 内核可用,也只能使用单个 CPU 内核)

我们看下下面的例子,只为 go 分配了 1 个 processer 去处理 2 个 goroutine:

package main

import (
	"fmt"
	"runtime"
	"sync"
	"time"
)

func main() {
	// 分配 1 个逻辑处理器供调度程序使用
	runtime.GOMAXPROCS(1)
	var wg sync.WaitGroup
	wg.Add(2)

	fmt.Println("Starting Goroutines")

	// 开一个go协程打印字母
	go func() {
		defer wg.Done()
		time.Sleep(time.Second)
		// 打印3次字母
		for count := 0; count < 3; count++ {
			for ch := 'a'; ch < 'a'+26; ch++ {
				fmt.Printf("%c ", ch)
			}
			fmt.Println()
		}
	}()

	// 开一个go协程打印数字
	go func() {
		defer wg.Done()
		// 打印3次数字
		for count := 0; count < 3; count++ {
			for n := 1; n <= 26; n++ {
				fmt.Printf("%d ", n)
			}
			fmt.Println()
		}
	}()

	// 等待返回
	fmt.Println("Waiting To Finish")
	wg.Wait()
	fmt.Println("\nTerminating Program")
}

看下结果:

go run main.go
Starting Goroutines
Waiting To Finish
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 
a b c d e f g h i j k l m n o p q r s t u v w x y z 
a b c d e f g h i j k l m n o p q r s t u v w x y z 
a b c d e f g h i j k l m n o p q r s t u v w x y z 

Terminating Program

可以看到这俩个 goroutine 是串行执行的,要么先完成第一个 goroutine,要么先完成第二个 goroutine,并不是并发执行的。

那如何去实现并发执行呢?

我们同样设置 runtime.GOMAXPROCS 为 1,但是在 goroutine 中我们在不同的时机加入阻塞 goroutine 的时间函数 time.Sleep,我们看下会有什么不同的结果。

package main

import (
	"fmt"
	"runtime"
	"sync"
	"time"
)

func main() {
	// 分配 1 个逻辑处理器供调度程序使用
	runtime.GOMAXPROCS(1)
	var wg sync.WaitGroup
	wg.Add(2)

	fmt.Println("Starting Goroutines")

	// 开一个go协程打印字母
	go func() {
		defer wg.Done()
		time.Sleep(time.Second)
		// 打印3次字母
		for count := 0; count < 3; count++ {
			for ch := 'a'; ch < 'a'+26; ch++ {
				if count == 0 {
					time.Sleep(10 * time.Millisecond)
				}
				if count == 1 {
					time.Sleep(30 * time.Millisecond)
				}
				if count == 2 {
					time.Sleep(50 * time.Millisecond)
				}
				fmt.Printf("%c ", ch)
			}
			fmt.Println()
		}
	}()

	// 开一个go协程打印数字
	go func() {
		defer wg.Done()
		// 打印3次数字
		for count := 0; count < 3; count++ {
			for n := 1; n <= 26; n++ {
				if count == 0 {
					time.Sleep(20 * time.Millisecond)
				}
				if count == 1 {
					time.Sleep(40 * time.Millisecond)
				}
				if count == 2 {
					time.Sleep(60 * time.Millisecond)
				}
				fmt.Printf("%d ", n)
			}
			fmt.Println()
		}
	}()

	// 等待返回
	fmt.Println("Waiting To Finish")
	wg.Wait()
	fmt.Println("\nTerminating Program")
}

看下结果:

go run main.go
Starting Goroutines
Waiting To Finish
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 
1 2 3 4 5 6 7 8 9 10 11 a 12 b c d e 13 f g h i 14 j k l m 15 n o p 16 q r s t 17 u v w x 18 y z 
19 a b 20 c 21 d 22 e f 23 g 24 h 25 i j 26 
k l 1 m n 2 o p 3 q r 4 s t 5 u v 6 w x 7 y z 
8 a 9 b 10 c 11 d 12 e f 13 g 14 h 15 i 16 j 17 k l 18 m 19 n 20 o 21 p 22 q r 23 s 24 t 25 u 26 
v w x y z 

Terminating Program

通过上面的结果我们可以看到,当 goroutine1 阻塞时,go 调度器会调度 goroutine2 执行。

我们可以得出:

  • 即使我们将 runtime.GOMAXPROCS(1) 设置为 1,程序也在并发运行
  • Running 状态的 Goroutine 数量最大为 1,Block Goroutine 可以多于一个,其他所有 Goroutine 都处于 Runnable 状态

3、线程池

  • 在需要时创建一个线程,这意味着如果有 goroutine 要运行但所有其他线程都忙,则创建一个线程
  • 一旦线程完成其执行而不是销毁重用它
  • 这可以更快的创建 goroutine,因为我们可以重用线程
  • 但是还有更多的内存消耗,性能问题,并且没有无限堆栈。

4、M: N 线程共享运行队列调度(GMP)

  • M 代表系统线程的数量
  • N 代表 goroutine 的数量
  • goroutine 的创建成本很低,我们可以完全控制 goroutine 的整个生命周期,因为它是在用户空间中创建的
  • 创建一个操作系统线程很昂贵,我们无法控制它,但是使用多个线程我们可以实现并行
  • 在这个模型中,多个 goroutine 被多路复用到内核线程中

我们上面提到过导致 goroutine 阻塞调用可能是下面一些原因:

  • 在 channel 中收发数据
  • 网络 IO 调用
  • 阻塞的系统调用
  • 计时器
  • 互斥操作(Mutex)

下面看一些 goroutine 阻塞的例子:

package main

import (
	"fmt"
	"io/ioutil"
	"net/http"
	"os"
	"sync"
	"time"
)

// 全局变量
var worker int

func writeToFile(wg *sync.WaitGroup) {
	defer wg.Done()

	file, _ := os.OpenFile("file.txt", os.O_RDWR|os.O_CREATE, 0755)           // 系统调用阻塞
	resp, _ := http.Get("https://blog.waterflow.link/articles/1662706601117") // 网络IO阻塞
	body, _ := ioutil.ReadAll(resp.Body)                                      // 系统调用阻塞

	file.WriteString(string(body))
}

func workerCount(wg *sync.WaitGroup, m *sync.Mutex, ch chan string) {
	// Lock() 给共享资源上锁
	// 独占访问状态,
	// 增加worker的值,
	// Unlock() 释放锁
	m.Lock() // Mutex阻塞
	worker = worker + 1
	ch <- fmt.Sprintf("Worker %d is ready", worker)
	m.Unlock()

	// 返回, 通知WaitGroup完成
	wg.Done()
}

func printWorker(wg *sync.WaitGroup, done chan bool, ch chan string) {

	for i := 0; i < 100; i++ {
		fmt.Println(<-ch) // Channel阻塞
	}
	wg.Done()
	done <- true
}

func main() {

	ch := make(chan string)
	done := make(chan bool)

	var mu sync.Mutex

	var wg sync.WaitGroup

	for i := 1; i <= 100; i++ {
		wg.Add(1)
		go workerCount(&wg, &mu, ch)
	}

	wg.Add(2)
	go writeToFile(&wg)
	go printWorker(&wg, done, ch)

	wg.Wait()

	<-done // Channel阻塞

	<-time.After(1 * time.Second) // Timer阻塞
	close(ch)
	close(done)
}

下面我们看看 go 调度器在上面这些例子中是如何工作的:

  • 如果一个 goroutine 在通道上被阻塞,则通道有等待队列,所有阻塞的 goroutine 都列在等待队列中,并且很容易跟踪。 在阻塞调用之后,它们将被放入 schedular 的全局运行队列中,OS Thread 将再次按照 FIFO 的顺序选择 goroutine。
    http://image-1313007945.cos.ap-nanjing.myqcloud.com/image/1662974630.gif
  1. M1,M2,M3 尝试从全局 G 队列中获取 G
  2. M1 获取锁并拿到 G1,然后释放锁
  3. M3 获取锁拿到 G2,然后释放锁
  4. M2 获取锁拿到 G3,然后释放锁
  5. G1 在 ch1 的 channel 中阻塞,然后添加到 ch1 的等待队列。导致 M1 空闲
  6. M1 不能闲着,从全局队列获取锁拿到 G4,然后释放锁
  7. G3 阻塞在 ch2 的 channel 中,然后被放到 ch2 的等待队列。导致 M2 空闲
  8. M2 获取锁拿到 G5,然后释放锁
  9. 此时 G3 在 ch2 结束阻塞,被放到全局队列尾部等待执行
  10. G1 在 ch1 结束阻塞,被放到全局队列尾部等待执行
  11. G4,G5,G2 执行完成
  12. M1,M2,M3 重复步骤 1-4
  • 互斥锁、定时器和网络 IO 使用相同的机制

  • 如果一个 goroutine 在系统调用中被阻塞,那么情况就不同了,因为我们不知道内核空间发生了什么。 通道是在用户空间中创建的,因此我们可以完全控制它们,但在系统调用的情况下,我们没法控制它们。

  • 阻塞系统调用不仅会阻塞 goroutine 还会阻塞内核线程。

  • 假设一个 goroutine 被安排在一个内核线程上的系统调用,当一个内核线程完成执行时,它将唤醒另一个内核线程(线程重用),该线程将拾取另一个 goroutine 并开始执行它。 这是一个理想的场景,但在实际情况下,我们不知道系统调用将花费多少时间,因此我们不能依赖内核线程来唤醒另一个线程,我们需要一些代码级逻辑来决定何时 在系统调用的情况下唤醒另一个线程。 这个逻辑在 golang 中实现为 runtime·entersyscall()和 runtime·exitsyscall()。 这意味着内核线程的数量可以超过核心的数量。

  • 当对内核进行系统调用时,它有两个关键点,一个是进入时机,另一个是退出时机。

    http://image-1313007945.cos.ap-nanjing.myqcloud.com/image/1662974658.gif

    1. M1,M2 试着从全局队列拿 G
    2. M1 获取锁并拿到 G1,然后释放锁
    3. M2 获取锁并拿到 G2,然后释放锁
    4. M2 阻塞在系统调用,没有可用的内核线程,所以 go 调度器创建一个新的线程 M3
    5. M3 获取锁并拿到 G3,然后释放锁
    6. 此时 M2 结束阻塞状态,重新把 G2 放到全局队列(G2 由阻塞变为可执行状态)。M2 虽然是空闲状态,但是 go 调度器不会销毁它,而是自旋发现新的可执行的 goroutine。
    7. G1,G3 执行结束
    8. M1,M3 重复步骤 1-3

操作系统可以支持多少内核线程?

在 Linux 内核中,此参数在文件 /proc/sys/kernel/threads-max 中定义,该文件用于特定内核。

sh:~$ cat /proc/sys/kernel/threads-max 94751
这里输出 94751 表示内核最多可以执行 94751 个线程

每个 Go 程序可以支持多少个 goroutine?

调度中没有内置对 goroutine 数量的限制。

每个 GO 程序 可以支持多少个内核线程?

默认情况下,运行时将每个程序限制为最多 10,000 个线程。可以通过调用 runtime/debug 包中的 SetMaxThreads 函数来更改此值。

总结:

  1. 内核线程数可以多于内核数
  2. 轻量级 goroutine
  3. 处理 IO 和系统调用
  4. goroutine 并行执行
  5. 不可扩展(所有内核级线程都尝试使用互斥锁访问全局运行队列。因此,由于竞争,这不容易扩展)

5、M:N 线程分布式运行队列调度器

为了解决每个线程同时尝试访问互斥锁的可扩展问题,维护每个线程的本地运行队列

  • 每个线程状态(本地运行队列)
  • 仍然有一个全局运行队列
    http://image-1313007945.cos.ap-nanjing.myqcloud.com/image/1662974686.gif
  1. M1,M2,M3,M4 扫描本地可运行队列
  2. M1,M2,M3,M4 从各自的本地队列取出 G4,G6,G1,G3

从上面的动图可以看到:

  • 从本地队列拿 G 是不需要加锁的
  • 可运行 goroutine 的全局队列需要锁

结论:

  1. 轻量级 goroutine
  2. 处理 IO 和 SystemCalls
  3. goroutine 并行执行
  4. 可扩展
  5. 高效

如果线程数大于内核数,那么会有什么问题呢?

在分布式运行队列调度中,我们知道每个线程都有自己的本地运行队列,其中包含有关接下来将执行哪个 goroutine 的信息。 同样由于系统调用,线程数会增加,并且大多数时候它们的本地运行队列是空的。 因此,如果线程数大于核心数,则每个线程必须扫描所有线程本地运行队列,并且大部分时间它们是空的,所以如果线程过多,这个过程是耗时的并且解决方案 效率不高,因此我们需要将线程扫描限制为使用 M:P:N 线程模型求解的常数。

6、M:P: N 线程

如何检查逻辑处理器的数量?

package main

import (
	"fmt"
	"runtime"
)

func main() {
	fmt.Println(runtime.NumCPU())
}

分布式 M:P:N 调度例子
http://image-1313007945.cos.ap-nanjing.myqcloud.com/image/1662974709.gif

  1. M1,M2 各自扫描 P1,P2 的队列
  2. M1,M2 从各自的 P1,P2 中取出 G3,G1 执行

在系统调用期间执行 P 的切换
http://image-1313007945.cos.ap-nanjing.myqcloud.com/image/1662974724.gif

  1. M1,M2 各自扫描 P1,P2 的队列
  2. M1,M2 从各自的 P1,P2 中取出 G3,G1 执行
  3. G1 即将进入系统调用,所以在这之前 G1 会唤醒另一个线程 M3,并将 P2 切换到 M3
  4. M3 扫描 P2 并取出 G2 运行
  5. 一旦 G1 变为非阻塞,它将被推送到全局队列等待运行

在 work-stealing 期间,只需要扫描固定数量的队列,因为逻辑处理器的数量是有限的。

如何选择下一个要运行的 goroutine ?

Go 调度器 将按以下顺序检查以选择下一个要执行的 goroutine

  • 本地运行队列

    http://image-1313007945.cos.ap-nanjing.myqcloud.com/image/1662974749.gif

  • 全局运行队列

    http://image-1313007945.cos.ap-nanjing.myqcloud.com/image/1662974771.gif

    1. M1,M2,M3 各自扫描本地队列 P1,P2,P3
    2. M1,M2,M3 各自从 P1,P2,P3 取出 G3,G1,G5
    3. G5 完成,M3 扫描本地队列 P3 发现空,然后扫描全局队列
    4. M3 将从全局队列获取一定数量的 G(G6,G7),保存到本地队列 P3
    5. 现在 M3 从本地队列 P3 取出 G6 执行
  • Network poller

    http://image-1313007945.cos.ap-nanjing.myqcloud.com/image/1662974790.gif

    1. M1,M2,M3 各自扫描本地队列 P1,P2,P3
    2. M1,M2,M3 各自从 P1,P2,P3 取出 G3,G1,G6
    3. G6 执行完成,M3 扫描 P3 发现是空的,然后扫描全局队列
    4. 但是全局队列也是空的,然后就检查网络轮询中已就绪的 G
    5. 网络轮询中有一个已就绪的 G2,所以 M3 取出 G2 并执行
  • Work Stealing

    http://image-1313007945.cos.ap-nanjing.myqcloud.com/image/1662974803.gif

    1. M1,M2,M3 各自扫描本地队列 P1,P2,P3
    2. M1,M2,M3 各自从 P1,P2,P3 取出 G3,G1,G6
    3. G6 执行完成,M3 扫描 P3 发现是空的,然后扫描全局队列
    4. 但是全局队列也是空的,然后就检查网络轮询中已就绪的 G
    5. 但是网络轮询中没有已就绪的 G,所以 M3 随机的从其他 P 中窃取一半的 G 到 P3
    6. 如果随机选中的 P 中没有要执行的 G,就会重试 4 次,从其他 P 获取

总结:

  • 轻量级 goroutine
  • 处理 IO 和系统调用
  • goroutine 的并行执行
  • 可扩展
  • 高效/工作窃取

Go 调度的局限性

  • FIFO 对局部性原则不利
  • 没有 goroutine 优先级的概念(不像 Linux 内核)
  • 没有强抢占 -> 没有强公平或延迟保证
  • 它没有意识到系统拓扑 -> 没有真实的位置。有一个旧的 NUMA 感知调度程序提案。此外,建议使用 LIFO 队列,这样 CPU 内核缓存中更有可能有数据。

翻译自:

https://mukeshpilaniya.github.io/posts/Go-Schedular/

讨论区

登录评论
暂无评论