Index: sdk/lib/_internal/pub/lib/src/barback/load_transformers.dart |
diff --git a/sdk/lib/_internal/pub/lib/src/barback/load_transformers.dart b/sdk/lib/_internal/pub/lib/src/barback/load_transformers.dart |
index 5481a50c0f69a73788b72ea90ed251d83972d45d..3eb02b7219878d829d38d9ca8e5fdeb69f79f8af 100644 |
--- a/sdk/lib/_internal/pub/lib/src/barback/load_transformers.dart |
+++ b/sdk/lib/_internal/pub/lib/src/barback/load_transformers.dart |
@@ -32,9 +32,11 @@ import 'http://<<HOST_AND_PORT>>/packages/stack_trace/stack_trace.dart'; |
import 'http://<<HOST_AND_PORT>>/packages/barback/barback.dart'; |
/// Sets up the initial communication with the host isolate. |
-void main() { |
- port.receive((args, replyTo) { |
- _sendFuture(replyTo, new Future.sync(() { |
+void main(List<String> args, SendPort replyTo) { |
+ var port = new ReceivePort(); |
+ replyTo.send(['success', port.sendPort]); |
+ port.listen((args) { |
+ _sendFuture(args['replyTo'], new Future.sync(() { |
var library = Uri.parse(args['library']); |
var configuration = JSON.decode(args['configuration']); |
return initialize(library, configuration). |
@@ -112,10 +114,10 @@ class ForeignTransform implements Transform { |
} |
Future<Asset> getInput(AssetId id) { |
- return _receiveFuture(_port.call({ |
+ return _callAndReceiveFuture(_port, { |
'type': 'getInput', |
'id': _serializeId(id) |
- })).then(_deserializeAsset); |
+ }).then(_deserializeAsset); |
} |
Future<String> readInputAsString(AssetId id, {Encoding encoding}) { |
@@ -181,7 +183,8 @@ Map _serializeTransformerOrGroup(transformerOrGroup) { |
/// Converts [transformer] into a serializable map. |
Map _serializeTransformer(Transformer transformer) { |
var port = new ReceivePort(); |
- port.receive((message, replyTo) { |
+ port.listen((message) { |
+ var replyTo = message['replyTo']; |
_sendFuture(replyTo, new Future.sync(() { |
if (message['type'] == 'isPrimary') { |
return transformer.isPrimary(_deserializeAsset(message['asset'])); |
@@ -196,7 +199,7 @@ Map _serializeTransformer(Transformer transformer) { |
return { |
'type': 'Transformer', |
'toString': transformer.toString(), |
- 'port': port.toSendPort() |
+ 'port': port.sendPort |
}; |
} |
@@ -211,11 +214,29 @@ Map _serializeTransformerGroup(TransformerGroup group) { |
}; |
} |
+/// When the input receives a 'done' as data-event, transforms it to a |
+/// done event and cancels the subscription. |
+StreamSubscription doneTransformer(Stream input, bool cancelOnError) { |
+ var subscription; |
+ var transformed = input.transform(new StreamTransformer.fromHandlers( |
+ handleData: (data, sink) { |
+ if (data == 'done') { |
+ sink.close(); |
+ subscription.cancel(); |
+ } else { |
+ sink.add(data); |
+ } |
+ })); |
+ subscription = transformed.listen(null, cancelOnError: cancelOnError); |
+ return subscription; |
+} |
+ |
/// Converts a serializable map into an [Asset]. |
Asset _deserializeAsset(Map asset) { |
- var box = new MessageBox(); |
- asset['sink'].add(box.sink); |
- return new Asset.fromStream(_deserializeId(asset['id']), box.stream); |
+ var receivePort = new ReceivePort(); |
+ asset['sendPort'].send(receivePort.sendPort); |
+ var stream = receivePort.transform(const StreamTransformer(doneTransformer)); |
+ return new Asset.fromStream(_deserializeId(asset['id']), stream); |
} |
/// Converts a serializable map into an [AssetId]. |
@@ -225,16 +246,18 @@ AssetId _deserializeId(Map id) => new AssetId(id['package'], id['path']); |
Map _serializeAsset(Asset asset) { |
// We can't send IsolateStreams (issue 12437), so instead we send a sink and |
// get the isolate to send us back another sink. |
- var box = new MessageBox(); |
- box.stream.first.then((sink) { |
- asset.read().listen(sink.add, |
- onError: sink.addError, |
- onDone: sink.close); |
+ var receivePort = new ReceivePort(); |
+ receivePort.first.then((sendPort) { |
+ asset.read().listen(sendPort.send, |
+ onError: (error, stackTrace) { |
+ throw new UnimplementedError('Error during asset serialization'); |
+ }, |
+ onDone: () { sendPort.send('done'); }); |
}); |
return { |
'id': _serializeId(asset.id), |
- 'sink': box.sink |
+ 'sendPort': receivePort.sendPort |
}; |
} |
@@ -280,11 +303,18 @@ void _sendFuture(SendPort port, Future future) { |
/// Receives the result of [_sendFuture] from [portCall], which should be the |
/// return value of [SendPort.call]. |
-Future _receiveFuture(Future portCall) { |
- return portCall.then((response) { |
- if (response.containsKey('success')) return response['success']; |
- return new Future.error( |
- new CrossIsolateException.deserialize(response['error'])); |
+/// |
+/// The [message] argument is modified to include the [replyTo] port. |
+Future _callAndReceiveFuture(SendPort port, Map message) { |
+ var responsePort = new ReceivePort(); |
+ message['replyTo'] = responsePort.sendPort; |
+ return new Future.sync(() { |
+ port.send(message); |
+ return responsePort.first.then((response) { |
+ if (response.containsKey('success')) return response['success']; |
+ return new Future.error( |
+ new CrossIsolateException.deserialize(response['error'])); |
+ }); |
}); |
} |
@@ -379,11 +409,11 @@ Future<Set> loadTransformers(BarbackServer server, TransformerId id) { |
log.fine("Loading transformers from $assetId"); |
return dart.runInIsolate(code).then((sendPort) { |
- return _receiveFuture(sendPort.call({ |
+ return _callAndReceiveFuture(sendPort, { |
'library': uri, |
// TODO(nweiz): support non-JSON-encodable configuration maps. |
'configuration': JSON.encode(id.configuration) |
- })).then((transformers) { |
+ }).then((transformers) { |
transformers = transformers.map(_deserializeTransformerOrGroup).toSet(); |
log.fine("Transformers from $assetId: $transformers"); |
return transformers; |
@@ -422,17 +452,17 @@ class _ForeignTransformer extends Transformer { |
_toString = map['toString']; |
Future<bool> isPrimary(Asset asset) { |
- return _receiveFuture(_port.call({ |
+ return _callAndReceiveFuture(_port, { |
'type': 'isPrimary', |
'asset': _serializeAsset(asset) |
- })); |
+ }); |
} |
Future apply(Transform transform) { |
- return _receiveFuture(_port.call({ |
+ return _callAndReceiveFuture(_port, { |
'type': 'apply', |
'transform': _serializeTransform(transform) |
- })); |
+ }); |
} |
String toString() => _toString; |
@@ -464,7 +494,8 @@ _deserializeTransformerOrGroup(Map map) { |
/// Converts [transform] into a serializable map. |
Map _serializeTransform(Transform transform) { |
var receivePort = new ReceivePort(); |
- receivePort.receive((message, replyTo) { |
+ receivePort.listen((message) { |
+ var replyTo = message['replyTo']; |
if (message['type'] == 'getInput') { |
_sendFuture(replyTo, transform.getInput(_deserializeId(message['id'])) |
.then(_serializeAsset)); |
@@ -492,16 +523,34 @@ Map _serializeTransform(Transform transform) { |
}); |
return { |
- 'port': receivePort.toSendPort(), |
+ 'port': receivePort.sendPort, |
'primaryInput': _serializeAsset(transform.primaryInput) |
}; |
} |
+/// When the input receives a 'done' as data-event, transforms it to a |
+/// done event and cancels the subscription. |
+StreamSubscription doneTransformer(Stream input, bool cancelOnError) { |
+ var subscription; |
+ var transformed = input.transform(new StreamTransformer.fromHandlers( |
+ handleData: (data, sink) { |
+ if (data == 'done') { |
+ sink.close(); |
+ subscription.cancel(); |
+ } else { |
+ sink.add(data); |
+ } |
+ })); |
+ subscription = transformed.listen(null, cancelOnError: cancelOnError); |
+ return subscription; |
+} |
+ |
/// Converts a serializable map into an [Asset]. |
Asset _deserializeAsset(Map asset) { |
- var box = new MessageBox(); |
- asset['sink'].add(box.sink); |
- return new Asset.fromStream(_deserializeId(asset['id']), box.stream); |
+ var receivePort = new ReceivePort(); |
+ asset['sendPort'].send(receivePort.sendPort); |
+ var stream = receivePort.transform(const StreamTransformer(doneTransformer)); |
+ return new Asset.fromStream(_deserializeId(asset['id']), stream); |
} |
/// Converts a serializable map into an [AssetId]. |
@@ -528,16 +577,18 @@ Location _deserializeLocation(Map location) { |
Map _serializeAsset(Asset asset) { |
// We can't send IsolateStreams (issue 12437), so instead we send a sink and |
// get the isolate to send us back another sink. |
- var box = new MessageBox(); |
- box.stream.first.then((sink) { |
- asset.read().listen(sink.add, |
- onError: sink.addError, |
- onDone: sink.close); |
+ var receivePort = new ReceivePort(); |
+ receivePort.first.then((sendPort) { |
+ asset.read().listen(sendPort.send, |
+ onError: (error, stackTrace) { |
+ throw new UnimplementedError('Error during asset serialization'); |
+ }, |
+ onDone: () { sendPort.send('done'); }); |
}); |
return { |
'id': _serializeId(asset.id), |
- 'sink': box.sink |
+ 'sendPort': receivePort.sendPort |
}; |
} |
@@ -546,7 +597,7 @@ Map _serializeId(AssetId id) => {'package': id.package, 'path': id.path}; |
/// Sends the result of [future] through [port]. |
/// |
-/// This should be received on the other end using [_receiveFuture]. It |
+/// This should be received on the other end using [_callAndReceiveFuture]. It |
/// re-raises any exceptions on the other side as [dart.CrossIsolateException]s. |
void _sendFuture(SendPort port, Future future) { |
future.then((result) { |
@@ -559,10 +610,17 @@ void _sendFuture(SendPort port, Future future) { |
/// Receives the result of [_sendFuture] from [portCall], which should be the |
/// return value of [SendPort.call]. |
-Future _receiveFuture(Future portCall) { |
- return portCall.then((response) { |
- if (response.containsKey('success')) return response['success']; |
- return new Future.error( |
- new dart.CrossIsolateException.deserialize(response['error'])); |
+/// |
+/// The [message] argument is modified to include the [replyTo] port. |
+Future _callAndReceiveFuture(SendPort port, Map message) { |
+ var responsePort = new ReceivePort(); |
+ message['replyTo'] = responsePort.sendPort; |
+ return new Future.sync(() { |
+ port.send(message); |
+ return responsePort.first.then((response) { |
+ if (response.containsKey('success')) return response['success']; |
+ return new Future.error( |
+ new dart.CrossIsolateException.deserialize(response['error'])); |
+ }); |
}); |
} |