| OLD | NEW |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 package bindings | 5 package bindings |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "fmt" | 8 "fmt" |
| 9 "runtime" |
| 9 "sync" | 10 "sync" |
| 10 "sync/atomic" | 11 "sync/atomic" |
| 11 | 12 |
| 12 "mojo/public/go/system" | 13 "mojo/public/go/system" |
| 13 ) | 14 ) |
| 14 | 15 |
| 15 var defaultWaiter *asyncWaiterImpl | 16 var defaultWaiter *asyncWaiterImpl |
| 16 var once sync.Once | 17 var once sync.Once |
| 17 | 18 |
| 18 // GetAsyncWaiter returns a default implementation of |AsyncWaiter| interface. | 19 // GetAsyncWaiter returns a default implementation of |AsyncWaiter| interface. |
| (...skipping 60 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 79 signals []system.MojoHandleSignals | 80 signals []system.MojoHandleSignals |
| 80 asyncWaitIds []AsyncWaitId | 81 asyncWaitIds []AsyncWaitId |
| 81 responses []chan<- WaitResponse | 82 responses []chan<- WaitResponse |
| 82 | 83 |
| 83 // Flag shared between waiterImpl and worker that is 1 iff the worker is | 84 // Flag shared between waiterImpl and worker that is 1 iff the worker is |
| 84 // already notified by waiterImpl. The worker sets it to 0 as soon as | 85 // already notified by waiterImpl. The worker sets it to 0 as soon as |
| 85 // |WaitMany()| succeeds. | 86 // |WaitMany()| succeeds. |
| 86 isNotified *int32 | 87 isNotified *int32 |
| 87 waitChan <-chan waitRequest // should have a non-empty buffer | 88 waitChan <-chan waitRequest // should have a non-empty buffer |
| 88 cancelChan <-chan AsyncWaitId // should have a non-empty buffer | 89 cancelChan <-chan AsyncWaitId // should have a non-empty buffer |
| 89 » ids Counter // is incremented each |AsyncWait()| call | 90 » ids uint64 // is incremented each |AsyncWait()| call |
| 90 } | 91 } |
| 91 | 92 |
| 92 // removeHandle removes handle at provided index without sending response by | 93 // removeHandle removes handle at provided index without sending response by |
| 93 // swapping all information associated with index-th handle with the last one | 94 // swapping all information associated with index-th handle with the last one |
| 94 // and removing the last one. | 95 // and removing the last one. |
| 95 func (w *asyncWaiterWorker) removeHandle(index int) { | 96 func (w *asyncWaiterWorker) removeHandle(index int) { |
| 96 l := len(w.handles) - 1 | 97 l := len(w.handles) - 1 |
| 97 // Swap with the last and remove last. | 98 // Swap with the last and remove last. |
| 98 w.handles[index] = w.handles[l] | 99 w.handles[index] = w.handles[l] |
| 99 w.handles = w.handles[0:l] | 100 w.handles = w.handles[0:l] |
| (...skipping 36 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 136 // processIncomingRequests processes all queued async wait or cancel requests | 137 // processIncomingRequests processes all queued async wait or cancel requests |
| 137 // sent by asyncWaiterImpl. | 138 // sent by asyncWaiterImpl. |
| 138 func (w *asyncWaiterWorker) processIncomingRequests() { | 139 func (w *asyncWaiterWorker) processIncomingRequests() { |
| 139 for { | 140 for { |
| 140 select { | 141 select { |
| 141 case request := <-w.waitChan: | 142 case request := <-w.waitChan: |
| 142 w.handles = append(w.handles, request.handle) | 143 w.handles = append(w.handles, request.handle) |
| 143 w.signals = append(w.signals, request.signals) | 144 w.signals = append(w.signals, request.signals) |
| 144 w.responses = append(w.responses, request.responseChan) | 145 w.responses = append(w.responses, request.responseChan) |
| 145 | 146 |
| 146 » » » id := AsyncWaitId(w.ids.Next()) | 147 » » » w.ids++ |
| 148 » » » id := AsyncWaitId(w.ids) |
| 147 w.asyncWaitIds = append(w.asyncWaitIds, id) | 149 w.asyncWaitIds = append(w.asyncWaitIds, id) |
| 148 request.idChan <- id | 150 request.idChan <- id |
| 149 case AsyncWaitId := <-w.cancelChan: | 151 case AsyncWaitId := <-w.cancelChan: |
| 150 // Zero index is reserved for the waking message pipe ha
ndle. | 152 // Zero index is reserved for the waking message pipe ha
ndle. |
| 151 index := 0 | 153 index := 0 |
| 152 for i := 1; i < len(w.asyncWaitIds); i++ { | 154 for i := 1; i < len(w.asyncWaitIds); i++ { |
| 153 if w.asyncWaitIds[i] == AsyncWaitId { | 155 if w.asyncWaitIds[i] == AsyncWaitId { |
| 154 index = i | 156 index = i |
| 155 break | 157 break |
| 156 } | 158 } |
| (...skipping 47 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 204 wakingHandle system.MessagePipeHandle | 206 wakingHandle system.MessagePipeHandle |
| 205 | 207 |
| 206 // Flag shared between waiterImpl and worker that is 1 iff the worker is | 208 // Flag shared between waiterImpl and worker that is 1 iff the worker is |
| 207 // already notified by waiterImpl. The worker sets it to 0 as soon as | 209 // already notified by waiterImpl. The worker sets it to 0 as soon as |
| 208 // |WaitMany()| succeeds. | 210 // |WaitMany()| succeeds. |
| 209 isWorkerNotified *int32 | 211 isWorkerNotified *int32 |
| 210 waitChan chan<- waitRequest // should have a non-empty buffer | 212 waitChan chan<- waitRequest // should have a non-empty buffer |
| 211 cancelChan chan<- AsyncWaitId // should have a non-empty buffer | 213 cancelChan chan<- AsyncWaitId // should have a non-empty buffer |
| 212 } | 214 } |
| 213 | 215 |
| 216 func finalizeWorker(worker *asyncWaiterWorker) { |
| 217 // Close waking handle on worker side. |
| 218 worker.handles[0].Close() |
| 219 } |
| 220 |
| 221 func finalizeAsyncWaiter(waiter *asyncWaiterImpl) { |
| 222 waiter.wakingHandle.Close() |
| 223 } |
| 224 |
| 214 // newAsyncWaiter creates an asyncWaiterImpl and starts its worker goroutine. | 225 // newAsyncWaiter creates an asyncWaiterImpl and starts its worker goroutine. |
| 215 func newAsyncWaiter() *asyncWaiterImpl { | 226 func newAsyncWaiter() *asyncWaiterImpl { |
| 216 result, h0, h1 := system.GetCore().CreateMessagePipe(nil) | 227 result, h0, h1 := system.GetCore().CreateMessagePipe(nil) |
| 217 if result != system.MOJO_RESULT_OK { | 228 if result != system.MOJO_RESULT_OK { |
| 218 panic(fmt.Sprintf("can't create message pipe %v", result)) | 229 panic(fmt.Sprintf("can't create message pipe %v", result)) |
| 219 } | 230 } |
| 220 waitChan := make(chan waitRequest, 10) | 231 waitChan := make(chan waitRequest, 10) |
| 221 cancelChan := make(chan AsyncWaitId, 10) | 232 cancelChan := make(chan AsyncWaitId, 10) |
| 222 isNotified := new(int32) | 233 isNotified := new(int32) |
| 223 » worker := asyncWaiterWorker{ | 234 » worker := &asyncWaiterWorker{ |
| 224 []system.Handle{h1}, | 235 []system.Handle{h1}, |
| 225 []system.MojoHandleSignals{system.MOJO_HANDLE_SIGNAL_READABLE}, | 236 []system.MojoHandleSignals{system.MOJO_HANDLE_SIGNAL_READABLE}, |
| 226 []AsyncWaitId{0}, | 237 []AsyncWaitId{0}, |
| 227 []chan<- WaitResponse{make(chan WaitResponse)}, | 238 []chan<- WaitResponse{make(chan WaitResponse)}, |
| 228 isNotified, | 239 isNotified, |
| 229 waitChan, | 240 waitChan, |
| 230 cancelChan, | 241 cancelChan, |
| 231 » » Counter{}, | 242 » » 0, |
| 232 } | 243 } |
| 244 runtime.SetFinalizer(worker, finalizeWorker) |
| 233 go worker.runLoop() | 245 go worker.runLoop() |
| 234 » return &asyncWaiterImpl{ | 246 » waiter := &asyncWaiterImpl{ |
| 235 wakingHandle: h0, | 247 wakingHandle: h0, |
| 236 isWorkerNotified: isNotified, | 248 isWorkerNotified: isNotified, |
| 237 waitChan: waitChan, | 249 waitChan: waitChan, |
| 238 cancelChan: cancelChan, | 250 cancelChan: cancelChan, |
| 239 } | 251 } |
| 252 runtime.SetFinalizer(waiter, finalizeAsyncWaiter) |
| 253 return waiter |
| 240 } | 254 } |
| 241 | 255 |
| 242 // wakeWorker wakes the worker from |WaitMany()| call. This should be called | 256 // wakeWorker wakes the worker from |WaitMany()| call. This should be called |
| 243 // after sending a message to |waitChan| or |cancelChan| to avoid deadlock. | 257 // after sending a message to |waitChan| or |cancelChan| to avoid deadlock. |
| 244 func (w *asyncWaiterImpl) wakeWorker() { | 258 func (w *asyncWaiterImpl) wakeWorker() { |
| 245 if atomic.CompareAndSwapInt32(w.isWorkerNotified, 0, 1) { | 259 if atomic.CompareAndSwapInt32(w.isWorkerNotified, 0, 1) { |
| 246 result := w.wakingHandle.WriteMessage([]byte{0}, nil, system.MOJ
O_WRITE_MESSAGE_FLAG_NONE) | 260 result := w.wakingHandle.WriteMessage([]byte{0}, nil, system.MOJ
O_WRITE_MESSAGE_FLAG_NONE) |
| 247 if result != system.MOJO_RESULT_OK { | 261 if result != system.MOJO_RESULT_OK { |
| 248 panic("can't write to a message pipe") | 262 panic("can't write to a message pipe") |
| 249 } | 263 } |
| 250 } | 264 } |
| 251 } | 265 } |
| 252 | 266 |
| 253 func (w *asyncWaiterImpl) AsyncWait(handle system.Handle, signals system.MojoHan
dleSignals, responseChan chan<- WaitResponse) AsyncWaitId { | 267 func (w *asyncWaiterImpl) AsyncWait(handle system.Handle, signals system.MojoHan
dleSignals, responseChan chan<- WaitResponse) AsyncWaitId { |
| 254 idChan := make(chan AsyncWaitId, 1) | 268 idChan := make(chan AsyncWaitId, 1) |
| 255 w.waitChan <- waitRequest{ | 269 w.waitChan <- waitRequest{ |
| 256 handle, | 270 handle, |
| 257 signals, | 271 signals, |
| 258 idChan, | 272 idChan, |
| 259 responseChan, | 273 responseChan, |
| 260 } | 274 } |
| 261 w.wakeWorker() | 275 w.wakeWorker() |
| 262 return <-idChan | 276 return <-idChan |
| 263 } | 277 } |
| 264 | 278 |
| 265 func (w *asyncWaiterImpl) CancelWait(id AsyncWaitId) { | 279 func (w *asyncWaiterImpl) CancelWait(id AsyncWaitId) { |
| 266 w.cancelChan <- id | 280 w.cancelChan <- id |
| 267 w.wakeWorker() | 281 w.wakeWorker() |
| 268 } | 282 } |
| OLD | NEW |