Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1807)

Side by Side Diff: mojo/public/dart/src/event_stream.dart

Issue 814543006: Move //mojo/{public, edk} underneath //third_party (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Rebase Created 5 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « mojo/public/dart/src/data_pipe.dart ('k') | mojo/public/dart/src/handle.dart » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 part of core;
6
7 class MojoEventStream extends Stream<int> {
8 // The underlying Mojo handle.
9 MojoHandle _handle;
10
11 // Providing our own stream controller allows us to take custom actions when
12 // listeners pause/resume/etc. their StreamSubscription.
13 StreamController _controller;
14
15 // The send port that we give to the handle watcher to notify us of handle
16 // events.
17 SendPort _sendPort;
18
19 // The receive port on which we listen and receive events from the handle
20 // watcher.
21 ReceivePort _receivePort;
22
23 // The signals on this handle that we're interested in.
24 MojoHandleSignals _signals;
25
26 // Whether listen has been called.
27 bool _isListening;
28
29 MojoEventStream(MojoHandle handle,
30 [MojoHandleSignals signals = MojoHandleSignals.READABLE]) :
31 _handle = handle,
32 _signals = signals,
33 _isListening = false {
34 MojoResult result = MojoHandle.register(this);
35 if (!result.isOk) {
36 throw "Failed to register the MojoHandle: $result.";
37 }
38 }
39
40 void close() {
41 if (_handle != null) {
42 MojoHandleWatcher.close(_handle);
43 _handle = null;
44 }
45 if (_receivePort != null) {
46 _receivePort.close();
47 _receivePort = null;
48 }
49 }
50
51 StreamSubscription<List<int>> listen(
52 void onData(List event),
53 {Function onError, void onDone(), bool cancelOnError}) {
54 if (_isListening) {
55 throw "Listen has already been called: $_handle.";
56 }
57 _receivePort = new ReceivePort();
58 _sendPort = _receivePort.sendPort;
59 _controller = new StreamController(sync: true,
60 onListen: _onSubscriptionStateChange,
61 onCancel: _onSubscriptionStateChange,
62 onPause: _onPauseStateChange,
63 onResume: _onPauseStateChange);
64 _controller.addStream(_receivePort);
65
66 if (_signals != MojoHandleSignals.NONE) {
67 var res = MojoHandleWatcher.add(_handle, _sendPort, _signals.value);
68 if (!res.isOk) {
69 throw "MojoHandleWatcher add failed: $res";
70 }
71 }
72
73 _isListening = true;
74 return _controller.stream.listen(
75 onData,
76 onError: onError,
77 onDone: onDone,
78 cancelOnError: cancelOnError);
79 }
80
81 void enableSignals(MojoHandleSignals signals) {
82 _signals = signals;
83 if (_isListening) {
84 var res = MojoHandleWatcher.add(_handle, _sendPort, signals.value);
85 if (!res.isOk) {
86 throw "MojoHandleWatcher add failed: $res";
87 }
88 }
89 }
90
91 void enableReadEvents() => enableSignals(MojoHandleSignals.READABLE);
92 void enableWriteEvents() => enableSignals(MojoHandleSignals.WRITABLE);
93 void enableAllEvents() => enableSignals(MojoHandleSignals.READWRITE);
94
95 void _onSubscriptionStateChange() {
96 if (!_controller.hasListener) {
97 close();
98 }
99 }
100
101 void _onPauseStateChange() {
102 if (_controller.isPaused) {
103 var res = MojoHandleWatcher.remove(_handle);
104 if (!res.isOk) {
105 throw "MojoHandleWatcher add failed: $res";
106 }
107 } else {
108 var res = MojoHandleWatcher.add(_handle, _sendPort, _signals.value);
109 if (!res.isOk) {
110 throw "MojoHandleWatcher add failed: $res";
111 }
112 }
113 }
114
115 bool get readyRead => _handle.readyRead;
116 bool get readyWrite => _handle.readyWrite;
117
118 String toString() => "$_handle";
119 }
120
121
122 class MojoEventStreamListener {
123 MojoMessagePipeEndpoint _endpoint;
124 MojoEventStream _eventStream;
125 bool _isOpen = false;
126 bool _isInHandler = false;
127
128 MojoEventStreamListener(MojoMessagePipeEndpoint endpoint) :
129 _endpoint = endpoint,
130 _eventStream = new MojoEventStream(endpoint.handle),
131 _isOpen = false;
132
133 MojoEventStreamListener.fromHandle(MojoHandle handle) {
134 _endpoint = new MojoMessagePipeEndpoint(handle);
135 _eventStream = new MojoEventStream(handle);
136 _isOpen = false;
137 }
138
139 MojoEventStreamListener.unbound() :
140 _endpoint = null,
141 _eventStream = null,
142 _isOpen = false;
143
144 void bind(MojoMessagePipeEndpoint endpoint) {
145 assert(!isBound);
146 _endpoint = endpoint;
147 _eventStream = new MojoEventStream(endpoint.handle);
148 _isOpen = false;
149 }
150
151 void bindFromHandle(MojoHandle handle) {
152 assert(!isBound);
153 _endpoint = new MojoMessagePipeEndpoint(handle);
154 _eventStream = new MojoEventStream(handle);
155 _isOpen = false;
156 }
157
158 StreamSubscription<int> listen() {
159 _isOpen = true;
160 return _eventStream.listen((List<int> event) {
161 var signalsWatched = new MojoHandleSignals(event[0]);
162 var signalsReceived = new MojoHandleSignals(event[1]);
163 if (signalsReceived.isPeerClosed) {
164 handlePeerClosed();
165 // The peer being closed obviates any other signal we might
166 // have received since we won't be able to read or write the handle.
167 // Thus, we just return before invoking other handlers.
168 return;
169 }
170 _isInHandler = true;
171 if (signalsReceived.isReadable) {
172 assert(_eventStream.readyRead);
173 handleRead();
174 }
175 if (signalsReceived.isWritable) {
176 assert(_eventStream.readyWrite);
177 handleWrite();
178 }
179 _eventStream.enableSignals(enableSignals(
180 signalsWatched, signalsReceived));
181 _isInHandler = false;
182 });
183 }
184
185 void close() {
186 if (_isOpen) {
187 _eventStream.close();
188 _isOpen = false;
189 _eventStream = null;
190 _endpoint = null;
191 }
192 }
193
194 void handleRead() {}
195 void handleWrite() {}
196 void handlePeerClosed() {
197 close();
198 }
199
200 MojoHandleSignals enableSignals(MojoHandleSignals watched,
201 MojoHandleSignals received) => watched;
202
203 MojoMessagePipeEndpoint get endpoint => _endpoint;
204 bool get isOpen => _isOpen;
205 bool get isInHandler => _isInHandler;
206 bool get isBound => _endpoint != null;
207 }
OLDNEW
« no previous file with comments | « mojo/public/dart/src/data_pipe.dart ('k') | mojo/public/dart/src/handle.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698