Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(119)

Side by Side Diff: third_party/mojo/src/mojo/public/go/bindings/async_waiter.go

Issue 954643002: Update mojo sdk to rev 3d23dae011859a2aae49f1d1adde705c8e85d819 (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: resolve more msvc linkage woes Created 5 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 package bindings
6
7 import (
8 "fmt"
9 "sync"
10 "sync/atomic"
11
12 "mojo/public/go/system"
13 )
14
15 var waiter *asyncWaiterImpl
16 var once sync.Once
17
18 // GetAsyncWaiter returns a default implementation of |AsyncWaiter| interface.
19 func GetAsyncWaiter() AsyncWaiter {
20 once.Do(func() {
21 waiter = newAsyncWaiter()
22 })
23 return waiter
24 }
25
26 // AsyncWaitId is an id returned by |AsyncWait()| used to cancel it.
27 type AsyncWaitId uint64
28
29 // WaitResponse is a struct sent to a channel waiting for |AsyncWait()| to
30 // finish. It contains the same information as if |Wait()| was called on a
31 // handle.
32 type WaitResponse struct {
33 Result system.MojoResult
34 State system.MojoHandleSignalsState
35 }
36
37 // AsyncWaiter defines an interface for asynchronously waiting (and cancelling
38 // asynchronous waits) on a handle.
39 type AsyncWaiter interface {
40 // AsyncWait asynchronously waits on a given handle until a signal
41 // indicated by |signals| is satisfied or it becomes known that no
42 // signal indicated by |signals| will ever be satisified. The wait
43 // response will be sent to |responseChan|.
44 //
45 // |handle| must not be closed or transferred until the wait response
46 // is received from |responseChan|.
47 AsyncWait(handle system.Handle, signals system.MojoHandleSignals, respon seChan chan<- WaitResponse) AsyncWaitId
48
49 // CancelWait cancels an outstanding async wait (specified by |id|)
50 // initiated by |AsyncWait()|. A response with Mojo result
51 // |MOJO_RESULT_ABORTED| is sent to the corresponding |responseChan|.
52 CancelWait(id AsyncWaitId)
53 }
54
55 // waitRequest is a struct sent to asyncWaiterWorker to add another handle to
56 // the list of waiting handles.
57 type waitRequest struct {
58 handle system.Handle
59 signals system.MojoHandleSignals
60
61 // Used for |CancelWait()| calls. The worker should issue IDs so that
62 // you can't cancel the wait until the worker received the wait request.
63 idChan chan<- AsyncWaitId
64
65 // A channel end to send wait results.
66 responseChan chan<- WaitResponse
67 }
68
69 // asyncWaiterWorker does the actual work, in its own goroutine. It calls
70 // |WaitMany()| on all provided handles. New handles a added via |waitChan|
71 // and removed via |cancelChan| messages. To wake the worker asyncWaiterImpl
72 // sends mojo messages to a dedicated message pipe, the other end of which has
73 // index 0 in all slices of the worker.
74 type asyncWaiterWorker struct {
75 // |handles| and |signals| are used to make |WaitMany()| calls directly.
76 // All these arrays should be operated simultaneously; i-th element
77 // of each refers to i-th handle.
78 handles []system.Handle
79 signals []system.MojoHandleSignals
80 asyncWaitIds []AsyncWaitId
81 responses []chan<- WaitResponse
82
83 // Flag shared between waiterImpl and worker that is 1 iff the worker is
84 // already notified by waiterImpl. The worker sets it to 0 as soon as
85 // |WaitMany()| succeeds.
86 isNotified *int32
87 waitChan <-chan waitRequest // should have a non-empty buffer
88 cancelChan <-chan AsyncWaitId // should have a non-empty buffer
89 lastUsedId AsyncWaitId // is incremented each |AsyncWait()| call
90 }
91
92 // removeHandle removes handle at provided index without sending response by
93 // swapping all information associated with index-th handle with the last one
94 // and removing the last one.
95 func (w *asyncWaiterWorker) removeHandle(index int) {
96 l := len(w.handles) - 1
97 // Swap with the last and remove last.
98 w.handles[index] = w.handles[l]
99 w.handles = w.handles[0:l]
100 w.signals[index] = w.signals[l]
101 w.signals = w.signals[0:l]
102
103 w.asyncWaitIds[index] = w.asyncWaitIds[l]
104 w.asyncWaitIds = w.asyncWaitIds[0:l]
105 w.responses[index] = w.responses[l]
106 w.responses = w.responses[0:l]
107 }
108
109 // sendWaitResponseAndRemove send response to corresponding channel and removes
110 // index-th waiting handle.
111 func (w *asyncWaiterWorker) sendWaitResponseAndRemove(index int, result system.M ojoResult, state system.MojoHandleSignalsState) {
112 w.responses[index] <- WaitResponse{
113 result,
114 state,
115 }
116 w.removeHandle(index)
117 }
118
119 // respondToSatisfiedWaits responds to all wait requests that have at least
120 // one satisfied signal and removes them.
121 func (w *asyncWaiterWorker) respondToSatisfiedWaits(states []system.MojoHandleSi gnalsState) {
122 // Don't touch handle at index 0 as it is the waking handle.
123 for i := 1; i < len(states); {
124 if (states[i].SatisfiedSignals & w.signals[i]) != 0 {
125 // Respond and swap i-th with last and remove last.
126 w.sendWaitResponseAndRemove(i, system.MOJO_RESULT_OK, st ates[i])
127 // Swap i-th with last and remove last.
128 states[i] = states[len(states)-1]
129 states = states[:len(states)-1]
130 } else {
131 i++
132 }
133 }
134 }
135
136 // processIncomingRequests processes all queued async wait or cancel requests
137 // sent by asyncWaiterImpl.
138 func (w *asyncWaiterWorker) processIncomingRequests() {
139 for {
140 select {
141 case request := <-w.waitChan:
142 w.handles = append(w.handles, request.handle)
143 w.signals = append(w.signals, request.signals)
144 w.responses = append(w.responses, request.responseChan)
145
146 w.lastUsedId++
147 id := w.lastUsedId
148 w.asyncWaitIds = append(w.asyncWaitIds, id)
149 request.idChan <- id
150 case AsyncWaitId := <-w.cancelChan:
151 // Zero index is reserved for the waking message pipe ha ndle.
152 index := 0
153 for i := 1; i < len(w.asyncWaitIds); i++ {
154 if w.asyncWaitIds[i] == AsyncWaitId {
155 index = i
156 break
157 }
158 }
159 // Do nothing if the id was not found as wait response m ay be
160 // already sent if the async wait was successful.
161 if index > 0 {
162 w.sendWaitResponseAndRemove(index, system.MOJO_R ESULT_ABORTED, system.MojoHandleSignalsState{})
163 }
164 default:
165 return
166 }
167 }
168 }
169
170 // runLoop run loop of the asyncWaiterWorker. Blocks on |WaitMany()|. If the
171 // wait is interrupted by waking handle (index 0) then it means that the worker
172 // was woken by waiterImpl, so the worker processes incoming requests from
173 // waiterImpl; otherwise responses to corresponding wait request.
174 func (w *asyncWaiterWorker) runLoop() {
175 for {
176 result, index, states := system.GetCore().WaitMany(w.handles, w. signals, system.MOJO_DEADLINE_INDEFINITE)
177 // Set flag to 0, so that the next incoming request to
178 // waiterImpl would explicitly wake worker by sending a message
179 // to waking message pipe.
180 atomic.StoreInt32(w.isNotified, 0)
181 if index == -1 {
182 panic(fmt.Sprintf("error waiting on handles: %v", result ))
183 break
184 }
185 // Zero index means that the worker was signaled by asyncWaiterI mpl.
186 if index == 0 {
187 if result != system.MOJO_RESULT_OK {
188 panic(fmt.Sprintf("error waiting on waking handl e: %v", result))
189 }
190 w.handles[0].(system.MessagePipeHandle).ReadMessage(syst em.MOJO_READ_MESSAGE_FLAG_NONE)
191 w.processIncomingRequests()
192 } else if result != system.MOJO_RESULT_OK {
193 w.sendWaitResponseAndRemove(index, result, system.MojoHa ndleSignalsState{})
194 } else {
195 w.respondToSatisfiedWaits(states)
196 }
197 }
198 }
199
200 // asyncWaiterImpl is an implementation of |AsyncWaiter| interface.
201 // Runs a worker in a separate goroutine and comunicates with it by sending a
202 // message to |wakingHandle| to wake worker from |WaitMany()| call and
203 // sending request via |waitChan| and |cancelChan|.
204 type asyncWaiterImpl struct {
205 wakingHandle system.MessagePipeHandle
206
207 // Flag shared between waiterImpl and worker that is 1 iff the worker is
208 // already notified by waiterImpl. The worker sets it to 0 as soon as
209 // |WaitMany()| succeeds.
210 isWorkerNotified *int32
211 waitChan chan<- waitRequest // should have a non-empty buffer
212 cancelChan chan<- AsyncWaitId // should have a non-empty buffer
213 }
214
215 // newAsyncWaiter creates an asyncWaiterImpl and starts its worker goroutine.
216 func newAsyncWaiter() *asyncWaiterImpl {
217 result, h0, h1 := system.GetCore().CreateMessagePipe(nil)
218 if result != system.MOJO_RESULT_OK {
219 panic(fmt.Sprintf("can't create message pipe %v", result))
220 }
221 waitChan := make(chan waitRequest, 10)
222 cancelChan := make(chan AsyncWaitId, 10)
223 isNotified := new(int32)
224 worker := asyncWaiterWorker{
225 []system.Handle{h1},
226 []system.MojoHandleSignals{system.MOJO_HANDLE_SIGNAL_READABLE},
227 []AsyncWaitId{0},
228 []chan<- WaitResponse{make(chan WaitResponse)},
229 isNotified,
230 waitChan,
231 cancelChan,
232 0,
233 }
234 go worker.runLoop()
235 return &asyncWaiterImpl{
236 wakingHandle: h0,
237 isWorkerNotified: isNotified,
238 waitChan: waitChan,
239 cancelChan: cancelChan,
240 }
241 }
242
243 // wakeWorker wakes the worker from |WaitMany()| call. This should be called
244 // after sending a message to |waitChan| or |cancelChan| to avoid deadlock.
245 func (w *asyncWaiterImpl) wakeWorker() {
246 if atomic.CompareAndSwapInt32(w.isWorkerNotified, 0, 1) {
247 w.wakingHandle.WriteMessage([]byte{0}, nil, system.MOJO_WRITE_ME SSAGE_FLAG_NONE)
248 }
249 }
250
251 func (w *asyncWaiterImpl) AsyncWait(handle system.Handle, signals system.MojoHan dleSignals, responseChan chan<- WaitResponse) AsyncWaitId {
252 idChan := make(chan AsyncWaitId, 1)
253 w.waitChan <- waitRequest{
254 handle,
255 signals,
256 idChan,
257 responseChan,
258 }
259 w.wakeWorker()
260 return <-idChan
261 }
262
263 func (w *asyncWaiterImpl) CancelWait(id AsyncWaitId) {
264 w.cancelChan <- id
265 w.wakeWorker()
266 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698