| Index: third_party/mojo/src/mojo/public/go/bindings/router.go
|
| diff --git a/third_party/mojo/src/mojo/public/go/bindings/router.go b/third_party/mojo/src/mojo/public/go/bindings/router.go
|
| new file mode 100644
|
| index 0000000000000000000000000000000000000000..84a5d3c0dbd0f78d32327e2b57d2ed8588610d0b
|
| --- /dev/null
|
| +++ b/third_party/mojo/src/mojo/public/go/bindings/router.go
|
| @@ -0,0 +1,226 @@
|
| +// Copyright 2015 The Chromium Authors. All rights reserved.
|
| +// Use of this source code is governed by a BSD-style license that can be
|
| +// found in the LICENSE file.
|
| +
|
| +package bindings
|
| +
|
| +import (
|
| + "fmt"
|
| + "sync"
|
| +
|
| + "mojo/public/go/system"
|
| +)
|
| +
|
| +// MessageReadResult contains information returned after reading and parsing
|
| +// a message: a non-nil error of a valid message.
|
| +type MessageReadResult struct {
|
| + Message *Message
|
| + Error error
|
| +}
|
| +
|
| +// routeRequest is a request sent from Router to routerWorker.
|
| +type routeRequest struct {
|
| + // The outgoing message with non-zero request id.
|
| + message *Message
|
| + // The channel to send respond for the message.
|
| + responseChan chan<- MessageReadResult
|
| +}
|
| +
|
| +// routerWorker sends messages that require a response and and routes responses
|
| +// to appropriate receivers. The work is done on a separate go routine.
|
| +type routerWorker struct {
|
| + // The message pipe handle to send requests and receive responses.
|
| + handle system.MessagePipeHandle
|
| + // Map from request id to response channel.
|
| + responders map[uint64]chan<- MessageReadResult
|
| + // The channel of incoming requests that require responses.
|
| + requestChan <-chan routeRequest
|
| + // The channel that indicates that the worker should terminate.
|
| + done <-chan struct{}
|
| + // Implementation of async waiter.
|
| + waiter AsyncWaiter
|
| + waitChan chan WaitResponse
|
| + waitId AsyncWaitId
|
| +}
|
| +
|
| +// readOutstandingMessages reads and dispatches available messages in the
|
| +// message pipe until the messages is empty or there are no waiting responders.
|
| +// If the worker is currently waiting on the message pipe, returns immediately
|
| +// without an error.
|
| +func (w *routerWorker) readAndDispatchOutstandingMessages() error {
|
| + if w.waitId != 0 {
|
| + // Still waiting for a new message in the message pipe.
|
| + return nil
|
| + }
|
| + for len(w.responders) > 0 {
|
| + result, bytes, handles := w.handle.ReadMessage(system.MOJO_READ_MESSAGE_FLAG_NONE)
|
| + if result == system.MOJO_RESULT_SHOULD_WAIT {
|
| + w.waitId = w.waiter.AsyncWait(w.handle, system.MOJO_HANDLE_SIGNAL_READABLE, w.waitChan)
|
| + return nil
|
| + }
|
| + if result != system.MOJO_RESULT_OK {
|
| + return fmt.Errorf("error reading message: %v", result)
|
| + }
|
| + message, err := ParseMessage(bytes, handles)
|
| + if err != nil {
|
| + return err
|
| + }
|
| + id := message.Header.RequestId
|
| + w.responders[id] <- MessageReadResult{message, nil}
|
| + delete(w.responders, id)
|
| + }
|
| + return nil
|
| +}
|
| +
|
| +func (w *routerWorker) cancelIfWaiting() {
|
| + if w.waitId != 0 {
|
| + w.waiter.CancelWait(w.waitId)
|
| + w.waitId = 0
|
| + }
|
| +}
|
| +
|
| +// runLoop is the main run loop of the worker. It processes incoming requests
|
| +// from Router and waits on a message pipe for new messages.
|
| +// Returns an error describing the cause of stopping.
|
| +func (w *routerWorker) runLoop() error {
|
| + for {
|
| + select {
|
| + case waitResponse := <-w.waitChan:
|
| + w.waitId = 0
|
| + if waitResponse.Result != system.MOJO_RESULT_OK {
|
| + return fmt.Errorf("error waiting for message: %v", waitResponse.Result)
|
| + }
|
| + case request := <-w.requestChan:
|
| + if err := WriteMessage(w.handle, request.message); err != nil {
|
| + return err
|
| + }
|
| + if request.responseChan != nil {
|
| + w.responders[request.message.Header.RequestId] = request.responseChan
|
| + }
|
| + case <-w.done:
|
| + return fmt.Errorf("message pipe is closed")
|
| + }
|
| + // Returns immediately without an error if still waiting for
|
| + // a new message.
|
| + if err := w.readAndDispatchOutstandingMessages(); err != nil {
|
| + return err
|
| + }
|
| + }
|
| +}
|
| +
|
| +// Router sends messages to a message pipe and routes responses back to senders
|
| +// of messages with non-zero request ids. The caller should issue unique request
|
| +// ids for each message given to the router.
|
| +type Router struct {
|
| + // Mutex protecting requestChan from new requests in case the router is
|
| + // closed and the handle.
|
| + mu sync.Mutex
|
| + // The message pipe handle to send requests and receive responses.
|
| + handle system.MessagePipeHandle
|
| + // Channel to communicate with worker.
|
| + requestChan chan<- routeRequest
|
| +
|
| + // Channel to stop the worker.
|
| + done chan<- struct{}
|
| +}
|
| +
|
| +// NewRouter returns a new Router instance that sends and receives messages
|
| +// from a provided message pipe handle.
|
| +func NewRouter(handle system.MessagePipeHandle, waiter AsyncWaiter) *Router {
|
| + requestChan := make(chan routeRequest, 10)
|
| + doneChan := make(chan struct{})
|
| + router := &Router{
|
| + handle: handle,
|
| + requestChan: requestChan,
|
| + done: doneChan,
|
| + }
|
| + router.runWorker(&routerWorker{
|
| + handle,
|
| + make(map[uint64]chan<- MessageReadResult),
|
| + requestChan,
|
| + doneChan,
|
| + waiter,
|
| + make(chan WaitResponse, 1),
|
| + 0,
|
| + })
|
| + return router
|
| +}
|
| +
|
| +// Close closes the router and the underlying message pipe. All new incoming
|
| +// requests are returned with an error. Panics if you try to close the router
|
| +// more than once.
|
| +func (r *Router) Close() {
|
| + close(r.done)
|
| +}
|
| +
|
| +// Accept sends a message to the message pipe. The message should have a
|
| +// zero request id in header.
|
| +func (r *Router) Accept(message *Message) error {
|
| + if message.Header.RequestId != 0 {
|
| + return fmt.Errorf("message header should have a zero request ID")
|
| + }
|
| + r.mu.Lock()
|
| + defer r.mu.Unlock()
|
| + // This can also mean that the router is closed.
|
| + if !r.handle.IsValid() {
|
| + return fmt.Errorf("can't write a message to an invalid handle")
|
| + }
|
| + r.requestChan <- routeRequest{message, nil}
|
| + return nil
|
| +}
|
| +
|
| +func (r *Router) runWorker(worker *routerWorker) {
|
| + // Run worker on a separate go routine.
|
| + go func() {
|
| + // Get the reason why the worker stopped. The error means that
|
| + // either the router is closed or there was an error reading
|
| + // or writing to a message pipe. In both cases it will be
|
| + // the reason why we can't process any more requests.
|
| + err := worker.runLoop()
|
| + worker.cancelIfWaiting()
|
| + // Respond to all pending requests.
|
| + for _, responseChan := range worker.responders {
|
| + responseChan <- MessageReadResult{nil, err}
|
| + }
|
| + // Respond to incoming requests until we make sure that all
|
| + // new requests return with an error before sending request
|
| + // to responseChan.
|
| + go func() {
|
| + for responder := range worker.requestChan {
|
| + responder.responseChan <- MessageReadResult{nil, err}
|
| + }
|
| + }()
|
| + r.mu.Lock()
|
| + r.handle.Close()
|
| + // If we acquire the lock then no other go routine is waiting
|
| + // to write to responseChan. All go routines that acquire the
|
| + // lock after us will return before sending to responseChan as
|
| + // the underlying handle is invalid (already closed).
|
| + // We can safely close the requestChan.
|
| + close(r.requestChan)
|
| + r.mu.Unlock()
|
| + }()
|
| +}
|
| +
|
| +// AcceptWithResponse sends a message to the message pipe and returns a channel
|
| +// that will stream the result of reading corresponding response. The message
|
| +// should have a non-zero request id in header. It is responsibility of the
|
| +// caller to issue unique request ids for all given messages.
|
| +func (r *Router) AcceptWithResponse(message *Message) <-chan MessageReadResult {
|
| + responseChan := make(chan MessageReadResult, 1)
|
| + if message.Header.RequestId == 0 {
|
| + responseChan <- MessageReadResult{nil, fmt.Errorf("message header should have a request ID")}
|
| + return responseChan
|
| + }
|
| + r.mu.Lock()
|
| + defer r.mu.Unlock()
|
| + // Return an error before sending a request to requestChan if the router
|
| + // is closed so that we can safely close responseChan once we close the
|
| + // router.
|
| + if !r.handle.IsValid() {
|
| + responseChan <- MessageReadResult{nil, fmt.Errorf("can't write a message to an invalid handle")}
|
| + return responseChan
|
| + }
|
| + r.requestChan <- routeRequest{message, responseChan}
|
| + return responseChan
|
| +}
|
|
|