Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(47)

Unified Diff: mojo/public/go/bindings/async_waiter.go

Issue 2250183003: Make the fuchsia mojo/public repo the source of truth. (Closed) Base URL: https://github.com/domokit/mojo.git@master
Patch Set: Created 4 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « mojo/public/go/application/describer.go ('k') | mojo/public/go/bindings/connector.go » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: mojo/public/go/bindings/async_waiter.go
diff --git a/mojo/public/go/bindings/async_waiter.go b/mojo/public/go/bindings/async_waiter.go
deleted file mode 100644
index bcd4a7cd3e2223944949d96a114d31237245896c..0000000000000000000000000000000000000000
--- a/mojo/public/go/bindings/async_waiter.go
+++ /dev/null
@@ -1,282 +0,0 @@
-// Copyright 2015 The Chromium Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style license that can be
-// found in the LICENSE file.
-
-package bindings
-
-import (
- "fmt"
- "runtime"
- "sync"
- "sync/atomic"
-
- "mojo/public/go/system"
-)
-
-var defaultWaiter *asyncWaiterImpl
-var once sync.Once
-
-// GetAsyncWaiter returns a default implementation of |AsyncWaiter| interface.
-func GetAsyncWaiter() AsyncWaiter {
- once.Do(func() {
- defaultWaiter = newAsyncWaiter()
- })
- return defaultWaiter
-}
-
-// AsyncWaitId is an id returned by |AsyncWait()| used to cancel it.
-type AsyncWaitId uint64
-
-// WaitResponse is a struct sent to a channel waiting for |AsyncWait()| to
-// finish. It contains the same information as if |Wait()| was called on a
-// handle.
-type WaitResponse struct {
- Result system.MojoResult
- State system.MojoHandleSignalsState
-}
-
-// AsyncWaiter defines an interface for asynchronously waiting (and cancelling
-// asynchronous waits) on a handle.
-type AsyncWaiter interface {
- // AsyncWait asynchronously waits on a given handle until a signal
- // indicated by |signals| is satisfied or it becomes known that no
- // signal indicated by |signals| will ever be satisified. The wait
- // response will be sent to |responseChan|.
- //
- // |handle| must not be closed or transferred until the wait response
- // is received from |responseChan|.
- AsyncWait(handle system.Handle, signals system.MojoHandleSignals, responseChan chan<- WaitResponse) AsyncWaitId
-
- // CancelWait cancels an outstanding async wait (specified by |id|)
- // initiated by |AsyncWait()|. A response with Mojo result
- // |MOJO_RESULT_ABORTED| is sent to the corresponding |responseChan|.
- CancelWait(id AsyncWaitId)
-}
-
-// waitRequest is a struct sent to asyncWaiterWorker to add another handle to
-// the list of waiting handles.
-type waitRequest struct {
- handle system.Handle
- signals system.MojoHandleSignals
-
- // Used for |CancelWait()| calls. The worker should issue IDs so that
- // you can't cancel the wait until the worker received the wait request.
- idChan chan<- AsyncWaitId
-
- // A channel end to send wait results.
- responseChan chan<- WaitResponse
-}
-
-// asyncWaiterWorker does the actual work, in its own goroutine. It calls
-// |WaitMany()| on all provided handles. New handles a added via |waitChan|
-// and removed via |cancelChan| messages. To wake the worker asyncWaiterImpl
-// sends mojo messages to a dedicated message pipe, the other end of which has
-// index 0 in all slices of the worker.
-type asyncWaiterWorker struct {
- // |handles| and |signals| are used to make |WaitMany()| calls directly.
- // All these arrays should be operated simultaneously; i-th element
- // of each refers to i-th handle.
- handles []system.Handle
- signals []system.MojoHandleSignals
- asyncWaitIds []AsyncWaitId
- responses []chan<- WaitResponse
-
- // Flag shared between waiterImpl and worker that is 1 iff the worker is
- // already notified by waiterImpl. The worker sets it to 0 as soon as
- // |WaitMany()| succeeds.
- isNotified *int32
- waitChan <-chan waitRequest // should have a non-empty buffer
- cancelChan <-chan AsyncWaitId // should have a non-empty buffer
- ids uint64 // is incremented each |AsyncWait()| call
-}
-
-// removeHandle removes handle at provided index without sending response by
-// swapping all information associated with index-th handle with the last one
-// and removing the last one.
-func (w *asyncWaiterWorker) removeHandle(index int) {
- l := len(w.handles) - 1
- // Swap with the last and remove last.
- w.handles[index] = w.handles[l]
- w.handles = w.handles[0:l]
- w.signals[index] = w.signals[l]
- w.signals = w.signals[0:l]
-
- w.asyncWaitIds[index] = w.asyncWaitIds[l]
- w.asyncWaitIds = w.asyncWaitIds[0:l]
- w.responses[index] = w.responses[l]
- w.responses = w.responses[0:l]
-}
-
-// sendWaitResponseAndRemove send response to corresponding channel and removes
-// index-th waiting handle.
-func (w *asyncWaiterWorker) sendWaitResponseAndRemove(index int, result system.MojoResult, state system.MojoHandleSignalsState) {
- w.responses[index] <- WaitResponse{
- result,
- state,
- }
- w.removeHandle(index)
-}
-
-// respondToSatisfiedWaits responds to all wait requests that have at least
-// one satisfied signal and removes them.
-func (w *asyncWaiterWorker) respondToSatisfiedWaits(states []system.MojoHandleSignalsState) {
- // Don't touch handle at index 0 as it is the waking handle.
- for i := 1; i < len(states); {
- if (states[i].SatisfiedSignals & w.signals[i]) != 0 {
- // Respond and swap i-th with last and remove last.
- w.sendWaitResponseAndRemove(i, system.MOJO_RESULT_OK, states[i])
- // Swap i-th with last and remove last.
- states[i] = states[len(states)-1]
- states = states[:len(states)-1]
- } else {
- i++
- }
- }
-}
-
-// processIncomingRequests processes all queued async wait or cancel requests
-// sent by asyncWaiterImpl.
-func (w *asyncWaiterWorker) processIncomingRequests() {
- for {
- select {
- case request := <-w.waitChan:
- w.handles = append(w.handles, request.handle)
- w.signals = append(w.signals, request.signals)
- w.responses = append(w.responses, request.responseChan)
-
- w.ids++
- id := AsyncWaitId(w.ids)
- w.asyncWaitIds = append(w.asyncWaitIds, id)
- request.idChan <- id
- case AsyncWaitId := <-w.cancelChan:
- // Zero index is reserved for the waking message pipe handle.
- index := 0
- for i := 1; i < len(w.asyncWaitIds); i++ {
- if w.asyncWaitIds[i] == AsyncWaitId {
- index = i
- break
- }
- }
- // Do nothing if the id was not found as wait response may be
- // already sent if the async wait was successful.
- if index > 0 {
- w.sendWaitResponseAndRemove(index, system.MOJO_RESULT_ABORTED, system.MojoHandleSignalsState{})
- }
- default:
- return
- }
- }
-}
-
-// runLoop run loop of the asyncWaiterWorker. Blocks on |WaitMany()|. If the
-// wait is interrupted by waking handle (index 0) then it means that the worker
-// was woken by waiterImpl, so the worker processes incoming requests from
-// waiterImpl; otherwise responses to corresponding wait request.
-func (w *asyncWaiterWorker) runLoop() {
- for {
- result, index, states := system.GetCore().WaitMany(w.handles, w.signals, system.MOJO_DEADLINE_INDEFINITE)
- // Set flag to 0, so that the next incoming request to
- // waiterImpl would explicitly wake worker by sending a message
- // to waking message pipe.
- atomic.StoreInt32(w.isNotified, 0)
- if index == -1 {
- panic(fmt.Sprintf("error waiting on handles: %v", result))
- break
- }
- // Zero index means that the worker was signaled by asyncWaiterImpl.
- if index == 0 {
- if result != system.MOJO_RESULT_OK {
- panic(fmt.Sprintf("error waiting on waking handle: %v", result))
- }
- w.handles[0].(system.MessagePipeHandle).ReadMessage(system.MOJO_READ_MESSAGE_FLAG_NONE)
- w.processIncomingRequests()
- } else if result != system.MOJO_RESULT_OK {
- w.sendWaitResponseAndRemove(index, result, system.MojoHandleSignalsState{})
- } else {
- w.respondToSatisfiedWaits(states)
- }
- }
-}
-
-// asyncWaiterImpl is an implementation of |AsyncWaiter| interface.
-// Runs a worker in a separate goroutine and comunicates with it by sending a
-// message to |wakingHandle| to wake worker from |WaitMany()| call and
-// sending request via |waitChan| and |cancelChan|.
-type asyncWaiterImpl struct {
- wakingHandle system.MessagePipeHandle
-
- // Flag shared between waiterImpl and worker that is 1 iff the worker is
- // already notified by waiterImpl. The worker sets it to 0 as soon as
- // |WaitMany()| succeeds.
- isWorkerNotified *int32
- waitChan chan<- waitRequest // should have a non-empty buffer
- cancelChan chan<- AsyncWaitId // should have a non-empty buffer
-}
-
-func finalizeWorker(worker *asyncWaiterWorker) {
- // Close waking handle on worker side.
- worker.handles[0].Close()
-}
-
-func finalizeAsyncWaiter(waiter *asyncWaiterImpl) {
- waiter.wakingHandle.Close()
-}
-
-// newAsyncWaiter creates an asyncWaiterImpl and starts its worker goroutine.
-func newAsyncWaiter() *asyncWaiterImpl {
- result, h0, h1 := system.GetCore().CreateMessagePipe(nil)
- if result != system.MOJO_RESULT_OK {
- panic(fmt.Sprintf("can't create message pipe %v", result))
- }
- waitChan := make(chan waitRequest, 10)
- cancelChan := make(chan AsyncWaitId, 10)
- isNotified := new(int32)
- worker := &asyncWaiterWorker{
- []system.Handle{h1},
- []system.MojoHandleSignals{system.MOJO_HANDLE_SIGNAL_READABLE},
- []AsyncWaitId{0},
- []chan<- WaitResponse{make(chan WaitResponse)},
- isNotified,
- waitChan,
- cancelChan,
- 0,
- }
- runtime.SetFinalizer(worker, finalizeWorker)
- go worker.runLoop()
- waiter := &asyncWaiterImpl{
- wakingHandle: h0,
- isWorkerNotified: isNotified,
- waitChan: waitChan,
- cancelChan: cancelChan,
- }
- runtime.SetFinalizer(waiter, finalizeAsyncWaiter)
- return waiter
-}
-
-// wakeWorker wakes the worker from |WaitMany()| call. This should be called
-// after sending a message to |waitChan| or |cancelChan| to avoid deadlock.
-func (w *asyncWaiterImpl) wakeWorker() {
- if atomic.CompareAndSwapInt32(w.isWorkerNotified, 0, 1) {
- result := w.wakingHandle.WriteMessage([]byte{0}, nil, system.MOJO_WRITE_MESSAGE_FLAG_NONE)
- if result != system.MOJO_RESULT_OK {
- panic("can't write to a message pipe")
- }
- }
-}
-
-func (w *asyncWaiterImpl) AsyncWait(handle system.Handle, signals system.MojoHandleSignals, responseChan chan<- WaitResponse) AsyncWaitId {
- idChan := make(chan AsyncWaitId, 1)
- w.waitChan <- waitRequest{
- handle,
- signals,
- idChan,
- responseChan,
- }
- w.wakeWorker()
- return <-idChan
-}
-
-func (w *asyncWaiterImpl) CancelWait(id AsyncWaitId) {
- w.cancelChan <- id
- w.wakeWorker()
-}
« no previous file with comments | « mojo/public/go/application/describer.go ('k') | mojo/public/go/bindings/connector.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698