Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(175)

Side by Side Diff: third_party/mojo/src/mojo/public/go/bindings/async_waiter.go

Issue 1019173002: Update mojo sdk to rev 7214b7ec7d27563b2666afad86cf1c5895c56c18 (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Keep permission service alive if embedder drops requests Created 5 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 package bindings 5 package bindings
6 6
7 import ( 7 import (
8 "fmt" 8 "fmt"
9 "runtime"
9 "sync" 10 "sync"
10 "sync/atomic" 11 "sync/atomic"
11 12
12 "mojo/public/go/system" 13 "mojo/public/go/system"
13 ) 14 )
14 15
15 var defaultWaiter *asyncWaiterImpl 16 var defaultWaiter *asyncWaiterImpl
16 var once sync.Once 17 var once sync.Once
17 18
18 // GetAsyncWaiter returns a default implementation of |AsyncWaiter| interface. 19 // GetAsyncWaiter returns a default implementation of |AsyncWaiter| interface.
(...skipping 60 matching lines...) Expand 10 before | Expand all | Expand 10 after
79 signals []system.MojoHandleSignals 80 signals []system.MojoHandleSignals
80 asyncWaitIds []AsyncWaitId 81 asyncWaitIds []AsyncWaitId
81 responses []chan<- WaitResponse 82 responses []chan<- WaitResponse
82 83
83 // Flag shared between waiterImpl and worker that is 1 iff the worker is 84 // Flag shared between waiterImpl and worker that is 1 iff the worker is
84 // already notified by waiterImpl. The worker sets it to 0 as soon as 85 // already notified by waiterImpl. The worker sets it to 0 as soon as
85 // |WaitMany()| succeeds. 86 // |WaitMany()| succeeds.
86 isNotified *int32 87 isNotified *int32
87 waitChan <-chan waitRequest // should have a non-empty buffer 88 waitChan <-chan waitRequest // should have a non-empty buffer
88 cancelChan <-chan AsyncWaitId // should have a non-empty buffer 89 cancelChan <-chan AsyncWaitId // should have a non-empty buffer
89 » ids Counter // is incremented each |AsyncWait()| call 90 » ids uint64 // is incremented each |AsyncWait()| call
90 } 91 }
91 92
92 // removeHandle removes handle at provided index without sending response by 93 // removeHandle removes handle at provided index without sending response by
93 // swapping all information associated with index-th handle with the last one 94 // swapping all information associated with index-th handle with the last one
94 // and removing the last one. 95 // and removing the last one.
95 func (w *asyncWaiterWorker) removeHandle(index int) { 96 func (w *asyncWaiterWorker) removeHandle(index int) {
96 l := len(w.handles) - 1 97 l := len(w.handles) - 1
97 // Swap with the last and remove last. 98 // Swap with the last and remove last.
98 w.handles[index] = w.handles[l] 99 w.handles[index] = w.handles[l]
99 w.handles = w.handles[0:l] 100 w.handles = w.handles[0:l]
(...skipping 36 matching lines...) Expand 10 before | Expand all | Expand 10 after
136 // processIncomingRequests processes all queued async wait or cancel requests 137 // processIncomingRequests processes all queued async wait or cancel requests
137 // sent by asyncWaiterImpl. 138 // sent by asyncWaiterImpl.
138 func (w *asyncWaiterWorker) processIncomingRequests() { 139 func (w *asyncWaiterWorker) processIncomingRequests() {
139 for { 140 for {
140 select { 141 select {
141 case request := <-w.waitChan: 142 case request := <-w.waitChan:
142 w.handles = append(w.handles, request.handle) 143 w.handles = append(w.handles, request.handle)
143 w.signals = append(w.signals, request.signals) 144 w.signals = append(w.signals, request.signals)
144 w.responses = append(w.responses, request.responseChan) 145 w.responses = append(w.responses, request.responseChan)
145 146
146 » » » id := AsyncWaitId(w.ids.Next()) 147 » » » w.ids++
148 » » » id := AsyncWaitId(w.ids)
147 w.asyncWaitIds = append(w.asyncWaitIds, id) 149 w.asyncWaitIds = append(w.asyncWaitIds, id)
148 request.idChan <- id 150 request.idChan <- id
149 case AsyncWaitId := <-w.cancelChan: 151 case AsyncWaitId := <-w.cancelChan:
150 // Zero index is reserved for the waking message pipe ha ndle. 152 // Zero index is reserved for the waking message pipe ha ndle.
151 index := 0 153 index := 0
152 for i := 1; i < len(w.asyncWaitIds); i++ { 154 for i := 1; i < len(w.asyncWaitIds); i++ {
153 if w.asyncWaitIds[i] == AsyncWaitId { 155 if w.asyncWaitIds[i] == AsyncWaitId {
154 index = i 156 index = i
155 break 157 break
156 } 158 }
(...skipping 47 matching lines...) Expand 10 before | Expand all | Expand 10 after
204 wakingHandle system.MessagePipeHandle 206 wakingHandle system.MessagePipeHandle
205 207
206 // Flag shared between waiterImpl and worker that is 1 iff the worker is 208 // Flag shared between waiterImpl and worker that is 1 iff the worker is
207 // already notified by waiterImpl. The worker sets it to 0 as soon as 209 // already notified by waiterImpl. The worker sets it to 0 as soon as
208 // |WaitMany()| succeeds. 210 // |WaitMany()| succeeds.
209 isWorkerNotified *int32 211 isWorkerNotified *int32
210 waitChan chan<- waitRequest // should have a non-empty buffer 212 waitChan chan<- waitRequest // should have a non-empty buffer
211 cancelChan chan<- AsyncWaitId // should have a non-empty buffer 213 cancelChan chan<- AsyncWaitId // should have a non-empty buffer
212 } 214 }
213 215
216 func finalizeWorker(worker *asyncWaiterWorker) {
217 // Close waking handle on worker side.
218 worker.handles[0].Close()
219 }
220
221 func finalizeAsyncWaiter(waiter *asyncWaiterImpl) {
222 waiter.wakingHandle.Close()
223 }
224
214 // newAsyncWaiter creates an asyncWaiterImpl and starts its worker goroutine. 225 // newAsyncWaiter creates an asyncWaiterImpl and starts its worker goroutine.
215 func newAsyncWaiter() *asyncWaiterImpl { 226 func newAsyncWaiter() *asyncWaiterImpl {
216 result, h0, h1 := system.GetCore().CreateMessagePipe(nil) 227 result, h0, h1 := system.GetCore().CreateMessagePipe(nil)
217 if result != system.MOJO_RESULT_OK { 228 if result != system.MOJO_RESULT_OK {
218 panic(fmt.Sprintf("can't create message pipe %v", result)) 229 panic(fmt.Sprintf("can't create message pipe %v", result))
219 } 230 }
220 waitChan := make(chan waitRequest, 10) 231 waitChan := make(chan waitRequest, 10)
221 cancelChan := make(chan AsyncWaitId, 10) 232 cancelChan := make(chan AsyncWaitId, 10)
222 isNotified := new(int32) 233 isNotified := new(int32)
223 » worker := asyncWaiterWorker{ 234 » worker := &asyncWaiterWorker{
224 []system.Handle{h1}, 235 []system.Handle{h1},
225 []system.MojoHandleSignals{system.MOJO_HANDLE_SIGNAL_READABLE}, 236 []system.MojoHandleSignals{system.MOJO_HANDLE_SIGNAL_READABLE},
226 []AsyncWaitId{0}, 237 []AsyncWaitId{0},
227 []chan<- WaitResponse{make(chan WaitResponse)}, 238 []chan<- WaitResponse{make(chan WaitResponse)},
228 isNotified, 239 isNotified,
229 waitChan, 240 waitChan,
230 cancelChan, 241 cancelChan,
231 » » Counter{}, 242 » » 0,
232 } 243 }
244 runtime.SetFinalizer(worker, finalizeWorker)
233 go worker.runLoop() 245 go worker.runLoop()
234 » return &asyncWaiterImpl{ 246 » waiter := &asyncWaiterImpl{
235 wakingHandle: h0, 247 wakingHandle: h0,
236 isWorkerNotified: isNotified, 248 isWorkerNotified: isNotified,
237 waitChan: waitChan, 249 waitChan: waitChan,
238 cancelChan: cancelChan, 250 cancelChan: cancelChan,
239 } 251 }
252 runtime.SetFinalizer(waiter, finalizeAsyncWaiter)
253 return waiter
240 } 254 }
241 255
242 // wakeWorker wakes the worker from |WaitMany()| call. This should be called 256 // wakeWorker wakes the worker from |WaitMany()| call. This should be called
243 // after sending a message to |waitChan| or |cancelChan| to avoid deadlock. 257 // after sending a message to |waitChan| or |cancelChan| to avoid deadlock.
244 func (w *asyncWaiterImpl) wakeWorker() { 258 func (w *asyncWaiterImpl) wakeWorker() {
245 if atomic.CompareAndSwapInt32(w.isWorkerNotified, 0, 1) { 259 if atomic.CompareAndSwapInt32(w.isWorkerNotified, 0, 1) {
246 result := w.wakingHandle.WriteMessage([]byte{0}, nil, system.MOJ O_WRITE_MESSAGE_FLAG_NONE) 260 result := w.wakingHandle.WriteMessage([]byte{0}, nil, system.MOJ O_WRITE_MESSAGE_FLAG_NONE)
247 if result != system.MOJO_RESULT_OK { 261 if result != system.MOJO_RESULT_OK {
248 panic("can't write to a message pipe") 262 panic("can't write to a message pipe")
249 } 263 }
250 } 264 }
251 } 265 }
252 266
253 func (w *asyncWaiterImpl) AsyncWait(handle system.Handle, signals system.MojoHan dleSignals, responseChan chan<- WaitResponse) AsyncWaitId { 267 func (w *asyncWaiterImpl) AsyncWait(handle system.Handle, signals system.MojoHan dleSignals, responseChan chan<- WaitResponse) AsyncWaitId {
254 idChan := make(chan AsyncWaitId, 1) 268 idChan := make(chan AsyncWaitId, 1)
255 w.waitChan <- waitRequest{ 269 w.waitChan <- waitRequest{
256 handle, 270 handle,
257 signals, 271 signals,
258 idChan, 272 idChan,
259 responseChan, 273 responseChan,
260 } 274 }
261 w.wakeWorker() 275 w.wakeWorker()
262 return <-idChan 276 return <-idChan
263 } 277 }
264 278
265 func (w *asyncWaiterImpl) CancelWait(id AsyncWaitId) { 279 func (w *asyncWaiterImpl) CancelWait(id AsyncWaitId) {
266 w.cancelChan <- id 280 w.cancelChan <- id
267 w.wakeWorker() 281 w.wakeWorker()
268 } 282 }
OLDNEW
« no previous file with comments | « third_party/mojo/src/mojo/public/dart/src/types.dart ('k') | third_party/mojo/src/mojo/public/go/bindings/decoder.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698