| OLD | NEW |
| (Empty) |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | |
| 2 // Use of this source code is governed by a BSD-style license that can be | |
| 3 // found in the LICENSE file. | |
| 4 | |
| 5 package bindings | |
| 6 | |
| 7 import ( | |
| 8 "fmt" | |
| 9 "sync" | |
| 10 | |
| 11 "mojo/public/go/system" | |
| 12 ) | |
| 13 | |
| 14 // MessageReadResult contains information returned after reading and parsing | |
| 15 // a message: a non-nil error of a valid message. | |
| 16 type MessageReadResult struct { | |
| 17 Message *Message | |
| 18 Error error | |
| 19 } | |
| 20 | |
| 21 // routeRequest is a request sent from Router to routerWorker. | |
| 22 type routeRequest struct { | |
| 23 // The outgoing message with non-zero request id. | |
| 24 message *Message | |
| 25 // The channel to send respond for the message. | |
| 26 responseChan chan<- MessageReadResult | |
| 27 } | |
| 28 | |
| 29 // routerWorker sends messages that require a response and and routes responses | |
| 30 // to appropriate receivers. The work is done on a separate go routine. | |
| 31 type routerWorker struct { | |
| 32 // The message pipe handle to send requests and receive responses. | |
| 33 handle system.MessagePipeHandle | |
| 34 // Map from request id to response channel. | |
| 35 responders map[uint64]chan<- MessageReadResult | |
| 36 // The channel of incoming requests that require responses. | |
| 37 requestChan <-chan routeRequest | |
| 38 // The channel that indicates that the worker should terminate. | |
| 39 done <-chan struct{} | |
| 40 // Implementation of async waiter. | |
| 41 waiter AsyncWaiter | |
| 42 waitChan chan WaitResponse | |
| 43 waitId AsyncWaitId | |
| 44 } | |
| 45 | |
| 46 // readOutstandingMessages reads and dispatches available messages in the | |
| 47 // message pipe until the messages is empty or there are no waiting responders. | |
| 48 // If the worker is currently waiting on the message pipe, returns immediately | |
| 49 // without an error. | |
| 50 func (w *routerWorker) readAndDispatchOutstandingMessages() error { | |
| 51 if w.waitId != 0 { | |
| 52 // Still waiting for a new message in the message pipe. | |
| 53 return nil | |
| 54 } | |
| 55 for len(w.responders) > 0 { | |
| 56 result, bytes, handles := w.handle.ReadMessage(system.MOJO_READ_
MESSAGE_FLAG_NONE) | |
| 57 if result == system.MOJO_RESULT_SHOULD_WAIT { | |
| 58 w.waitId = w.waiter.AsyncWait(w.handle, system.MOJO_HAND
LE_SIGNAL_READABLE, w.waitChan) | |
| 59 return nil | |
| 60 } | |
| 61 if result != system.MOJO_RESULT_OK { | |
| 62 return &ConnectionError{result} | |
| 63 } | |
| 64 message, err := ParseMessage(bytes, handles) | |
| 65 if err != nil { | |
| 66 return err | |
| 67 } | |
| 68 id := message.Header.RequestId | |
| 69 w.responders[id] <- MessageReadResult{message, nil} | |
| 70 delete(w.responders, id) | |
| 71 } | |
| 72 return nil | |
| 73 } | |
| 74 | |
| 75 func (w *routerWorker) cancelIfWaiting() { | |
| 76 if w.waitId != 0 { | |
| 77 w.waiter.CancelWait(w.waitId) | |
| 78 w.waitId = 0 | |
| 79 } | |
| 80 } | |
| 81 | |
| 82 // runLoop is the main run loop of the worker. It processes incoming requests | |
| 83 // from Router and waits on a message pipe for new messages. | |
| 84 // Returns an error describing the cause of stopping. | |
| 85 func (w *routerWorker) runLoop() error { | |
| 86 for { | |
| 87 select { | |
| 88 case waitResponse := <-w.waitChan: | |
| 89 w.waitId = 0 | |
| 90 if waitResponse.Result != system.MOJO_RESULT_OK { | |
| 91 return &ConnectionError{waitResponse.Result} | |
| 92 } | |
| 93 case request := <-w.requestChan: | |
| 94 if err := WriteMessage(w.handle, request.message); err !
= nil { | |
| 95 return err | |
| 96 } | |
| 97 if request.responseChan != nil { | |
| 98 w.responders[request.message.Header.RequestId] =
request.responseChan | |
| 99 } | |
| 100 case <-w.done: | |
| 101 return errConnectionClosed | |
| 102 } | |
| 103 // Returns immediately without an error if still waiting for | |
| 104 // a new message. | |
| 105 if err := w.readAndDispatchOutstandingMessages(); err != nil { | |
| 106 return err | |
| 107 } | |
| 108 } | |
| 109 } | |
| 110 | |
| 111 // Router sends messages to a message pipe and routes responses back to senders | |
| 112 // of messages with non-zero request ids. The caller should issue unique request | |
| 113 // ids for each message given to the router. | |
| 114 type Router struct { | |
| 115 // Mutex protecting requestChan from new requests in case the router is | |
| 116 // closed and the handle. | |
| 117 mu sync.Mutex | |
| 118 // The message pipe handle to send requests and receive responses. | |
| 119 handle system.MessagePipeHandle | |
| 120 // Channel to communicate with worker. | |
| 121 requestChan chan<- routeRequest | |
| 122 | |
| 123 // Makes sure that the done channel is closed once. | |
| 124 closeOnce sync.Once | |
| 125 // Channel to stop the worker. | |
| 126 done chan<- struct{} | |
| 127 } | |
| 128 | |
| 129 // NewRouter returns a new Router instance that sends and receives messages | |
| 130 // from a provided message pipe handle. | |
| 131 func NewRouter(handle system.MessagePipeHandle, waiter AsyncWaiter) *Router { | |
| 132 requestChan := make(chan routeRequest, 10) | |
| 133 doneChan := make(chan struct{}) | |
| 134 router := &Router{ | |
| 135 handle: handle, | |
| 136 requestChan: requestChan, | |
| 137 done: doneChan, | |
| 138 } | |
| 139 router.runWorker(&routerWorker{ | |
| 140 handle, | |
| 141 make(map[uint64]chan<- MessageReadResult), | |
| 142 requestChan, | |
| 143 doneChan, | |
| 144 waiter, | |
| 145 make(chan WaitResponse, 1), | |
| 146 0, | |
| 147 }) | |
| 148 return router | |
| 149 } | |
| 150 | |
| 151 // Close closes the router and the underlying message pipe. All new incoming | |
| 152 // requests are returned with an error. | |
| 153 func (r *Router) Close() { | |
| 154 r.closeOnce.Do(func() { | |
| 155 close(r.done) | |
| 156 }) | |
| 157 } | |
| 158 | |
| 159 // Accept sends a message to the message pipe. The message should have a | |
| 160 // zero request id in header. | |
| 161 func (r *Router) Accept(message *Message) error { | |
| 162 if message.Header.RequestId != 0 { | |
| 163 return fmt.Errorf("message header should have a zero request ID"
) | |
| 164 } | |
| 165 r.mu.Lock() | |
| 166 defer r.mu.Unlock() | |
| 167 if !r.handle.IsValid() { | |
| 168 return errConnectionClosed | |
| 169 } | |
| 170 r.requestChan <- routeRequest{message, nil} | |
| 171 return nil | |
| 172 } | |
| 173 | |
| 174 func (r *Router) runWorker(worker *routerWorker) { | |
| 175 // Run worker on a separate go routine. | |
| 176 go func() { | |
| 177 // Get the reason why the worker stopped. The error means that | |
| 178 // either the router is closed or there was an error reading | |
| 179 // or writing to a message pipe. In both cases it will be | |
| 180 // the reason why we can't process any more requests. | |
| 181 err := worker.runLoop() | |
| 182 worker.cancelIfWaiting() | |
| 183 // Respond to all pending requests. | |
| 184 for _, responseChan := range worker.responders { | |
| 185 responseChan <- MessageReadResult{nil, err} | |
| 186 } | |
| 187 // Respond to incoming requests until we make sure that all | |
| 188 // new requests return with an error before sending request | |
| 189 // to responseChan. | |
| 190 go func() { | |
| 191 for responder := range worker.requestChan { | |
| 192 responder.responseChan <- MessageReadResult{nil,
err} | |
| 193 } | |
| 194 }() | |
| 195 r.mu.Lock() | |
| 196 r.handle.Close() | |
| 197 // If we acquire the lock then no other go routine is waiting | |
| 198 // to write to responseChan. All go routines that acquire the | |
| 199 // lock after us will return before sending to responseChan as | |
| 200 // the underlying handle is invalid (already closed). | |
| 201 // We can safely close the requestChan. | |
| 202 close(r.requestChan) | |
| 203 r.mu.Unlock() | |
| 204 }() | |
| 205 } | |
| 206 | |
| 207 // AcceptWithResponse sends a message to the message pipe and returns a channel | |
| 208 // that will stream the result of reading corresponding response. The message | |
| 209 // should have a non-zero request id in header. It is responsibility of the | |
| 210 // caller to issue unique request ids for all given messages. | |
| 211 func (r *Router) AcceptWithResponse(message *Message) <-chan MessageReadResult { | |
| 212 responseChan := make(chan MessageReadResult, 1) | |
| 213 if message.Header.RequestId == 0 { | |
| 214 responseChan <- MessageReadResult{nil, fmt.Errorf("message heade
r should have a request ID")} | |
| 215 return responseChan | |
| 216 } | |
| 217 r.mu.Lock() | |
| 218 defer r.mu.Unlock() | |
| 219 // Return an error before sending a request to requestChan if the router | |
| 220 // is closed so that we can safely close responseChan once we close the | |
| 221 // router. | |
| 222 if !r.handle.IsValid() { | |
| 223 responseChan <- MessageReadResult{nil, errConnectionClosed} | |
| 224 return responseChan | |
| 225 } | |
| 226 r.requestChan <- routeRequest{message, responseChan} | |
| 227 return responseChan | |
| 228 } | |
| OLD | NEW |