title: Go Concurrency Programming
date: 2021-08-20 13:12:00
categories: Study Notes
Concepts#
- Concurrency is not parallelism.
- Channels have different types indicated by the <- and -> arrows, which specify the direction of the channel.
- The buffer size of a channel indicates whether it is a synchronous or asynchronous channel.
- CAS (compare-and-swap) is a technique used in designing concurrent algorithms to improve the safety of multi-threaded execution.
CAS#
CAS Principle: CAS has three operands: the memory value V, the old expected value A, and the new value to be modified B. CAS modifies the memory value to B and returns true only if the expected value A is equal to the memory value V. Otherwise, it does nothing and returns false.
- CAS operations are provided by the operating system and supported at the hardware level.
if *addr == old {
*addr = new
return true
}
return false
- Implementing locks in Go using CAS
package main
import (
"sync/atomic"
)
type Mutex struct {
state int32
}
func (m *Mutex) Lock() {
for(!atomic.CompareAndSwapInt32(&m.state, 0, 1)) {
return
}
}
func (m *Mutex) Unlock() {
atomic.CompareAndSwapInt32(&m.state, 1, 0)
}
- Implementing sync.Map based on CAS: goroutine thread safety
Beginner Level#
Generator#
In Go, channels are first-class types, just like basic types such as int and string. A generator is a function that returns a channel and adds values to the channel internally.
The generator pattern is commonly used to implement the simplest producer-consumer pattern or publish-subscribe pattern.
func boring(msg string) <-chan string { // Returns receive-only channel of strings.
c := make(chan string)
go func() { // We launch the goroutine from inside the function.
for i := 0; ; i++ {
c <- fmt.Sprintf("%s %d", msg, i)
time.Sleep(time.Duration(rand.Intn(1e3)) * time.Millisecond)
}
}()
return c // Return the channel to the caller.
}
Reusing Channels (Select)#
func main() {
c := boring("Joe")
timeout := time.After(5 * time.Second)
timer := time.NewTimer(3 * time.Second)
for {
select {
case s := <-c:
fmt.Println(s)
case <-timer.C:
// Do something on a timer
timer.Reset(3 * time.Second)
case <-timeout: // Listen for timeout signal
fmt.Println("You talk too much.")
return
case <-quit: // Listen for quit signal
return
}
}
}
Controlling Concurrency (WaitGroup->chan->context)#
WaitGroup#
WaitGroup is a basic class for controlling concurrency.
func main() {
var wg sync.WaitGroup
wg.Add(2)
go func() {
time.Sleep(2*time.Second)
fmt.Println("Worker 1 completed")
wg.Done()
}()
go func() {
time.Sleep(2*time.Second)
fmt.Println("Worker 2 completed")
wg.Done()
}()
wg.Wait()
fmt.Println("All workers have completed. End of work.")
}
Context#
Context has the following four functions for deriving new contexts from a parent context.
func WithCancel(parent Context) (ctx Context, cancel CancelFunc)
func WithDeadline(parent Context, deadline time.Time) (Context, CancelFunc)
func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc)
func WithValue(parent Context, key, val interface{}) Context
- WithCancel: Returns the context and a cancel function.
- WithDeadline: Returns the context and a cancel function. It automatically cancels when the deadline is exceeded.
- WithTimeout: Similar to WithDeadline, but with a timeout duration.
- WithValue: Passes metadata. Data can be obtained using ctx.Value(key).
Context is used to handle each goroutine of a request in request processing.
func main() {
ctx, cancel := context.WithCancel(context.Background())
go watch(ctx,"[Monitor 1]")
go watch(ctx,"[Monitor 2]")
go watch(ctx,"[Monitor 3]")
time.Sleep(10 * time.Second)
fmt.Println("Okay, stop monitoring.")
cancel()
// To check if the monitors have stopped, if there is no monitor output, it means they have stopped.
time.Sleep(5 * time.Second)
}
func watch(ctx context.Context, name string) {
for {
select {
case <-ctx.Done():
fmt.Println(name,"Monitoring stopped...")
return
default:
fmt.Println(name,"Goroutine monitoring...")
time.Sleep(2 * time.Second)
}
}
}
Worker Scheduling#
Basic worker queue scheduling method
idleWorker<-workerQueue
func(){
idleWorker.DoTask()
defer workerQueue<-idleWorker
}