| OLD | NEW | 
|---|
|  | (Empty) | 
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. |  | 
| 2 // Use of this source code is governed by a BSD-style license that can be |  | 
| 3 // found in the LICENSE file. |  | 
| 4 |  | 
| 5 package bindings |  | 
| 6 |  | 
| 7 import ( |  | 
| 8         "fmt" |  | 
| 9         "sync" |  | 
| 10 |  | 
| 11         "mojo/public/go/system" |  | 
| 12 ) |  | 
| 13 |  | 
| 14 // MessageReadResult contains information returned after reading and parsing |  | 
| 15 // a message: a non-nil error of a valid message. |  | 
| 16 type MessageReadResult struct { |  | 
| 17         Message *Message |  | 
| 18         Error   error |  | 
| 19 } |  | 
| 20 |  | 
| 21 // routeRequest is a request sent from Router to routerWorker. |  | 
| 22 type routeRequest struct { |  | 
| 23         // The outgoing message with non-zero request id. |  | 
| 24         message *Message |  | 
| 25         // The channel to send respond for the message. |  | 
| 26         responseChan chan<- MessageReadResult |  | 
| 27 } |  | 
| 28 |  | 
| 29 // routerWorker sends messages that require a response and and routes responses |  | 
| 30 // to appropriate receivers. The work is done on a separate go routine. |  | 
| 31 type routerWorker struct { |  | 
| 32         // The message pipe handle to send requests and receive responses. |  | 
| 33         handle system.MessagePipeHandle |  | 
| 34         // Map from request id to response channel. |  | 
| 35         responders map[uint64]chan<- MessageReadResult |  | 
| 36         // The channel of incoming requests that require responses. |  | 
| 37         requestChan <-chan routeRequest |  | 
| 38         // The channel that indicates that the worker should terminate. |  | 
| 39         done <-chan struct{} |  | 
| 40         // Implementation of async waiter. |  | 
| 41         waiter   AsyncWaiter |  | 
| 42         waitChan chan WaitResponse |  | 
| 43         waitId   AsyncWaitId |  | 
| 44 } |  | 
| 45 |  | 
| 46 // readOutstandingMessages reads and dispatches available messages in the |  | 
| 47 // message pipe until the messages is empty or there are no waiting responders. |  | 
| 48 // If the worker is currently waiting on the message pipe, returns immediately |  | 
| 49 // without an error. |  | 
| 50 func (w *routerWorker) readAndDispatchOutstandingMessages() error { |  | 
| 51         if w.waitId != 0 { |  | 
| 52                 // Still waiting for a new message in the message pipe. |  | 
| 53                 return nil |  | 
| 54         } |  | 
| 55         for len(w.responders) > 0 { |  | 
| 56                 result, bytes, handles := w.handle.ReadMessage(system.MOJO_READ_
     MESSAGE_FLAG_NONE) |  | 
| 57                 if result == system.MOJO_RESULT_SHOULD_WAIT { |  | 
| 58                         w.waitId = w.waiter.AsyncWait(w.handle, system.MOJO_HAND
     LE_SIGNAL_READABLE, w.waitChan) |  | 
| 59                         return nil |  | 
| 60                 } |  | 
| 61                 if result != system.MOJO_RESULT_OK { |  | 
| 62                         return &ConnectionError{result} |  | 
| 63                 } |  | 
| 64                 message, err := ParseMessage(bytes, handles) |  | 
| 65                 if err != nil { |  | 
| 66                         return err |  | 
| 67                 } |  | 
| 68                 id := message.Header.RequestId |  | 
| 69                 w.responders[id] <- MessageReadResult{message, nil} |  | 
| 70                 delete(w.responders, id) |  | 
| 71         } |  | 
| 72         return nil |  | 
| 73 } |  | 
| 74 |  | 
| 75 func (w *routerWorker) cancelIfWaiting() { |  | 
| 76         if w.waitId != 0 { |  | 
| 77                 w.waiter.CancelWait(w.waitId) |  | 
| 78                 w.waitId = 0 |  | 
| 79         } |  | 
| 80 } |  | 
| 81 |  | 
| 82 // runLoop is the main run loop of the worker. It processes incoming requests |  | 
| 83 // from Router and waits on a message pipe for new messages. |  | 
| 84 // Returns an error describing the cause of stopping. |  | 
| 85 func (w *routerWorker) runLoop() error { |  | 
| 86         for { |  | 
| 87                 select { |  | 
| 88                 case waitResponse := <-w.waitChan: |  | 
| 89                         w.waitId = 0 |  | 
| 90                         if waitResponse.Result != system.MOJO_RESULT_OK { |  | 
| 91                                 return &ConnectionError{waitResponse.Result} |  | 
| 92                         } |  | 
| 93                 case request := <-w.requestChan: |  | 
| 94                         if err := WriteMessage(w.handle, request.message); err !
     = nil { |  | 
| 95                                 return err |  | 
| 96                         } |  | 
| 97                         if request.responseChan != nil { |  | 
| 98                                 w.responders[request.message.Header.RequestId] =
      request.responseChan |  | 
| 99                         } |  | 
| 100                 case <-w.done: |  | 
| 101                         return errConnectionClosed |  | 
| 102                 } |  | 
| 103                 // Returns immediately without an error if still waiting for |  | 
| 104                 // a new message. |  | 
| 105                 if err := w.readAndDispatchOutstandingMessages(); err != nil { |  | 
| 106                         return err |  | 
| 107                 } |  | 
| 108         } |  | 
| 109 } |  | 
| 110 |  | 
| 111 // Router sends messages to a message pipe and routes responses back to senders |  | 
| 112 // of messages with non-zero request ids. The caller should issue unique request |  | 
| 113 // ids for each message given to the router. |  | 
| 114 type Router struct { |  | 
| 115         // Mutex protecting requestChan from new requests in case the router is |  | 
| 116         // closed and the handle. |  | 
| 117         mu sync.Mutex |  | 
| 118         // The message pipe handle to send requests and receive responses. |  | 
| 119         handle system.MessagePipeHandle |  | 
| 120         // Channel to communicate with worker. |  | 
| 121         requestChan chan<- routeRequest |  | 
| 122 |  | 
| 123         // Makes sure that the done channel is closed once. |  | 
| 124         closeOnce sync.Once |  | 
| 125         // Channel to stop the worker. |  | 
| 126         done chan<- struct{} |  | 
| 127 } |  | 
| 128 |  | 
| 129 // NewRouter returns a new Router instance that sends and receives messages |  | 
| 130 // from a provided message pipe handle. |  | 
| 131 func NewRouter(handle system.MessagePipeHandle, waiter AsyncWaiter) *Router { |  | 
| 132         requestChan := make(chan routeRequest, 10) |  | 
| 133         doneChan := make(chan struct{}) |  | 
| 134         router := &Router{ |  | 
| 135                 handle:      handle, |  | 
| 136                 requestChan: requestChan, |  | 
| 137                 done:        doneChan, |  | 
| 138         } |  | 
| 139         router.runWorker(&routerWorker{ |  | 
| 140                 handle, |  | 
| 141                 make(map[uint64]chan<- MessageReadResult), |  | 
| 142                 requestChan, |  | 
| 143                 doneChan, |  | 
| 144                 waiter, |  | 
| 145                 make(chan WaitResponse, 1), |  | 
| 146                 0, |  | 
| 147         }) |  | 
| 148         return router |  | 
| 149 } |  | 
| 150 |  | 
| 151 // Close closes the router and the underlying message pipe. All new incoming |  | 
| 152 // requests are returned with an error. |  | 
| 153 func (r *Router) Close() { |  | 
| 154         r.closeOnce.Do(func() { |  | 
| 155                 close(r.done) |  | 
| 156         }) |  | 
| 157 } |  | 
| 158 |  | 
| 159 // Accept sends a message to the message pipe. The message should have a |  | 
| 160 // zero request id in header. |  | 
| 161 func (r *Router) Accept(message *Message) error { |  | 
| 162         if message.Header.RequestId != 0 { |  | 
| 163                 return fmt.Errorf("message header should have a zero request ID"
     ) |  | 
| 164         } |  | 
| 165         r.mu.Lock() |  | 
| 166         defer r.mu.Unlock() |  | 
| 167         if !r.handle.IsValid() { |  | 
| 168                 return errConnectionClosed |  | 
| 169         } |  | 
| 170         r.requestChan <- routeRequest{message, nil} |  | 
| 171         return nil |  | 
| 172 } |  | 
| 173 |  | 
| 174 func (r *Router) runWorker(worker *routerWorker) { |  | 
| 175         // Run worker on a separate go routine. |  | 
| 176         go func() { |  | 
| 177                 // Get the reason why the worker stopped. The error means that |  | 
| 178                 // either the router is closed or there was an error reading |  | 
| 179                 // or writing to a message pipe. In both cases it will be |  | 
| 180                 // the reason why we can't process any more requests. |  | 
| 181                 err := worker.runLoop() |  | 
| 182                 worker.cancelIfWaiting() |  | 
| 183                 // Respond to all pending requests. |  | 
| 184                 for _, responseChan := range worker.responders { |  | 
| 185                         responseChan <- MessageReadResult{nil, err} |  | 
| 186                 } |  | 
| 187                 // Respond to incoming requests until we make sure that all |  | 
| 188                 // new requests return with an error before sending request |  | 
| 189                 // to responseChan. |  | 
| 190                 go func() { |  | 
| 191                         for responder := range worker.requestChan { |  | 
| 192                                 responder.responseChan <- MessageReadResult{nil,
      err} |  | 
| 193                         } |  | 
| 194                 }() |  | 
| 195                 r.mu.Lock() |  | 
| 196                 r.handle.Close() |  | 
| 197                 // If we acquire the lock then no other go routine is waiting |  | 
| 198                 // to write to responseChan. All go routines that acquire the |  | 
| 199                 // lock after us will return before sending to responseChan as |  | 
| 200                 // the underlying handle is invalid (already closed). |  | 
| 201                 // We can safely close the requestChan. |  | 
| 202                 close(r.requestChan) |  | 
| 203                 r.mu.Unlock() |  | 
| 204         }() |  | 
| 205 } |  | 
| 206 |  | 
| 207 // AcceptWithResponse sends a message to the message pipe and returns a channel |  | 
| 208 // that will stream the result of reading corresponding response. The message |  | 
| 209 // should have a non-zero request id in header. It is responsibility of the |  | 
| 210 // caller to issue unique request ids for all given messages. |  | 
| 211 func (r *Router) AcceptWithResponse(message *Message) <-chan MessageReadResult { |  | 
| 212         responseChan := make(chan MessageReadResult, 1) |  | 
| 213         if message.Header.RequestId == 0 { |  | 
| 214                 responseChan <- MessageReadResult{nil, fmt.Errorf("message heade
     r should have a request ID")} |  | 
| 215                 return responseChan |  | 
| 216         } |  | 
| 217         r.mu.Lock() |  | 
| 218         defer r.mu.Unlock() |  | 
| 219         // Return an error before sending a request to requestChan if the router |  | 
| 220         // is closed so that we can safely close responseChan once we close the |  | 
| 221         // router. |  | 
| 222         if !r.handle.IsValid() { |  | 
| 223                 responseChan <- MessageReadResult{nil, errConnectionClosed} |  | 
| 224                 return responseChan |  | 
| 225         } |  | 
| 226         r.requestChan <- routeRequest{message, responseChan} |  | 
| 227         return responseChan |  | 
| 228 } |  | 
| OLD | NEW | 
|---|