Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(217)

Side by Side Diff: third_party/mojo/src/mojo/public/go/bindings/router.go

Issue 975973002: Update mojo sdk to rev f68e697e389943cd9bf9652397312280e96b127a (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: shake fist at msvc Created 5 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 package bindings
6
7 import (
8 "fmt"
9 "sync"
10
11 "mojo/public/go/system"
12 )
13
14 // MessageReadResult contains information returned after reading and parsing
15 // a message: a non-nil error of a valid message.
16 type MessageReadResult struct {
17 Message *Message
18 Error error
19 }
20
21 // routeRequest is a request sent from Router to routerWorker.
22 type routeRequest struct {
23 // The outgoing message with non-zero request id.
24 message *Message
25 // The channel to send respond for the message.
26 responseChan chan<- MessageReadResult
27 }
28
29 // routerWorker sends messages that require a response and and routes responses
30 // to appropriate receivers. The work is done on a separate go routine.
31 type routerWorker struct {
32 // The message pipe handle to send requests and receive responses.
33 handle system.MessagePipeHandle
34 // Map from request id to response channel.
35 responders map[uint64]chan<- MessageReadResult
36 // The channel of incoming requests that require responses.
37 requestChan <-chan routeRequest
38 // The channel that indicates that the worker should terminate.
39 done <-chan struct{}
40 // Implementation of async waiter.
41 waiter AsyncWaiter
42 waitChan chan WaitResponse
43 waitId AsyncWaitId
44 }
45
46 // readOutstandingMessages reads and dispatches available messages in the
47 // message pipe until the messages is empty or there are no waiting responders.
48 // If the worker is currently waiting on the message pipe, returns immediately
49 // without an error.
50 func (w *routerWorker) readAndDispatchOutstandingMessages() error {
51 if w.waitId != 0 {
52 // Still waiting for a new message in the message pipe.
53 return nil
54 }
55 for len(w.responders) > 0 {
56 result, bytes, handles := w.handle.ReadMessage(system.MOJO_READ_ MESSAGE_FLAG_NONE)
57 if result == system.MOJO_RESULT_SHOULD_WAIT {
58 w.waitId = w.waiter.AsyncWait(w.handle, system.MOJO_HAND LE_SIGNAL_READABLE, w.waitChan)
59 return nil
60 }
61 if result != system.MOJO_RESULT_OK {
62 return fmt.Errorf("error reading message: %v", result)
63 }
64 message, err := ParseMessage(bytes, handles)
65 if err != nil {
66 return err
67 }
68 id := message.Header.RequestId
69 w.responders[id] <- MessageReadResult{message, nil}
70 delete(w.responders, id)
71 }
72 return nil
73 }
74
75 func (w *routerWorker) cancelIfWaiting() {
76 if w.waitId != 0 {
77 w.waiter.CancelWait(w.waitId)
78 w.waitId = 0
79 }
80 }
81
82 // runLoop is the main run loop of the worker. It processes incoming requests
83 // from Router and waits on a message pipe for new messages.
84 // Returns an error describing the cause of stopping.
85 func (w *routerWorker) runLoop() error {
86 for {
87 select {
88 case waitResponse := <-w.waitChan:
89 w.waitId = 0
90 if waitResponse.Result != system.MOJO_RESULT_OK {
91 return fmt.Errorf("error waiting for message: %v ", waitResponse.Result)
92 }
93 case request := <-w.requestChan:
94 if err := WriteMessage(w.handle, request.message); err ! = nil {
95 return err
96 }
97 if request.responseChan != nil {
98 w.responders[request.message.Header.RequestId] = request.responseChan
99 }
100 case <-w.done:
101 return fmt.Errorf("message pipe is closed")
102 }
103 // Returns immediately without an error if still waiting for
104 // a new message.
105 if err := w.readAndDispatchOutstandingMessages(); err != nil {
106 return err
107 }
108 }
109 }
110
111 // Router sends messages to a message pipe and routes responses back to senders
112 // of messages with non-zero request ids. The caller should issue unique request
113 // ids for each message given to the router.
114 type Router struct {
115 // Mutex protecting requestChan from new requests in case the router is
116 // closed and the handle.
117 mu sync.Mutex
118 // The message pipe handle to send requests and receive responses.
119 handle system.MessagePipeHandle
120 // Channel to communicate with worker.
121 requestChan chan<- routeRequest
122
123 // Channel to stop the worker.
124 done chan<- struct{}
125 }
126
127 // NewRouter returns a new Router instance that sends and receives messages
128 // from a provided message pipe handle.
129 func NewRouter(handle system.MessagePipeHandle, waiter AsyncWaiter) *Router {
130 requestChan := make(chan routeRequest, 10)
131 doneChan := make(chan struct{})
132 router := &Router{
133 handle: handle,
134 requestChan: requestChan,
135 done: doneChan,
136 }
137 router.runWorker(&routerWorker{
138 handle,
139 make(map[uint64]chan<- MessageReadResult),
140 requestChan,
141 doneChan,
142 waiter,
143 make(chan WaitResponse, 1),
144 0,
145 })
146 return router
147 }
148
149 // Close closes the router and the underlying message pipe. All new incoming
150 // requests are returned with an error. Panics if you try to close the router
151 // more than once.
152 func (r *Router) Close() {
153 close(r.done)
154 }
155
156 // Accept sends a message to the message pipe. The message should have a
157 // zero request id in header.
158 func (r *Router) Accept(message *Message) error {
159 if message.Header.RequestId != 0 {
160 return fmt.Errorf("message header should have a zero request ID" )
161 }
162 r.mu.Lock()
163 defer r.mu.Unlock()
164 // This can also mean that the router is closed.
165 if !r.handle.IsValid() {
166 return fmt.Errorf("can't write a message to an invalid handle")
167 }
168 r.requestChan <- routeRequest{message, nil}
169 return nil
170 }
171
172 func (r *Router) runWorker(worker *routerWorker) {
173 // Run worker on a separate go routine.
174 go func() {
175 // Get the reason why the worker stopped. The error means that
176 // either the router is closed or there was an error reading
177 // or writing to a message pipe. In both cases it will be
178 // the reason why we can't process any more requests.
179 err := worker.runLoop()
180 worker.cancelIfWaiting()
181 // Respond to all pending requests.
182 for _, responseChan := range worker.responders {
183 responseChan <- MessageReadResult{nil, err}
184 }
185 // Respond to incoming requests until we make sure that all
186 // new requests return with an error before sending request
187 // to responseChan.
188 go func() {
189 for responder := range worker.requestChan {
190 responder.responseChan <- MessageReadResult{nil, err}
191 }
192 }()
193 r.mu.Lock()
194 r.handle.Close()
195 // If we acquire the lock then no other go routine is waiting
196 // to write to responseChan. All go routines that acquire the
197 // lock after us will return before sending to responseChan as
198 // the underlying handle is invalid (already closed).
199 // We can safely close the requestChan.
200 close(r.requestChan)
201 r.mu.Unlock()
202 }()
203 }
204
205 // AcceptWithResponse sends a message to the message pipe and returns a channel
206 // that will stream the result of reading corresponding response. The message
207 // should have a non-zero request id in header. It is responsibility of the
208 // caller to issue unique request ids for all given messages.
209 func (r *Router) AcceptWithResponse(message *Message) <-chan MessageReadResult {
210 responseChan := make(chan MessageReadResult, 1)
211 if message.Header.RequestId == 0 {
212 responseChan <- MessageReadResult{nil, fmt.Errorf("message heade r should have a request ID")}
213 return responseChan
214 }
215 r.mu.Lock()
216 defer r.mu.Unlock()
217 // Return an error before sending a request to requestChan if the router
218 // is closed so that we can safely close responseChan once we close the
219 // router.
220 if !r.handle.IsValid() {
221 responseChan <- MessageReadResult{nil, fmt.Errorf("can't write a message to an invalid handle")}
222 return responseChan
223 }
224 r.requestChan <- routeRequest{message, responseChan}
225 return responseChan
226 }
OLDNEW
« no previous file with comments | « third_party/mojo/src/mojo/public/go/bindings/interface.go ('k') | third_party/mojo/src/mojo/public/go/bindings/stub.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698