OLD | NEW |
| (Empty) |
1 // Copyright 2015 The Chromium Authors. All rights reserved. | |
2 // Use of this source code is governed by a BSD-style license that can be | |
3 // found in the LICENSE file. | |
4 | |
5 package bindings | |
6 | |
7 import ( | |
8 "fmt" | |
9 "runtime" | |
10 "sync" | |
11 "sync/atomic" | |
12 | |
13 "mojo/public/go/system" | |
14 ) | |
15 | |
16 var defaultWaiter *asyncWaiterImpl | |
17 var once sync.Once | |
18 | |
19 // GetAsyncWaiter returns a default implementation of |AsyncWaiter| interface. | |
20 func GetAsyncWaiter() AsyncWaiter { | |
21 once.Do(func() { | |
22 defaultWaiter = newAsyncWaiter() | |
23 }) | |
24 return defaultWaiter | |
25 } | |
26 | |
27 // AsyncWaitId is an id returned by |AsyncWait()| used to cancel it. | |
28 type AsyncWaitId uint64 | |
29 | |
30 // WaitResponse is a struct sent to a channel waiting for |AsyncWait()| to | |
31 // finish. It contains the same information as if |Wait()| was called on a | |
32 // handle. | |
33 type WaitResponse struct { | |
34 Result system.MojoResult | |
35 State system.MojoHandleSignalsState | |
36 } | |
37 | |
38 // AsyncWaiter defines an interface for asynchronously waiting (and cancelling | |
39 // asynchronous waits) on a handle. | |
40 type AsyncWaiter interface { | |
41 // AsyncWait asynchronously waits on a given handle until a signal | |
42 // indicated by |signals| is satisfied or it becomes known that no | |
43 // signal indicated by |signals| will ever be satisified. The wait | |
44 // response will be sent to |responseChan|. | |
45 // | |
46 // |handle| must not be closed or transferred until the wait response | |
47 // is received from |responseChan|. | |
48 AsyncWait(handle system.Handle, signals system.MojoHandleSignals, respon
seChan chan<- WaitResponse) AsyncWaitId | |
49 | |
50 // CancelWait cancels an outstanding async wait (specified by |id|) | |
51 // initiated by |AsyncWait()|. A response with Mojo result | |
52 // |MOJO_RESULT_ABORTED| is sent to the corresponding |responseChan|. | |
53 CancelWait(id AsyncWaitId) | |
54 } | |
55 | |
56 // waitRequest is a struct sent to asyncWaiterWorker to add another handle to | |
57 // the list of waiting handles. | |
58 type waitRequest struct { | |
59 handle system.Handle | |
60 signals system.MojoHandleSignals | |
61 | |
62 // Used for |CancelWait()| calls. The worker should issue IDs so that | |
63 // you can't cancel the wait until the worker received the wait request. | |
64 idChan chan<- AsyncWaitId | |
65 | |
66 // A channel end to send wait results. | |
67 responseChan chan<- WaitResponse | |
68 } | |
69 | |
70 // asyncWaiterWorker does the actual work, in its own goroutine. It calls | |
71 // |WaitMany()| on all provided handles. New handles a added via |waitChan| | |
72 // and removed via |cancelChan| messages. To wake the worker asyncWaiterImpl | |
73 // sends mojo messages to a dedicated message pipe, the other end of which has | |
74 // index 0 in all slices of the worker. | |
75 type asyncWaiterWorker struct { | |
76 // |handles| and |signals| are used to make |WaitMany()| calls directly. | |
77 // All these arrays should be operated simultaneously; i-th element | |
78 // of each refers to i-th handle. | |
79 handles []system.Handle | |
80 signals []system.MojoHandleSignals | |
81 asyncWaitIds []AsyncWaitId | |
82 responses []chan<- WaitResponse | |
83 | |
84 // Flag shared between waiterImpl and worker that is 1 iff the worker is | |
85 // already notified by waiterImpl. The worker sets it to 0 as soon as | |
86 // |WaitMany()| succeeds. | |
87 isNotified *int32 | |
88 waitChan <-chan waitRequest // should have a non-empty buffer | |
89 cancelChan <-chan AsyncWaitId // should have a non-empty buffer | |
90 ids uint64 // is incremented each |AsyncWait()| call | |
91 } | |
92 | |
93 // removeHandle removes handle at provided index without sending response by | |
94 // swapping all information associated with index-th handle with the last one | |
95 // and removing the last one. | |
96 func (w *asyncWaiterWorker) removeHandle(index int) { | |
97 l := len(w.handles) - 1 | |
98 // Swap with the last and remove last. | |
99 w.handles[index] = w.handles[l] | |
100 w.handles = w.handles[0:l] | |
101 w.signals[index] = w.signals[l] | |
102 w.signals = w.signals[0:l] | |
103 | |
104 w.asyncWaitIds[index] = w.asyncWaitIds[l] | |
105 w.asyncWaitIds = w.asyncWaitIds[0:l] | |
106 w.responses[index] = w.responses[l] | |
107 w.responses = w.responses[0:l] | |
108 } | |
109 | |
110 // sendWaitResponseAndRemove send response to corresponding channel and removes | |
111 // index-th waiting handle. | |
112 func (w *asyncWaiterWorker) sendWaitResponseAndRemove(index int, result system.M
ojoResult, state system.MojoHandleSignalsState) { | |
113 w.responses[index] <- WaitResponse{ | |
114 result, | |
115 state, | |
116 } | |
117 w.removeHandle(index) | |
118 } | |
119 | |
120 // respondToSatisfiedWaits responds to all wait requests that have at least | |
121 // one satisfied signal and removes them. | |
122 func (w *asyncWaiterWorker) respondToSatisfiedWaits(states []system.MojoHandleSi
gnalsState) { | |
123 // Don't touch handle at index 0 as it is the waking handle. | |
124 for i := 1; i < len(states); { | |
125 if (states[i].SatisfiedSignals & w.signals[i]) != 0 { | |
126 // Respond and swap i-th with last and remove last. | |
127 w.sendWaitResponseAndRemove(i, system.MOJO_RESULT_OK, st
ates[i]) | |
128 // Swap i-th with last and remove last. | |
129 states[i] = states[len(states)-1] | |
130 states = states[:len(states)-1] | |
131 } else { | |
132 i++ | |
133 } | |
134 } | |
135 } | |
136 | |
137 // processIncomingRequests processes all queued async wait or cancel requests | |
138 // sent by asyncWaiterImpl. | |
139 func (w *asyncWaiterWorker) processIncomingRequests() { | |
140 for { | |
141 select { | |
142 case request := <-w.waitChan: | |
143 w.handles = append(w.handles, request.handle) | |
144 w.signals = append(w.signals, request.signals) | |
145 w.responses = append(w.responses, request.responseChan) | |
146 | |
147 w.ids++ | |
148 id := AsyncWaitId(w.ids) | |
149 w.asyncWaitIds = append(w.asyncWaitIds, id) | |
150 request.idChan <- id | |
151 case AsyncWaitId := <-w.cancelChan: | |
152 // Zero index is reserved for the waking message pipe ha
ndle. | |
153 index := 0 | |
154 for i := 1; i < len(w.asyncWaitIds); i++ { | |
155 if w.asyncWaitIds[i] == AsyncWaitId { | |
156 index = i | |
157 break | |
158 } | |
159 } | |
160 // Do nothing if the id was not found as wait response m
ay be | |
161 // already sent if the async wait was successful. | |
162 if index > 0 { | |
163 w.sendWaitResponseAndRemove(index, system.MOJO_R
ESULT_ABORTED, system.MojoHandleSignalsState{}) | |
164 } | |
165 default: | |
166 return | |
167 } | |
168 } | |
169 } | |
170 | |
171 // runLoop run loop of the asyncWaiterWorker. Blocks on |WaitMany()|. If the | |
172 // wait is interrupted by waking handle (index 0) then it means that the worker | |
173 // was woken by waiterImpl, so the worker processes incoming requests from | |
174 // waiterImpl; otherwise responses to corresponding wait request. | |
175 func (w *asyncWaiterWorker) runLoop() { | |
176 for { | |
177 result, index, states := system.GetCore().WaitMany(w.handles, w.
signals, system.MOJO_DEADLINE_INDEFINITE) | |
178 // Set flag to 0, so that the next incoming request to | |
179 // waiterImpl would explicitly wake worker by sending a message | |
180 // to waking message pipe. | |
181 atomic.StoreInt32(w.isNotified, 0) | |
182 if index == -1 { | |
183 panic(fmt.Sprintf("error waiting on handles: %v", result
)) | |
184 break | |
185 } | |
186 // Zero index means that the worker was signaled by asyncWaiterI
mpl. | |
187 if index == 0 { | |
188 if result != system.MOJO_RESULT_OK { | |
189 panic(fmt.Sprintf("error waiting on waking handl
e: %v", result)) | |
190 } | |
191 w.handles[0].(system.MessagePipeHandle).ReadMessage(syst
em.MOJO_READ_MESSAGE_FLAG_NONE) | |
192 w.processIncomingRequests() | |
193 } else if result != system.MOJO_RESULT_OK { | |
194 w.sendWaitResponseAndRemove(index, result, system.MojoHa
ndleSignalsState{}) | |
195 } else { | |
196 w.respondToSatisfiedWaits(states) | |
197 } | |
198 } | |
199 } | |
200 | |
201 // asyncWaiterImpl is an implementation of |AsyncWaiter| interface. | |
202 // Runs a worker in a separate goroutine and comunicates with it by sending a | |
203 // message to |wakingHandle| to wake worker from |WaitMany()| call and | |
204 // sending request via |waitChan| and |cancelChan|. | |
205 type asyncWaiterImpl struct { | |
206 wakingHandle system.MessagePipeHandle | |
207 | |
208 // Flag shared between waiterImpl and worker that is 1 iff the worker is | |
209 // already notified by waiterImpl. The worker sets it to 0 as soon as | |
210 // |WaitMany()| succeeds. | |
211 isWorkerNotified *int32 | |
212 waitChan chan<- waitRequest // should have a non-empty buffer | |
213 cancelChan chan<- AsyncWaitId // should have a non-empty buffer | |
214 } | |
215 | |
216 func finalizeWorker(worker *asyncWaiterWorker) { | |
217 // Close waking handle on worker side. | |
218 worker.handles[0].Close() | |
219 } | |
220 | |
221 func finalizeAsyncWaiter(waiter *asyncWaiterImpl) { | |
222 waiter.wakingHandle.Close() | |
223 } | |
224 | |
225 // newAsyncWaiter creates an asyncWaiterImpl and starts its worker goroutine. | |
226 func newAsyncWaiter() *asyncWaiterImpl { | |
227 result, h0, h1 := system.GetCore().CreateMessagePipe(nil) | |
228 if result != system.MOJO_RESULT_OK { | |
229 panic(fmt.Sprintf("can't create message pipe %v", result)) | |
230 } | |
231 waitChan := make(chan waitRequest, 10) | |
232 cancelChan := make(chan AsyncWaitId, 10) | |
233 isNotified := new(int32) | |
234 worker := &asyncWaiterWorker{ | |
235 []system.Handle{h1}, | |
236 []system.MojoHandleSignals{system.MOJO_HANDLE_SIGNAL_READABLE}, | |
237 []AsyncWaitId{0}, | |
238 []chan<- WaitResponse{make(chan WaitResponse)}, | |
239 isNotified, | |
240 waitChan, | |
241 cancelChan, | |
242 0, | |
243 } | |
244 runtime.SetFinalizer(worker, finalizeWorker) | |
245 go worker.runLoop() | |
246 waiter := &asyncWaiterImpl{ | |
247 wakingHandle: h0, | |
248 isWorkerNotified: isNotified, | |
249 waitChan: waitChan, | |
250 cancelChan: cancelChan, | |
251 } | |
252 runtime.SetFinalizer(waiter, finalizeAsyncWaiter) | |
253 return waiter | |
254 } | |
255 | |
256 // wakeWorker wakes the worker from |WaitMany()| call. This should be called | |
257 // after sending a message to |waitChan| or |cancelChan| to avoid deadlock. | |
258 func (w *asyncWaiterImpl) wakeWorker() { | |
259 if atomic.CompareAndSwapInt32(w.isWorkerNotified, 0, 1) { | |
260 result := w.wakingHandle.WriteMessage([]byte{0}, nil, system.MOJ
O_WRITE_MESSAGE_FLAG_NONE) | |
261 if result != system.MOJO_RESULT_OK { | |
262 panic("can't write to a message pipe") | |
263 } | |
264 } | |
265 } | |
266 | |
267 func (w *asyncWaiterImpl) AsyncWait(handle system.Handle, signals system.MojoHan
dleSignals, responseChan chan<- WaitResponse) AsyncWaitId { | |
268 idChan := make(chan AsyncWaitId, 1) | |
269 w.waitChan <- waitRequest{ | |
270 handle, | |
271 signals, | |
272 idChan, | |
273 responseChan, | |
274 } | |
275 w.wakeWorker() | |
276 return <-idChan | |
277 } | |
278 | |
279 func (w *asyncWaiterImpl) CancelWait(id AsyncWaitId) { | |
280 w.cancelChan <- id | |
281 w.wakeWorker() | |
282 } | |
OLD | NEW |