| Index: runtime/observatory/lib/src/service/object.dart
|
| diff --git a/runtime/observatory/lib/src/service/object.dart b/runtime/observatory/lib/src/service/object.dart
|
| index e9f9b867f92afc7cff97b58c20a65203b85eff61..aa1079a847406702bb9daaec591369f3181fe0b5 100644
|
| --- a/runtime/observatory/lib/src/service/object.dart
|
| +++ b/runtime/observatory/lib/src/service/object.dart
|
| @@ -432,10 +432,53 @@ abstract class VM extends ServiceObjectOwner {
|
| update(toObservable({'id':'vm', 'type':'@VM'}));
|
| }
|
|
|
| - final StreamController<ServiceEvent> events =
|
| - new StreamController.broadcast();
|
| + Future streamListen(String streamId) {
|
| + return invokeRpc('streamListen', {'streamId': streamId});
|
| + }
|
| +
|
| + Future streamCancel(String streamId) {
|
| + return invokeRpc('streamCancel', {'streamId': streamId});
|
| + }
|
| +
|
| + static const kIsolateEventStreamId = 'Isolate';
|
| + static const kDebugEventStreamId = 'Debug';
|
| + static const kGCEventStreamId = 'GC';
|
| +
|
| + Map<String,StreamController> _streamControllers = {};
|
| +
|
| + void _streamListenHandleError(String streamId) {
|
| + streamListen(streamId).catchError((e, st) {
|
| + _getEventStreamController(streamId).addError(e, st);
|
| + });
|
| + }
|
| +
|
| + void _streamCancelHandleError(String streamId) {
|
| + // TODO(turnidge): Consider removing the controller from the map here.
|
| + streamCancel(streamId).catchError((e, st) {
|
| + _getEventStreamController(streamId).addError(e, st);
|
| + });
|
| + }
|
| +
|
| + StreamController _getEventStreamController(String streamId) {
|
| + var controller = _streamControllers.putIfAbsent(
|
| + streamId, () {
|
| + return new StreamController.broadcast(
|
| + onListen: () => _streamListenHandleError(streamId),
|
| + onCancel: () => _streamCancelHandleError(streamId));
|
| + });
|
| + return controller;
|
| + }
|
| +
|
| + Stream getEventStream(String streamId) {
|
| + return _getEventStreamController(streamId).stream;
|
| + }
|
| +
|
| + Stream get isolateEvents => getEventStream(kIsolateEventStreamId);
|
| + Stream get debugEvents => getEventStream(kDebugEventStreamId);
|
| + Stream get gcEvents => getEventStream(kGCEventStreamId);
|
|
|
| - void postServiceEvent(Map response, ByteData data) {
|
| + void postServiceEvent(String streamId, Map response, ByteData data) {
|
| + assert(streamId != null);
|
| var map = toObservable(response);
|
| assert(!map.containsKey('_data'));
|
| if (data != null) {
|
| @@ -448,19 +491,21 @@ abstract class VM extends ServiceObjectOwner {
|
| }
|
|
|
| var eventIsolate = map['isolate'];
|
| + var event;
|
| if (eventIsolate == null) {
|
| - var event = new ServiceObject._fromMap(vm, map);
|
| - events.add(event);
|
| + event = new ServiceObject._fromMap(vm, map);
|
| } else {
|
| // getFromMap creates the Isolate if it hasn't been seen already.
|
| var isolate = getFromMap(map['isolate']);
|
| - var event = new ServiceObject._fromMap(isolate, map);
|
| + event = new ServiceObject._fromMap(isolate, map);
|
| if (event.eventType == ServiceEvent.kIsolateExit) {
|
| _removeIsolate(isolate.id);
|
| }
|
| + // Give the isolate a chance to process the event first before
|
| + // dispatching it to others.
|
| isolate._onEvent(event);
|
| - events.add(event);
|
| }
|
| + _getEventStreamController(streamId).add(event);
|
| }
|
|
|
| void _removeIsolate(String isolateId) {
|
|
|