Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(626)

Unified Diff: sdk/lib/_internal/pub/lib/src/barback/load_transformers.dart

Issue 27215002: Very simple version of Isolates. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Address comments. Created 7 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « sdk/lib/_internal/lib/isolate_patch.dart ('k') | sdk/lib/_internal/pub/lib/src/dart.dart » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: sdk/lib/_internal/pub/lib/src/barback/load_transformers.dart
diff --git a/sdk/lib/_internal/pub/lib/src/barback/load_transformers.dart b/sdk/lib/_internal/pub/lib/src/barback/load_transformers.dart
index 5481a50c0f69a73788b72ea90ed251d83972d45d..3eb02b7219878d829d38d9ca8e5fdeb69f79f8af 100644
--- a/sdk/lib/_internal/pub/lib/src/barback/load_transformers.dart
+++ b/sdk/lib/_internal/pub/lib/src/barback/load_transformers.dart
@@ -32,9 +32,11 @@ import 'http://<<HOST_AND_PORT>>/packages/stack_trace/stack_trace.dart';
import 'http://<<HOST_AND_PORT>>/packages/barback/barback.dart';
/// Sets up the initial communication with the host isolate.
-void main() {
- port.receive((args, replyTo) {
- _sendFuture(replyTo, new Future.sync(() {
+void main(List<String> args, SendPort replyTo) {
+ var port = new ReceivePort();
+ replyTo.send(['success', port.sendPort]);
+ port.listen((args) {
+ _sendFuture(args['replyTo'], new Future.sync(() {
var library = Uri.parse(args['library']);
var configuration = JSON.decode(args['configuration']);
return initialize(library, configuration).
@@ -112,10 +114,10 @@ class ForeignTransform implements Transform {
}
Future<Asset> getInput(AssetId id) {
- return _receiveFuture(_port.call({
+ return _callAndReceiveFuture(_port, {
'type': 'getInput',
'id': _serializeId(id)
- })).then(_deserializeAsset);
+ }).then(_deserializeAsset);
}
Future<String> readInputAsString(AssetId id, {Encoding encoding}) {
@@ -181,7 +183,8 @@ Map _serializeTransformerOrGroup(transformerOrGroup) {
/// Converts [transformer] into a serializable map.
Map _serializeTransformer(Transformer transformer) {
var port = new ReceivePort();
- port.receive((message, replyTo) {
+ port.listen((message) {
+ var replyTo = message['replyTo'];
_sendFuture(replyTo, new Future.sync(() {
if (message['type'] == 'isPrimary') {
return transformer.isPrimary(_deserializeAsset(message['asset']));
@@ -196,7 +199,7 @@ Map _serializeTransformer(Transformer transformer) {
return {
'type': 'Transformer',
'toString': transformer.toString(),
- 'port': port.toSendPort()
+ 'port': port.sendPort
};
}
@@ -211,11 +214,29 @@ Map _serializeTransformerGroup(TransformerGroup group) {
};
}
+/// When the input receives a 'done' as data-event, transforms it to a
+/// done event and cancels the subscription.
+StreamSubscription doneTransformer(Stream input, bool cancelOnError) {
+ var subscription;
+ var transformed = input.transform(new StreamTransformer.fromHandlers(
+ handleData: (data, sink) {
+ if (data == 'done') {
+ sink.close();
+ subscription.cancel();
+ } else {
+ sink.add(data);
+ }
+ }));
+ subscription = transformed.listen(null, cancelOnError: cancelOnError);
+ return subscription;
+}
+
/// Converts a serializable map into an [Asset].
Asset _deserializeAsset(Map asset) {
- var box = new MessageBox();
- asset['sink'].add(box.sink);
- return new Asset.fromStream(_deserializeId(asset['id']), box.stream);
+ var receivePort = new ReceivePort();
+ asset['sendPort'].send(receivePort.sendPort);
+ var stream = receivePort.transform(const StreamTransformer(doneTransformer));
+ return new Asset.fromStream(_deserializeId(asset['id']), stream);
}
/// Converts a serializable map into an [AssetId].
@@ -225,16 +246,18 @@ AssetId _deserializeId(Map id) => new AssetId(id['package'], id['path']);
Map _serializeAsset(Asset asset) {
// We can't send IsolateStreams (issue 12437), so instead we send a sink and
// get the isolate to send us back another sink.
- var box = new MessageBox();
- box.stream.first.then((sink) {
- asset.read().listen(sink.add,
- onError: sink.addError,
- onDone: sink.close);
+ var receivePort = new ReceivePort();
+ receivePort.first.then((sendPort) {
+ asset.read().listen(sendPort.send,
+ onError: (error, stackTrace) {
+ throw new UnimplementedError('Error during asset serialization');
+ },
+ onDone: () { sendPort.send('done'); });
});
return {
'id': _serializeId(asset.id),
- 'sink': box.sink
+ 'sendPort': receivePort.sendPort
};
}
@@ -280,11 +303,18 @@ void _sendFuture(SendPort port, Future future) {
/// Receives the result of [_sendFuture] from [portCall], which should be the
/// return value of [SendPort.call].
-Future _receiveFuture(Future portCall) {
- return portCall.then((response) {
- if (response.containsKey('success')) return response['success'];
- return new Future.error(
- new CrossIsolateException.deserialize(response['error']));
+///
+/// The [message] argument is modified to include the [replyTo] port.
+Future _callAndReceiveFuture(SendPort port, Map message) {
+ var responsePort = new ReceivePort();
+ message['replyTo'] = responsePort.sendPort;
+ return new Future.sync(() {
+ port.send(message);
+ return responsePort.first.then((response) {
+ if (response.containsKey('success')) return response['success'];
+ return new Future.error(
+ new CrossIsolateException.deserialize(response['error']));
+ });
});
}
@@ -379,11 +409,11 @@ Future<Set> loadTransformers(BarbackServer server, TransformerId id) {
log.fine("Loading transformers from $assetId");
return dart.runInIsolate(code).then((sendPort) {
- return _receiveFuture(sendPort.call({
+ return _callAndReceiveFuture(sendPort, {
'library': uri,
// TODO(nweiz): support non-JSON-encodable configuration maps.
'configuration': JSON.encode(id.configuration)
- })).then((transformers) {
+ }).then((transformers) {
transformers = transformers.map(_deserializeTransformerOrGroup).toSet();
log.fine("Transformers from $assetId: $transformers");
return transformers;
@@ -422,17 +452,17 @@ class _ForeignTransformer extends Transformer {
_toString = map['toString'];
Future<bool> isPrimary(Asset asset) {
- return _receiveFuture(_port.call({
+ return _callAndReceiveFuture(_port, {
'type': 'isPrimary',
'asset': _serializeAsset(asset)
- }));
+ });
}
Future apply(Transform transform) {
- return _receiveFuture(_port.call({
+ return _callAndReceiveFuture(_port, {
'type': 'apply',
'transform': _serializeTransform(transform)
- }));
+ });
}
String toString() => _toString;
@@ -464,7 +494,8 @@ _deserializeTransformerOrGroup(Map map) {
/// Converts [transform] into a serializable map.
Map _serializeTransform(Transform transform) {
var receivePort = new ReceivePort();
- receivePort.receive((message, replyTo) {
+ receivePort.listen((message) {
+ var replyTo = message['replyTo'];
if (message['type'] == 'getInput') {
_sendFuture(replyTo, transform.getInput(_deserializeId(message['id']))
.then(_serializeAsset));
@@ -492,16 +523,34 @@ Map _serializeTransform(Transform transform) {
});
return {
- 'port': receivePort.toSendPort(),
+ 'port': receivePort.sendPort,
'primaryInput': _serializeAsset(transform.primaryInput)
};
}
+/// When the input receives a 'done' as data-event, transforms it to a
+/// done event and cancels the subscription.
+StreamSubscription doneTransformer(Stream input, bool cancelOnError) {
+ var subscription;
+ var transformed = input.transform(new StreamTransformer.fromHandlers(
+ handleData: (data, sink) {
+ if (data == 'done') {
+ sink.close();
+ subscription.cancel();
+ } else {
+ sink.add(data);
+ }
+ }));
+ subscription = transformed.listen(null, cancelOnError: cancelOnError);
+ return subscription;
+}
+
/// Converts a serializable map into an [Asset].
Asset _deserializeAsset(Map asset) {
- var box = new MessageBox();
- asset['sink'].add(box.sink);
- return new Asset.fromStream(_deserializeId(asset['id']), box.stream);
+ var receivePort = new ReceivePort();
+ asset['sendPort'].send(receivePort.sendPort);
+ var stream = receivePort.transform(const StreamTransformer(doneTransformer));
+ return new Asset.fromStream(_deserializeId(asset['id']), stream);
}
/// Converts a serializable map into an [AssetId].
@@ -528,16 +577,18 @@ Location _deserializeLocation(Map location) {
Map _serializeAsset(Asset asset) {
// We can't send IsolateStreams (issue 12437), so instead we send a sink and
// get the isolate to send us back another sink.
- var box = new MessageBox();
- box.stream.first.then((sink) {
- asset.read().listen(sink.add,
- onError: sink.addError,
- onDone: sink.close);
+ var receivePort = new ReceivePort();
+ receivePort.first.then((sendPort) {
+ asset.read().listen(sendPort.send,
+ onError: (error, stackTrace) {
+ throw new UnimplementedError('Error during asset serialization');
+ },
+ onDone: () { sendPort.send('done'); });
});
return {
'id': _serializeId(asset.id),
- 'sink': box.sink
+ 'sendPort': receivePort.sendPort
};
}
@@ -546,7 +597,7 @@ Map _serializeId(AssetId id) => {'package': id.package, 'path': id.path};
/// Sends the result of [future] through [port].
///
-/// This should be received on the other end using [_receiveFuture]. It
+/// This should be received on the other end using [_callAndReceiveFuture]. It
/// re-raises any exceptions on the other side as [dart.CrossIsolateException]s.
void _sendFuture(SendPort port, Future future) {
future.then((result) {
@@ -559,10 +610,17 @@ void _sendFuture(SendPort port, Future future) {
/// Receives the result of [_sendFuture] from [portCall], which should be the
/// return value of [SendPort.call].
-Future _receiveFuture(Future portCall) {
- return portCall.then((response) {
- if (response.containsKey('success')) return response['success'];
- return new Future.error(
- new dart.CrossIsolateException.deserialize(response['error']));
+///
+/// The [message] argument is modified to include the [replyTo] port.
+Future _callAndReceiveFuture(SendPort port, Map message) {
+ var responsePort = new ReceivePort();
+ message['replyTo'] = responsePort.sendPort;
+ return new Future.sync(() {
+ port.send(message);
+ return responsePort.first.then((response) {
+ if (response.containsKey('success')) return response['success'];
+ return new Future.error(
+ new dart.CrossIsolateException.deserialize(response['error']));
+ });
});
}
« no previous file with comments | « sdk/lib/_internal/lib/isolate_patch.dart ('k') | sdk/lib/_internal/pub/lib/src/dart.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698