OLD | NEW |
1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 package bindings | 5 package bindings |
6 | 6 |
7 import ( | 7 import ( |
8 "fmt" | 8 "fmt" |
| 9 "runtime" |
9 "sync" | 10 "sync" |
10 "sync/atomic" | 11 "sync/atomic" |
11 | 12 |
12 "mojo/public/go/system" | 13 "mojo/public/go/system" |
13 ) | 14 ) |
14 | 15 |
15 var defaultWaiter *asyncWaiterImpl | 16 var defaultWaiter *asyncWaiterImpl |
16 var once sync.Once | 17 var once sync.Once |
17 | 18 |
18 // GetAsyncWaiter returns a default implementation of |AsyncWaiter| interface. | 19 // GetAsyncWaiter returns a default implementation of |AsyncWaiter| interface. |
(...skipping 60 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
79 signals []system.MojoHandleSignals | 80 signals []system.MojoHandleSignals |
80 asyncWaitIds []AsyncWaitId | 81 asyncWaitIds []AsyncWaitId |
81 responses []chan<- WaitResponse | 82 responses []chan<- WaitResponse |
82 | 83 |
83 // Flag shared between waiterImpl and worker that is 1 iff the worker is | 84 // Flag shared between waiterImpl and worker that is 1 iff the worker is |
84 // already notified by waiterImpl. The worker sets it to 0 as soon as | 85 // already notified by waiterImpl. The worker sets it to 0 as soon as |
85 // |WaitMany()| succeeds. | 86 // |WaitMany()| succeeds. |
86 isNotified *int32 | 87 isNotified *int32 |
87 waitChan <-chan waitRequest // should have a non-empty buffer | 88 waitChan <-chan waitRequest // should have a non-empty buffer |
88 cancelChan <-chan AsyncWaitId // should have a non-empty buffer | 89 cancelChan <-chan AsyncWaitId // should have a non-empty buffer |
89 » ids Counter // is incremented each |AsyncWait()| call | 90 » ids uint64 // is incremented each |AsyncWait()| call |
90 } | 91 } |
91 | 92 |
92 // removeHandle removes handle at provided index without sending response by | 93 // removeHandle removes handle at provided index without sending response by |
93 // swapping all information associated with index-th handle with the last one | 94 // swapping all information associated with index-th handle with the last one |
94 // and removing the last one. | 95 // and removing the last one. |
95 func (w *asyncWaiterWorker) removeHandle(index int) { | 96 func (w *asyncWaiterWorker) removeHandle(index int) { |
96 l := len(w.handles) - 1 | 97 l := len(w.handles) - 1 |
97 // Swap with the last and remove last. | 98 // Swap with the last and remove last. |
98 w.handles[index] = w.handles[l] | 99 w.handles[index] = w.handles[l] |
99 w.handles = w.handles[0:l] | 100 w.handles = w.handles[0:l] |
(...skipping 36 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
136 // processIncomingRequests processes all queued async wait or cancel requests | 137 // processIncomingRequests processes all queued async wait or cancel requests |
137 // sent by asyncWaiterImpl. | 138 // sent by asyncWaiterImpl. |
138 func (w *asyncWaiterWorker) processIncomingRequests() { | 139 func (w *asyncWaiterWorker) processIncomingRequests() { |
139 for { | 140 for { |
140 select { | 141 select { |
141 case request := <-w.waitChan: | 142 case request := <-w.waitChan: |
142 w.handles = append(w.handles, request.handle) | 143 w.handles = append(w.handles, request.handle) |
143 w.signals = append(w.signals, request.signals) | 144 w.signals = append(w.signals, request.signals) |
144 w.responses = append(w.responses, request.responseChan) | 145 w.responses = append(w.responses, request.responseChan) |
145 | 146 |
146 » » » id := AsyncWaitId(w.ids.Next()) | 147 » » » w.ids++ |
| 148 » » » id := AsyncWaitId(w.ids) |
147 w.asyncWaitIds = append(w.asyncWaitIds, id) | 149 w.asyncWaitIds = append(w.asyncWaitIds, id) |
148 request.idChan <- id | 150 request.idChan <- id |
149 case AsyncWaitId := <-w.cancelChan: | 151 case AsyncWaitId := <-w.cancelChan: |
150 // Zero index is reserved for the waking message pipe ha
ndle. | 152 // Zero index is reserved for the waking message pipe ha
ndle. |
151 index := 0 | 153 index := 0 |
152 for i := 1; i < len(w.asyncWaitIds); i++ { | 154 for i := 1; i < len(w.asyncWaitIds); i++ { |
153 if w.asyncWaitIds[i] == AsyncWaitId { | 155 if w.asyncWaitIds[i] == AsyncWaitId { |
154 index = i | 156 index = i |
155 break | 157 break |
156 } | 158 } |
(...skipping 47 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
204 wakingHandle system.MessagePipeHandle | 206 wakingHandle system.MessagePipeHandle |
205 | 207 |
206 // Flag shared between waiterImpl and worker that is 1 iff the worker is | 208 // Flag shared between waiterImpl and worker that is 1 iff the worker is |
207 // already notified by waiterImpl. The worker sets it to 0 as soon as | 209 // already notified by waiterImpl. The worker sets it to 0 as soon as |
208 // |WaitMany()| succeeds. | 210 // |WaitMany()| succeeds. |
209 isWorkerNotified *int32 | 211 isWorkerNotified *int32 |
210 waitChan chan<- waitRequest // should have a non-empty buffer | 212 waitChan chan<- waitRequest // should have a non-empty buffer |
211 cancelChan chan<- AsyncWaitId // should have a non-empty buffer | 213 cancelChan chan<- AsyncWaitId // should have a non-empty buffer |
212 } | 214 } |
213 | 215 |
| 216 func finalizeWorker(worker *asyncWaiterWorker) { |
| 217 // Close waking handle on worker side. |
| 218 worker.handles[0].Close() |
| 219 } |
| 220 |
| 221 func finalizeAsyncWaiter(waiter *asyncWaiterImpl) { |
| 222 waiter.wakingHandle.Close() |
| 223 } |
| 224 |
214 // newAsyncWaiter creates an asyncWaiterImpl and starts its worker goroutine. | 225 // newAsyncWaiter creates an asyncWaiterImpl and starts its worker goroutine. |
215 func newAsyncWaiter() *asyncWaiterImpl { | 226 func newAsyncWaiter() *asyncWaiterImpl { |
216 result, h0, h1 := system.GetCore().CreateMessagePipe(nil) | 227 result, h0, h1 := system.GetCore().CreateMessagePipe(nil) |
217 if result != system.MOJO_RESULT_OK { | 228 if result != system.MOJO_RESULT_OK { |
218 panic(fmt.Sprintf("can't create message pipe %v", result)) | 229 panic(fmt.Sprintf("can't create message pipe %v", result)) |
219 } | 230 } |
220 waitChan := make(chan waitRequest, 10) | 231 waitChan := make(chan waitRequest, 10) |
221 cancelChan := make(chan AsyncWaitId, 10) | 232 cancelChan := make(chan AsyncWaitId, 10) |
222 isNotified := new(int32) | 233 isNotified := new(int32) |
223 » worker := asyncWaiterWorker{ | 234 » worker := &asyncWaiterWorker{ |
224 []system.Handle{h1}, | 235 []system.Handle{h1}, |
225 []system.MojoHandleSignals{system.MOJO_HANDLE_SIGNAL_READABLE}, | 236 []system.MojoHandleSignals{system.MOJO_HANDLE_SIGNAL_READABLE}, |
226 []AsyncWaitId{0}, | 237 []AsyncWaitId{0}, |
227 []chan<- WaitResponse{make(chan WaitResponse)}, | 238 []chan<- WaitResponse{make(chan WaitResponse)}, |
228 isNotified, | 239 isNotified, |
229 waitChan, | 240 waitChan, |
230 cancelChan, | 241 cancelChan, |
231 » » Counter{}, | 242 » » 0, |
232 } | 243 } |
| 244 runtime.SetFinalizer(worker, finalizeWorker) |
233 go worker.runLoop() | 245 go worker.runLoop() |
234 » return &asyncWaiterImpl{ | 246 » waiter := &asyncWaiterImpl{ |
235 wakingHandle: h0, | 247 wakingHandle: h0, |
236 isWorkerNotified: isNotified, | 248 isWorkerNotified: isNotified, |
237 waitChan: waitChan, | 249 waitChan: waitChan, |
238 cancelChan: cancelChan, | 250 cancelChan: cancelChan, |
239 } | 251 } |
| 252 runtime.SetFinalizer(waiter, finalizeAsyncWaiter) |
| 253 return waiter |
240 } | 254 } |
241 | 255 |
242 // wakeWorker wakes the worker from |WaitMany()| call. This should be called | 256 // wakeWorker wakes the worker from |WaitMany()| call. This should be called |
243 // after sending a message to |waitChan| or |cancelChan| to avoid deadlock. | 257 // after sending a message to |waitChan| or |cancelChan| to avoid deadlock. |
244 func (w *asyncWaiterImpl) wakeWorker() { | 258 func (w *asyncWaiterImpl) wakeWorker() { |
245 if atomic.CompareAndSwapInt32(w.isWorkerNotified, 0, 1) { | 259 if atomic.CompareAndSwapInt32(w.isWorkerNotified, 0, 1) { |
246 result := w.wakingHandle.WriteMessage([]byte{0}, nil, system.MOJ
O_WRITE_MESSAGE_FLAG_NONE) | 260 result := w.wakingHandle.WriteMessage([]byte{0}, nil, system.MOJ
O_WRITE_MESSAGE_FLAG_NONE) |
247 if result != system.MOJO_RESULT_OK { | 261 if result != system.MOJO_RESULT_OK { |
248 panic("can't write to a message pipe") | 262 panic("can't write to a message pipe") |
249 } | 263 } |
250 } | 264 } |
251 } | 265 } |
252 | 266 |
253 func (w *asyncWaiterImpl) AsyncWait(handle system.Handle, signals system.MojoHan
dleSignals, responseChan chan<- WaitResponse) AsyncWaitId { | 267 func (w *asyncWaiterImpl) AsyncWait(handle system.Handle, signals system.MojoHan
dleSignals, responseChan chan<- WaitResponse) AsyncWaitId { |
254 idChan := make(chan AsyncWaitId, 1) | 268 idChan := make(chan AsyncWaitId, 1) |
255 w.waitChan <- waitRequest{ | 269 w.waitChan <- waitRequest{ |
256 handle, | 270 handle, |
257 signals, | 271 signals, |
258 idChan, | 272 idChan, |
259 responseChan, | 273 responseChan, |
260 } | 274 } |
261 w.wakeWorker() | 275 w.wakeWorker() |
262 return <-idChan | 276 return <-idChan |
263 } | 277 } |
264 | 278 |
265 func (w *asyncWaiterImpl) CancelWait(id AsyncWaitId) { | 279 func (w *asyncWaiterImpl) CancelWait(id AsyncWaitId) { |
266 w.cancelChan <- id | 280 w.cancelChan <- id |
267 w.wakeWorker() | 281 w.wakeWorker() |
268 } | 282 } |
OLD | NEW |