banner
uyoung

uyoung

twitter

Golang concurrent programming

title: Go Concurrency Programming
date: 2021-08-20 13:12:00
categories: Study Notes

golang

Concepts#

  • Concurrency is not parallelism.
  • Channels have different types indicated by the <- and -> arrows, which specify the direction of the channel.
  • The buffer size of a channel indicates whether it is a synchronous or asynchronous channel.
  • CAS (compare-and-swap) is a technique used in designing concurrent algorithms to improve the safety of multi-threaded execution.

CAS#

CAS Principle: CAS has three operands: the memory value V, the old expected value A, and the new value to be modified B. CAS modifies the memory value to B and returns true only if the expected value A is equal to the memory value V. Otherwise, it does nothing and returns false.

  • CAS operations are provided by the operating system and supported at the hardware level.
if *addr == old {
  *addr = new
  return true
}
return false
  • Implementing locks in Go using CAS
package main
  import (
    "sync/atomic"
  )
  type Mutex struct {
    state int32
  }
  func (m *Mutex) Lock() {
    for(!atomic.CompareAndSwapInt32(&m.state, 0, 1)) {
        return
    }
  }
  func (m *Mutex) Unlock() {
    atomic.CompareAndSwapInt32(&m.state, 1, 0)
  }
  • Implementing sync.Map based on CAS: goroutine thread safety

Beginner Level#

Generator#

In Go, channels are first-class types, just like basic types such as int and string. A generator is a function that returns a channel and adds values to the channel internally.

The generator pattern is commonly used to implement the simplest producer-consumer pattern or publish-subscribe pattern.

func boring(msg string) <-chan string { // Returns receive-only channel of strings.
    c := make(chan string)
    go func() { // We launch the goroutine from inside the function.
        for i := 0; ; i++ {
            c <- fmt.Sprintf("%s %d", msg, i)
            time.Sleep(time.Duration(rand.Intn(1e3)) * time.Millisecond)
        }
    }()
    return c // Return the channel to the caller.
}

Reusing Channels (Select)#

func main() {
  c := boring("Joe")
  timeout := time.After(5 * time.Second)
  timer := time.NewTimer(3 * time.Second)
  for {
    select {
    case s := <-c:
      fmt.Println(s)
    case <-timer.C:
      // Do something on a timer
      timer.Reset(3 * time.Second)
    case <-timeout: // Listen for timeout signal
      fmt.Println("You talk too much.")
      return
    case <-quit: // Listen for quit signal
      return
    }
  }
}

Controlling Concurrency (WaitGroup->chan->context)#

WaitGroup#

WaitGroup is a basic class for controlling concurrency.

func main() {
  var wg sync.WaitGroup

  wg.Add(2)
  go func() {
      time.Sleep(2*time.Second)
      fmt.Println("Worker 1 completed")
      wg.Done()
  }()
  go func() {
      time.Sleep(2*time.Second)
      fmt.Println("Worker 2 completed")
      wg.Done()
  }()
  wg.Wait()
  fmt.Println("All workers have completed. End of work.")
}

Context#

Context has the following four functions for deriving new contexts from a parent context.

func WithCancel(parent Context) (ctx Context, cancel CancelFunc)
func WithDeadline(parent Context, deadline time.Time) (Context, CancelFunc)
func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc)
func WithValue(parent Context, key, val interface{}) Context
  • WithCancel: Returns the context and a cancel function.
  • WithDeadline: Returns the context and a cancel function. It automatically cancels when the deadline is exceeded.
  • WithTimeout: Similar to WithDeadline, but with a timeout duration.
  • WithValue: Passes metadata. Data can be obtained using ctx.Value(key).

Context is used to handle each goroutine of a request in request processing.

func main() {
  ctx, cancel := context.WithCancel(context.Background())
  go watch(ctx,"[Monitor 1]")
  go watch(ctx,"[Monitor 2]")
  go watch(ctx,"[Monitor 3]")

  time.Sleep(10 * time.Second)
  fmt.Println("Okay, stop monitoring.")
  cancel()
  // To check if the monitors have stopped, if there is no monitor output, it means they have stopped.
  time.Sleep(5 * time.Second)
}

func watch(ctx context.Context, name string) {
  for {
    select {
    case <-ctx.Done():
      fmt.Println(name,"Monitoring stopped...")
      return
    default:
      fmt.Println(name,"Goroutine monitoring...")
      time.Sleep(2 * time.Second)
    }
  }
}

Worker Scheduling#

Basic worker queue scheduling method

idleWorker<-workerQueue

func(){
  idleWorker.DoTask()
  defer workerQueue<-idleWorker
}

Reference#

Go Concurrency Programming

Loading...
Ownership of this post data is guaranteed by blockchain and smart contracts to the creator alone.