Index: runtime/observatory/lib/src/service/object.dart |
diff --git a/runtime/observatory/lib/src/service/object.dart b/runtime/observatory/lib/src/service/object.dart |
index e9f9b867f92afc7cff97b58c20a65203b85eff61..aa1079a847406702bb9daaec591369f3181fe0b5 100644 |
--- a/runtime/observatory/lib/src/service/object.dart |
+++ b/runtime/observatory/lib/src/service/object.dart |
@@ -432,10 +432,53 @@ abstract class VM extends ServiceObjectOwner { |
update(toObservable({'id':'vm', 'type':'@VM'})); |
} |
- final StreamController<ServiceEvent> events = |
- new StreamController.broadcast(); |
+ Future streamListen(String streamId) { |
+ return invokeRpc('streamListen', {'streamId': streamId}); |
+ } |
+ |
+ Future streamCancel(String streamId) { |
+ return invokeRpc('streamCancel', {'streamId': streamId}); |
+ } |
+ |
+ static const kIsolateEventStreamId = 'Isolate'; |
+ static const kDebugEventStreamId = 'Debug'; |
+ static const kGCEventStreamId = 'GC'; |
+ |
+ Map<String,StreamController> _streamControllers = {}; |
+ |
+ void _streamListenHandleError(String streamId) { |
+ streamListen(streamId).catchError((e, st) { |
+ _getEventStreamController(streamId).addError(e, st); |
+ }); |
+ } |
+ |
+ void _streamCancelHandleError(String streamId) { |
+ // TODO(turnidge): Consider removing the controller from the map here. |
+ streamCancel(streamId).catchError((e, st) { |
+ _getEventStreamController(streamId).addError(e, st); |
+ }); |
+ } |
+ |
+ StreamController _getEventStreamController(String streamId) { |
+ var controller = _streamControllers.putIfAbsent( |
+ streamId, () { |
+ return new StreamController.broadcast( |
+ onListen: () => _streamListenHandleError(streamId), |
+ onCancel: () => _streamCancelHandleError(streamId)); |
+ }); |
+ return controller; |
+ } |
+ |
+ Stream getEventStream(String streamId) { |
+ return _getEventStreamController(streamId).stream; |
+ } |
+ |
+ Stream get isolateEvents => getEventStream(kIsolateEventStreamId); |
+ Stream get debugEvents => getEventStream(kDebugEventStreamId); |
+ Stream get gcEvents => getEventStream(kGCEventStreamId); |
- void postServiceEvent(Map response, ByteData data) { |
+ void postServiceEvent(String streamId, Map response, ByteData data) { |
+ assert(streamId != null); |
var map = toObservable(response); |
assert(!map.containsKey('_data')); |
if (data != null) { |
@@ -448,19 +491,21 @@ abstract class VM extends ServiceObjectOwner { |
} |
var eventIsolate = map['isolate']; |
+ var event; |
if (eventIsolate == null) { |
- var event = new ServiceObject._fromMap(vm, map); |
- events.add(event); |
+ event = new ServiceObject._fromMap(vm, map); |
} else { |
// getFromMap creates the Isolate if it hasn't been seen already. |
var isolate = getFromMap(map['isolate']); |
- var event = new ServiceObject._fromMap(isolate, map); |
+ event = new ServiceObject._fromMap(isolate, map); |
if (event.eventType == ServiceEvent.kIsolateExit) { |
_removeIsolate(isolate.id); |
} |
+ // Give the isolate a chance to process the event first before |
+ // dispatching it to others. |
isolate._onEvent(event); |
- events.add(event); |
} |
+ _getEventStreamController(streamId).add(event); |
} |
void _removeIsolate(String isolateId) { |