是什么?
SingleFlight 作用是将并发请求合并成一个请求,以减少对下层服务的压力
应用场景
- 大量的请求同时过来,查询一个已经失效的缓存,导致大量请求打到数据库(缓存击穿)
如何使用?
现在有一个查询接口,从 Redis 中读取数据,如果没有数据,就会从数据库读并设置到 Redis 中。假如现在有大量请求过来,同时发现 Redis 中没有数据,导致请求都打到下层的数据库,对数据库造成压力。
实际上这些请求可以共享一个结果。
这个时候就需要使用 singleFlight 合并这些请求。
设置一个读取缓存的接口
package main
import (
"github.com/gin-gonic/gin"
_ "github.com/go-sql-driver/mysql"
"golang.org/x/sync/singleflight"
"gorm.io/driver/mysql"
"gorm.io/gorm"
"log"
"math/rand"
"net/http"
"time"
)
type Demo struct {
Id int
Name string
UpdatedAt int
CreatedAt int
IsDeleted int
}
var demo Demo
var db *gorm.DB
var singleFlight singleflight.Group
// 获取并设置数据到缓存
func getAndSetCache(requestId int64, cacheKey string) (Demo, error) {
log.Printf("request %v start to get and set cache...", requestId)
// do的入参key,可以直接使用缓存的key,这样同一个缓存,只有一个协程会去读DB
value, _, _ := singleFlight.Do(cacheKey, func() (ret interface{}, err error) {
db.Table("demo").First(&demo)
log.Println("只有一个协程获取到了数据库值:", demo)
return demo, nil
})
return value.(Demo), nil
}
func init() {
dsn := "root:liufutian@tcp(127.0.0.1:3306)/test?charset=utf8mb4&parseTime=True&loc=Local"
db1, err := gorm.Open(mysql.Open(dsn), &gorm.Config{})
if err != nil {
panic(err)
}
db = db1
sqlDB, err := db.DB()
sqlDB.SetMaxIdleConns(10)
sqlDB.SetMaxOpenConns(100)
sqlDB.SetConnMaxLifetime(time.Hour)
sqlDB.SetConnMaxIdleTime(time.Hour)
}
func main() {
router := gin.New()
router.Use(gin.Recovery())
router.GET("/demo/list", func(c *gin.Context) {
cache, _ := getAndSetCache(rand.Int63(), "cacheKey")
c.JSON(http.StatusOK, cache)
})
router.Run(":9501")
}
开启服务
go run single_flight.go
[GIN-debug] [WARNING] Running in "debug" mode. Switch to "release" mode in production.
- using env: export GIN_MODE=release
- using code: gin.SetMode(gin.ReleaseMode)
[GIN-debug] GET /demo/list --> main.main.func1 (2 handlers)
[GIN-debug] [WARNING] You trusted all proxies, this is NOT safe. We recommend you to set a value.
Please check https://pkg.go.dev/github.com/gin-gonic/gin#readme-don-t-trust-all-proxies for details.
[GIN-debug] Listening and serving HTTP on :9501
并发测试
ab -c 10 -n 20 http://localhost:9501/demo/list
输出结果
2022/08/12 22:54:57 request 5577006791947779410 start to get and set cache...
2022/08/12 22:54:57 只有一个协程获取到了数据库值: {1 liu 33 1 0}
2022/08/12 22:54:57 request 8674665223082153551 start to get and set cache...
2022/08/12 22:54:57 request 6129484611666145821 start to get and set cache...
2022/08/12 22:54:57 request 4037200794235010051 start to get and set cache...
2022/08/12 22:54:57 request 3916589616287113937 start to get and set cache...
2022/08/12 22:54:57 request 6334824724549167320 start to get and set cache...
2022/08/12 22:54:57 request 1443635317331776148 start to get and set cache...
2022/08/12 22:54:57 request 605394647632969758 start to get and set cache...
2022/08/12 22:54:57 request 2775422040480279449 start to get and set cache...
2022/08/12 22:54:57 request 894385949183117216 start to get and set cache...
2022/08/12 22:54:57 request 4751997750760398084 start to get and set cache...
2022/08/12 22:54:57 只有一个协程获取到了数据库值: {1 liu 33 1 0}
2022/08/12 22:54:57 request 7504504064263669287 start to get and set cache...
2022/08/12 22:54:57 request 1976235410884491574 start to get and set cache...
2022/08/12 22:54:57 request 3510942875414458836 start to get and set cache...
2022/08/12 22:54:57 request 2933568871211445515 start to get and set cache...
2022/08/12 22:54:57 request 4324745483838182873 start to get and set cache...
2022/08/12 22:54:57 request 2703387474910584091 start to get and set cache...
2022/08/12 22:54:57 request 2610529275472644968 start to get and set cache...
2022/08/12 22:54:57 request 6263450610539110790 start to get and set cache...
2022/08/12 22:54:57 request 2015796113853353331 start to get and set cache...
2022/08/12 22:54:57 只有一个协程获取到了数据库值: {1 liu 33 1 0}
可以看到同时开了 10 个协程,共 20 个并发请求,但是真正读数据库的只有 3 个。达到了我们的预期效果。
源码解读
上面的使用中可以看到,我们用到了 singleFlight
的 Do
方法
var singleFlight singleflight.Group
// 获取并设置数据到缓存
func getAndSetCache(requestId int64, cacheKey string) (Demo, error) {
log.Printf("request %v start to get and set cache...", requestId)
// do的入参key,可以直接使用缓存的key,这样同一个缓存,只有一个协程会去读DB
value, _, _ := singleFlight.Do(cacheKey, func() (ret interface{}, err error) {
第一行初始化了一个 Group 结构体
type Group struct {
// 互斥锁,用来保护m的
mu sync.Mutex
// 一个懒加载的map,保存的是call的引用
m map[string]*call
}
我们再看这个 call
里面都有什么
// 正在flight,或者已经flight完成的结构体
type call struct {
// 并发控制,等待一组goroutine完成,然后返回
wg sync.WaitGroup
// 回调函数的返回参数
// 这些字段在 WaitGroup 完成之前写入一次
// 并且仅在 WaitGroup 完成后读取。
val interface{}
err error
// forgotten indicates whether Forget was called with this call's key
// while the call was still in flight.
forgotten bool
// 重复调用次数的计数器
dups int
// 针对DoChan函数的channel数组
chans []chan<- Result
}
// 保存了DoChan的结果
type Result struct {
Val interface{}
Err error
Shared bool
}
Do
方法
func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) {
// 并发进来先上锁
g.mu.Lock()
// 如果没有初始化,在这里初始化m
if g.m == nil {
g.m = make(map[string]*call)
}
// 如果m中已经有值
if c, ok := g.m[key]; ok {
// 数量+1,释放锁并等待
c.dups++
g.mu.Unlock()
c.wg.Wait()
if e, ok := c.err.(*panicError); ok {
panic(e)
} else if c.err == errGoexit {
runtime.Goexit()
}
// 唤醒后返回
return c.val, c.err, true
}
// 初始化call
c := new(call)
c.wg.Add(1)
g.m[key] = c
g.mu.Unlock()
// 同一批gorutine中只会执行一次
g.doCall(c, key, fn)
return c.val, c.err, c.dups > 0
}
doCall
方法
func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) {
normalReturn := false
recovered := false
defer func() {
if !normalReturn && !recovered {
c.err = errGoexit
}
// 执行完成
c.wg.Done()
g.mu.Lock()
defer g.mu.Unlock()
if !c.forgotten {
// 安全删m中的key
delete(g.m, key)
}
if e, ok := c.err.(*panicError); ok {
...
} else if c.err == errGoexit {
// Already in the process of goexit, no need to call again
} else {
// Normal return
for _, ch := range c.chans {
ch <- Result{c.val, c.err, c.dups > 0}
}
}
}()
func() {
...
// 执行回调函数
c.val, c.err = fn()
// 正常返回设置为true
normalReturn = true
}()
if !normalReturn {
recovered = true
}
}
当执行 Do
方法的时候:
- 因为要对 map 操作所以先上锁,检查是否初始化 m,没有就初始化 m。
- 检查 m 中是否有对应的 key,如果没有,初始化
call
,wg+1(因为这一批请求进来只会有一个真正执行,所以 wg 设置为 1),并存到 m 中,注意此时锁会释放掉 - 锁释放对应上面 27 行,假如有 10 个请求同时进来,有 1 个请求执行了前 2 步,并释放锁。剩下的 9 个请求会进入 if 分支,对应上面的 15 行。所以从这里开始,会有两条分支出去。第一条分支对应真正执行
doCall
的 goroutine,第二条分支是剩下的 9 个 goroutine 等待第一条分支执行c.wg.Done()
。下面我们具体说下这两条分支: - 第二条分支在第一条分支执行
doCall
的同时,剩下的 9 个 goroutine 获取锁,重复调用次数c.dups
每次加 1,释放锁,等待第一条分支执行c.wg.Done()
- 第一条分支会执行
doCall
方法,进去之后执行回调函数,返回值保存到call
中的val
和err
中,设置normalReturn
为true
。然后进入到 defer,执行c.wg.Done()
,并删除 m 中对应的 key - 当第一条分支执行
c.wg.Done()
,第二条分支唤醒并返回已经被赋值的 c.val, c.err - 此时整个流程执行结束,只请求了一次数据库操作,剩下的 goroutine 复用返回值
do 方法核心示例
package main
import (
"fmt"
"os"
"os/signal"
"sync"
"syscall"
"time"
)
type Group struct {
m map[string]string
mu sync.Mutex
}
var wg sync.WaitGroup
var i int
var k = "key"
var ch = make(chan struct{})
func (g *Group) do() {
for i = 0; i < 10; i++ {
go func(i int) {
g.mu.Lock()
if g.m == nil {
g.m = make(map[string]string)
}
if _, ok := g.m[k]; ok {
g.mu.Unlock()
wg.Wait()
fmt.Println("完成了烙铁", i)
} else {
g.m[k] = ""
g.mu.Unlock()
wg.Add(1)
fmt.Println("控制并发流程+1")
// 处理redis或者mysql,耗时操作
time.Sleep(time.Second * 3)
wg.Done()
}
}(i)
}
}
func main() {
g := &Group{}
g.do()
sig := make(chan os.Signal, 1)
signal.Notify(sig, syscall.SIGUSR2)
<-sig
}
返回结果:
go run waitgroup.go
控制并发流程+1
完成了烙铁 1
完成了烙铁 0
完成了烙铁 8
完成了烙铁 7
完成了烙铁 9
完成了烙铁 5
完成了烙铁 6
完成了烙铁 2
完成了烙铁 4
讨论区
登录评论