Index: third_party/mojo/src/mojo/public/go/bindings/router.go |
diff --git a/third_party/mojo/src/mojo/public/go/bindings/router.go b/third_party/mojo/src/mojo/public/go/bindings/router.go |
new file mode 100644 |
index 0000000000000000000000000000000000000000..84a5d3c0dbd0f78d32327e2b57d2ed8588610d0b |
--- /dev/null |
+++ b/third_party/mojo/src/mojo/public/go/bindings/router.go |
@@ -0,0 +1,226 @@ |
+// Copyright 2015 The Chromium Authors. All rights reserved. |
+// Use of this source code is governed by a BSD-style license that can be |
+// found in the LICENSE file. |
+ |
+package bindings |
+ |
+import ( |
+ "fmt" |
+ "sync" |
+ |
+ "mojo/public/go/system" |
+) |
+ |
+// MessageReadResult contains information returned after reading and parsing |
+// a message: a non-nil error of a valid message. |
+type MessageReadResult struct { |
+ Message *Message |
+ Error error |
+} |
+ |
+// routeRequest is a request sent from Router to routerWorker. |
+type routeRequest struct { |
+ // The outgoing message with non-zero request id. |
+ message *Message |
+ // The channel to send respond for the message. |
+ responseChan chan<- MessageReadResult |
+} |
+ |
+// routerWorker sends messages that require a response and and routes responses |
+// to appropriate receivers. The work is done on a separate go routine. |
+type routerWorker struct { |
+ // The message pipe handle to send requests and receive responses. |
+ handle system.MessagePipeHandle |
+ // Map from request id to response channel. |
+ responders map[uint64]chan<- MessageReadResult |
+ // The channel of incoming requests that require responses. |
+ requestChan <-chan routeRequest |
+ // The channel that indicates that the worker should terminate. |
+ done <-chan struct{} |
+ // Implementation of async waiter. |
+ waiter AsyncWaiter |
+ waitChan chan WaitResponse |
+ waitId AsyncWaitId |
+} |
+ |
+// readOutstandingMessages reads and dispatches available messages in the |
+// message pipe until the messages is empty or there are no waiting responders. |
+// If the worker is currently waiting on the message pipe, returns immediately |
+// without an error. |
+func (w *routerWorker) readAndDispatchOutstandingMessages() error { |
+ if w.waitId != 0 { |
+ // Still waiting for a new message in the message pipe. |
+ return nil |
+ } |
+ for len(w.responders) > 0 { |
+ result, bytes, handles := w.handle.ReadMessage(system.MOJO_READ_MESSAGE_FLAG_NONE) |
+ if result == system.MOJO_RESULT_SHOULD_WAIT { |
+ w.waitId = w.waiter.AsyncWait(w.handle, system.MOJO_HANDLE_SIGNAL_READABLE, w.waitChan) |
+ return nil |
+ } |
+ if result != system.MOJO_RESULT_OK { |
+ return fmt.Errorf("error reading message: %v", result) |
+ } |
+ message, err := ParseMessage(bytes, handles) |
+ if err != nil { |
+ return err |
+ } |
+ id := message.Header.RequestId |
+ w.responders[id] <- MessageReadResult{message, nil} |
+ delete(w.responders, id) |
+ } |
+ return nil |
+} |
+ |
+func (w *routerWorker) cancelIfWaiting() { |
+ if w.waitId != 0 { |
+ w.waiter.CancelWait(w.waitId) |
+ w.waitId = 0 |
+ } |
+} |
+ |
+// runLoop is the main run loop of the worker. It processes incoming requests |
+// from Router and waits on a message pipe for new messages. |
+// Returns an error describing the cause of stopping. |
+func (w *routerWorker) runLoop() error { |
+ for { |
+ select { |
+ case waitResponse := <-w.waitChan: |
+ w.waitId = 0 |
+ if waitResponse.Result != system.MOJO_RESULT_OK { |
+ return fmt.Errorf("error waiting for message: %v", waitResponse.Result) |
+ } |
+ case request := <-w.requestChan: |
+ if err := WriteMessage(w.handle, request.message); err != nil { |
+ return err |
+ } |
+ if request.responseChan != nil { |
+ w.responders[request.message.Header.RequestId] = request.responseChan |
+ } |
+ case <-w.done: |
+ return fmt.Errorf("message pipe is closed") |
+ } |
+ // Returns immediately without an error if still waiting for |
+ // a new message. |
+ if err := w.readAndDispatchOutstandingMessages(); err != nil { |
+ return err |
+ } |
+ } |
+} |
+ |
+// Router sends messages to a message pipe and routes responses back to senders |
+// of messages with non-zero request ids. The caller should issue unique request |
+// ids for each message given to the router. |
+type Router struct { |
+ // Mutex protecting requestChan from new requests in case the router is |
+ // closed and the handle. |
+ mu sync.Mutex |
+ // The message pipe handle to send requests and receive responses. |
+ handle system.MessagePipeHandle |
+ // Channel to communicate with worker. |
+ requestChan chan<- routeRequest |
+ |
+ // Channel to stop the worker. |
+ done chan<- struct{} |
+} |
+ |
+// NewRouter returns a new Router instance that sends and receives messages |
+// from a provided message pipe handle. |
+func NewRouter(handle system.MessagePipeHandle, waiter AsyncWaiter) *Router { |
+ requestChan := make(chan routeRequest, 10) |
+ doneChan := make(chan struct{}) |
+ router := &Router{ |
+ handle: handle, |
+ requestChan: requestChan, |
+ done: doneChan, |
+ } |
+ router.runWorker(&routerWorker{ |
+ handle, |
+ make(map[uint64]chan<- MessageReadResult), |
+ requestChan, |
+ doneChan, |
+ waiter, |
+ make(chan WaitResponse, 1), |
+ 0, |
+ }) |
+ return router |
+} |
+ |
+// Close closes the router and the underlying message pipe. All new incoming |
+// requests are returned with an error. Panics if you try to close the router |
+// more than once. |
+func (r *Router) Close() { |
+ close(r.done) |
+} |
+ |
+// Accept sends a message to the message pipe. The message should have a |
+// zero request id in header. |
+func (r *Router) Accept(message *Message) error { |
+ if message.Header.RequestId != 0 { |
+ return fmt.Errorf("message header should have a zero request ID") |
+ } |
+ r.mu.Lock() |
+ defer r.mu.Unlock() |
+ // This can also mean that the router is closed. |
+ if !r.handle.IsValid() { |
+ return fmt.Errorf("can't write a message to an invalid handle") |
+ } |
+ r.requestChan <- routeRequest{message, nil} |
+ return nil |
+} |
+ |
+func (r *Router) runWorker(worker *routerWorker) { |
+ // Run worker on a separate go routine. |
+ go func() { |
+ // Get the reason why the worker stopped. The error means that |
+ // either the router is closed or there was an error reading |
+ // or writing to a message pipe. In both cases it will be |
+ // the reason why we can't process any more requests. |
+ err := worker.runLoop() |
+ worker.cancelIfWaiting() |
+ // Respond to all pending requests. |
+ for _, responseChan := range worker.responders { |
+ responseChan <- MessageReadResult{nil, err} |
+ } |
+ // Respond to incoming requests until we make sure that all |
+ // new requests return with an error before sending request |
+ // to responseChan. |
+ go func() { |
+ for responder := range worker.requestChan { |
+ responder.responseChan <- MessageReadResult{nil, err} |
+ } |
+ }() |
+ r.mu.Lock() |
+ r.handle.Close() |
+ // If we acquire the lock then no other go routine is waiting |
+ // to write to responseChan. All go routines that acquire the |
+ // lock after us will return before sending to responseChan as |
+ // the underlying handle is invalid (already closed). |
+ // We can safely close the requestChan. |
+ close(r.requestChan) |
+ r.mu.Unlock() |
+ }() |
+} |
+ |
+// AcceptWithResponse sends a message to the message pipe and returns a channel |
+// that will stream the result of reading corresponding response. The message |
+// should have a non-zero request id in header. It is responsibility of the |
+// caller to issue unique request ids for all given messages. |
+func (r *Router) AcceptWithResponse(message *Message) <-chan MessageReadResult { |
+ responseChan := make(chan MessageReadResult, 1) |
+ if message.Header.RequestId == 0 { |
+ responseChan <- MessageReadResult{nil, fmt.Errorf("message header should have a request ID")} |
+ return responseChan |
+ } |
+ r.mu.Lock() |
+ defer r.mu.Unlock() |
+ // Return an error before sending a request to requestChan if the router |
+ // is closed so that we can safely close responseChan once we close the |
+ // router. |
+ if !r.handle.IsValid() { |
+ responseChan <- MessageReadResult{nil, fmt.Errorf("can't write a message to an invalid handle")} |
+ return responseChan |
+ } |
+ r.requestChan <- routeRequest{message, responseChan} |
+ return responseChan |
+} |