Go patterns
This a personal note for the Russ Cox guest lecture.
1. Concurrency vs Parallelism
- Concurrency: write a program to handle lot of things at once
- not necessarily faster
- Parallelism: the program itself can do a lot of computations at once
2. Use goroutines for states
2.1. Matching a regex
- return if a given string matches a regex: start with
"
, contains arbitrary escape sequence and ends with"
unclear logic: store states in the data
1: state := 0 2: for { 3: c := read() 4: switch state { 5: case 0: 6: // first char must be " 7: if c != '"' { 8: return false 9: } 10: state = 1 // match the next char 11: case 1: 12: // ending with " matches 13: if c == '"' { 14: return true 15: } 16: if c == '\\' { 17: state = 2 18: } else { 19: // transition to state 1 to match next char 20: state = 1 21: } 22: case 2: 23: // read the char, discard it and 24: state = 1 25: } 26: }
clear logic: store states in the code
1: // no variable to store state 2: if read() != '"' { 3: return false 4: } 5: var c rune // c is a Unicode, alias to int32 6: for c != '"' { 7: c = read() 8: if c == '\\' { 9: read() // skip the next char 10: } 11: } 12: return true
2.2. When the state variable cannot be avoided
the function needs to return the state
1: type quoter struct { 2: state int 3: } 4: 5: func (q *quoter) Init() { 6: r.state = 0 7: } 8: // proess each char based on current state 9: func (q *quoter) Write(c rune) Status { 10: switch q.state { 11: case 0: 12: if c != '"' { 13: return BadInput 14: } 15: q.state = 1 16: case 1: 17: if c == '"' { 18: return Success 19: } 20: if c == '\\' { 21: q.state = 2 22: } else { 23: q.state = 1 24: } 25: case 2: 26: q.state = 1 27: } 28: return NeedMoreInput 29: }
use additional goroutines to hold states
1: type quoter struct { 2: char chan rune 3: status chan Status 4: } 5: func (q *quoter) Init() { 6: q.char = make(chan rune) 7: q.status = make(chan Status) 8: // need to make sure why and when the goroutine will exit 9: go q.parse() 10: // blocks until it receives an initial status from parse() 11: // to ensure that parse() is ready, i.e., q.status = NeedMoreInput 12: // before Write() is called 13: <-q.status 14: } 15: // Write sends the next char to q.char, which will be receivecd by parse() 16: // the status is a public state accessible by the user 17: func (q *quoter) Write(r rune) Status { 18: q.char <- c 19: // wait for the result 20: return <-q.status 21: } 22: func (q *quoteReader) parse() { 23: if q.read() != '"' { 24: q.status <- SyntaxError 25: return 26: } 27: var c rune 28: for c!= '"' { 29: c = q.read() 30: if c == '\\' { 31: q.read() 32: } 33: } 34: q.status <- Done 35: } 36: // a helper function used in parse() to return the next char in q.char 37: func (q *quoter) read() int { 38: q.status <- NeedMoreInput 39: return <- q.char 40: } 41: func main() { 42: q := "er{} 43: q.Init() 44: 45: input := `"Hello, \"World\""` 46: for _, c := range input { 47: status := q.Write(c) 48: } 49: }
- check goroutine blockage
Ctrl-\
sendsSIGQUIT
- use the HTTP server’s
/debug/pprof/goroutine
if importingnet/http
3. Pattern 1: publish/subscribe server
- the information goes one way: server -> client
- close a channel to signal no new values will be sent
prefer
defer
when unlocking the mutex1: type Server struct { 2: mu sync.Mutex // protect sub 3: sub map[chan<- Event]bool // whether a channel should be closed 4: } 5: func (s *Server) Init() { 6: s.sub = make(map[chan<- Event]bool) 7: } 8: // publish an event to all subscribed channel 9: func (s *Server) Publish(e Event) { 10: s.mu.Lock() // each method could be called by many clients 11: defer s.mu.Unlock() 12: // need mutex here since it needs to read s.sub state 13: for c := range s.sub { 14: // if a goroutine consumes the channel events too slow 15: // then a new event publish has to wait 16: // before it can send to the channel 17: // can add channel buffer to mitigate this 18: c <- e 19: } 20: } 21: // a channel starts to subscribe 22: func (s *Server) Subscribe(c chan<- Event) { 23: s.mu.Lock() 24: defer s.mu.Unlock() 25: if s.sub[c] { 26: // the mutex wil also be unlocked with defer 27: panic("pubsub: already subscribed") } 28: s.sub[c] = true 29: } 30: // a channel cancels the subscription 31: func (s *Server) Cancel(c chan<- Event) { 32: s.mu.Lock() 33: defer s.mu.Unlock() 34: if !s.sub[c] { 35: panic("pubsub: not subscribed") 36: } 37: close(c) 38: delete(s.sub, c) 39: }
3.1. Options for slow goroutines
- slow down event generation
- drop events if it cannot be sent, e.g.,
os/signal
,runtime/pprof
queue events, e.g., add a
helper
between the server and each client, which also separates the concerns1: func helper(in <-chan Event, out chan<- Event) { 2: var q []Event 3: // if the in is closed, flash out the pending events in q 4: // and close out 5: for in != nil || len(q) > 0 { 6: // decide whether and what to send 7: var sendOut chan<- Event 8: var next Event 9: if len(q) > 0 { 10: sendOut = out 11: next = q[0] 12: } 13: select { 14: case e, ok := <-in: // never reaches here after in = nil 15: // ok tells whether in is closed 16: if !ok { 17: in = nil 18: break 19: } 20: q = append(q, e) 21: case sendOut <- next: // if len(q) == 0, sendOut = nil 22: q = q[1:] 23: } 24: } 25: close(out) 26: }
convert mutexes into goroutines, not suitable for Raft where state transition is complex
1: type Server struct { 2: publish chan Event 3: subscribe chan subReq // a channel to queue unhandled subscription 4: cancel chan subReq 5: } 6: type subReq struct { 7: c chan<- Event 8: // a signal of whether an operation succeeds 9: ok chan bool 10: } 11: 12: func (s *Server) Init() { 13: s.publish = make(chan Event) 14: s.subscribe = make(chan subReq) 15: s.cancel = make(chan subReq) 16: go s.loop() 17: } 18: func (s *Server) Publish(e Event) { 19: // no mutex is required here 20: // as it does not read state 21: s.publish <- e 22: } 23: func (s *Server) Subscribe(c chan<- Event) { 24: r := subReq{c: c, ok: make(chan bool)} 25: s.subscribe <- r 26: if !<-r.ok { // wait for loop() handle result 27: panic("pubsub: already subscribed") 28: } 29: } 30: func (s *Server) Cancel(c chan<- Event) { 31: r := subReq{c: c, ok: make(chan bool)} 32: s.cancel <- r 33: if !<-r.ok { 34: panic("pubusb: not subscribed") 35: } 36: } 37: func (s *Server) loop() { 38: // now sub is a local variable, no lock is needed 39: // sub maps from a subscribed channel to a helper channel 40: sub := make(map[chan<- Event]chan<- Event) 41: for { 42: select { 43: case e := <-s.publish: 44: for _, h := range sub { 45: // the event is published to a helper channel 46: h <- e 47: } 48: case r := <-s.subscribe: 49: // the helper channel exists 50: // meaning the subscriber has been handled before 51: if sub[r.c] != nil { 52: r.ok <- false 53: break 54: } 55: h = make(chan Event) 56: go helper(h, r.c) 57: sub[r.c] = h 58: r.ok <- true 59: case c := <-s.cancel: 60: if !sub[r.c] == nil{ 61: r.ok <- false 62: break 63: } 64: // close the helper channel 65: close(sub[r.c]) 66: delete(sub, r.c) 67: r.ok <- true 68: } 69: } 70: }
4. Pattern 2: work scheduler
\(M\) tasks assigned to \(N\) servers/workers, \( M >> N\).
1: func Schedule(servers []string, numTask int, 2: call func(srv string, task int)) { 3: 4: idle := make(chan string, len(servers)) 5: // initialize a channel of idle servers 6: for _, srv := range servers { 7: idle <- srv 8: } 9: 10: for task := 0, task < numTask; task++ { 11: // if using task in the for loop rather than a local task, 12: // there is a race: the loop goes on before the goroutinue starts, 13: // so that some tasks are skipped. 14: task := task 15: // if moving srv := <- idle inside goroutine 16: // a lot of goroutines are created simoutaneously and hung 17: // due to non-idle server 18: // leaving it outside so that a goroutine is only created when 19: // there is an idle server (but it slows down the main loop) 20: srv := <-idle 21: go func() { 22: call(srv, task) // server does the task 23: // serve finishes the task and becomes idle again 24: idle <- srv 25: }() 26: } 27: 28: // determine when all tasks are done / all servers are idle 29: // this is used to prevent early exit when all tasks have been assigned 30: // but the last servers have not finished 31: for i :=0; i < len(servers); i++ { 32: <-idle 33: } 34: }
Optimization for the above code: while the task loop creates goroutines \(M\) times, actually there are only at most \(N\) active goroutines at any time.
- Better to spin off a goroutine for each server.
- The number of servers can be dynamic.
1: func Schedule(servers chan string, numTask int, 2: call func(srv string, task int)) { 3: 4: work := make(chan int) // a queue of all works yet to be done 5: done := make(chan bool) // a queue of all done tasks 6: exit := make(chan bool) // signal when should not pull new servers 7: 8: runTasks := func(srv string) { 9: // keep polling until work is closed 10: for task := range work { 11: if call(srv, task) { 12: done <- true 13: } else { 14: // repush the task if it failed 15: work <- task 16: } 17: } 18: } 19: 20: // use a goroutine to avoid hanging when 21: // no server is available 22: go func() { 23: for _, srv := range servers { 24: for { 25: select { 26: case src := <-servers: 27: go runTasks(srv) 28: case <-exit: 29: return 30: } 31: } 32: } 33: }() 34: 35: // The following code has a deadlock! 36: // In the runTasks, the server pushes to done channel when a task is done. 37: // However, the done channel is only pulled when the main routine has 38: // pushed all tasks and close the work channel. 39: // Therefore any server hangs when trying push the second done work. 40: // for taks := 0; task < numTask; task++ { 41: // work <- task 42: // } 43: // // signal no more task so that servers know 44: // // when to termiante 45: // close(work) 46: 47: // // wait until all tasks are done 48: // for i := 0; i < numTask; i++ { 49: // <-done 50: // } 51: 52: // fix 1: one can switch between work and donw channel 53: i := 0 54: WorkLoop: 55: for task := 0; task < numTask; task++ { 56: for { 57: select { 58: case work <- task: 59: continue WorkLoop 60: case <-done: 61: i++ 62: } 63: } 64: } 65: 66: // wait for the last assigned tasks to be done 67: for ; i < numTask; i++ { 68: <-done 69: } 70: 71: // only close work channel in the end, 72: // in case some tasks failed and need to be redo 73: close(work) 74: exit <- true // stop pulling new servers 75: 76: // fix 2: move the work assignment to a separate go routine 77: // go func() { 78: // for task := ; task < numTask; task++ { 79: // work <- task 80: // } 81: // close(work) 82: // }() 83: 84: // fix 3: increase buffer for the work channel 85: // work := make(chan int, numTask) 86: }
5. Pattern 3: replicated service client
A client replicates its requests to multiple servers, waits for the first reply and changes its preferred server.
func (c *Client) Call(args Args) Reply { type result struct { serverID int reply Reply } const timeout = 1 * time.Second t := time.NewTimer(timeout) defer t.Stop() // a channel for all servers to send reply // so that even if the client has received a reply // other later replies don't hang done := make(chan result, len(c.servers)) c.mu.Lock() prefer := c.prefer c.mu.Unlock() var r result for off := 0; off < len(c.servers); off++ { // start from the preferred server id := (prefer + off) % len(c.servers) go func() { done <- result{id, c.callOne(c.servers[id], arfs)} }() // now wait for either a done signal or a timeout // if it is done, don't send to other servers // otherwise, reset the timer and sends to the next server select { case r = <-done: goto Done // use a goto if it makes code clear case <-t.C: // timeout t.Reset(timeout) } } r = <-done // wait for the first reply even if it is a RPC timeout Done: c.mu.Lock() c.prefer = r.serverID // update preference c.mu.Unlock() return r.reply }
6. Pattern 4: Protocol multiplexer
A multiplexer sits in front of a service and forward messages between multiple clients and the service, e.g., an RPC.
1: type ProtocolMux interface { 2: // A mux is binded to a specific service 3: Init(Service) 4: // A client uses this method to send message to the service 5: // and wait for the service reply 6: Call(Msg) Msg 7: } 8: 9: // Methods that a service exposes to let a mux use 10: // Underlining messgae processing are in the implementation 11: // of the actual service struct 12: type Service interface { 13: // A tag is a muxing identifier in the request or reply message, 14: // e.g., which client channel to send the reply 15: ReadTag(Msg) int64 16: // Send a request message to the service 17: // multiple sends cannot be called concurrently 18: // probably due to only a single channel between 19: // mux and the service (serialization) 20: Send(Msg) 21: // Waits and return the reply message, 22: // multiple recvs cannot be called concurrently 23: Recv() Msg 24: }
The mux maintains a channel to queue unsent requests and a channel to queue unsent replies.
1: type Mux struct { 2: srv Service 3: // stores unsent requests 4: send chan Msg 5: mu sync.Mutex 6: // maps channel tag to channel 7: // whose replies have not been sent out 8: pending map[int64]chan<- Msg 9: } 10: 11: func (m *Mux) Init(srv Service) { 12: m.srv = srv 13: m.pending = make(map[int64]chan Msg) 14: go m.sendLoop() 15: go m.recvLoop() 16: } 17: 18: // sending out queued requests 19: func (m *Mux) sendLoop { 20: for args := range m.send { 21: m.srv.Send(args) 22: } 23: } 24: 25: func (m *Mux) recvLoop() { 26: for { 27: reply := m.srv.Recv() 28: tag := m.srv.ReadTag(reply) 29: m.mu.Lock() 30: // get the reply channel 31: done := m.pending[tag] 32: // clear the channel since the message loop 33: // is complete 34: delete(m.pending, tag) 35: m.mu.Unlock() 36: 37: if done == nil { 38: panic("unexpected reply") 39: } 40: done <- reply 41: } 42: 43: } 44: 45: // Clients call this method concurrently 46: func (m *Mux) Call(args Msg) (reply Msg) { 47: tag := m.srv.ReadTag(args) 48: // to record which message should reply 49: // to which client 50: done := make(chan Msg, 1) 51: m.mu.Lock() 52: if m.pending[tag] != nil { 53: m.mu.Unlock() 54: panic("duplicate request") 55: } 56: m.pending[tag] = done 57: m.mu.Unlock() 58: m.send <- args 59: return <-done // hang until a reply is received 60: }