| OLD | NEW |
| (Empty) |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | |
| 2 // Use of this source code is governed by a BSD-style license that can be | |
| 3 // found in the LICENSE file. | |
| 4 | |
| 5 package bindings | |
| 6 | |
| 7 import ( | |
| 8 "fmt" | |
| 9 "runtime" | |
| 10 "sync" | |
| 11 "sync/atomic" | |
| 12 | |
| 13 "mojo/public/go/system" | |
| 14 ) | |
| 15 | |
| 16 var defaultWaiter *asyncWaiterImpl | |
| 17 var once sync.Once | |
| 18 | |
| 19 // GetAsyncWaiter returns a default implementation of |AsyncWaiter| interface. | |
| 20 func GetAsyncWaiter() AsyncWaiter { | |
| 21 once.Do(func() { | |
| 22 defaultWaiter = newAsyncWaiter() | |
| 23 }) | |
| 24 return defaultWaiter | |
| 25 } | |
| 26 | |
| 27 // AsyncWaitId is an id returned by |AsyncWait()| used to cancel it. | |
| 28 type AsyncWaitId uint64 | |
| 29 | |
| 30 // WaitResponse is a struct sent to a channel waiting for |AsyncWait()| to | |
| 31 // finish. It contains the same information as if |Wait()| was called on a | |
| 32 // handle. | |
| 33 type WaitResponse struct { | |
| 34 Result system.MojoResult | |
| 35 State system.MojoHandleSignalsState | |
| 36 } | |
| 37 | |
| 38 // AsyncWaiter defines an interface for asynchronously waiting (and cancelling | |
| 39 // asynchronous waits) on a handle. | |
| 40 type AsyncWaiter interface { | |
| 41 // AsyncWait asynchronously waits on a given handle until a signal | |
| 42 // indicated by |signals| is satisfied or it becomes known that no | |
| 43 // signal indicated by |signals| will ever be satisified. The wait | |
| 44 // response will be sent to |responseChan|. | |
| 45 // | |
| 46 // |handle| must not be closed or transferred until the wait response | |
| 47 // is received from |responseChan|. | |
| 48 AsyncWait(handle system.Handle, signals system.MojoHandleSignals, respon
seChan chan<- WaitResponse) AsyncWaitId | |
| 49 | |
| 50 // CancelWait cancels an outstanding async wait (specified by |id|) | |
| 51 // initiated by |AsyncWait()|. A response with Mojo result | |
| 52 // |MOJO_RESULT_ABORTED| is sent to the corresponding |responseChan|. | |
| 53 CancelWait(id AsyncWaitId) | |
| 54 } | |
| 55 | |
| 56 // waitRequest is a struct sent to asyncWaiterWorker to add another handle to | |
| 57 // the list of waiting handles. | |
| 58 type waitRequest struct { | |
| 59 handle system.Handle | |
| 60 signals system.MojoHandleSignals | |
| 61 | |
| 62 // Used for |CancelWait()| calls. The worker should issue IDs so that | |
| 63 // you can't cancel the wait until the worker received the wait request. | |
| 64 idChan chan<- AsyncWaitId | |
| 65 | |
| 66 // A channel end to send wait results. | |
| 67 responseChan chan<- WaitResponse | |
| 68 } | |
| 69 | |
| 70 // asyncWaiterWorker does the actual work, in its own goroutine. It calls | |
| 71 // |WaitMany()| on all provided handles. New handles a added via |waitChan| | |
| 72 // and removed via |cancelChan| messages. To wake the worker asyncWaiterImpl | |
| 73 // sends mojo messages to a dedicated message pipe, the other end of which has | |
| 74 // index 0 in all slices of the worker. | |
| 75 type asyncWaiterWorker struct { | |
| 76 // |handles| and |signals| are used to make |WaitMany()| calls directly. | |
| 77 // All these arrays should be operated simultaneously; i-th element | |
| 78 // of each refers to i-th handle. | |
| 79 handles []system.Handle | |
| 80 signals []system.MojoHandleSignals | |
| 81 asyncWaitIds []AsyncWaitId | |
| 82 responses []chan<- WaitResponse | |
| 83 | |
| 84 // Flag shared between waiterImpl and worker that is 1 iff the worker is | |
| 85 // already notified by waiterImpl. The worker sets it to 0 as soon as | |
| 86 // |WaitMany()| succeeds. | |
| 87 isNotified *int32 | |
| 88 waitChan <-chan waitRequest // should have a non-empty buffer | |
| 89 cancelChan <-chan AsyncWaitId // should have a non-empty buffer | |
| 90 ids uint64 // is incremented each |AsyncWait()| call | |
| 91 } | |
| 92 | |
| 93 // removeHandle removes handle at provided index without sending response by | |
| 94 // swapping all information associated with index-th handle with the last one | |
| 95 // and removing the last one. | |
| 96 func (w *asyncWaiterWorker) removeHandle(index int) { | |
| 97 l := len(w.handles) - 1 | |
| 98 // Swap with the last and remove last. | |
| 99 w.handles[index] = w.handles[l] | |
| 100 w.handles = w.handles[0:l] | |
| 101 w.signals[index] = w.signals[l] | |
| 102 w.signals = w.signals[0:l] | |
| 103 | |
| 104 w.asyncWaitIds[index] = w.asyncWaitIds[l] | |
| 105 w.asyncWaitIds = w.asyncWaitIds[0:l] | |
| 106 w.responses[index] = w.responses[l] | |
| 107 w.responses = w.responses[0:l] | |
| 108 } | |
| 109 | |
| 110 // sendWaitResponseAndRemove send response to corresponding channel and removes | |
| 111 // index-th waiting handle. | |
| 112 func (w *asyncWaiterWorker) sendWaitResponseAndRemove(index int, result system.M
ojoResult, state system.MojoHandleSignalsState) { | |
| 113 w.responses[index] <- WaitResponse{ | |
| 114 result, | |
| 115 state, | |
| 116 } | |
| 117 w.removeHandle(index) | |
| 118 } | |
| 119 | |
| 120 // respondToSatisfiedWaits responds to all wait requests that have at least | |
| 121 // one satisfied signal and removes them. | |
| 122 func (w *asyncWaiterWorker) respondToSatisfiedWaits(states []system.MojoHandleSi
gnalsState) { | |
| 123 // Don't touch handle at index 0 as it is the waking handle. | |
| 124 for i := 1; i < len(states); { | |
| 125 if (states[i].SatisfiedSignals & w.signals[i]) != 0 { | |
| 126 // Respond and swap i-th with last and remove last. | |
| 127 w.sendWaitResponseAndRemove(i, system.MOJO_RESULT_OK, st
ates[i]) | |
| 128 // Swap i-th with last and remove last. | |
| 129 states[i] = states[len(states)-1] | |
| 130 states = states[:len(states)-1] | |
| 131 } else { | |
| 132 i++ | |
| 133 } | |
| 134 } | |
| 135 } | |
| 136 | |
| 137 // processIncomingRequests processes all queued async wait or cancel requests | |
| 138 // sent by asyncWaiterImpl. | |
| 139 func (w *asyncWaiterWorker) processIncomingRequests() { | |
| 140 for { | |
| 141 select { | |
| 142 case request := <-w.waitChan: | |
| 143 w.handles = append(w.handles, request.handle) | |
| 144 w.signals = append(w.signals, request.signals) | |
| 145 w.responses = append(w.responses, request.responseChan) | |
| 146 | |
| 147 w.ids++ | |
| 148 id := AsyncWaitId(w.ids) | |
| 149 w.asyncWaitIds = append(w.asyncWaitIds, id) | |
| 150 request.idChan <- id | |
| 151 case AsyncWaitId := <-w.cancelChan: | |
| 152 // Zero index is reserved for the waking message pipe ha
ndle. | |
| 153 index := 0 | |
| 154 for i := 1; i < len(w.asyncWaitIds); i++ { | |
| 155 if w.asyncWaitIds[i] == AsyncWaitId { | |
| 156 index = i | |
| 157 break | |
| 158 } | |
| 159 } | |
| 160 // Do nothing if the id was not found as wait response m
ay be | |
| 161 // already sent if the async wait was successful. | |
| 162 if index > 0 { | |
| 163 w.sendWaitResponseAndRemove(index, system.MOJO_R
ESULT_ABORTED, system.MojoHandleSignalsState{}) | |
| 164 } | |
| 165 default: | |
| 166 return | |
| 167 } | |
| 168 } | |
| 169 } | |
| 170 | |
| 171 // runLoop run loop of the asyncWaiterWorker. Blocks on |WaitMany()|. If the | |
| 172 // wait is interrupted by waking handle (index 0) then it means that the worker | |
| 173 // was woken by waiterImpl, so the worker processes incoming requests from | |
| 174 // waiterImpl; otherwise responses to corresponding wait request. | |
| 175 func (w *asyncWaiterWorker) runLoop() { | |
| 176 for { | |
| 177 result, index, states := system.GetCore().WaitMany(w.handles, w.
signals, system.MOJO_DEADLINE_INDEFINITE) | |
| 178 // Set flag to 0, so that the next incoming request to | |
| 179 // waiterImpl would explicitly wake worker by sending a message | |
| 180 // to waking message pipe. | |
| 181 atomic.StoreInt32(w.isNotified, 0) | |
| 182 if index == -1 { | |
| 183 panic(fmt.Sprintf("error waiting on handles: %v", result
)) | |
| 184 break | |
| 185 } | |
| 186 // Zero index means that the worker was signaled by asyncWaiterI
mpl. | |
| 187 if index == 0 { | |
| 188 if result != system.MOJO_RESULT_OK { | |
| 189 panic(fmt.Sprintf("error waiting on waking handl
e: %v", result)) | |
| 190 } | |
| 191 w.handles[0].(system.MessagePipeHandle).ReadMessage(syst
em.MOJO_READ_MESSAGE_FLAG_NONE) | |
| 192 w.processIncomingRequests() | |
| 193 } else if result != system.MOJO_RESULT_OK { | |
| 194 w.sendWaitResponseAndRemove(index, result, system.MojoHa
ndleSignalsState{}) | |
| 195 } else { | |
| 196 w.respondToSatisfiedWaits(states) | |
| 197 } | |
| 198 } | |
| 199 } | |
| 200 | |
| 201 // asyncWaiterImpl is an implementation of |AsyncWaiter| interface. | |
| 202 // Runs a worker in a separate goroutine and comunicates with it by sending a | |
| 203 // message to |wakingHandle| to wake worker from |WaitMany()| call and | |
| 204 // sending request via |waitChan| and |cancelChan|. | |
| 205 type asyncWaiterImpl struct { | |
| 206 wakingHandle system.MessagePipeHandle | |
| 207 | |
| 208 // Flag shared between waiterImpl and worker that is 1 iff the worker is | |
| 209 // already notified by waiterImpl. The worker sets it to 0 as soon as | |
| 210 // |WaitMany()| succeeds. | |
| 211 isWorkerNotified *int32 | |
| 212 waitChan chan<- waitRequest // should have a non-empty buffer | |
| 213 cancelChan chan<- AsyncWaitId // should have a non-empty buffer | |
| 214 } | |
| 215 | |
| 216 func finalizeWorker(worker *asyncWaiterWorker) { | |
| 217 // Close waking handle on worker side. | |
| 218 worker.handles[0].Close() | |
| 219 } | |
| 220 | |
| 221 func finalizeAsyncWaiter(waiter *asyncWaiterImpl) { | |
| 222 waiter.wakingHandle.Close() | |
| 223 } | |
| 224 | |
| 225 // newAsyncWaiter creates an asyncWaiterImpl and starts its worker goroutine. | |
| 226 func newAsyncWaiter() *asyncWaiterImpl { | |
| 227 result, h0, h1 := system.GetCore().CreateMessagePipe(nil) | |
| 228 if result != system.MOJO_RESULT_OK { | |
| 229 panic(fmt.Sprintf("can't create message pipe %v", result)) | |
| 230 } | |
| 231 waitChan := make(chan waitRequest, 10) | |
| 232 cancelChan := make(chan AsyncWaitId, 10) | |
| 233 isNotified := new(int32) | |
| 234 worker := &asyncWaiterWorker{ | |
| 235 []system.Handle{h1}, | |
| 236 []system.MojoHandleSignals{system.MOJO_HANDLE_SIGNAL_READABLE}, | |
| 237 []AsyncWaitId{0}, | |
| 238 []chan<- WaitResponse{make(chan WaitResponse)}, | |
| 239 isNotified, | |
| 240 waitChan, | |
| 241 cancelChan, | |
| 242 0, | |
| 243 } | |
| 244 runtime.SetFinalizer(worker, finalizeWorker) | |
| 245 go worker.runLoop() | |
| 246 waiter := &asyncWaiterImpl{ | |
| 247 wakingHandle: h0, | |
| 248 isWorkerNotified: isNotified, | |
| 249 waitChan: waitChan, | |
| 250 cancelChan: cancelChan, | |
| 251 } | |
| 252 runtime.SetFinalizer(waiter, finalizeAsyncWaiter) | |
| 253 return waiter | |
| 254 } | |
| 255 | |
| 256 // wakeWorker wakes the worker from |WaitMany()| call. This should be called | |
| 257 // after sending a message to |waitChan| or |cancelChan| to avoid deadlock. | |
| 258 func (w *asyncWaiterImpl) wakeWorker() { | |
| 259 if atomic.CompareAndSwapInt32(w.isWorkerNotified, 0, 1) { | |
| 260 result := w.wakingHandle.WriteMessage([]byte{0}, nil, system.MOJ
O_WRITE_MESSAGE_FLAG_NONE) | |
| 261 if result != system.MOJO_RESULT_OK { | |
| 262 panic("can't write to a message pipe") | |
| 263 } | |
| 264 } | |
| 265 } | |
| 266 | |
| 267 func (w *asyncWaiterImpl) AsyncWait(handle system.Handle, signals system.MojoHan
dleSignals, responseChan chan<- WaitResponse) AsyncWaitId { | |
| 268 idChan := make(chan AsyncWaitId, 1) | |
| 269 w.waitChan <- waitRequest{ | |
| 270 handle, | |
| 271 signals, | |
| 272 idChan, | |
| 273 responseChan, | |
| 274 } | |
| 275 w.wakeWorker() | |
| 276 return <-idChan | |
| 277 } | |
| 278 | |
| 279 func (w *asyncWaiterImpl) CancelWait(id AsyncWaitId) { | |
| 280 w.cancelChan <- id | |
| 281 w.wakeWorker() | |
| 282 } | |
| OLD | NEW |