| Index: third_party/mojo/src/mojo/public/go/bindings/async_waiter.go
|
| diff --git a/third_party/mojo/src/mojo/public/go/bindings/async_waiter.go b/third_party/mojo/src/mojo/public/go/bindings/async_waiter.go
|
| new file mode 100644
|
| index 0000000000000000000000000000000000000000..15fd82278c01ade60dd13ece79c5655ccbcdc62e
|
| --- /dev/null
|
| +++ b/third_party/mojo/src/mojo/public/go/bindings/async_waiter.go
|
| @@ -0,0 +1,266 @@
|
| +// Copyright 2015 The Chromium Authors. All rights reserved.
|
| +// Use of this source code is governed by a BSD-style license that can be
|
| +// found in the LICENSE file.
|
| +
|
| +package bindings
|
| +
|
| +import (
|
| + "fmt"
|
| + "sync"
|
| + "sync/atomic"
|
| +
|
| + "mojo/public/go/system"
|
| +)
|
| +
|
| +var waiter *asyncWaiterImpl
|
| +var once sync.Once
|
| +
|
| +// GetAsyncWaiter returns a default implementation of |AsyncWaiter| interface.
|
| +func GetAsyncWaiter() AsyncWaiter {
|
| + once.Do(func() {
|
| + waiter = newAsyncWaiter()
|
| + })
|
| + return waiter
|
| +}
|
| +
|
| +// AsyncWaitId is an id returned by |AsyncWait()| used to cancel it.
|
| +type AsyncWaitId uint64
|
| +
|
| +// WaitResponse is a struct sent to a channel waiting for |AsyncWait()| to
|
| +// finish. It contains the same information as if |Wait()| was called on a
|
| +// handle.
|
| +type WaitResponse struct {
|
| + Result system.MojoResult
|
| + State system.MojoHandleSignalsState
|
| +}
|
| +
|
| +// AsyncWaiter defines an interface for asynchronously waiting (and cancelling
|
| +// asynchronous waits) on a handle.
|
| +type AsyncWaiter interface {
|
| + // AsyncWait asynchronously waits on a given handle until a signal
|
| + // indicated by |signals| is satisfied or it becomes known that no
|
| + // signal indicated by |signals| will ever be satisified. The wait
|
| + // response will be sent to |responseChan|.
|
| + //
|
| + // |handle| must not be closed or transferred until the wait response
|
| + // is received from |responseChan|.
|
| + AsyncWait(handle system.Handle, signals system.MojoHandleSignals, responseChan chan<- WaitResponse) AsyncWaitId
|
| +
|
| + // CancelWait cancels an outstanding async wait (specified by |id|)
|
| + // initiated by |AsyncWait()|. A response with Mojo result
|
| + // |MOJO_RESULT_ABORTED| is sent to the corresponding |responseChan|.
|
| + CancelWait(id AsyncWaitId)
|
| +}
|
| +
|
| +// waitRequest is a struct sent to asyncWaiterWorker to add another handle to
|
| +// the list of waiting handles.
|
| +type waitRequest struct {
|
| + handle system.Handle
|
| + signals system.MojoHandleSignals
|
| +
|
| + // Used for |CancelWait()| calls. The worker should issue IDs so that
|
| + // you can't cancel the wait until the worker received the wait request.
|
| + idChan chan<- AsyncWaitId
|
| +
|
| + // A channel end to send wait results.
|
| + responseChan chan<- WaitResponse
|
| +}
|
| +
|
| +// asyncWaiterWorker does the actual work, in its own goroutine. It calls
|
| +// |WaitMany()| on all provided handles. New handles a added via |waitChan|
|
| +// and removed via |cancelChan| messages. To wake the worker asyncWaiterImpl
|
| +// sends mojo messages to a dedicated message pipe, the other end of which has
|
| +// index 0 in all slices of the worker.
|
| +type asyncWaiterWorker struct {
|
| + // |handles| and |signals| are used to make |WaitMany()| calls directly.
|
| + // All these arrays should be operated simultaneously; i-th element
|
| + // of each refers to i-th handle.
|
| + handles []system.Handle
|
| + signals []system.MojoHandleSignals
|
| + asyncWaitIds []AsyncWaitId
|
| + responses []chan<- WaitResponse
|
| +
|
| + // Flag shared between waiterImpl and worker that is 1 iff the worker is
|
| + // already notified by waiterImpl. The worker sets it to 0 as soon as
|
| + // |WaitMany()| succeeds.
|
| + isNotified *int32
|
| + waitChan <-chan waitRequest // should have a non-empty buffer
|
| + cancelChan <-chan AsyncWaitId // should have a non-empty buffer
|
| + lastUsedId AsyncWaitId // is incremented each |AsyncWait()| call
|
| +}
|
| +
|
| +// removeHandle removes handle at provided index without sending response by
|
| +// swapping all information associated with index-th handle with the last one
|
| +// and removing the last one.
|
| +func (w *asyncWaiterWorker) removeHandle(index int) {
|
| + l := len(w.handles) - 1
|
| + // Swap with the last and remove last.
|
| + w.handles[index] = w.handles[l]
|
| + w.handles = w.handles[0:l]
|
| + w.signals[index] = w.signals[l]
|
| + w.signals = w.signals[0:l]
|
| +
|
| + w.asyncWaitIds[index] = w.asyncWaitIds[l]
|
| + w.asyncWaitIds = w.asyncWaitIds[0:l]
|
| + w.responses[index] = w.responses[l]
|
| + w.responses = w.responses[0:l]
|
| +}
|
| +
|
| +// sendWaitResponseAndRemove send response to corresponding channel and removes
|
| +// index-th waiting handle.
|
| +func (w *asyncWaiterWorker) sendWaitResponseAndRemove(index int, result system.MojoResult, state system.MojoHandleSignalsState) {
|
| + w.responses[index] <- WaitResponse{
|
| + result,
|
| + state,
|
| + }
|
| + w.removeHandle(index)
|
| +}
|
| +
|
| +// respondToSatisfiedWaits responds to all wait requests that have at least
|
| +// one satisfied signal and removes them.
|
| +func (w *asyncWaiterWorker) respondToSatisfiedWaits(states []system.MojoHandleSignalsState) {
|
| + // Don't touch handle at index 0 as it is the waking handle.
|
| + for i := 1; i < len(states); {
|
| + if (states[i].SatisfiedSignals & w.signals[i]) != 0 {
|
| + // Respond and swap i-th with last and remove last.
|
| + w.sendWaitResponseAndRemove(i, system.MOJO_RESULT_OK, states[i])
|
| + // Swap i-th with last and remove last.
|
| + states[i] = states[len(states)-1]
|
| + states = states[:len(states)-1]
|
| + } else {
|
| + i++
|
| + }
|
| + }
|
| +}
|
| +
|
| +// processIncomingRequests processes all queued async wait or cancel requests
|
| +// sent by asyncWaiterImpl.
|
| +func (w *asyncWaiterWorker) processIncomingRequests() {
|
| + for {
|
| + select {
|
| + case request := <-w.waitChan:
|
| + w.handles = append(w.handles, request.handle)
|
| + w.signals = append(w.signals, request.signals)
|
| + w.responses = append(w.responses, request.responseChan)
|
| +
|
| + w.lastUsedId++
|
| + id := w.lastUsedId
|
| + w.asyncWaitIds = append(w.asyncWaitIds, id)
|
| + request.idChan <- id
|
| + case AsyncWaitId := <-w.cancelChan:
|
| + // Zero index is reserved for the waking message pipe handle.
|
| + index := 0
|
| + for i := 1; i < len(w.asyncWaitIds); i++ {
|
| + if w.asyncWaitIds[i] == AsyncWaitId {
|
| + index = i
|
| + break
|
| + }
|
| + }
|
| + // Do nothing if the id was not found as wait response may be
|
| + // already sent if the async wait was successful.
|
| + if index > 0 {
|
| + w.sendWaitResponseAndRemove(index, system.MOJO_RESULT_ABORTED, system.MojoHandleSignalsState{})
|
| + }
|
| + default:
|
| + return
|
| + }
|
| + }
|
| +}
|
| +
|
| +// runLoop run loop of the asyncWaiterWorker. Blocks on |WaitMany()|. If the
|
| +// wait is interrupted by waking handle (index 0) then it means that the worker
|
| +// was woken by waiterImpl, so the worker processes incoming requests from
|
| +// waiterImpl; otherwise responses to corresponding wait request.
|
| +func (w *asyncWaiterWorker) runLoop() {
|
| + for {
|
| + result, index, states := system.GetCore().WaitMany(w.handles, w.signals, system.MOJO_DEADLINE_INDEFINITE)
|
| + // Set flag to 0, so that the next incoming request to
|
| + // waiterImpl would explicitly wake worker by sending a message
|
| + // to waking message pipe.
|
| + atomic.StoreInt32(w.isNotified, 0)
|
| + if index == -1 {
|
| + panic(fmt.Sprintf("error waiting on handles: %v", result))
|
| + break
|
| + }
|
| + // Zero index means that the worker was signaled by asyncWaiterImpl.
|
| + if index == 0 {
|
| + if result != system.MOJO_RESULT_OK {
|
| + panic(fmt.Sprintf("error waiting on waking handle: %v", result))
|
| + }
|
| + w.handles[0].(system.MessagePipeHandle).ReadMessage(system.MOJO_READ_MESSAGE_FLAG_NONE)
|
| + w.processIncomingRequests()
|
| + } else if result != system.MOJO_RESULT_OK {
|
| + w.sendWaitResponseAndRemove(index, result, system.MojoHandleSignalsState{})
|
| + } else {
|
| + w.respondToSatisfiedWaits(states)
|
| + }
|
| + }
|
| +}
|
| +
|
| +// asyncWaiterImpl is an implementation of |AsyncWaiter| interface.
|
| +// Runs a worker in a separate goroutine and comunicates with it by sending a
|
| +// message to |wakingHandle| to wake worker from |WaitMany()| call and
|
| +// sending request via |waitChan| and |cancelChan|.
|
| +type asyncWaiterImpl struct {
|
| + wakingHandle system.MessagePipeHandle
|
| +
|
| + // Flag shared between waiterImpl and worker that is 1 iff the worker is
|
| + // already notified by waiterImpl. The worker sets it to 0 as soon as
|
| + // |WaitMany()| succeeds.
|
| + isWorkerNotified *int32
|
| + waitChan chan<- waitRequest // should have a non-empty buffer
|
| + cancelChan chan<- AsyncWaitId // should have a non-empty buffer
|
| +}
|
| +
|
| +// newAsyncWaiter creates an asyncWaiterImpl and starts its worker goroutine.
|
| +func newAsyncWaiter() *asyncWaiterImpl {
|
| + result, h0, h1 := system.GetCore().CreateMessagePipe(nil)
|
| + if result != system.MOJO_RESULT_OK {
|
| + panic(fmt.Sprintf("can't create message pipe %v", result))
|
| + }
|
| + waitChan := make(chan waitRequest, 10)
|
| + cancelChan := make(chan AsyncWaitId, 10)
|
| + isNotified := new(int32)
|
| + worker := asyncWaiterWorker{
|
| + []system.Handle{h1},
|
| + []system.MojoHandleSignals{system.MOJO_HANDLE_SIGNAL_READABLE},
|
| + []AsyncWaitId{0},
|
| + []chan<- WaitResponse{make(chan WaitResponse)},
|
| + isNotified,
|
| + waitChan,
|
| + cancelChan,
|
| + 0,
|
| + }
|
| + go worker.runLoop()
|
| + return &asyncWaiterImpl{
|
| + wakingHandle: h0,
|
| + isWorkerNotified: isNotified,
|
| + waitChan: waitChan,
|
| + cancelChan: cancelChan,
|
| + }
|
| +}
|
| +
|
| +// wakeWorker wakes the worker from |WaitMany()| call. This should be called
|
| +// after sending a message to |waitChan| or |cancelChan| to avoid deadlock.
|
| +func (w *asyncWaiterImpl) wakeWorker() {
|
| + if atomic.CompareAndSwapInt32(w.isWorkerNotified, 0, 1) {
|
| + w.wakingHandle.WriteMessage([]byte{0}, nil, system.MOJO_WRITE_MESSAGE_FLAG_NONE)
|
| + }
|
| +}
|
| +
|
| +func (w *asyncWaiterImpl) AsyncWait(handle system.Handle, signals system.MojoHandleSignals, responseChan chan<- WaitResponse) AsyncWaitId {
|
| + idChan := make(chan AsyncWaitId, 1)
|
| + w.waitChan <- waitRequest{
|
| + handle,
|
| + signals,
|
| + idChan,
|
| + responseChan,
|
| + }
|
| + w.wakeWorker()
|
| + return <-idChan
|
| +}
|
| +
|
| +func (w *asyncWaiterImpl) CancelWait(id AsyncWaitId) {
|
| + w.cancelChan <- id
|
| + w.wakeWorker()
|
| +}
|
|
|