Index: sdk/lib/_internal/pub/lib/src/barback/load_transformers.dart |
diff --git a/sdk/lib/_internal/pub/lib/src/barback/load_transformers.dart b/sdk/lib/_internal/pub/lib/src/barback/load_transformers.dart |
index 7a5e0f32c54efd09540340298c45e1244e429d4a..621e85a1bb7df2446bc9f1be9c67079eeabdc9b1 100644 |
--- a/sdk/lib/_internal/pub/lib/src/barback/load_transformers.dart |
+++ b/sdk/lib/_internal/pub/lib/src/barback/load_transformers.dart |
@@ -32,16 +32,16 @@ import 'http://<<HOST_AND_PORT>>/packages/stack_trace/stack_trace.dart'; |
import 'http://<<HOST_AND_PORT>>/packages/barback/barback.dart'; |
/// Sets up the initial communication with the host isolate. |
-void main(List<String> args, SendPort replyTo) { |
+void main(_, SendPort replyTo) { |
var port = new ReceivePort(); |
- replyTo.send(['success', port.sendPort]); |
- port.listen((args) { |
- _sendFuture(args['replyTo'], new Future.sync(() { |
- var library = Uri.parse(args['library']); |
- var configuration = JSON.decode(args['configuration']); |
+ replyTo.send(port.sendPort); |
+ port.first.then((wrappedMessage) { |
+ _respond(wrappedMessage, (message) { |
+ var library = Uri.parse(message['library']); |
+ var configuration = JSON.decode(message['configuration']); |
return initialize(library, configuration). |
map(_serializeTransformerOrGroup).toList(); |
- })); |
+ }); |
}); |
} |
@@ -103,7 +103,7 @@ class ForeignTransform implements Transform { |
: _port = transform['port'], |
primaryInput = _deserializeAsset(transform['primaryInput']) { |
_logger = new TransformLogger((assetId, level, message, span) { |
- _port.send({ |
+ _call(_port, { |
'type': 'log', |
'level': level.name, |
'message': message, |
@@ -114,7 +114,7 @@ class ForeignTransform implements Transform { |
} |
Future<Asset> getInput(AssetId id) { |
- return _callAndReceiveFuture(_port, { |
+ return _call(_port, { |
'type': 'getInput', |
'id': _serializeId(id) |
}).then(_deserializeAsset); |
@@ -129,7 +129,7 @@ class ForeignTransform implements Transform { |
_futureStream(getInput(id).then((input) => input.read())); |
void addOutput(Asset output) { |
- _port.send({ |
+ _call(_port, { |
'type': 'addOutput', |
'output': _serializeAsset(output) |
}); |
@@ -183,9 +183,8 @@ Map _serializeTransformerOrGroup(transformerOrGroup) { |
/// Converts [transformer] into a serializable map. |
Map _serializeTransformer(Transformer transformer) { |
var port = new ReceivePort(); |
- port.listen((message) { |
- var replyTo = message['replyTo']; |
- _sendFuture(replyTo, new Future.sync(() { |
+ port.listen((wrappedMessage) { |
+ _respond(wrappedMessage, (message) { |
if (message['type'] == 'isPrimary') { |
return transformer.isPrimary(_deserializeAsset(message['asset'])); |
} else { |
@@ -193,7 +192,7 @@ Map _serializeTransformer(Transformer transformer) { |
return transformer.apply( |
new ForeignTransform(message['transform'])); |
} |
- })); |
+ }); |
}); |
return { |
@@ -311,33 +310,42 @@ Map _serializeLocation(Location location) { |
}; |
} |
-/// Sends the result of [future] through [port]. |
+/// Responds to a message sent by [_call]. |
/// |
-/// This should be received on the other end using [_receiveFuture]. It |
-/// re-raises any exceptions on the other side as [CrossIsolateException]s. |
-void _sendFuture(SendPort port, Future future) { |
- future.then((result) { |
- port.send({'success': result}); |
- }).catchError((error) { |
+/// [wrappedMessage] is the raw message sent by [_call]. This unwraps it and |
+/// passes the contents of the message to [callback], then sends the return |
+/// value of [callback] back to [_call]. If [callback] returns a Future or |
+/// throws an error, that will also be sent. |
+void _respond(wrappedMessage, callback(message)) { |
+ var replyTo = wrappedMessage['replyTo']; |
+ new Future.sync(() => callback(wrappedMessage['message'])) |
+ .then((result) => replyTo.send({'type': 'success', 'value': result})) |
+ .catchError((error, stackTrace) { |
// TODO(nweiz): at least MissingInputException should be preserved here. |
- port.send({'error': CrossIsolateException.serialize(error)}); |
+ replyTo.send({ |
+ 'type': 'error', |
+ 'error': CrossIsolateException.serialize(error, stackTrace) |
+ }); |
}); |
} |
-/// Receives the result of [_sendFuture] from [portCall], which should be the |
-/// return value of [SendPort.call]. |
+/// Wraps [message] and sends it across [port], then waits for a response which |
+/// should be sent using [_respond]. |
/// |
-/// The [message] argument is modified to include the [replyTo] port. |
-Future _callAndReceiveFuture(SendPort port, Map message) { |
- var responsePort = new ReceivePort(); |
- message['replyTo'] = responsePort.sendPort; |
- return new Future.sync(() { |
- port.send(message); |
- return responsePort.first.then((response) { |
- if (response.containsKey('success')) return response['success']; |
- return new Future.error( |
- new CrossIsolateException.deserialize(response['error'])); |
- }); |
+/// The returned Future will complete to the value or error returned by |
+/// [_respond]. |
+Future _call(SendPort port, message) { |
+ var receivePort = new ReceivePort(); |
+ port.send({ |
+ 'message': message, |
+ 'replyTo': receivePort.sendPort |
+ }); |
+ |
+ return receivePort.first.then((response) { |
+ if (response['type'] == 'success') return response['value']; |
+ assert(response['type'] == 'error'); |
+ return new Future.error( |
+ new CrossIsolateException.deserialize(response['error'])); |
}); |
} |
@@ -446,8 +454,11 @@ Future<Set> loadTransformers(BarbackServer server, TransformerId id) { |
_TRANSFORMER_ISOLATE.replaceAll('<<HOST_AND_PORT>>', hostAndPort); |
log.fine("Loading transformers from $assetId"); |
- return dart.runInIsolate(code).then((sendPort) { |
- return _callAndReceiveFuture(sendPort, { |
+ var port = new ReceivePort(); |
+ return dart.runInIsolate(code, port.sendPort) |
+ .then((_) => port.first) |
+ .then((sendPort) { |
+ return _call(sendPort, { |
'library': uri, |
// TODO(nweiz): support non-JSON-encodable configuration maps. |
'configuration': JSON.encode(id.configuration) |
@@ -490,14 +501,14 @@ class _ForeignTransformer extends Transformer { |
_toString = map['toString']; |
Future<bool> isPrimary(Asset asset) { |
- return _callAndReceiveFuture(_port, { |
+ return _call(_port, { |
'type': 'isPrimary', |
'asset': _serializeAsset(asset) |
}); |
} |
Future apply(Transform transform) { |
- return _callAndReceiveFuture(_port, { |
+ return _call(_port, { |
'type': 'apply', |
'transform': _serializeTransform(transform) |
}); |
@@ -532,16 +543,19 @@ _deserializeTransformerOrGroup(Map map) { |
/// Converts [transform] into a serializable map. |
Map _serializeTransform(Transform transform) { |
var receivePort = new ReceivePort(); |
- receivePort.listen((message) { |
- var replyTo = message['replyTo']; |
- if (message['type'] == 'getInput') { |
- _sendFuture(replyTo, transform.getInput(_deserializeId(message['id'])) |
- .then(_serializeAsset)); |
- } else if (message['type'] == 'addOutput') { |
- transform.addOutput(_deserializeAsset(message['output'])); |
- } else { |
- assert(message['type'] == 'log'); |
+ receivePort.listen((wrappedMessage) { |
+ _respond(wrappedMessage, (message) { |
+ if (message['type'] == 'getInput') { |
+ return transform.getInput(_deserializeId(message['id'])) |
+ .then(_serializeAsset); |
+ } |
+ if (message['type'] == 'addOutput') { |
+ transform.addOutput(_deserializeAsset(message['output'])); |
+ return; |
+ } |
+ |
+ assert(message['type'] == 'log'); |
var method; |
if (message['level'] == 'Info') { |
method = transform.logger.info; |
@@ -557,7 +571,7 @@ Map _serializeTransform(Transform transform) { |
var span = message['span'] == null ? null : |
_deserializeSpan(message['span']); |
method(message['message'], asset: assetId, span: span); |
- } |
+ }); |
}); |
return { |
@@ -656,32 +670,41 @@ SendPort _serializeStream(Stream stream) { |
/// Converts [id] into a serializable map. |
Map _serializeId(AssetId id) => {'package': id.package, 'path': id.path}; |
-/// Sends the result of [future] through [port]. |
+/// Responds to a message sent by [_call]. |
/// |
-/// This should be received on the other end using [_callAndReceiveFuture]. It |
-/// re-raises any exceptions on the other side as [dart.CrossIsolateException]s. |
-void _sendFuture(SendPort port, Future future) { |
- future.then((result) { |
- port.send({'success': result}); |
- }).catchError((error) { |
+/// [wrappedMessage] is the raw message sent by [_call]. This unwraps it and |
+/// passes the contents of the message to [callback], then sends the return |
+/// value of [callback] back to [_call]. If [callback] returns a Future or |
+/// throws an error, that will also be sent. |
+void _respond(wrappedMessage, callback(message)) { |
+ var replyTo = wrappedMessage['replyTo']; |
+ new Future.sync(() => callback(wrappedMessage['message'])) |
+ .then((result) => replyTo.send({'type': 'success', 'value': result})) |
+ .catchError((error, stackTrace) { |
// TODO(nweiz): at least MissingInputException should be preserved here. |
- port.send({'error': dart.CrossIsolateException.serialize(error)}); |
+ replyTo.send({ |
+ 'type': 'error', |
+ 'error': dart.CrossIsolateException.serialize(error, stackTrace) |
+ }); |
}); |
} |
-/// Receives the result of [_sendFuture] from [portCall], which should be the |
-/// return value of [SendPort.call]. |
+/// Wraps [message] and sends it across [port], then waits for a response which |
+/// should be sent using [_respond]. |
/// |
-/// The [message] argument is modified to include the [replyTo] port. |
-Future _callAndReceiveFuture(SendPort port, Map message) { |
- var responsePort = new ReceivePort(); |
- message['replyTo'] = responsePort.sendPort; |
- return new Future.sync(() { |
- port.send(message); |
- return responsePort.first.then((response) { |
- if (response.containsKey('success')) return response['success']; |
- return new Future.error( |
- new dart.CrossIsolateException.deserialize(response['error'])); |
- }); |
+/// The returned Future will complete to the value or error returned by |
+/// [_respond]. |
+Future _call(SendPort port, message) { |
+ var receivePort = new ReceivePort(); |
+ port.send({ |
+ 'message': message, |
+ 'replyTo': receivePort.sendPort |
+ }); |
+ |
+ return receivePort.first.then((response) { |
+ if (response['type'] == 'success') return response['value']; |
+ assert(response['type'] == 'error'); |
+ return new Future.error( |
+ new dart.CrossIsolateException.deserialize(response['error'])); |
}); |
} |