| Index: sdk/lib/_internal/pub/lib/src/barback/load_transformers.dart
|
| diff --git a/sdk/lib/_internal/pub/lib/src/barback/load_transformers.dart b/sdk/lib/_internal/pub/lib/src/barback/load_transformers.dart
|
| index 5481a50c0f69a73788b72ea90ed251d83972d45d..3eb02b7219878d829d38d9ca8e5fdeb69f79f8af 100644
|
| --- a/sdk/lib/_internal/pub/lib/src/barback/load_transformers.dart
|
| +++ b/sdk/lib/_internal/pub/lib/src/barback/load_transformers.dart
|
| @@ -32,9 +32,11 @@ import 'http://<<HOST_AND_PORT>>/packages/stack_trace/stack_trace.dart';
|
| import 'http://<<HOST_AND_PORT>>/packages/barback/barback.dart';
|
|
|
| /// Sets up the initial communication with the host isolate.
|
| -void main() {
|
| - port.receive((args, replyTo) {
|
| - _sendFuture(replyTo, new Future.sync(() {
|
| +void main(List<String> args, SendPort replyTo) {
|
| + var port = new ReceivePort();
|
| + replyTo.send(['success', port.sendPort]);
|
| + port.listen((args) {
|
| + _sendFuture(args['replyTo'], new Future.sync(() {
|
| var library = Uri.parse(args['library']);
|
| var configuration = JSON.decode(args['configuration']);
|
| return initialize(library, configuration).
|
| @@ -112,10 +114,10 @@ class ForeignTransform implements Transform {
|
| }
|
|
|
| Future<Asset> getInput(AssetId id) {
|
| - return _receiveFuture(_port.call({
|
| + return _callAndReceiveFuture(_port, {
|
| 'type': 'getInput',
|
| 'id': _serializeId(id)
|
| - })).then(_deserializeAsset);
|
| + }).then(_deserializeAsset);
|
| }
|
|
|
| Future<String> readInputAsString(AssetId id, {Encoding encoding}) {
|
| @@ -181,7 +183,8 @@ Map _serializeTransformerOrGroup(transformerOrGroup) {
|
| /// Converts [transformer] into a serializable map.
|
| Map _serializeTransformer(Transformer transformer) {
|
| var port = new ReceivePort();
|
| - port.receive((message, replyTo) {
|
| + port.listen((message) {
|
| + var replyTo = message['replyTo'];
|
| _sendFuture(replyTo, new Future.sync(() {
|
| if (message['type'] == 'isPrimary') {
|
| return transformer.isPrimary(_deserializeAsset(message['asset']));
|
| @@ -196,7 +199,7 @@ Map _serializeTransformer(Transformer transformer) {
|
| return {
|
| 'type': 'Transformer',
|
| 'toString': transformer.toString(),
|
| - 'port': port.toSendPort()
|
| + 'port': port.sendPort
|
| };
|
| }
|
|
|
| @@ -211,11 +214,29 @@ Map _serializeTransformerGroup(TransformerGroup group) {
|
| };
|
| }
|
|
|
| +/// When the input receives a 'done' as data-event, transforms it to a
|
| +/// done event and cancels the subscription.
|
| +StreamSubscription doneTransformer(Stream input, bool cancelOnError) {
|
| + var subscription;
|
| + var transformed = input.transform(new StreamTransformer.fromHandlers(
|
| + handleData: (data, sink) {
|
| + if (data == 'done') {
|
| + sink.close();
|
| + subscription.cancel();
|
| + } else {
|
| + sink.add(data);
|
| + }
|
| + }));
|
| + subscription = transformed.listen(null, cancelOnError: cancelOnError);
|
| + return subscription;
|
| +}
|
| +
|
| /// Converts a serializable map into an [Asset].
|
| Asset _deserializeAsset(Map asset) {
|
| - var box = new MessageBox();
|
| - asset['sink'].add(box.sink);
|
| - return new Asset.fromStream(_deserializeId(asset['id']), box.stream);
|
| + var receivePort = new ReceivePort();
|
| + asset['sendPort'].send(receivePort.sendPort);
|
| + var stream = receivePort.transform(const StreamTransformer(doneTransformer));
|
| + return new Asset.fromStream(_deserializeId(asset['id']), stream);
|
| }
|
|
|
| /// Converts a serializable map into an [AssetId].
|
| @@ -225,16 +246,18 @@ AssetId _deserializeId(Map id) => new AssetId(id['package'], id['path']);
|
| Map _serializeAsset(Asset asset) {
|
| // We can't send IsolateStreams (issue 12437), so instead we send a sink and
|
| // get the isolate to send us back another sink.
|
| - var box = new MessageBox();
|
| - box.stream.first.then((sink) {
|
| - asset.read().listen(sink.add,
|
| - onError: sink.addError,
|
| - onDone: sink.close);
|
| + var receivePort = new ReceivePort();
|
| + receivePort.first.then((sendPort) {
|
| + asset.read().listen(sendPort.send,
|
| + onError: (error, stackTrace) {
|
| + throw new UnimplementedError('Error during asset serialization');
|
| + },
|
| + onDone: () { sendPort.send('done'); });
|
| });
|
|
|
| return {
|
| 'id': _serializeId(asset.id),
|
| - 'sink': box.sink
|
| + 'sendPort': receivePort.sendPort
|
| };
|
| }
|
|
|
| @@ -280,11 +303,18 @@ void _sendFuture(SendPort port, Future future) {
|
|
|
| /// Receives the result of [_sendFuture] from [portCall], which should be the
|
| /// return value of [SendPort.call].
|
| -Future _receiveFuture(Future portCall) {
|
| - return portCall.then((response) {
|
| - if (response.containsKey('success')) return response['success'];
|
| - return new Future.error(
|
| - new CrossIsolateException.deserialize(response['error']));
|
| +///
|
| +/// The [message] argument is modified to include the [replyTo] port.
|
| +Future _callAndReceiveFuture(SendPort port, Map message) {
|
| + var responsePort = new ReceivePort();
|
| + message['replyTo'] = responsePort.sendPort;
|
| + return new Future.sync(() {
|
| + port.send(message);
|
| + return responsePort.first.then((response) {
|
| + if (response.containsKey('success')) return response['success'];
|
| + return new Future.error(
|
| + new CrossIsolateException.deserialize(response['error']));
|
| + });
|
| });
|
| }
|
|
|
| @@ -379,11 +409,11 @@ Future<Set> loadTransformers(BarbackServer server, TransformerId id) {
|
| log.fine("Loading transformers from $assetId");
|
|
|
| return dart.runInIsolate(code).then((sendPort) {
|
| - return _receiveFuture(sendPort.call({
|
| + return _callAndReceiveFuture(sendPort, {
|
| 'library': uri,
|
| // TODO(nweiz): support non-JSON-encodable configuration maps.
|
| 'configuration': JSON.encode(id.configuration)
|
| - })).then((transformers) {
|
| + }).then((transformers) {
|
| transformers = transformers.map(_deserializeTransformerOrGroup).toSet();
|
| log.fine("Transformers from $assetId: $transformers");
|
| return transformers;
|
| @@ -422,17 +452,17 @@ class _ForeignTransformer extends Transformer {
|
| _toString = map['toString'];
|
|
|
| Future<bool> isPrimary(Asset asset) {
|
| - return _receiveFuture(_port.call({
|
| + return _callAndReceiveFuture(_port, {
|
| 'type': 'isPrimary',
|
| 'asset': _serializeAsset(asset)
|
| - }));
|
| + });
|
| }
|
|
|
| Future apply(Transform transform) {
|
| - return _receiveFuture(_port.call({
|
| + return _callAndReceiveFuture(_port, {
|
| 'type': 'apply',
|
| 'transform': _serializeTransform(transform)
|
| - }));
|
| + });
|
| }
|
|
|
| String toString() => _toString;
|
| @@ -464,7 +494,8 @@ _deserializeTransformerOrGroup(Map map) {
|
| /// Converts [transform] into a serializable map.
|
| Map _serializeTransform(Transform transform) {
|
| var receivePort = new ReceivePort();
|
| - receivePort.receive((message, replyTo) {
|
| + receivePort.listen((message) {
|
| + var replyTo = message['replyTo'];
|
| if (message['type'] == 'getInput') {
|
| _sendFuture(replyTo, transform.getInput(_deserializeId(message['id']))
|
| .then(_serializeAsset));
|
| @@ -492,16 +523,34 @@ Map _serializeTransform(Transform transform) {
|
| });
|
|
|
| return {
|
| - 'port': receivePort.toSendPort(),
|
| + 'port': receivePort.sendPort,
|
| 'primaryInput': _serializeAsset(transform.primaryInput)
|
| };
|
| }
|
|
|
| +/// When the input receives a 'done' as data-event, transforms it to a
|
| +/// done event and cancels the subscription.
|
| +StreamSubscription doneTransformer(Stream input, bool cancelOnError) {
|
| + var subscription;
|
| + var transformed = input.transform(new StreamTransformer.fromHandlers(
|
| + handleData: (data, sink) {
|
| + if (data == 'done') {
|
| + sink.close();
|
| + subscription.cancel();
|
| + } else {
|
| + sink.add(data);
|
| + }
|
| + }));
|
| + subscription = transformed.listen(null, cancelOnError: cancelOnError);
|
| + return subscription;
|
| +}
|
| +
|
| /// Converts a serializable map into an [Asset].
|
| Asset _deserializeAsset(Map asset) {
|
| - var box = new MessageBox();
|
| - asset['sink'].add(box.sink);
|
| - return new Asset.fromStream(_deserializeId(asset['id']), box.stream);
|
| + var receivePort = new ReceivePort();
|
| + asset['sendPort'].send(receivePort.sendPort);
|
| + var stream = receivePort.transform(const StreamTransformer(doneTransformer));
|
| + return new Asset.fromStream(_deserializeId(asset['id']), stream);
|
| }
|
|
|
| /// Converts a serializable map into an [AssetId].
|
| @@ -528,16 +577,18 @@ Location _deserializeLocation(Map location) {
|
| Map _serializeAsset(Asset asset) {
|
| // We can't send IsolateStreams (issue 12437), so instead we send a sink and
|
| // get the isolate to send us back another sink.
|
| - var box = new MessageBox();
|
| - box.stream.first.then((sink) {
|
| - asset.read().listen(sink.add,
|
| - onError: sink.addError,
|
| - onDone: sink.close);
|
| + var receivePort = new ReceivePort();
|
| + receivePort.first.then((sendPort) {
|
| + asset.read().listen(sendPort.send,
|
| + onError: (error, stackTrace) {
|
| + throw new UnimplementedError('Error during asset serialization');
|
| + },
|
| + onDone: () { sendPort.send('done'); });
|
| });
|
|
|
| return {
|
| 'id': _serializeId(asset.id),
|
| - 'sink': box.sink
|
| + 'sendPort': receivePort.sendPort
|
| };
|
| }
|
|
|
| @@ -546,7 +597,7 @@ Map _serializeId(AssetId id) => {'package': id.package, 'path': id.path};
|
|
|
| /// Sends the result of [future] through [port].
|
| ///
|
| -/// This should be received on the other end using [_receiveFuture]. It
|
| +/// This should be received on the other end using [_callAndReceiveFuture]. It
|
| /// re-raises any exceptions on the other side as [dart.CrossIsolateException]s.
|
| void _sendFuture(SendPort port, Future future) {
|
| future.then((result) {
|
| @@ -559,10 +610,17 @@ void _sendFuture(SendPort port, Future future) {
|
|
|
| /// Receives the result of [_sendFuture] from [portCall], which should be the
|
| /// return value of [SendPort.call].
|
| -Future _receiveFuture(Future portCall) {
|
| - return portCall.then((response) {
|
| - if (response.containsKey('success')) return response['success'];
|
| - return new Future.error(
|
| - new dart.CrossIsolateException.deserialize(response['error']));
|
| +///
|
| +/// The [message] argument is modified to include the [replyTo] port.
|
| +Future _callAndReceiveFuture(SendPort port, Map message) {
|
| + var responsePort = new ReceivePort();
|
| + message['replyTo'] = responsePort.sendPort;
|
| + return new Future.sync(() {
|
| + port.send(message);
|
| + return responsePort.first.then((response) {
|
| + if (response.containsKey('success')) return response['success'];
|
| + return new Future.error(
|
| + new dart.CrossIsolateException.deserialize(response['error']));
|
| + });
|
| });
|
| }
|
|
|