| OLD | NEW |
| (Empty) | |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. |
| 4 |
| 5 package bindings |
| 6 |
| 7 import ( |
| 8 "fmt" |
| 9 "sync" |
| 10 "sync/atomic" |
| 11 |
| 12 "mojo/public/go/system" |
| 13 ) |
| 14 |
| 15 var waiter *asyncWaiterImpl |
| 16 var once sync.Once |
| 17 |
| 18 // GetAsyncWaiter returns a default implementation of |AsyncWaiter| interface. |
| 19 func GetAsyncWaiter() AsyncWaiter { |
| 20 once.Do(func() { |
| 21 waiter = newAsyncWaiter() |
| 22 }) |
| 23 return waiter |
| 24 } |
| 25 |
| 26 // AsyncWaitId is an id returned by |AsyncWait()| used to cancel it. |
| 27 type AsyncWaitId uint64 |
| 28 |
| 29 // WaitResponse is a struct sent to a channel waiting for |AsyncWait()| to |
| 30 // finish. It contains the same information as if |Wait()| was called on a |
| 31 // handle. |
| 32 type WaitResponse struct { |
| 33 Result system.MojoResult |
| 34 State system.MojoHandleSignalsState |
| 35 } |
| 36 |
| 37 // AsyncWaiter defines an interface for asynchronously waiting (and cancelling |
| 38 // asynchronous waits) on a handle. |
| 39 type AsyncWaiter interface { |
| 40 // AsyncWait asynchronously waits on a given handle until a signal |
| 41 // indicated by |signals| is satisfied or it becomes known that no |
| 42 // signal indicated by |signals| will ever be satisified. The wait |
| 43 // response will be sent to |responseChan|. |
| 44 // |
| 45 // |handle| must not be closed or transferred until the wait response |
| 46 // is received from |responseChan|. |
| 47 AsyncWait(handle system.Handle, signals system.MojoHandleSignals, respon
seChan chan<- WaitResponse) AsyncWaitId |
| 48 |
| 49 // CancelWait cancels an outstanding async wait (specified by |id|) |
| 50 // initiated by |AsyncWait()|. A response with Mojo result |
| 51 // |MOJO_RESULT_ABORTED| is sent to the corresponding |responseChan|. |
| 52 CancelWait(id AsyncWaitId) |
| 53 } |
| 54 |
| 55 // waitRequest is a struct sent to asyncWaiterWorker to add another handle to |
| 56 // the list of waiting handles. |
| 57 type waitRequest struct { |
| 58 handle system.Handle |
| 59 signals system.MojoHandleSignals |
| 60 |
| 61 // Used for |CancelWait()| calls. The worker should issue IDs so that |
| 62 // you can't cancel the wait until the worker received the wait request. |
| 63 idChan chan<- AsyncWaitId |
| 64 |
| 65 // A channel end to send wait results. |
| 66 responseChan chan<- WaitResponse |
| 67 } |
| 68 |
| 69 // asyncWaiterWorker does the actual work, in its own goroutine. It calls |
| 70 // |WaitMany()| on all provided handles. New handles a added via |waitChan| |
| 71 // and removed via |cancelChan| messages. To wake the worker asyncWaiterImpl |
| 72 // sends mojo messages to a dedicated message pipe, the other end of which has |
| 73 // index 0 in all slices of the worker. |
| 74 type asyncWaiterWorker struct { |
| 75 // |handles| and |signals| are used to make |WaitMany()| calls directly. |
| 76 // All these arrays should be operated simultaneously; i-th element |
| 77 // of each refers to i-th handle. |
| 78 handles []system.Handle |
| 79 signals []system.MojoHandleSignals |
| 80 asyncWaitIds []AsyncWaitId |
| 81 responses []chan<- WaitResponse |
| 82 |
| 83 // Flag shared between waiterImpl and worker that is 1 iff the worker is |
| 84 // already notified by waiterImpl. The worker sets it to 0 as soon as |
| 85 // |WaitMany()| succeeds. |
| 86 isNotified *int32 |
| 87 waitChan <-chan waitRequest // should have a non-empty buffer |
| 88 cancelChan <-chan AsyncWaitId // should have a non-empty buffer |
| 89 lastUsedId AsyncWaitId // is incremented each |AsyncWait()| call |
| 90 } |
| 91 |
| 92 // removeHandle removes handle at provided index without sending response by |
| 93 // swapping all information associated with index-th handle with the last one |
| 94 // and removing the last one. |
| 95 func (w *asyncWaiterWorker) removeHandle(index int) { |
| 96 l := len(w.handles) - 1 |
| 97 // Swap with the last and remove last. |
| 98 w.handles[index] = w.handles[l] |
| 99 w.handles = w.handles[0:l] |
| 100 w.signals[index] = w.signals[l] |
| 101 w.signals = w.signals[0:l] |
| 102 |
| 103 w.asyncWaitIds[index] = w.asyncWaitIds[l] |
| 104 w.asyncWaitIds = w.asyncWaitIds[0:l] |
| 105 w.responses[index] = w.responses[l] |
| 106 w.responses = w.responses[0:l] |
| 107 } |
| 108 |
| 109 // sendWaitResponseAndRemove send response to corresponding channel and removes |
| 110 // index-th waiting handle. |
| 111 func (w *asyncWaiterWorker) sendWaitResponseAndRemove(index int, result system.M
ojoResult, state system.MojoHandleSignalsState) { |
| 112 w.responses[index] <- WaitResponse{ |
| 113 result, |
| 114 state, |
| 115 } |
| 116 w.removeHandle(index) |
| 117 } |
| 118 |
| 119 // respondToSatisfiedWaits responds to all wait requests that have at least |
| 120 // one satisfied signal and removes them. |
| 121 func (w *asyncWaiterWorker) respondToSatisfiedWaits(states []system.MojoHandleSi
gnalsState) { |
| 122 // Don't touch handle at index 0 as it is the waking handle. |
| 123 for i := 1; i < len(states); { |
| 124 if (states[i].SatisfiedSignals & w.signals[i]) != 0 { |
| 125 // Respond and swap i-th with last and remove last. |
| 126 w.sendWaitResponseAndRemove(i, system.MOJO_RESULT_OK, st
ates[i]) |
| 127 // Swap i-th with last and remove last. |
| 128 states[i] = states[len(states)-1] |
| 129 states = states[:len(states)-1] |
| 130 } else { |
| 131 i++ |
| 132 } |
| 133 } |
| 134 } |
| 135 |
| 136 // processIncomingRequests processes all queued async wait or cancel requests |
| 137 // sent by asyncWaiterImpl. |
| 138 func (w *asyncWaiterWorker) processIncomingRequests() { |
| 139 for { |
| 140 select { |
| 141 case request := <-w.waitChan: |
| 142 w.handles = append(w.handles, request.handle) |
| 143 w.signals = append(w.signals, request.signals) |
| 144 w.responses = append(w.responses, request.responseChan) |
| 145 |
| 146 w.lastUsedId++ |
| 147 id := w.lastUsedId |
| 148 w.asyncWaitIds = append(w.asyncWaitIds, id) |
| 149 request.idChan <- id |
| 150 case AsyncWaitId := <-w.cancelChan: |
| 151 // Zero index is reserved for the waking message pipe ha
ndle. |
| 152 index := 0 |
| 153 for i := 1; i < len(w.asyncWaitIds); i++ { |
| 154 if w.asyncWaitIds[i] == AsyncWaitId { |
| 155 index = i |
| 156 break |
| 157 } |
| 158 } |
| 159 // Do nothing if the id was not found as wait response m
ay be |
| 160 // already sent if the async wait was successful. |
| 161 if index > 0 { |
| 162 w.sendWaitResponseAndRemove(index, system.MOJO_R
ESULT_ABORTED, system.MojoHandleSignalsState{}) |
| 163 } |
| 164 default: |
| 165 return |
| 166 } |
| 167 } |
| 168 } |
| 169 |
| 170 // runLoop run loop of the asyncWaiterWorker. Blocks on |WaitMany()|. If the |
| 171 // wait is interrupted by waking handle (index 0) then it means that the worker |
| 172 // was woken by waiterImpl, so the worker processes incoming requests from |
| 173 // waiterImpl; otherwise responses to corresponding wait request. |
| 174 func (w *asyncWaiterWorker) runLoop() { |
| 175 for { |
| 176 result, index, states := system.GetCore().WaitMany(w.handles, w.
signals, system.MOJO_DEADLINE_INDEFINITE) |
| 177 // Set flag to 0, so that the next incoming request to |
| 178 // waiterImpl would explicitly wake worker by sending a message |
| 179 // to waking message pipe. |
| 180 atomic.StoreInt32(w.isNotified, 0) |
| 181 if index == -1 { |
| 182 panic(fmt.Sprintf("error waiting on handles: %v", result
)) |
| 183 break |
| 184 } |
| 185 // Zero index means that the worker was signaled by asyncWaiterI
mpl. |
| 186 if index == 0 { |
| 187 if result != system.MOJO_RESULT_OK { |
| 188 panic(fmt.Sprintf("error waiting on waking handl
e: %v", result)) |
| 189 } |
| 190 w.handles[0].(system.MessagePipeHandle).ReadMessage(syst
em.MOJO_READ_MESSAGE_FLAG_NONE) |
| 191 w.processIncomingRequests() |
| 192 } else if result != system.MOJO_RESULT_OK { |
| 193 w.sendWaitResponseAndRemove(index, result, system.MojoHa
ndleSignalsState{}) |
| 194 } else { |
| 195 w.respondToSatisfiedWaits(states) |
| 196 } |
| 197 } |
| 198 } |
| 199 |
| 200 // asyncWaiterImpl is an implementation of |AsyncWaiter| interface. |
| 201 // Runs a worker in a separate goroutine and comunicates with it by sending a |
| 202 // message to |wakingHandle| to wake worker from |WaitMany()| call and |
| 203 // sending request via |waitChan| and |cancelChan|. |
| 204 type asyncWaiterImpl struct { |
| 205 wakingHandle system.MessagePipeHandle |
| 206 |
| 207 // Flag shared between waiterImpl and worker that is 1 iff the worker is |
| 208 // already notified by waiterImpl. The worker sets it to 0 as soon as |
| 209 // |WaitMany()| succeeds. |
| 210 isWorkerNotified *int32 |
| 211 waitChan chan<- waitRequest // should have a non-empty buffer |
| 212 cancelChan chan<- AsyncWaitId // should have a non-empty buffer |
| 213 } |
| 214 |
| 215 // newAsyncWaiter creates an asyncWaiterImpl and starts its worker goroutine. |
| 216 func newAsyncWaiter() *asyncWaiterImpl { |
| 217 result, h0, h1 := system.GetCore().CreateMessagePipe(nil) |
| 218 if result != system.MOJO_RESULT_OK { |
| 219 panic(fmt.Sprintf("can't create message pipe %v", result)) |
| 220 } |
| 221 waitChan := make(chan waitRequest, 10) |
| 222 cancelChan := make(chan AsyncWaitId, 10) |
| 223 isNotified := new(int32) |
| 224 worker := asyncWaiterWorker{ |
| 225 []system.Handle{h1}, |
| 226 []system.MojoHandleSignals{system.MOJO_HANDLE_SIGNAL_READABLE}, |
| 227 []AsyncWaitId{0}, |
| 228 []chan<- WaitResponse{make(chan WaitResponse)}, |
| 229 isNotified, |
| 230 waitChan, |
| 231 cancelChan, |
| 232 0, |
| 233 } |
| 234 go worker.runLoop() |
| 235 return &asyncWaiterImpl{ |
| 236 wakingHandle: h0, |
| 237 isWorkerNotified: isNotified, |
| 238 waitChan: waitChan, |
| 239 cancelChan: cancelChan, |
| 240 } |
| 241 } |
| 242 |
| 243 // wakeWorker wakes the worker from |WaitMany()| call. This should be called |
| 244 // after sending a message to |waitChan| or |cancelChan| to avoid deadlock. |
| 245 func (w *asyncWaiterImpl) wakeWorker() { |
| 246 if atomic.CompareAndSwapInt32(w.isWorkerNotified, 0, 1) { |
| 247 w.wakingHandle.WriteMessage([]byte{0}, nil, system.MOJO_WRITE_ME
SSAGE_FLAG_NONE) |
| 248 } |
| 249 } |
| 250 |
| 251 func (w *asyncWaiterImpl) AsyncWait(handle system.Handle, signals system.MojoHan
dleSignals, responseChan chan<- WaitResponse) AsyncWaitId { |
| 252 idChan := make(chan AsyncWaitId, 1) |
| 253 w.waitChan <- waitRequest{ |
| 254 handle, |
| 255 signals, |
| 256 idChan, |
| 257 responseChan, |
| 258 } |
| 259 w.wakeWorker() |
| 260 return <-idChan |
| 261 } |
| 262 |
| 263 func (w *asyncWaiterImpl) CancelWait(id AsyncWaitId) { |
| 264 w.cancelChan <- id |
| 265 w.wakeWorker() |
| 266 } |
| OLD | NEW |