OLD | NEW |
(Empty) | |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. |
| 4 |
| 5 package bindings |
| 6 |
| 7 import ( |
| 8 "fmt" |
| 9 "sync" |
| 10 |
| 11 "mojo/public/go/system" |
| 12 ) |
| 13 |
| 14 // MessageReadResult contains information returned after reading and parsing |
| 15 // a message: a non-nil error of a valid message. |
| 16 type MessageReadResult struct { |
| 17 Message *Message |
| 18 Error error |
| 19 } |
| 20 |
| 21 // routeRequest is a request sent from Router to routerWorker. |
| 22 type routeRequest struct { |
| 23 // The outgoing message with non-zero request id. |
| 24 message *Message |
| 25 // The channel to send respond for the message. |
| 26 responseChan chan<- MessageReadResult |
| 27 } |
| 28 |
| 29 // routerWorker sends messages that require a response and and routes responses |
| 30 // to appropriate receivers. The work is done on a separate go routine. |
| 31 type routerWorker struct { |
| 32 // The message pipe handle to send requests and receive responses. |
| 33 handle system.MessagePipeHandle |
| 34 // Map from request id to response channel. |
| 35 responders map[uint64]chan<- MessageReadResult |
| 36 // The channel of incoming requests that require responses. |
| 37 requestChan <-chan routeRequest |
| 38 // The channel that indicates that the worker should terminate. |
| 39 done <-chan struct{} |
| 40 // Implementation of async waiter. |
| 41 waiter AsyncWaiter |
| 42 waitChan chan WaitResponse |
| 43 waitId AsyncWaitId |
| 44 } |
| 45 |
| 46 // readOutstandingMessages reads and dispatches available messages in the |
| 47 // message pipe until the messages is empty or there are no waiting responders. |
| 48 // If the worker is currently waiting on the message pipe, returns immediately |
| 49 // without an error. |
| 50 func (w *routerWorker) readAndDispatchOutstandingMessages() error { |
| 51 if w.waitId != 0 { |
| 52 // Still waiting for a new message in the message pipe. |
| 53 return nil |
| 54 } |
| 55 for len(w.responders) > 0 { |
| 56 result, bytes, handles := w.handle.ReadMessage(system.MOJO_READ_
MESSAGE_FLAG_NONE) |
| 57 if result == system.MOJO_RESULT_SHOULD_WAIT { |
| 58 w.waitId = w.waiter.AsyncWait(w.handle, system.MOJO_HAND
LE_SIGNAL_READABLE, w.waitChan) |
| 59 return nil |
| 60 } |
| 61 if result != system.MOJO_RESULT_OK { |
| 62 return fmt.Errorf("error reading message: %v", result) |
| 63 } |
| 64 message, err := ParseMessage(bytes, handles) |
| 65 if err != nil { |
| 66 return err |
| 67 } |
| 68 id := message.Header.RequestId |
| 69 w.responders[id] <- MessageReadResult{message, nil} |
| 70 delete(w.responders, id) |
| 71 } |
| 72 return nil |
| 73 } |
| 74 |
| 75 func (w *routerWorker) cancelIfWaiting() { |
| 76 if w.waitId != 0 { |
| 77 w.waiter.CancelWait(w.waitId) |
| 78 w.waitId = 0 |
| 79 } |
| 80 } |
| 81 |
| 82 // runLoop is the main run loop of the worker. It processes incoming requests |
| 83 // from Router and waits on a message pipe for new messages. |
| 84 // Returns an error describing the cause of stopping. |
| 85 func (w *routerWorker) runLoop() error { |
| 86 for { |
| 87 select { |
| 88 case waitResponse := <-w.waitChan: |
| 89 w.waitId = 0 |
| 90 if waitResponse.Result != system.MOJO_RESULT_OK { |
| 91 return fmt.Errorf("error waiting for message: %v
", waitResponse.Result) |
| 92 } |
| 93 case request := <-w.requestChan: |
| 94 if err := WriteMessage(w.handle, request.message); err !
= nil { |
| 95 return err |
| 96 } |
| 97 if request.responseChan != nil { |
| 98 w.responders[request.message.Header.RequestId] =
request.responseChan |
| 99 } |
| 100 case <-w.done: |
| 101 return fmt.Errorf("message pipe is closed") |
| 102 } |
| 103 // Returns immediately without an error if still waiting for |
| 104 // a new message. |
| 105 if err := w.readAndDispatchOutstandingMessages(); err != nil { |
| 106 return err |
| 107 } |
| 108 } |
| 109 } |
| 110 |
| 111 // Router sends messages to a message pipe and routes responses back to senders |
| 112 // of messages with non-zero request ids. The caller should issue unique request |
| 113 // ids for each message given to the router. |
| 114 type Router struct { |
| 115 // Mutex protecting requestChan from new requests in case the router is |
| 116 // closed and the handle. |
| 117 mu sync.Mutex |
| 118 // The message pipe handle to send requests and receive responses. |
| 119 handle system.MessagePipeHandle |
| 120 // Channel to communicate with worker. |
| 121 requestChan chan<- routeRequest |
| 122 |
| 123 // Channel to stop the worker. |
| 124 done chan<- struct{} |
| 125 } |
| 126 |
| 127 // NewRouter returns a new Router instance that sends and receives messages |
| 128 // from a provided message pipe handle. |
| 129 func NewRouter(handle system.MessagePipeHandle, waiter AsyncWaiter) *Router { |
| 130 requestChan := make(chan routeRequest, 10) |
| 131 doneChan := make(chan struct{}) |
| 132 router := &Router{ |
| 133 handle: handle, |
| 134 requestChan: requestChan, |
| 135 done: doneChan, |
| 136 } |
| 137 router.runWorker(&routerWorker{ |
| 138 handle, |
| 139 make(map[uint64]chan<- MessageReadResult), |
| 140 requestChan, |
| 141 doneChan, |
| 142 waiter, |
| 143 make(chan WaitResponse, 1), |
| 144 0, |
| 145 }) |
| 146 return router |
| 147 } |
| 148 |
| 149 // Close closes the router and the underlying message pipe. All new incoming |
| 150 // requests are returned with an error. Panics if you try to close the router |
| 151 // more than once. |
| 152 func (r *Router) Close() { |
| 153 close(r.done) |
| 154 } |
| 155 |
| 156 // Accept sends a message to the message pipe. The message should have a |
| 157 // zero request id in header. |
| 158 func (r *Router) Accept(message *Message) error { |
| 159 if message.Header.RequestId != 0 { |
| 160 return fmt.Errorf("message header should have a zero request ID"
) |
| 161 } |
| 162 r.mu.Lock() |
| 163 defer r.mu.Unlock() |
| 164 // This can also mean that the router is closed. |
| 165 if !r.handle.IsValid() { |
| 166 return fmt.Errorf("can't write a message to an invalid handle") |
| 167 } |
| 168 r.requestChan <- routeRequest{message, nil} |
| 169 return nil |
| 170 } |
| 171 |
| 172 func (r *Router) runWorker(worker *routerWorker) { |
| 173 // Run worker on a separate go routine. |
| 174 go func() { |
| 175 // Get the reason why the worker stopped. The error means that |
| 176 // either the router is closed or there was an error reading |
| 177 // or writing to a message pipe. In both cases it will be |
| 178 // the reason why we can't process any more requests. |
| 179 err := worker.runLoop() |
| 180 worker.cancelIfWaiting() |
| 181 // Respond to all pending requests. |
| 182 for _, responseChan := range worker.responders { |
| 183 responseChan <- MessageReadResult{nil, err} |
| 184 } |
| 185 // Respond to incoming requests until we make sure that all |
| 186 // new requests return with an error before sending request |
| 187 // to responseChan. |
| 188 go func() { |
| 189 for responder := range worker.requestChan { |
| 190 responder.responseChan <- MessageReadResult{nil,
err} |
| 191 } |
| 192 }() |
| 193 r.mu.Lock() |
| 194 r.handle.Close() |
| 195 // If we acquire the lock then no other go routine is waiting |
| 196 // to write to responseChan. All go routines that acquire the |
| 197 // lock after us will return before sending to responseChan as |
| 198 // the underlying handle is invalid (already closed). |
| 199 // We can safely close the requestChan. |
| 200 close(r.requestChan) |
| 201 r.mu.Unlock() |
| 202 }() |
| 203 } |
| 204 |
| 205 // AcceptWithResponse sends a message to the message pipe and returns a channel |
| 206 // that will stream the result of reading corresponding response. The message |
| 207 // should have a non-zero request id in header. It is responsibility of the |
| 208 // caller to issue unique request ids for all given messages. |
| 209 func (r *Router) AcceptWithResponse(message *Message) <-chan MessageReadResult { |
| 210 responseChan := make(chan MessageReadResult, 1) |
| 211 if message.Header.RequestId == 0 { |
| 212 responseChan <- MessageReadResult{nil, fmt.Errorf("message heade
r should have a request ID")} |
| 213 return responseChan |
| 214 } |
| 215 r.mu.Lock() |
| 216 defer r.mu.Unlock() |
| 217 // Return an error before sending a request to requestChan if the router |
| 218 // is closed so that we can safely close responseChan once we close the |
| 219 // router. |
| 220 if !r.handle.IsValid() { |
| 221 responseChan <- MessageReadResult{nil, fmt.Errorf("can't write a
message to an invalid handle")} |
| 222 return responseChan |
| 223 } |
| 224 r.requestChan <- routeRequest{message, responseChan} |
| 225 return responseChan |
| 226 } |
OLD | NEW |