Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1716)

Unified Diff: sdk/lib/_internal/pub/lib/src/barback/load_transformers.dart

Issue 50303005: Clean up some pub and scheduled_test integration with the new isolate API. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Created 7 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: sdk/lib/_internal/pub/lib/src/barback/load_transformers.dart
diff --git a/sdk/lib/_internal/pub/lib/src/barback/load_transformers.dart b/sdk/lib/_internal/pub/lib/src/barback/load_transformers.dart
index 7a5e0f32c54efd09540340298c45e1244e429d4a..621e85a1bb7df2446bc9f1be9c67079eeabdc9b1 100644
--- a/sdk/lib/_internal/pub/lib/src/barback/load_transformers.dart
+++ b/sdk/lib/_internal/pub/lib/src/barback/load_transformers.dart
@@ -32,16 +32,16 @@ import 'http://<<HOST_AND_PORT>>/packages/stack_trace/stack_trace.dart';
import 'http://<<HOST_AND_PORT>>/packages/barback/barback.dart';
/// Sets up the initial communication with the host isolate.
-void main(List<String> args, SendPort replyTo) {
+void main(_, SendPort replyTo) {
var port = new ReceivePort();
- replyTo.send(['success', port.sendPort]);
- port.listen((args) {
- _sendFuture(args['replyTo'], new Future.sync(() {
- var library = Uri.parse(args['library']);
- var configuration = JSON.decode(args['configuration']);
+ replyTo.send(port.sendPort);
+ port.first.then((wrappedMessage) {
+ _respond(wrappedMessage, (message) {
+ var library = Uri.parse(message['library']);
+ var configuration = JSON.decode(message['configuration']);
return initialize(library, configuration).
map(_serializeTransformerOrGroup).toList();
- }));
+ });
});
}
@@ -103,7 +103,7 @@ class ForeignTransform implements Transform {
: _port = transform['port'],
primaryInput = _deserializeAsset(transform['primaryInput']) {
_logger = new TransformLogger((assetId, level, message, span) {
- _port.send({
+ _call(_port, {
'type': 'log',
'level': level.name,
'message': message,
@@ -114,7 +114,7 @@ class ForeignTransform implements Transform {
}
Future<Asset> getInput(AssetId id) {
- return _callAndReceiveFuture(_port, {
+ return _call(_port, {
'type': 'getInput',
'id': _serializeId(id)
}).then(_deserializeAsset);
@@ -129,7 +129,7 @@ class ForeignTransform implements Transform {
_futureStream(getInput(id).then((input) => input.read()));
void addOutput(Asset output) {
- _port.send({
+ _call(_port, {
'type': 'addOutput',
'output': _serializeAsset(output)
});
@@ -183,9 +183,8 @@ Map _serializeTransformerOrGroup(transformerOrGroup) {
/// Converts [transformer] into a serializable map.
Map _serializeTransformer(Transformer transformer) {
var port = new ReceivePort();
- port.listen((message) {
- var replyTo = message['replyTo'];
- _sendFuture(replyTo, new Future.sync(() {
+ port.listen((wrappedMessage) {
+ _respond(wrappedMessage, (message) {
if (message['type'] == 'isPrimary') {
return transformer.isPrimary(_deserializeAsset(message['asset']));
} else {
@@ -193,7 +192,7 @@ Map _serializeTransformer(Transformer transformer) {
return transformer.apply(
new ForeignTransform(message['transform']));
}
- }));
+ });
});
return {
@@ -311,33 +310,42 @@ Map _serializeLocation(Location location) {
};
}
-/// Sends the result of [future] through [port].
+/// Responds to a message sent by [_call].
///
-/// This should be received on the other end using [_receiveFuture]. It
-/// re-raises any exceptions on the other side as [CrossIsolateException]s.
-void _sendFuture(SendPort port, Future future) {
- future.then((result) {
- port.send({'success': result});
- }).catchError((error) {
+/// [wrappedMessage] is the raw message sent by [_call]. This unwraps it and
+/// passes the contents of the message to [callback], then sends the return
+/// value of [callback] back to [_call]. If [callback] returns a Future or
+/// throws an error, that will also be sent.
+void _respond(wrappedMessage, callback(message)) {
+ var replyTo = wrappedMessage['replyTo'];
+ new Future.sync(() => callback(wrappedMessage['message']))
+ .then((result) => replyTo.send({'type': 'success', 'value': result}))
+ .catchError((error, stackTrace) {
// TODO(nweiz): at least MissingInputException should be preserved here.
- port.send({'error': CrossIsolateException.serialize(error)});
+ replyTo.send({
+ 'type': 'error',
+ 'error': CrossIsolateException.serialize(error, stackTrace)
+ });
});
}
-/// Receives the result of [_sendFuture] from [portCall], which should be the
-/// return value of [SendPort.call].
+/// Wraps [message] and sends it across [port], then waits for a response which
+/// should be sent using [_respond].
///
-/// The [message] argument is modified to include the [replyTo] port.
-Future _callAndReceiveFuture(SendPort port, Map message) {
- var responsePort = new ReceivePort();
- message['replyTo'] = responsePort.sendPort;
- return new Future.sync(() {
- port.send(message);
- return responsePort.first.then((response) {
- if (response.containsKey('success')) return response['success'];
- return new Future.error(
- new CrossIsolateException.deserialize(response['error']));
- });
+/// The returned Future will complete to the value or error returned by
+/// [_respond].
+Future _call(SendPort port, message) {
+ var receivePort = new ReceivePort();
+ port.send({
+ 'message': message,
+ 'replyTo': receivePort.sendPort
+ });
+
+ return receivePort.first.then((response) {
+ if (response['type'] == 'success') return response['value'];
+ assert(response['type'] == 'error');
+ return new Future.error(
+ new CrossIsolateException.deserialize(response['error']));
});
}
@@ -446,8 +454,11 @@ Future<Set> loadTransformers(BarbackServer server, TransformerId id) {
_TRANSFORMER_ISOLATE.replaceAll('<<HOST_AND_PORT>>', hostAndPort);
log.fine("Loading transformers from $assetId");
- return dart.runInIsolate(code).then((sendPort) {
- return _callAndReceiveFuture(sendPort, {
+ var port = new ReceivePort();
+ return dart.runInIsolate(code, port.sendPort)
+ .then((_) => port.first)
+ .then((sendPort) {
+ return _call(sendPort, {
'library': uri,
// TODO(nweiz): support non-JSON-encodable configuration maps.
'configuration': JSON.encode(id.configuration)
@@ -490,14 +501,14 @@ class _ForeignTransformer extends Transformer {
_toString = map['toString'];
Future<bool> isPrimary(Asset asset) {
- return _callAndReceiveFuture(_port, {
+ return _call(_port, {
'type': 'isPrimary',
'asset': _serializeAsset(asset)
});
}
Future apply(Transform transform) {
- return _callAndReceiveFuture(_port, {
+ return _call(_port, {
'type': 'apply',
'transform': _serializeTransform(transform)
});
@@ -532,16 +543,19 @@ _deserializeTransformerOrGroup(Map map) {
/// Converts [transform] into a serializable map.
Map _serializeTransform(Transform transform) {
var receivePort = new ReceivePort();
- receivePort.listen((message) {
- var replyTo = message['replyTo'];
- if (message['type'] == 'getInput') {
- _sendFuture(replyTo, transform.getInput(_deserializeId(message['id']))
- .then(_serializeAsset));
- } else if (message['type'] == 'addOutput') {
- transform.addOutput(_deserializeAsset(message['output']));
- } else {
- assert(message['type'] == 'log');
+ receivePort.listen((wrappedMessage) {
+ _respond(wrappedMessage, (message) {
+ if (message['type'] == 'getInput') {
+ return transform.getInput(_deserializeId(message['id']))
+ .then(_serializeAsset);
+ }
+ if (message['type'] == 'addOutput') {
+ transform.addOutput(_deserializeAsset(message['output']));
+ return;
+ }
+
+ assert(message['type'] == 'log');
var method;
if (message['level'] == 'Info') {
method = transform.logger.info;
@@ -557,7 +571,7 @@ Map _serializeTransform(Transform transform) {
var span = message['span'] == null ? null :
_deserializeSpan(message['span']);
method(message['message'], asset: assetId, span: span);
- }
+ });
});
return {
@@ -656,32 +670,41 @@ SendPort _serializeStream(Stream stream) {
/// Converts [id] into a serializable map.
Map _serializeId(AssetId id) => {'package': id.package, 'path': id.path};
-/// Sends the result of [future] through [port].
+/// Responds to a message sent by [_call].
///
-/// This should be received on the other end using [_callAndReceiveFuture]. It
-/// re-raises any exceptions on the other side as [dart.CrossIsolateException]s.
-void _sendFuture(SendPort port, Future future) {
- future.then((result) {
- port.send({'success': result});
- }).catchError((error) {
+/// [wrappedMessage] is the raw message sent by [_call]. This unwraps it and
+/// passes the contents of the message to [callback], then sends the return
+/// value of [callback] back to [_call]. If [callback] returns a Future or
+/// throws an error, that will also be sent.
+void _respond(wrappedMessage, callback(message)) {
+ var replyTo = wrappedMessage['replyTo'];
+ new Future.sync(() => callback(wrappedMessage['message']))
+ .then((result) => replyTo.send({'type': 'success', 'value': result}))
+ .catchError((error, stackTrace) {
// TODO(nweiz): at least MissingInputException should be preserved here.
- port.send({'error': dart.CrossIsolateException.serialize(error)});
+ replyTo.send({
+ 'type': 'error',
+ 'error': dart.CrossIsolateException.serialize(error, stackTrace)
+ });
});
}
-/// Receives the result of [_sendFuture] from [portCall], which should be the
-/// return value of [SendPort.call].
+/// Wraps [message] and sends it across [port], then waits for a response which
+/// should be sent using [_respond].
///
-/// The [message] argument is modified to include the [replyTo] port.
-Future _callAndReceiveFuture(SendPort port, Map message) {
- var responsePort = new ReceivePort();
- message['replyTo'] = responsePort.sendPort;
- return new Future.sync(() {
- port.send(message);
- return responsePort.first.then((response) {
- if (response.containsKey('success')) return response['success'];
- return new Future.error(
- new dart.CrossIsolateException.deserialize(response['error']));
- });
+/// The returned Future will complete to the value or error returned by
+/// [_respond].
+Future _call(SendPort port, message) {
+ var receivePort = new ReceivePort();
+ port.send({
+ 'message': message,
+ 'replyTo': receivePort.sendPort
+ });
+
+ return receivePort.first.then((response) {
+ if (response['type'] == 'success') return response['value'];
+ assert(response['type'] == 'error');
+ return new Future.error(
+ new dart.CrossIsolateException.deserialize(response['error']));
});
}
« no previous file with comments | « pkg/scheduled_test/test/scheduled_test/wrap_future_test.dart ('k') | sdk/lib/_internal/pub/lib/src/dart.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698