Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1)

Unified Diff: mojo/public/go/bindings/router.go

Issue 2250183003: Make the fuchsia mojo/public repo the source of truth. (Closed) Base URL: https://github.com/domokit/mojo.git@master
Patch Set: Created 4 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « mojo/public/go/bindings/message.go ('k') | mojo/public/go/bindings/stub.go » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: mojo/public/go/bindings/router.go
diff --git a/mojo/public/go/bindings/router.go b/mojo/public/go/bindings/router.go
deleted file mode 100644
index 1e8daba6e36eef78ca4fbcd226854b02499d1b21..0000000000000000000000000000000000000000
--- a/mojo/public/go/bindings/router.go
+++ /dev/null
@@ -1,228 +0,0 @@
-// Copyright 2015 The Chromium Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style license that can be
-// found in the LICENSE file.
-
-package bindings
-
-import (
- "fmt"
- "sync"
-
- "mojo/public/go/system"
-)
-
-// MessageReadResult contains information returned after reading and parsing
-// a message: a non-nil error of a valid message.
-type MessageReadResult struct {
- Message *Message
- Error error
-}
-
-// routeRequest is a request sent from Router to routerWorker.
-type routeRequest struct {
- // The outgoing message with non-zero request id.
- message *Message
- // The channel to send respond for the message.
- responseChan chan<- MessageReadResult
-}
-
-// routerWorker sends messages that require a response and and routes responses
-// to appropriate receivers. The work is done on a separate go routine.
-type routerWorker struct {
- // The message pipe handle to send requests and receive responses.
- handle system.MessagePipeHandle
- // Map from request id to response channel.
- responders map[uint64]chan<- MessageReadResult
- // The channel of incoming requests that require responses.
- requestChan <-chan routeRequest
- // The channel that indicates that the worker should terminate.
- done <-chan struct{}
- // Implementation of async waiter.
- waiter AsyncWaiter
- waitChan chan WaitResponse
- waitId AsyncWaitId
-}
-
-// readOutstandingMessages reads and dispatches available messages in the
-// message pipe until the messages is empty or there are no waiting responders.
-// If the worker is currently waiting on the message pipe, returns immediately
-// without an error.
-func (w *routerWorker) readAndDispatchOutstandingMessages() error {
- if w.waitId != 0 {
- // Still waiting for a new message in the message pipe.
- return nil
- }
- for len(w.responders) > 0 {
- result, bytes, handles := w.handle.ReadMessage(system.MOJO_READ_MESSAGE_FLAG_NONE)
- if result == system.MOJO_RESULT_SHOULD_WAIT {
- w.waitId = w.waiter.AsyncWait(w.handle, system.MOJO_HANDLE_SIGNAL_READABLE, w.waitChan)
- return nil
- }
- if result != system.MOJO_RESULT_OK {
- return &ConnectionError{result}
- }
- message, err := ParseMessage(bytes, handles)
- if err != nil {
- return err
- }
- id := message.Header.RequestId
- w.responders[id] <- MessageReadResult{message, nil}
- delete(w.responders, id)
- }
- return nil
-}
-
-func (w *routerWorker) cancelIfWaiting() {
- if w.waitId != 0 {
- w.waiter.CancelWait(w.waitId)
- w.waitId = 0
- }
-}
-
-// runLoop is the main run loop of the worker. It processes incoming requests
-// from Router and waits on a message pipe for new messages.
-// Returns an error describing the cause of stopping.
-func (w *routerWorker) runLoop() error {
- for {
- select {
- case waitResponse := <-w.waitChan:
- w.waitId = 0
- if waitResponse.Result != system.MOJO_RESULT_OK {
- return &ConnectionError{waitResponse.Result}
- }
- case request := <-w.requestChan:
- if err := WriteMessage(w.handle, request.message); err != nil {
- return err
- }
- if request.responseChan != nil {
- w.responders[request.message.Header.RequestId] = request.responseChan
- }
- case <-w.done:
- return errConnectionClosed
- }
- // Returns immediately without an error if still waiting for
- // a new message.
- if err := w.readAndDispatchOutstandingMessages(); err != nil {
- return err
- }
- }
-}
-
-// Router sends messages to a message pipe and routes responses back to senders
-// of messages with non-zero request ids. The caller should issue unique request
-// ids for each message given to the router.
-type Router struct {
- // Mutex protecting requestChan from new requests in case the router is
- // closed and the handle.
- mu sync.Mutex
- // The message pipe handle to send requests and receive responses.
- handle system.MessagePipeHandle
- // Channel to communicate with worker.
- requestChan chan<- routeRequest
-
- // Makes sure that the done channel is closed once.
- closeOnce sync.Once
- // Channel to stop the worker.
- done chan<- struct{}
-}
-
-// NewRouter returns a new Router instance that sends and receives messages
-// from a provided message pipe handle.
-func NewRouter(handle system.MessagePipeHandle, waiter AsyncWaiter) *Router {
- requestChan := make(chan routeRequest, 10)
- doneChan := make(chan struct{})
- router := &Router{
- handle: handle,
- requestChan: requestChan,
- done: doneChan,
- }
- router.runWorker(&routerWorker{
- handle,
- make(map[uint64]chan<- MessageReadResult),
- requestChan,
- doneChan,
- waiter,
- make(chan WaitResponse, 1),
- 0,
- })
- return router
-}
-
-// Close closes the router and the underlying message pipe. All new incoming
-// requests are returned with an error.
-func (r *Router) Close() {
- r.closeOnce.Do(func() {
- close(r.done)
- })
-}
-
-// Accept sends a message to the message pipe. The message should have a
-// zero request id in header.
-func (r *Router) Accept(message *Message) error {
- if message.Header.RequestId != 0 {
- return fmt.Errorf("message header should have a zero request ID")
- }
- r.mu.Lock()
- defer r.mu.Unlock()
- if !r.handle.IsValid() {
- return errConnectionClosed
- }
- r.requestChan <- routeRequest{message, nil}
- return nil
-}
-
-func (r *Router) runWorker(worker *routerWorker) {
- // Run worker on a separate go routine.
- go func() {
- // Get the reason why the worker stopped. The error means that
- // either the router is closed or there was an error reading
- // or writing to a message pipe. In both cases it will be
- // the reason why we can't process any more requests.
- err := worker.runLoop()
- worker.cancelIfWaiting()
- // Respond to all pending requests.
- for _, responseChan := range worker.responders {
- responseChan <- MessageReadResult{nil, err}
- }
- // Respond to incoming requests until we make sure that all
- // new requests return with an error before sending request
- // to responseChan.
- go func() {
- for responder := range worker.requestChan {
- responder.responseChan <- MessageReadResult{nil, err}
- }
- }()
- r.mu.Lock()
- r.handle.Close()
- // If we acquire the lock then no other go routine is waiting
- // to write to responseChan. All go routines that acquire the
- // lock after us will return before sending to responseChan as
- // the underlying handle is invalid (already closed).
- // We can safely close the requestChan.
- close(r.requestChan)
- r.mu.Unlock()
- }()
-}
-
-// AcceptWithResponse sends a message to the message pipe and returns a channel
-// that will stream the result of reading corresponding response. The message
-// should have a non-zero request id in header. It is responsibility of the
-// caller to issue unique request ids for all given messages.
-func (r *Router) AcceptWithResponse(message *Message) <-chan MessageReadResult {
- responseChan := make(chan MessageReadResult, 1)
- if message.Header.RequestId == 0 {
- responseChan <- MessageReadResult{nil, fmt.Errorf("message header should have a request ID")}
- return responseChan
- }
- r.mu.Lock()
- defer r.mu.Unlock()
- // Return an error before sending a request to requestChan if the router
- // is closed so that we can safely close responseChan once we close the
- // router.
- if !r.handle.IsValid() {
- responseChan <- MessageReadResult{nil, errConnectionClosed}
- return responseChan
- }
- r.requestChan <- routeRequest{message, responseChan}
- return responseChan
-}
« no previous file with comments | « mojo/public/go/bindings/message.go ('k') | mojo/public/go/bindings/stub.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698