OLD | NEW |
| (Empty) |
1 // Copyright 2015 The Chromium Authors. All rights reserved. | |
2 // Use of this source code is governed by a BSD-style license that can be | |
3 // found in the LICENSE file. | |
4 | |
5 package bindings | |
6 | |
7 import ( | |
8 "fmt" | |
9 "sync" | |
10 | |
11 "mojo/public/go/system" | |
12 ) | |
13 | |
14 // MessageReadResult contains information returned after reading and parsing | |
15 // a message: a non-nil error of a valid message. | |
16 type MessageReadResult struct { | |
17 Message *Message | |
18 Error error | |
19 } | |
20 | |
21 // routeRequest is a request sent from Router to routerWorker. | |
22 type routeRequest struct { | |
23 // The outgoing message with non-zero request id. | |
24 message *Message | |
25 // The channel to send respond for the message. | |
26 responseChan chan<- MessageReadResult | |
27 } | |
28 | |
29 // routerWorker sends messages that require a response and and routes responses | |
30 // to appropriate receivers. The work is done on a separate go routine. | |
31 type routerWorker struct { | |
32 // The message pipe handle to send requests and receive responses. | |
33 handle system.MessagePipeHandle | |
34 // Map from request id to response channel. | |
35 responders map[uint64]chan<- MessageReadResult | |
36 // The channel of incoming requests that require responses. | |
37 requestChan <-chan routeRequest | |
38 // The channel that indicates that the worker should terminate. | |
39 done <-chan struct{} | |
40 // Implementation of async waiter. | |
41 waiter AsyncWaiter | |
42 waitChan chan WaitResponse | |
43 waitId AsyncWaitId | |
44 } | |
45 | |
46 // readOutstandingMessages reads and dispatches available messages in the | |
47 // message pipe until the messages is empty or there are no waiting responders. | |
48 // If the worker is currently waiting on the message pipe, returns immediately | |
49 // without an error. | |
50 func (w *routerWorker) readAndDispatchOutstandingMessages() error { | |
51 if w.waitId != 0 { | |
52 // Still waiting for a new message in the message pipe. | |
53 return nil | |
54 } | |
55 for len(w.responders) > 0 { | |
56 result, bytes, handles := w.handle.ReadMessage(system.MOJO_READ_
MESSAGE_FLAG_NONE) | |
57 if result == system.MOJO_RESULT_SHOULD_WAIT { | |
58 w.waitId = w.waiter.AsyncWait(w.handle, system.MOJO_HAND
LE_SIGNAL_READABLE, w.waitChan) | |
59 return nil | |
60 } | |
61 if result != system.MOJO_RESULT_OK { | |
62 return &ConnectionError{result} | |
63 } | |
64 message, err := ParseMessage(bytes, handles) | |
65 if err != nil { | |
66 return err | |
67 } | |
68 id := message.Header.RequestId | |
69 w.responders[id] <- MessageReadResult{message, nil} | |
70 delete(w.responders, id) | |
71 } | |
72 return nil | |
73 } | |
74 | |
75 func (w *routerWorker) cancelIfWaiting() { | |
76 if w.waitId != 0 { | |
77 w.waiter.CancelWait(w.waitId) | |
78 w.waitId = 0 | |
79 } | |
80 } | |
81 | |
82 // runLoop is the main run loop of the worker. It processes incoming requests | |
83 // from Router and waits on a message pipe for new messages. | |
84 // Returns an error describing the cause of stopping. | |
85 func (w *routerWorker) runLoop() error { | |
86 for { | |
87 select { | |
88 case waitResponse := <-w.waitChan: | |
89 w.waitId = 0 | |
90 if waitResponse.Result != system.MOJO_RESULT_OK { | |
91 return &ConnectionError{waitResponse.Result} | |
92 } | |
93 case request := <-w.requestChan: | |
94 if err := WriteMessage(w.handle, request.message); err !
= nil { | |
95 return err | |
96 } | |
97 if request.responseChan != nil { | |
98 w.responders[request.message.Header.RequestId] =
request.responseChan | |
99 } | |
100 case <-w.done: | |
101 return errConnectionClosed | |
102 } | |
103 // Returns immediately without an error if still waiting for | |
104 // a new message. | |
105 if err := w.readAndDispatchOutstandingMessages(); err != nil { | |
106 return err | |
107 } | |
108 } | |
109 } | |
110 | |
111 // Router sends messages to a message pipe and routes responses back to senders | |
112 // of messages with non-zero request ids. The caller should issue unique request | |
113 // ids for each message given to the router. | |
114 type Router struct { | |
115 // Mutex protecting requestChan from new requests in case the router is | |
116 // closed and the handle. | |
117 mu sync.Mutex | |
118 // The message pipe handle to send requests and receive responses. | |
119 handle system.MessagePipeHandle | |
120 // Channel to communicate with worker. | |
121 requestChan chan<- routeRequest | |
122 | |
123 // Makes sure that the done channel is closed once. | |
124 closeOnce sync.Once | |
125 // Channel to stop the worker. | |
126 done chan<- struct{} | |
127 } | |
128 | |
129 // NewRouter returns a new Router instance that sends and receives messages | |
130 // from a provided message pipe handle. | |
131 func NewRouter(handle system.MessagePipeHandle, waiter AsyncWaiter) *Router { | |
132 requestChan := make(chan routeRequest, 10) | |
133 doneChan := make(chan struct{}) | |
134 router := &Router{ | |
135 handle: handle, | |
136 requestChan: requestChan, | |
137 done: doneChan, | |
138 } | |
139 router.runWorker(&routerWorker{ | |
140 handle, | |
141 make(map[uint64]chan<- MessageReadResult), | |
142 requestChan, | |
143 doneChan, | |
144 waiter, | |
145 make(chan WaitResponse, 1), | |
146 0, | |
147 }) | |
148 return router | |
149 } | |
150 | |
151 // Close closes the router and the underlying message pipe. All new incoming | |
152 // requests are returned with an error. | |
153 func (r *Router) Close() { | |
154 r.closeOnce.Do(func() { | |
155 close(r.done) | |
156 }) | |
157 } | |
158 | |
159 // Accept sends a message to the message pipe. The message should have a | |
160 // zero request id in header. | |
161 func (r *Router) Accept(message *Message) error { | |
162 if message.Header.RequestId != 0 { | |
163 return fmt.Errorf("message header should have a zero request ID"
) | |
164 } | |
165 r.mu.Lock() | |
166 defer r.mu.Unlock() | |
167 if !r.handle.IsValid() { | |
168 return errConnectionClosed | |
169 } | |
170 r.requestChan <- routeRequest{message, nil} | |
171 return nil | |
172 } | |
173 | |
174 func (r *Router) runWorker(worker *routerWorker) { | |
175 // Run worker on a separate go routine. | |
176 go func() { | |
177 // Get the reason why the worker stopped. The error means that | |
178 // either the router is closed or there was an error reading | |
179 // or writing to a message pipe. In both cases it will be | |
180 // the reason why we can't process any more requests. | |
181 err := worker.runLoop() | |
182 worker.cancelIfWaiting() | |
183 // Respond to all pending requests. | |
184 for _, responseChan := range worker.responders { | |
185 responseChan <- MessageReadResult{nil, err} | |
186 } | |
187 // Respond to incoming requests until we make sure that all | |
188 // new requests return with an error before sending request | |
189 // to responseChan. | |
190 go func() { | |
191 for responder := range worker.requestChan { | |
192 responder.responseChan <- MessageReadResult{nil,
err} | |
193 } | |
194 }() | |
195 r.mu.Lock() | |
196 r.handle.Close() | |
197 // If we acquire the lock then no other go routine is waiting | |
198 // to write to responseChan. All go routines that acquire the | |
199 // lock after us will return before sending to responseChan as | |
200 // the underlying handle is invalid (already closed). | |
201 // We can safely close the requestChan. | |
202 close(r.requestChan) | |
203 r.mu.Unlock() | |
204 }() | |
205 } | |
206 | |
207 // AcceptWithResponse sends a message to the message pipe and returns a channel | |
208 // that will stream the result of reading corresponding response. The message | |
209 // should have a non-zero request id in header. It is responsibility of the | |
210 // caller to issue unique request ids for all given messages. | |
211 func (r *Router) AcceptWithResponse(message *Message) <-chan MessageReadResult { | |
212 responseChan := make(chan MessageReadResult, 1) | |
213 if message.Header.RequestId == 0 { | |
214 responseChan <- MessageReadResult{nil, fmt.Errorf("message heade
r should have a request ID")} | |
215 return responseChan | |
216 } | |
217 r.mu.Lock() | |
218 defer r.mu.Unlock() | |
219 // Return an error before sending a request to requestChan if the router | |
220 // is closed so that we can safely close responseChan once we close the | |
221 // router. | |
222 if !r.handle.IsValid() { | |
223 responseChan <- MessageReadResult{nil, errConnectionClosed} | |
224 return responseChan | |
225 } | |
226 r.requestChan <- routeRequest{message, responseChan} | |
227 return responseChan | |
228 } | |
OLD | NEW |