Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(546)

Side by Side Diff: mojo/public/go/bindings/async_waiter.go

Issue 2250183003: Make the fuchsia mojo/public repo the source of truth. (Closed) Base URL: https://github.com/domokit/mojo.git@master
Patch Set: Created 4 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « mojo/public/go/application/describer.go ('k') | mojo/public/go/bindings/connector.go » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 package bindings
6
7 import (
8 "fmt"
9 "runtime"
10 "sync"
11 "sync/atomic"
12
13 "mojo/public/go/system"
14 )
15
16 var defaultWaiter *asyncWaiterImpl
17 var once sync.Once
18
19 // GetAsyncWaiter returns a default implementation of |AsyncWaiter| interface.
20 func GetAsyncWaiter() AsyncWaiter {
21 once.Do(func() {
22 defaultWaiter = newAsyncWaiter()
23 })
24 return defaultWaiter
25 }
26
27 // AsyncWaitId is an id returned by |AsyncWait()| used to cancel it.
28 type AsyncWaitId uint64
29
30 // WaitResponse is a struct sent to a channel waiting for |AsyncWait()| to
31 // finish. It contains the same information as if |Wait()| was called on a
32 // handle.
33 type WaitResponse struct {
34 Result system.MojoResult
35 State system.MojoHandleSignalsState
36 }
37
38 // AsyncWaiter defines an interface for asynchronously waiting (and cancelling
39 // asynchronous waits) on a handle.
40 type AsyncWaiter interface {
41 // AsyncWait asynchronously waits on a given handle until a signal
42 // indicated by |signals| is satisfied or it becomes known that no
43 // signal indicated by |signals| will ever be satisified. The wait
44 // response will be sent to |responseChan|.
45 //
46 // |handle| must not be closed or transferred until the wait response
47 // is received from |responseChan|.
48 AsyncWait(handle system.Handle, signals system.MojoHandleSignals, respon seChan chan<- WaitResponse) AsyncWaitId
49
50 // CancelWait cancels an outstanding async wait (specified by |id|)
51 // initiated by |AsyncWait()|. A response with Mojo result
52 // |MOJO_RESULT_ABORTED| is sent to the corresponding |responseChan|.
53 CancelWait(id AsyncWaitId)
54 }
55
56 // waitRequest is a struct sent to asyncWaiterWorker to add another handle to
57 // the list of waiting handles.
58 type waitRequest struct {
59 handle system.Handle
60 signals system.MojoHandleSignals
61
62 // Used for |CancelWait()| calls. The worker should issue IDs so that
63 // you can't cancel the wait until the worker received the wait request.
64 idChan chan<- AsyncWaitId
65
66 // A channel end to send wait results.
67 responseChan chan<- WaitResponse
68 }
69
70 // asyncWaiterWorker does the actual work, in its own goroutine. It calls
71 // |WaitMany()| on all provided handles. New handles a added via |waitChan|
72 // and removed via |cancelChan| messages. To wake the worker asyncWaiterImpl
73 // sends mojo messages to a dedicated message pipe, the other end of which has
74 // index 0 in all slices of the worker.
75 type asyncWaiterWorker struct {
76 // |handles| and |signals| are used to make |WaitMany()| calls directly.
77 // All these arrays should be operated simultaneously; i-th element
78 // of each refers to i-th handle.
79 handles []system.Handle
80 signals []system.MojoHandleSignals
81 asyncWaitIds []AsyncWaitId
82 responses []chan<- WaitResponse
83
84 // Flag shared between waiterImpl and worker that is 1 iff the worker is
85 // already notified by waiterImpl. The worker sets it to 0 as soon as
86 // |WaitMany()| succeeds.
87 isNotified *int32
88 waitChan <-chan waitRequest // should have a non-empty buffer
89 cancelChan <-chan AsyncWaitId // should have a non-empty buffer
90 ids uint64 // is incremented each |AsyncWait()| call
91 }
92
93 // removeHandle removes handle at provided index without sending response by
94 // swapping all information associated with index-th handle with the last one
95 // and removing the last one.
96 func (w *asyncWaiterWorker) removeHandle(index int) {
97 l := len(w.handles) - 1
98 // Swap with the last and remove last.
99 w.handles[index] = w.handles[l]
100 w.handles = w.handles[0:l]
101 w.signals[index] = w.signals[l]
102 w.signals = w.signals[0:l]
103
104 w.asyncWaitIds[index] = w.asyncWaitIds[l]
105 w.asyncWaitIds = w.asyncWaitIds[0:l]
106 w.responses[index] = w.responses[l]
107 w.responses = w.responses[0:l]
108 }
109
110 // sendWaitResponseAndRemove send response to corresponding channel and removes
111 // index-th waiting handle.
112 func (w *asyncWaiterWorker) sendWaitResponseAndRemove(index int, result system.M ojoResult, state system.MojoHandleSignalsState) {
113 w.responses[index] <- WaitResponse{
114 result,
115 state,
116 }
117 w.removeHandle(index)
118 }
119
120 // respondToSatisfiedWaits responds to all wait requests that have at least
121 // one satisfied signal and removes them.
122 func (w *asyncWaiterWorker) respondToSatisfiedWaits(states []system.MojoHandleSi gnalsState) {
123 // Don't touch handle at index 0 as it is the waking handle.
124 for i := 1; i < len(states); {
125 if (states[i].SatisfiedSignals & w.signals[i]) != 0 {
126 // Respond and swap i-th with last and remove last.
127 w.sendWaitResponseAndRemove(i, system.MOJO_RESULT_OK, st ates[i])
128 // Swap i-th with last and remove last.
129 states[i] = states[len(states)-1]
130 states = states[:len(states)-1]
131 } else {
132 i++
133 }
134 }
135 }
136
137 // processIncomingRequests processes all queued async wait or cancel requests
138 // sent by asyncWaiterImpl.
139 func (w *asyncWaiterWorker) processIncomingRequests() {
140 for {
141 select {
142 case request := <-w.waitChan:
143 w.handles = append(w.handles, request.handle)
144 w.signals = append(w.signals, request.signals)
145 w.responses = append(w.responses, request.responseChan)
146
147 w.ids++
148 id := AsyncWaitId(w.ids)
149 w.asyncWaitIds = append(w.asyncWaitIds, id)
150 request.idChan <- id
151 case AsyncWaitId := <-w.cancelChan:
152 // Zero index is reserved for the waking message pipe ha ndle.
153 index := 0
154 for i := 1; i < len(w.asyncWaitIds); i++ {
155 if w.asyncWaitIds[i] == AsyncWaitId {
156 index = i
157 break
158 }
159 }
160 // Do nothing if the id was not found as wait response m ay be
161 // already sent if the async wait was successful.
162 if index > 0 {
163 w.sendWaitResponseAndRemove(index, system.MOJO_R ESULT_ABORTED, system.MojoHandleSignalsState{})
164 }
165 default:
166 return
167 }
168 }
169 }
170
171 // runLoop run loop of the asyncWaiterWorker. Blocks on |WaitMany()|. If the
172 // wait is interrupted by waking handle (index 0) then it means that the worker
173 // was woken by waiterImpl, so the worker processes incoming requests from
174 // waiterImpl; otherwise responses to corresponding wait request.
175 func (w *asyncWaiterWorker) runLoop() {
176 for {
177 result, index, states := system.GetCore().WaitMany(w.handles, w. signals, system.MOJO_DEADLINE_INDEFINITE)
178 // Set flag to 0, so that the next incoming request to
179 // waiterImpl would explicitly wake worker by sending a message
180 // to waking message pipe.
181 atomic.StoreInt32(w.isNotified, 0)
182 if index == -1 {
183 panic(fmt.Sprintf("error waiting on handles: %v", result ))
184 break
185 }
186 // Zero index means that the worker was signaled by asyncWaiterI mpl.
187 if index == 0 {
188 if result != system.MOJO_RESULT_OK {
189 panic(fmt.Sprintf("error waiting on waking handl e: %v", result))
190 }
191 w.handles[0].(system.MessagePipeHandle).ReadMessage(syst em.MOJO_READ_MESSAGE_FLAG_NONE)
192 w.processIncomingRequests()
193 } else if result != system.MOJO_RESULT_OK {
194 w.sendWaitResponseAndRemove(index, result, system.MojoHa ndleSignalsState{})
195 } else {
196 w.respondToSatisfiedWaits(states)
197 }
198 }
199 }
200
201 // asyncWaiterImpl is an implementation of |AsyncWaiter| interface.
202 // Runs a worker in a separate goroutine and comunicates with it by sending a
203 // message to |wakingHandle| to wake worker from |WaitMany()| call and
204 // sending request via |waitChan| and |cancelChan|.
205 type asyncWaiterImpl struct {
206 wakingHandle system.MessagePipeHandle
207
208 // Flag shared between waiterImpl and worker that is 1 iff the worker is
209 // already notified by waiterImpl. The worker sets it to 0 as soon as
210 // |WaitMany()| succeeds.
211 isWorkerNotified *int32
212 waitChan chan<- waitRequest // should have a non-empty buffer
213 cancelChan chan<- AsyncWaitId // should have a non-empty buffer
214 }
215
216 func finalizeWorker(worker *asyncWaiterWorker) {
217 // Close waking handle on worker side.
218 worker.handles[0].Close()
219 }
220
221 func finalizeAsyncWaiter(waiter *asyncWaiterImpl) {
222 waiter.wakingHandle.Close()
223 }
224
225 // newAsyncWaiter creates an asyncWaiterImpl and starts its worker goroutine.
226 func newAsyncWaiter() *asyncWaiterImpl {
227 result, h0, h1 := system.GetCore().CreateMessagePipe(nil)
228 if result != system.MOJO_RESULT_OK {
229 panic(fmt.Sprintf("can't create message pipe %v", result))
230 }
231 waitChan := make(chan waitRequest, 10)
232 cancelChan := make(chan AsyncWaitId, 10)
233 isNotified := new(int32)
234 worker := &asyncWaiterWorker{
235 []system.Handle{h1},
236 []system.MojoHandleSignals{system.MOJO_HANDLE_SIGNAL_READABLE},
237 []AsyncWaitId{0},
238 []chan<- WaitResponse{make(chan WaitResponse)},
239 isNotified,
240 waitChan,
241 cancelChan,
242 0,
243 }
244 runtime.SetFinalizer(worker, finalizeWorker)
245 go worker.runLoop()
246 waiter := &asyncWaiterImpl{
247 wakingHandle: h0,
248 isWorkerNotified: isNotified,
249 waitChan: waitChan,
250 cancelChan: cancelChan,
251 }
252 runtime.SetFinalizer(waiter, finalizeAsyncWaiter)
253 return waiter
254 }
255
256 // wakeWorker wakes the worker from |WaitMany()| call. This should be called
257 // after sending a message to |waitChan| or |cancelChan| to avoid deadlock.
258 func (w *asyncWaiterImpl) wakeWorker() {
259 if atomic.CompareAndSwapInt32(w.isWorkerNotified, 0, 1) {
260 result := w.wakingHandle.WriteMessage([]byte{0}, nil, system.MOJ O_WRITE_MESSAGE_FLAG_NONE)
261 if result != system.MOJO_RESULT_OK {
262 panic("can't write to a message pipe")
263 }
264 }
265 }
266
267 func (w *asyncWaiterImpl) AsyncWait(handle system.Handle, signals system.MojoHan dleSignals, responseChan chan<- WaitResponse) AsyncWaitId {
268 idChan := make(chan AsyncWaitId, 1)
269 w.waitChan <- waitRequest{
270 handle,
271 signals,
272 idChan,
273 responseChan,
274 }
275 w.wakeWorker()
276 return <-idChan
277 }
278
279 func (w *asyncWaiterImpl) CancelWait(id AsyncWaitId) {
280 w.cancelChan <- id
281 w.wakeWorker()
282 }
OLDNEW
« no previous file with comments | « mojo/public/go/application/describer.go ('k') | mojo/public/go/bindings/connector.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698