Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(37)

Unified Diff: third_party/mojo/src/mojo/public/go/bindings/async_waiter.go

Issue 954643002: Update mojo sdk to rev 3d23dae011859a2aae49f1d1adde705c8e85d819 (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: use run_renderer_in_process() Created 5 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: third_party/mojo/src/mojo/public/go/bindings/async_waiter.go
diff --git a/third_party/mojo/src/mojo/public/go/bindings/async_waiter.go b/third_party/mojo/src/mojo/public/go/bindings/async_waiter.go
new file mode 100644
index 0000000000000000000000000000000000000000..15fd82278c01ade60dd13ece79c5655ccbcdc62e
--- /dev/null
+++ b/third_party/mojo/src/mojo/public/go/bindings/async_waiter.go
@@ -0,0 +1,266 @@
+// Copyright 2015 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+package bindings
+
+import (
+ "fmt"
+ "sync"
+ "sync/atomic"
+
+ "mojo/public/go/system"
+)
+
+var waiter *asyncWaiterImpl
+var once sync.Once
+
+// GetAsyncWaiter returns a default implementation of |AsyncWaiter| interface.
+func GetAsyncWaiter() AsyncWaiter {
+ once.Do(func() {
+ waiter = newAsyncWaiter()
+ })
+ return waiter
+}
+
+// AsyncWaitId is an id returned by |AsyncWait()| used to cancel it.
+type AsyncWaitId uint64
+
+// WaitResponse is a struct sent to a channel waiting for |AsyncWait()| to
+// finish. It contains the same information as if |Wait()| was called on a
+// handle.
+type WaitResponse struct {
+ Result system.MojoResult
+ State system.MojoHandleSignalsState
+}
+
+// AsyncWaiter defines an interface for asynchronously waiting (and cancelling
+// asynchronous waits) on a handle.
+type AsyncWaiter interface {
+ // AsyncWait asynchronously waits on a given handle until a signal
+ // indicated by |signals| is satisfied or it becomes known that no
+ // signal indicated by |signals| will ever be satisified. The wait
+ // response will be sent to |responseChan|.
+ //
+ // |handle| must not be closed or transferred until the wait response
+ // is received from |responseChan|.
+ AsyncWait(handle system.Handle, signals system.MojoHandleSignals, responseChan chan<- WaitResponse) AsyncWaitId
+
+ // CancelWait cancels an outstanding async wait (specified by |id|)
+ // initiated by |AsyncWait()|. A response with Mojo result
+ // |MOJO_RESULT_ABORTED| is sent to the corresponding |responseChan|.
+ CancelWait(id AsyncWaitId)
+}
+
+// waitRequest is a struct sent to asyncWaiterWorker to add another handle to
+// the list of waiting handles.
+type waitRequest struct {
+ handle system.Handle
+ signals system.MojoHandleSignals
+
+ // Used for |CancelWait()| calls. The worker should issue IDs so that
+ // you can't cancel the wait until the worker received the wait request.
+ idChan chan<- AsyncWaitId
+
+ // A channel end to send wait results.
+ responseChan chan<- WaitResponse
+}
+
+// asyncWaiterWorker does the actual work, in its own goroutine. It calls
+// |WaitMany()| on all provided handles. New handles a added via |waitChan|
+// and removed via |cancelChan| messages. To wake the worker asyncWaiterImpl
+// sends mojo messages to a dedicated message pipe, the other end of which has
+// index 0 in all slices of the worker.
+type asyncWaiterWorker struct {
+ // |handles| and |signals| are used to make |WaitMany()| calls directly.
+ // All these arrays should be operated simultaneously; i-th element
+ // of each refers to i-th handle.
+ handles []system.Handle
+ signals []system.MojoHandleSignals
+ asyncWaitIds []AsyncWaitId
+ responses []chan<- WaitResponse
+
+ // Flag shared between waiterImpl and worker that is 1 iff the worker is
+ // already notified by waiterImpl. The worker sets it to 0 as soon as
+ // |WaitMany()| succeeds.
+ isNotified *int32
+ waitChan <-chan waitRequest // should have a non-empty buffer
+ cancelChan <-chan AsyncWaitId // should have a non-empty buffer
+ lastUsedId AsyncWaitId // is incremented each |AsyncWait()| call
+}
+
+// removeHandle removes handle at provided index without sending response by
+// swapping all information associated with index-th handle with the last one
+// and removing the last one.
+func (w *asyncWaiterWorker) removeHandle(index int) {
+ l := len(w.handles) - 1
+ // Swap with the last and remove last.
+ w.handles[index] = w.handles[l]
+ w.handles = w.handles[0:l]
+ w.signals[index] = w.signals[l]
+ w.signals = w.signals[0:l]
+
+ w.asyncWaitIds[index] = w.asyncWaitIds[l]
+ w.asyncWaitIds = w.asyncWaitIds[0:l]
+ w.responses[index] = w.responses[l]
+ w.responses = w.responses[0:l]
+}
+
+// sendWaitResponseAndRemove send response to corresponding channel and removes
+// index-th waiting handle.
+func (w *asyncWaiterWorker) sendWaitResponseAndRemove(index int, result system.MojoResult, state system.MojoHandleSignalsState) {
+ w.responses[index] <- WaitResponse{
+ result,
+ state,
+ }
+ w.removeHandle(index)
+}
+
+// respondToSatisfiedWaits responds to all wait requests that have at least
+// one satisfied signal and removes them.
+func (w *asyncWaiterWorker) respondToSatisfiedWaits(states []system.MojoHandleSignalsState) {
+ // Don't touch handle at index 0 as it is the waking handle.
+ for i := 1; i < len(states); {
+ if (states[i].SatisfiedSignals & w.signals[i]) != 0 {
+ // Respond and swap i-th with last and remove last.
+ w.sendWaitResponseAndRemove(i, system.MOJO_RESULT_OK, states[i])
+ // Swap i-th with last and remove last.
+ states[i] = states[len(states)-1]
+ states = states[:len(states)-1]
+ } else {
+ i++
+ }
+ }
+}
+
+// processIncomingRequests processes all queued async wait or cancel requests
+// sent by asyncWaiterImpl.
+func (w *asyncWaiterWorker) processIncomingRequests() {
+ for {
+ select {
+ case request := <-w.waitChan:
+ w.handles = append(w.handles, request.handle)
+ w.signals = append(w.signals, request.signals)
+ w.responses = append(w.responses, request.responseChan)
+
+ w.lastUsedId++
+ id := w.lastUsedId
+ w.asyncWaitIds = append(w.asyncWaitIds, id)
+ request.idChan <- id
+ case AsyncWaitId := <-w.cancelChan:
+ // Zero index is reserved for the waking message pipe handle.
+ index := 0
+ for i := 1; i < len(w.asyncWaitIds); i++ {
+ if w.asyncWaitIds[i] == AsyncWaitId {
+ index = i
+ break
+ }
+ }
+ // Do nothing if the id was not found as wait response may be
+ // already sent if the async wait was successful.
+ if index > 0 {
+ w.sendWaitResponseAndRemove(index, system.MOJO_RESULT_ABORTED, system.MojoHandleSignalsState{})
+ }
+ default:
+ return
+ }
+ }
+}
+
+// runLoop run loop of the asyncWaiterWorker. Blocks on |WaitMany()|. If the
+// wait is interrupted by waking handle (index 0) then it means that the worker
+// was woken by waiterImpl, so the worker processes incoming requests from
+// waiterImpl; otherwise responses to corresponding wait request.
+func (w *asyncWaiterWorker) runLoop() {
+ for {
+ result, index, states := system.GetCore().WaitMany(w.handles, w.signals, system.MOJO_DEADLINE_INDEFINITE)
+ // Set flag to 0, so that the next incoming request to
+ // waiterImpl would explicitly wake worker by sending a message
+ // to waking message pipe.
+ atomic.StoreInt32(w.isNotified, 0)
+ if index == -1 {
+ panic(fmt.Sprintf("error waiting on handles: %v", result))
+ break
+ }
+ // Zero index means that the worker was signaled by asyncWaiterImpl.
+ if index == 0 {
+ if result != system.MOJO_RESULT_OK {
+ panic(fmt.Sprintf("error waiting on waking handle: %v", result))
+ }
+ w.handles[0].(system.MessagePipeHandle).ReadMessage(system.MOJO_READ_MESSAGE_FLAG_NONE)
+ w.processIncomingRequests()
+ } else if result != system.MOJO_RESULT_OK {
+ w.sendWaitResponseAndRemove(index, result, system.MojoHandleSignalsState{})
+ } else {
+ w.respondToSatisfiedWaits(states)
+ }
+ }
+}
+
+// asyncWaiterImpl is an implementation of |AsyncWaiter| interface.
+// Runs a worker in a separate goroutine and comunicates with it by sending a
+// message to |wakingHandle| to wake worker from |WaitMany()| call and
+// sending request via |waitChan| and |cancelChan|.
+type asyncWaiterImpl struct {
+ wakingHandle system.MessagePipeHandle
+
+ // Flag shared between waiterImpl and worker that is 1 iff the worker is
+ // already notified by waiterImpl. The worker sets it to 0 as soon as
+ // |WaitMany()| succeeds.
+ isWorkerNotified *int32
+ waitChan chan<- waitRequest // should have a non-empty buffer
+ cancelChan chan<- AsyncWaitId // should have a non-empty buffer
+}
+
+// newAsyncWaiter creates an asyncWaiterImpl and starts its worker goroutine.
+func newAsyncWaiter() *asyncWaiterImpl {
+ result, h0, h1 := system.GetCore().CreateMessagePipe(nil)
+ if result != system.MOJO_RESULT_OK {
+ panic(fmt.Sprintf("can't create message pipe %v", result))
+ }
+ waitChan := make(chan waitRequest, 10)
+ cancelChan := make(chan AsyncWaitId, 10)
+ isNotified := new(int32)
+ worker := asyncWaiterWorker{
+ []system.Handle{h1},
+ []system.MojoHandleSignals{system.MOJO_HANDLE_SIGNAL_READABLE},
+ []AsyncWaitId{0},
+ []chan<- WaitResponse{make(chan WaitResponse)},
+ isNotified,
+ waitChan,
+ cancelChan,
+ 0,
+ }
+ go worker.runLoop()
+ return &asyncWaiterImpl{
+ wakingHandle: h0,
+ isWorkerNotified: isNotified,
+ waitChan: waitChan,
+ cancelChan: cancelChan,
+ }
+}
+
+// wakeWorker wakes the worker from |WaitMany()| call. This should be called
+// after sending a message to |waitChan| or |cancelChan| to avoid deadlock.
+func (w *asyncWaiterImpl) wakeWorker() {
+ if atomic.CompareAndSwapInt32(w.isWorkerNotified, 0, 1) {
+ w.wakingHandle.WriteMessage([]byte{0}, nil, system.MOJO_WRITE_MESSAGE_FLAG_NONE)
+ }
+}
+
+func (w *asyncWaiterImpl) AsyncWait(handle system.Handle, signals system.MojoHandleSignals, responseChan chan<- WaitResponse) AsyncWaitId {
+ idChan := make(chan AsyncWaitId, 1)
+ w.waitChan <- waitRequest{
+ handle,
+ signals,
+ idChan,
+ responseChan,
+ }
+ w.wakeWorker()
+ return <-idChan
+}
+
+func (w *asyncWaiterImpl) CancelWait(id AsyncWaitId) {
+ w.cancelChan <- id
+ w.wakeWorker()
+}
« no previous file with comments | « third_party/mojo/src/mojo/public/dart/src/struct.dart ('k') | third_party/mojo/src/mojo/public/go/bindings/decoder.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698