Chenyo's org-static-blog

25 Jun 2024

Go patterns

This a personal note for the Russ Cox guest lecture.

1. Concurrency vs Parallelism

  • Concurrency: write a program to handle lot of things at once
    • not necessarily faster
  • Parallelism: the program itself can do a lot of computations at once

2. Use goroutines for states

2.1. Matching a regex

  • return if a given string matches a regex: start with ", contains arbitrary escape sequence and ends with "
  • unclear logic: store states in the data

     1: state := 0
     2: for {
     3:     c := read()
     4:     switch state {
     5:     case 0:
     6:         // first char must be "
     7:         if c != '"' {
     8:             return false
     9:         }
    10:         state = 1 // match the next char
    11:     case 1:
    12:         // ending with " matches
    13:         if c == '"' {
    14:             return true
    15:         }
    16:         if c == '\\' {
    17:             state = 2
    18:         } else {
    19:             // transition to state 1 to match next char
    20:             state = 1
    21:         }
    22:     case 2:
    23:         // read the char, discard it and
    24:         state = 1
    25:     }
    26: }
    
  • clear logic: store states in the code

     1: // no variable to store state
     2: if read() != '"' {
     3:     return false
     4: }
     5: var c rune // c is a Unicode, alias to int32
     6: for c != '"' {
     7:     c = read()
     8:     if c == '\\' {
     9:         read()  // skip the next char
    10:     }
    11: }
    12: return true
    

2.2. When the state variable cannot be avoided

  • the function needs to return the state

     1: type quoter struct {
     2:     state int
     3: }
     4: 
     5: func (q *quoter) Init() {
     6:     r.state = 0
     7: }
     8: // proess each char based on current state
     9: func (q *quoter) Write(c rune) Status {
    10:     switch q.state {
    11:     case 0:
    12:         if c != '"' {
    13:             return BadInput
    14:         }
    15:         q.state = 1
    16:     case 1:
    17:         if c == '"' {
    18:             return Success
    19:         }
    20:         if c == '\\' {
    21:             q.state = 2
    22:         } else {
    23:             q.state = 1
    24:         }
    25:     case 2:
    26:         q.state = 1
    27:     }
    28:     return NeedMoreInput
    29: }
    
  • use additional goroutines to hold states

     1: type quoter struct {
     2:     char chan rune
     3:     status chan Status
     4: }
     5: func (q *quoter) Init() {
     6:     q.char = make(chan rune)
     7:     q.status = make(chan Status)
     8:     // need to make sure why and when the goroutine will exit
     9:     go q.parse()
    10:     // blocks until it receives an initial status from parse()
    11:     // to ensure that parse() is ready, i.e., q.status = NeedMoreInput
    12:     // before Write() is called
    13:     <-q.status
    14: }
    15: // Write sends the next char to q.char, which will be receivecd by parse()
    16: // the status is a public state accessible by the user
    17: func (q *quoter) Write(r rune) Status {
    18:     q.char <- c
    19:     // wait for the result
    20:     return <-q.status
    21: }
    22: func (q *quoteReader) parse() {
    23:     if q.read() != '"' {
    24:         q.status <- SyntaxError
    25:         return
    26:     }
    27:     var c rune
    28:     for c!= '"' {
    29:         c = q.read()
    30:         if c == '\\' {
    31:             q.read()
    32:         }
    33:     }
    34:     q.status <- Done
    35: }
    36: // a helper function used in parse() to return the next char in q.char
    37: func (q *quoter) read() int {
    38:     q.status <- NeedMoreInput
    39:     return <- q.char
    40: }
    41: func main() {
    42:     q := &quoter{}
    43:     q.Init()
    44: 
    45:     input := `"Hello, \"World\""`
    46:     for _, c := range input {
    47:         status := q.Write(c)
    48:     }
    49: }
    
  • check goroutine blockage
    • Ctrl-\ sends SIGQUIT
    • use the HTTP server’s /debug/pprof/goroutine if importing net/http

3. Pattern 1: publish/subscribe server

  • the information goes one way: server -> client
  • close a channel to signal no new values will be sent
  • prefer defer when unlocking the mutex

     1: type Server struct {
     2:     mu  sync.Mutex // protect sub
     3:     sub map[chan<- Event]bool  // whether a channel should be closed
     4: }
     5: func (s *Server) Init() {
     6:     s.sub = make(map[chan<- Event]bool)
     7: }
     8: // publish an event to all subscribed channel
     9: func (s *Server) Publish(e Event) {
    10:     s.mu.Lock()  // each method could be called by many clients
    11:     defer s.mu.Unlock()
    12:     // need mutex here since it needs to read s.sub state
    13:     for c := range s.sub {
    14:         // if a goroutine consumes the channel events too slow
    15:         // then a new event publish has to wait
    16:         // before it can send to the channel
    17:         // can add channel buffer to mitigate this
    18:         c <- e
    19:     }
    20: }
    21: // a channel starts to subscribe
    22: func (s *Server) Subscribe(c chan<- Event) {
    23:     s.mu.Lock()
    24:     defer s.mu.Unlock()
    25:     if s.sub[c] {
    26:         // the mutex wil also be unlocked with defer
    27:         panic("pubsub: already subscribed")     }
    28:     s.sub[c] = true
    29: }
    30: // a channel cancels the subscription
    31: func (s *Server) Cancel(c chan<- Event) {
    32:     s.mu.Lock()
    33:     defer s.mu.Unlock()
    34:     if !s.sub[c] {
    35:         panic("pubsub: not subscribed")
    36:     }
    37:     close(c)
    38:     delete(s.sub, c)
    39: }
    

3.1. Options for slow goroutines

  • slow down event generation
  • drop events if it cannot be sent, e.g., os/signal, runtime/pprof
  • queue events, e.g., add a helper between the server and each client, which also separates the concerns

     1: func helper(in <-chan Event, out chan<- Event) {
     2:     var q []Event
     3:     // if the in is closed, flash out the pending events in q
     4:     // and close out
     5:     for in != nil || len(q) > 0 {
     6:         // decide whether and what to send
     7:         var sendOut chan<- Event
     8:         var next Event
     9:         if len(q) > 0 {
    10:             sendOut = out
    11:             next = q[0]
    12:         }
    13:         select {
    14:         case e, ok := <-in: // never reaches here after in = nil
    15:             // ok tells whether in is closed
    16:             if !ok {
    17:                 in = nil
    18:                 break
    19:             }
    20:             q = append(q, e)
    21:         case sendOut <- next: // if len(q) == 0, sendOut = nil
    22:             q = q[1:]
    23:         }
    24:     }
    25:     close(out)
    26: }
    
  • convert mutexes into goroutines, not suitable for Raft where state transition is complex

     1: type Server struct {
     2:     publish   chan Event
     3:     subscribe chan subReq  // a channel to queue unhandled subscription
     4:     cancel    chan subReq
     5: }
     6: type subReq struct {
     7:     c  chan<- Event
     8:     // a signal of whether an operation succeeds
     9:     ok chan bool
    10: }
    11: 
    12: func (s *Server) Init() {
    13:     s.publish = make(chan Event)
    14:     s.subscribe = make(chan subReq)
    15:     s.cancel = make(chan subReq)
    16:     go s.loop()
    17: }
    18: func (s *Server) Publish(e Event) {
    19:     // no mutex is required here
    20:     // as it does not read state
    21:     s.publish <- e
    22: }
    23: func (s *Server) Subscribe(c chan<- Event) {
    24:     r := subReq{c: c, ok: make(chan bool)}
    25:     s.subscribe <- r
    26:     if !<-r.ok {  // wait for loop() handle result
    27:         panic("pubsub: already subscribed")
    28:     }
    29: }
    30: func (s *Server) Cancel(c chan<- Event) {
    31:     r := subReq{c: c, ok: make(chan bool)}
    32:     s.cancel <- r
    33:     if !<-r.ok {
    34:         panic("pubusb: not subscribed")
    35:     }
    36: }
    37: func (s *Server) loop() {
    38:     // now sub is a local variable, no lock is needed
    39:     // sub maps from a subscribed channel to a helper channel
    40:     sub := make(map[chan<- Event]chan<- Event)
    41:     for {
    42:         select {
    43:         case e := <-s.publish:
    44:             for _, h := range sub {
    45:                 // the event is published to a helper channel
    46:                 h <- e
    47:             }
    48:         case r := <-s.subscribe:
    49:             // the helper channel exists
    50:             // meaning the subscriber has been handled before
    51:             if sub[r.c] != nil {
    52:                 r.ok <- false
    53:                 break
    54:             }
    55:             h = make(chan Event)
    56:             go helper(h, r.c)
    57:             sub[r.c] = h
    58:             r.ok <- true
    59:         case c := <-s.cancel:
    60:             if !sub[r.c] == nil{
    61:                 r.ok <- false
    62:                 break
    63:             }
    64:             // close the helper channel
    65:             close(sub[r.c])
    66:             delete(sub, r.c)
    67:             r.ok <- true
    68:         }
    69:     }
    70: }
    

4. Pattern 2: work scheduler

  • \(M\) tasks assigned to \(N\) servers/workers, \( M >> N\).

     1: func Schedule(servers []string, numTask int,
     2:     call func(srv string, task int)) {
     3: 
     4:     idle := make(chan string, len(servers))
     5:     // initialize a channel of idle servers
     6:     for _, srv := range servers {
     7:         idle <- srv
     8:     }
     9: 
    10:     for task := 0, task < numTask; task++ {
    11:         // if using task in the for loop rather than a local task,
    12:         // there is a race: the loop goes on before the goroutinue starts,
    13:         // so that some tasks are skipped.
    14:         task := task
    15:         // if moving srv := <- idle inside goroutine
    16:         // a lot of goroutines are created simoutaneously and hung
    17:         // due to non-idle server
    18:         // leaving it outside so that a goroutine is only created when
    19:         // there is an idle server (but it slows down the main loop)
    20:         srv := <-idle
    21:         go func() {
    22:             call(srv, task) // server does the task
    23:             // serve finishes the task and becomes idle again
    24:             idle <- srv
    25:         }()
    26:     }
    27: 
    28:     // determine when all tasks are done / all servers are idle
    29:     // this is used to prevent early exit when all tasks have been assigned
    30:     // but the last servers have not finished
    31:     for i :=0; i < len(servers); i++ {
    32:         <-idle
    33:     }
    34: }
    
  • Optimization for the above code: while the task loop creates goroutines \(M\) times, actually there are only at most \(N\) active goroutines at any time.

    • Better to spin off a goroutine for each server.
    • The number of servers can be dynamic.
     1: func Schedule(servers chan string, numTask int,
     2:     call func(srv string, task int)) {
     3: 
     4:     work := make(chan int)  // a queue of all works yet to be done
     5:     done := make(chan bool) // a queue of all done tasks
     6:     exit := make(chan bool) // signal when should not pull new servers
     7: 
     8:     runTasks := func(srv string) {
     9:         // keep polling until work is closed
    10:         for task := range work {
    11:             if call(srv, task) {
    12:                 done <- true
    13:             } else {
    14:                 // repush the task if it failed
    15:                 work <- task
    16:             }
    17:         }
    18:     }
    19: 
    20:     // use a goroutine to avoid hanging when
    21:     // no server is available
    22:     go func() {
    23:         for _, srv := range servers {
    24:             for {
    25:                 select {
    26:                 case src := <-servers:
    27:                     go runTasks(srv)
    28:                 case <-exit:
    29:                     return
    30:                 }
    31:             }
    32:         }
    33:     }()
    34: 
    35:     // The following code has a deadlock!
    36:     // In the runTasks, the server pushes to done channel when a task is done.
    37:     // However, the done channel is only pulled when the main routine has
    38:     // pushed all tasks and close the work channel.
    39:     // Therefore any server hangs when trying push the second done work.
    40:     // for taks := 0; task < numTask; task++ {
    41:     //  work <- task
    42:     // }
    43:     // // signal no more task so that servers know
    44:     // // when to termiante
    45:     // close(work)
    46: 
    47:     // // wait until all tasks are done
    48:     // for i := 0; i < numTask; i++ {
    49:     //  <-done
    50:     // }
    51: 
    52:     // fix 1: one can switch between work and donw channel
    53:     i := 0
    54: WorkLoop:
    55:     for task := 0; task < numTask; task++ {
    56:         for {
    57:             select {
    58:             case work <- task:
    59:                 continue WorkLoop
    60:             case <-done:
    61:                 i++
    62:             }
    63:         }
    64:     }
    65: 
    66:     // wait for the last assigned tasks to be done
    67:     for ; i < numTask; i++ {
    68:         <-done
    69:     }
    70: 
    71:     // only close work channel in the end,
    72:     // in case some tasks failed and need to be redo
    73:     close(work)
    74:     exit <- true // stop pulling new servers
    75: 
    76:     // fix 2: move the work assignment to a separate go routine
    77:     // go func() {
    78:     //  for task := ; task < numTask; task++ {
    79:     //      work <- task
    80:     //  }
    81:     //  close(work)
    82:     // }()
    83: 
    84:     // fix 3: increase buffer for the work channel
    85:     // work := make(chan int, numTask)
    86: }
    

5. Pattern 3: replicated service client

  • A client replicates its requests to multiple servers, waits for the first reply and changes its preferred server.

    func (c *Client) Call(args Args) Reply {
        type result struct {
            serverID int
            reply Reply
        }
    
        const timeout = 1 * time.Second
        t := time.NewTimer(timeout)
        defer t.Stop()
    
        // a channel for all servers to send reply
        // so that even if the client has received a reply
        // other later replies don't hang
        done := make(chan result, len(c.servers))
    
        c.mu.Lock()
        prefer := c.prefer
        c.mu.Unlock()
    
        var r result
        for off := 0; off < len(c.servers); off++ {
            // start from the preferred server
            id := (prefer + off) % len(c.servers)
            go func() {
                done <- result{id, c.callOne(c.servers[id], arfs)}
            }()
    
            // now wait for either a done signal or a timeout
            // if it is done, don't send to other servers
            // otherwise, reset the timer and sends to the next server
            select {
            case r = <-done:
                goto Done  // use a goto if it makes code clear
            case <-t.C:
                // timeout
                t.Reset(timeout)
            }
        }
    
        r = <-done  // wait for the first reply even if it is a RPC timeout
    
    Done:
        c.mu.Lock()
        c.prefer = r.serverID // update preference
        c.mu.Unlock()
        return r.reply
    }
    
    

6. Pattern 4: Protocol multiplexer

  • A multiplexer sits in front of a service and forward messages between multiple clients and the service, e.g., an RPC.

     1: type ProtocolMux interface {
     2:     // A mux is binded to a specific service
     3:     Init(Service)
     4:     // A client uses this method to send message to the service
     5:     // and wait for the service reply
     6:     Call(Msg) Msg
     7: }
     8: 
     9: // Methods that a service exposes to let a mux use
    10: // Underlining messgae processing are in the implementation
    11: // of the actual service struct
    12: type Service interface {
    13:     // A tag is a muxing identifier in the request or reply message,
    14:     // e.g., which client channel to send the reply
    15:     ReadTag(Msg) int64
    16:     // Send a request message to the service
    17:     // multiple sends cannot be called concurrently
    18:     // probably due to only a single channel between
    19:     // mux and the service (serialization)
    20:     Send(Msg)
    21:     // Waits and return the reply message,
    22:     // multiple recvs cannot be called concurrently
    23:     Recv() Msg
    24: }
    
  • The mux maintains a channel to queue unsent requests and a channel to queue unsent replies.

     1: type Mux struct {
     2:     srv Service
     3:     // stores unsent requests
     4:     send chan Msg
     5:     mu sync.Mutex
     6:     // maps channel tag to channel
     7:     // whose replies have not been sent out
     8:     pending map[int64]chan<- Msg
     9: }
    10: 
    11: func (m *Mux) Init(srv Service) {
    12:     m.srv = srv
    13:     m.pending = make(map[int64]chan Msg)
    14:     go m.sendLoop()
    15:     go m.recvLoop()
    16: }
    17: 
    18: // sending out queued requests
    19: func (m *Mux) sendLoop {
    20:     for args := range m.send {
    21:         m.srv.Send(args)
    22:     }
    23: }
    24: 
    25: func (m *Mux) recvLoop() {
    26:     for {
    27:         reply := m.srv.Recv()
    28:         tag := m.srv.ReadTag(reply)
    29:         m.mu.Lock()
    30:         // get the reply channel
    31:         done := m.pending[tag]
    32:         // clear the channel since the message loop
    33:         // is complete
    34:         delete(m.pending, tag)
    35:         m.mu.Unlock()
    36: 
    37:         if done == nil {
    38:             panic("unexpected reply")
    39:         }
    40:         done <- reply
    41:     }
    42: 
    43: }
    44: 
    45: // Clients call this method concurrently
    46: func (m *Mux) Call(args Msg) (reply Msg) {
    47:     tag := m.srv.ReadTag(args)
    48:     // to record which message should reply
    49:     // to which client
    50:     done := make(chan Msg, 1)
    51:     m.mu.Lock()
    52:     if m.pending[tag] != nil {
    53:         m.mu.Unlock()
    54:         panic("duplicate request")
    55:     }
    56:     m.pending[tag] = done
    57:     m.mu.Unlock()
    58:     m.send <- args
    59:     return <-done // hang until a reply is received
    60: }
    
Tags: go design-pattern study