Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(319)

Side by Side Diff: mojo/public/go/bindings/router.go

Issue 2250183003: Make the fuchsia mojo/public repo the source of truth. (Closed) Base URL: https://github.com/domokit/mojo.git@master
Patch Set: Created 4 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « mojo/public/go/bindings/message.go ('k') | mojo/public/go/bindings/stub.go » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 package bindings
6
7 import (
8 "fmt"
9 "sync"
10
11 "mojo/public/go/system"
12 )
13
14 // MessageReadResult contains information returned after reading and parsing
15 // a message: a non-nil error of a valid message.
16 type MessageReadResult struct {
17 Message *Message
18 Error error
19 }
20
21 // routeRequest is a request sent from Router to routerWorker.
22 type routeRequest struct {
23 // The outgoing message with non-zero request id.
24 message *Message
25 // The channel to send respond for the message.
26 responseChan chan<- MessageReadResult
27 }
28
29 // routerWorker sends messages that require a response and and routes responses
30 // to appropriate receivers. The work is done on a separate go routine.
31 type routerWorker struct {
32 // The message pipe handle to send requests and receive responses.
33 handle system.MessagePipeHandle
34 // Map from request id to response channel.
35 responders map[uint64]chan<- MessageReadResult
36 // The channel of incoming requests that require responses.
37 requestChan <-chan routeRequest
38 // The channel that indicates that the worker should terminate.
39 done <-chan struct{}
40 // Implementation of async waiter.
41 waiter AsyncWaiter
42 waitChan chan WaitResponse
43 waitId AsyncWaitId
44 }
45
46 // readOutstandingMessages reads and dispatches available messages in the
47 // message pipe until the messages is empty or there are no waiting responders.
48 // If the worker is currently waiting on the message pipe, returns immediately
49 // without an error.
50 func (w *routerWorker) readAndDispatchOutstandingMessages() error {
51 if w.waitId != 0 {
52 // Still waiting for a new message in the message pipe.
53 return nil
54 }
55 for len(w.responders) > 0 {
56 result, bytes, handles := w.handle.ReadMessage(system.MOJO_READ_ MESSAGE_FLAG_NONE)
57 if result == system.MOJO_RESULT_SHOULD_WAIT {
58 w.waitId = w.waiter.AsyncWait(w.handle, system.MOJO_HAND LE_SIGNAL_READABLE, w.waitChan)
59 return nil
60 }
61 if result != system.MOJO_RESULT_OK {
62 return &ConnectionError{result}
63 }
64 message, err := ParseMessage(bytes, handles)
65 if err != nil {
66 return err
67 }
68 id := message.Header.RequestId
69 w.responders[id] <- MessageReadResult{message, nil}
70 delete(w.responders, id)
71 }
72 return nil
73 }
74
75 func (w *routerWorker) cancelIfWaiting() {
76 if w.waitId != 0 {
77 w.waiter.CancelWait(w.waitId)
78 w.waitId = 0
79 }
80 }
81
82 // runLoop is the main run loop of the worker. It processes incoming requests
83 // from Router and waits on a message pipe for new messages.
84 // Returns an error describing the cause of stopping.
85 func (w *routerWorker) runLoop() error {
86 for {
87 select {
88 case waitResponse := <-w.waitChan:
89 w.waitId = 0
90 if waitResponse.Result != system.MOJO_RESULT_OK {
91 return &ConnectionError{waitResponse.Result}
92 }
93 case request := <-w.requestChan:
94 if err := WriteMessage(w.handle, request.message); err ! = nil {
95 return err
96 }
97 if request.responseChan != nil {
98 w.responders[request.message.Header.RequestId] = request.responseChan
99 }
100 case <-w.done:
101 return errConnectionClosed
102 }
103 // Returns immediately without an error if still waiting for
104 // a new message.
105 if err := w.readAndDispatchOutstandingMessages(); err != nil {
106 return err
107 }
108 }
109 }
110
111 // Router sends messages to a message pipe and routes responses back to senders
112 // of messages with non-zero request ids. The caller should issue unique request
113 // ids for each message given to the router.
114 type Router struct {
115 // Mutex protecting requestChan from new requests in case the router is
116 // closed and the handle.
117 mu sync.Mutex
118 // The message pipe handle to send requests and receive responses.
119 handle system.MessagePipeHandle
120 // Channel to communicate with worker.
121 requestChan chan<- routeRequest
122
123 // Makes sure that the done channel is closed once.
124 closeOnce sync.Once
125 // Channel to stop the worker.
126 done chan<- struct{}
127 }
128
129 // NewRouter returns a new Router instance that sends and receives messages
130 // from a provided message pipe handle.
131 func NewRouter(handle system.MessagePipeHandle, waiter AsyncWaiter) *Router {
132 requestChan := make(chan routeRequest, 10)
133 doneChan := make(chan struct{})
134 router := &Router{
135 handle: handle,
136 requestChan: requestChan,
137 done: doneChan,
138 }
139 router.runWorker(&routerWorker{
140 handle,
141 make(map[uint64]chan<- MessageReadResult),
142 requestChan,
143 doneChan,
144 waiter,
145 make(chan WaitResponse, 1),
146 0,
147 })
148 return router
149 }
150
151 // Close closes the router and the underlying message pipe. All new incoming
152 // requests are returned with an error.
153 func (r *Router) Close() {
154 r.closeOnce.Do(func() {
155 close(r.done)
156 })
157 }
158
159 // Accept sends a message to the message pipe. The message should have a
160 // zero request id in header.
161 func (r *Router) Accept(message *Message) error {
162 if message.Header.RequestId != 0 {
163 return fmt.Errorf("message header should have a zero request ID" )
164 }
165 r.mu.Lock()
166 defer r.mu.Unlock()
167 if !r.handle.IsValid() {
168 return errConnectionClosed
169 }
170 r.requestChan <- routeRequest{message, nil}
171 return nil
172 }
173
174 func (r *Router) runWorker(worker *routerWorker) {
175 // Run worker on a separate go routine.
176 go func() {
177 // Get the reason why the worker stopped. The error means that
178 // either the router is closed or there was an error reading
179 // or writing to a message pipe. In both cases it will be
180 // the reason why we can't process any more requests.
181 err := worker.runLoop()
182 worker.cancelIfWaiting()
183 // Respond to all pending requests.
184 for _, responseChan := range worker.responders {
185 responseChan <- MessageReadResult{nil, err}
186 }
187 // Respond to incoming requests until we make sure that all
188 // new requests return with an error before sending request
189 // to responseChan.
190 go func() {
191 for responder := range worker.requestChan {
192 responder.responseChan <- MessageReadResult{nil, err}
193 }
194 }()
195 r.mu.Lock()
196 r.handle.Close()
197 // If we acquire the lock then no other go routine is waiting
198 // to write to responseChan. All go routines that acquire the
199 // lock after us will return before sending to responseChan as
200 // the underlying handle is invalid (already closed).
201 // We can safely close the requestChan.
202 close(r.requestChan)
203 r.mu.Unlock()
204 }()
205 }
206
207 // AcceptWithResponse sends a message to the message pipe and returns a channel
208 // that will stream the result of reading corresponding response. The message
209 // should have a non-zero request id in header. It is responsibility of the
210 // caller to issue unique request ids for all given messages.
211 func (r *Router) AcceptWithResponse(message *Message) <-chan MessageReadResult {
212 responseChan := make(chan MessageReadResult, 1)
213 if message.Header.RequestId == 0 {
214 responseChan <- MessageReadResult{nil, fmt.Errorf("message heade r should have a request ID")}
215 return responseChan
216 }
217 r.mu.Lock()
218 defer r.mu.Unlock()
219 // Return an error before sending a request to requestChan if the router
220 // is closed so that we can safely close responseChan once we close the
221 // router.
222 if !r.handle.IsValid() {
223 responseChan <- MessageReadResult{nil, errConnectionClosed}
224 return responseChan
225 }
226 r.requestChan <- routeRequest{message, responseChan}
227 return responseChan
228 }
OLDNEW
« no previous file with comments | « mojo/public/go/bindings/message.go ('k') | mojo/public/go/bindings/stub.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698