Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(44)

Unified Diff: third_party/mojo/src/mojo/public/go/bindings/router.go

Issue 975973002: Update mojo sdk to rev f68e697e389943cd9bf9652397312280e96b127a (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: shake fist at msvc Created 5 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: third_party/mojo/src/mojo/public/go/bindings/router.go
diff --git a/third_party/mojo/src/mojo/public/go/bindings/router.go b/third_party/mojo/src/mojo/public/go/bindings/router.go
new file mode 100644
index 0000000000000000000000000000000000000000..84a5d3c0dbd0f78d32327e2b57d2ed8588610d0b
--- /dev/null
+++ b/third_party/mojo/src/mojo/public/go/bindings/router.go
@@ -0,0 +1,226 @@
+// Copyright 2015 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+package bindings
+
+import (
+ "fmt"
+ "sync"
+
+ "mojo/public/go/system"
+)
+
+// MessageReadResult contains information returned after reading and parsing
+// a message: a non-nil error of a valid message.
+type MessageReadResult struct {
+ Message *Message
+ Error error
+}
+
+// routeRequest is a request sent from Router to routerWorker.
+type routeRequest struct {
+ // The outgoing message with non-zero request id.
+ message *Message
+ // The channel to send respond for the message.
+ responseChan chan<- MessageReadResult
+}
+
+// routerWorker sends messages that require a response and and routes responses
+// to appropriate receivers. The work is done on a separate go routine.
+type routerWorker struct {
+ // The message pipe handle to send requests and receive responses.
+ handle system.MessagePipeHandle
+ // Map from request id to response channel.
+ responders map[uint64]chan<- MessageReadResult
+ // The channel of incoming requests that require responses.
+ requestChan <-chan routeRequest
+ // The channel that indicates that the worker should terminate.
+ done <-chan struct{}
+ // Implementation of async waiter.
+ waiter AsyncWaiter
+ waitChan chan WaitResponse
+ waitId AsyncWaitId
+}
+
+// readOutstandingMessages reads and dispatches available messages in the
+// message pipe until the messages is empty or there are no waiting responders.
+// If the worker is currently waiting on the message pipe, returns immediately
+// without an error.
+func (w *routerWorker) readAndDispatchOutstandingMessages() error {
+ if w.waitId != 0 {
+ // Still waiting for a new message in the message pipe.
+ return nil
+ }
+ for len(w.responders) > 0 {
+ result, bytes, handles := w.handle.ReadMessage(system.MOJO_READ_MESSAGE_FLAG_NONE)
+ if result == system.MOJO_RESULT_SHOULD_WAIT {
+ w.waitId = w.waiter.AsyncWait(w.handle, system.MOJO_HANDLE_SIGNAL_READABLE, w.waitChan)
+ return nil
+ }
+ if result != system.MOJO_RESULT_OK {
+ return fmt.Errorf("error reading message: %v", result)
+ }
+ message, err := ParseMessage(bytes, handles)
+ if err != nil {
+ return err
+ }
+ id := message.Header.RequestId
+ w.responders[id] <- MessageReadResult{message, nil}
+ delete(w.responders, id)
+ }
+ return nil
+}
+
+func (w *routerWorker) cancelIfWaiting() {
+ if w.waitId != 0 {
+ w.waiter.CancelWait(w.waitId)
+ w.waitId = 0
+ }
+}
+
+// runLoop is the main run loop of the worker. It processes incoming requests
+// from Router and waits on a message pipe for new messages.
+// Returns an error describing the cause of stopping.
+func (w *routerWorker) runLoop() error {
+ for {
+ select {
+ case waitResponse := <-w.waitChan:
+ w.waitId = 0
+ if waitResponse.Result != system.MOJO_RESULT_OK {
+ return fmt.Errorf("error waiting for message: %v", waitResponse.Result)
+ }
+ case request := <-w.requestChan:
+ if err := WriteMessage(w.handle, request.message); err != nil {
+ return err
+ }
+ if request.responseChan != nil {
+ w.responders[request.message.Header.RequestId] = request.responseChan
+ }
+ case <-w.done:
+ return fmt.Errorf("message pipe is closed")
+ }
+ // Returns immediately without an error if still waiting for
+ // a new message.
+ if err := w.readAndDispatchOutstandingMessages(); err != nil {
+ return err
+ }
+ }
+}
+
+// Router sends messages to a message pipe and routes responses back to senders
+// of messages with non-zero request ids. The caller should issue unique request
+// ids for each message given to the router.
+type Router struct {
+ // Mutex protecting requestChan from new requests in case the router is
+ // closed and the handle.
+ mu sync.Mutex
+ // The message pipe handle to send requests and receive responses.
+ handle system.MessagePipeHandle
+ // Channel to communicate with worker.
+ requestChan chan<- routeRequest
+
+ // Channel to stop the worker.
+ done chan<- struct{}
+}
+
+// NewRouter returns a new Router instance that sends and receives messages
+// from a provided message pipe handle.
+func NewRouter(handle system.MessagePipeHandle, waiter AsyncWaiter) *Router {
+ requestChan := make(chan routeRequest, 10)
+ doneChan := make(chan struct{})
+ router := &Router{
+ handle: handle,
+ requestChan: requestChan,
+ done: doneChan,
+ }
+ router.runWorker(&routerWorker{
+ handle,
+ make(map[uint64]chan<- MessageReadResult),
+ requestChan,
+ doneChan,
+ waiter,
+ make(chan WaitResponse, 1),
+ 0,
+ })
+ return router
+}
+
+// Close closes the router and the underlying message pipe. All new incoming
+// requests are returned with an error. Panics if you try to close the router
+// more than once.
+func (r *Router) Close() {
+ close(r.done)
+}
+
+// Accept sends a message to the message pipe. The message should have a
+// zero request id in header.
+func (r *Router) Accept(message *Message) error {
+ if message.Header.RequestId != 0 {
+ return fmt.Errorf("message header should have a zero request ID")
+ }
+ r.mu.Lock()
+ defer r.mu.Unlock()
+ // This can also mean that the router is closed.
+ if !r.handle.IsValid() {
+ return fmt.Errorf("can't write a message to an invalid handle")
+ }
+ r.requestChan <- routeRequest{message, nil}
+ return nil
+}
+
+func (r *Router) runWorker(worker *routerWorker) {
+ // Run worker on a separate go routine.
+ go func() {
+ // Get the reason why the worker stopped. The error means that
+ // either the router is closed or there was an error reading
+ // or writing to a message pipe. In both cases it will be
+ // the reason why we can't process any more requests.
+ err := worker.runLoop()
+ worker.cancelIfWaiting()
+ // Respond to all pending requests.
+ for _, responseChan := range worker.responders {
+ responseChan <- MessageReadResult{nil, err}
+ }
+ // Respond to incoming requests until we make sure that all
+ // new requests return with an error before sending request
+ // to responseChan.
+ go func() {
+ for responder := range worker.requestChan {
+ responder.responseChan <- MessageReadResult{nil, err}
+ }
+ }()
+ r.mu.Lock()
+ r.handle.Close()
+ // If we acquire the lock then no other go routine is waiting
+ // to write to responseChan. All go routines that acquire the
+ // lock after us will return before sending to responseChan as
+ // the underlying handle is invalid (already closed).
+ // We can safely close the requestChan.
+ close(r.requestChan)
+ r.mu.Unlock()
+ }()
+}
+
+// AcceptWithResponse sends a message to the message pipe and returns a channel
+// that will stream the result of reading corresponding response. The message
+// should have a non-zero request id in header. It is responsibility of the
+// caller to issue unique request ids for all given messages.
+func (r *Router) AcceptWithResponse(message *Message) <-chan MessageReadResult {
+ responseChan := make(chan MessageReadResult, 1)
+ if message.Header.RequestId == 0 {
+ responseChan <- MessageReadResult{nil, fmt.Errorf("message header should have a request ID")}
+ return responseChan
+ }
+ r.mu.Lock()
+ defer r.mu.Unlock()
+ // Return an error before sending a request to requestChan if the router
+ // is closed so that we can safely close responseChan once we close the
+ // router.
+ if !r.handle.IsValid() {
+ responseChan <- MessageReadResult{nil, fmt.Errorf("can't write a message to an invalid handle")}
+ return responseChan
+ }
+ r.requestChan <- routeRequest{message, responseChan}
+ return responseChan
+}
« no previous file with comments | « third_party/mojo/src/mojo/public/go/bindings/interface.go ('k') | third_party/mojo/src/mojo/public/go/bindings/stub.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698