Skip to content

Commit

Permalink
feat(utils): add log in goroutine locks
Browse files Browse the repository at this point in the history
  • Loading branch information
sainnhe committed Jan 11, 2025
1 parent bc0bf3c commit b86317b
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 7 deletions.
46 changes: 43 additions & 3 deletions internal/utils/goroutine_lock/goroutine_lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,47 @@
// Package goroutinelock implements goroutine locks.
package goroutinelock

import "sync"
import (
"sync"
"sync/atomic"

// GoroutineWg is used to implement goroutine locks.
var GoroutineWg = &sync.WaitGroup{}
"github.com/cloudwego/kitex/pkg/klog"
)

// Wg is used to implement goroutine locks.
var Wg = &WaitGroup{}

// WaitGroup defines a wait group with counter and status.
type WaitGroup struct {
sync.WaitGroup
count int64
shutdownStarted uint32
}

// Add adds delta and bumps counter.
func (wg *WaitGroup) Add(delta int) {
atomic.AddInt64(&wg.count, int64(delta))
wg.WaitGroup.Add(delta)
if atomic.LoadUint32(&wg.shutdownStarted) > 0 {
klog.Warn("shutdown started but a new goroutine lock is added")
}
}

// Done decrease wait group counter by 1.
func (wg *WaitGroup) Done() {
atomic.AddInt64(&wg.count, -1)
wg.WaitGroup.Done()
if atomic.LoadUint32(&wg.shutdownStarted) > 0 {
klog.Infof("waiting for goroutine locks to be released, remaining %d...", wg.GetCount())
}
}

// GetCount gets wait group counter.
func (wg *WaitGroup) GetCount() int {
return int(atomic.LoadInt64(&wg.count))
}

// StartShutdown sets the shutdown status to true.
func (wg *WaitGroup) StartShutdown() {
atomic.AddUint32(&wg.shutdownStarted, 1)
}
4 changes: 2 additions & 2 deletions pkg/utils/goroutine_lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@ import goroutinelock "github.com/cloudwego/kitex/internal/utils/goroutine_lock"

// GoroutineLock locks the goroutine so that graceful shutdown will wait until the lock is released
func GoroutineLock() {
goroutinelock.GoroutineWg.Add(1)
goroutinelock.Wg.Add(1)
}

// GoroutineUnlock unlocks the goroutine to allow graceful shutdown to continue.
// NOTE: This function should be executed using defer to avoid panic in the middle and causing the lock to not be
// released.
func GoroutineUnlock() {
goroutinelock.GoroutineWg.Done()
goroutinelock.Wg.Done()
}
2 changes: 1 addition & 1 deletion pkg/utils/goroutine_lock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func TestGoroutineLockAndUnlock(t *testing.T) {
time.Sleep(time.Second)
utils.GoroutineUnlock()
}()
goroutinelock.GoroutineWg.Wait()
goroutinelock.Wg.Wait()
diff := time.Since(startTime)
if diff < time.Second {
t.Errorf("Expect diff >= 1s, get %v", diff)
Expand Down
6 changes: 5 additions & 1 deletion server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,11 @@ func (s *server) Stop() (err error) {
}
// Goroutine Locks must wait after all connections are closed, otherwise new connections might be created while
// waiting for goroutine locks.
goroutinelock.GoroutineWg.Wait()
goroutinelock.Wg.StartShutdown()
if count := goroutinelock.Wg.GetCount(); count > 0 {
klog.Infof("waiting for goroutine locks to be released, remaining %d...", count)
goroutinelock.Wg.Wait()
}
})
return
}
Expand Down

0 comments on commit b86317b

Please sign in to comment.