OLD | NEW |
---|---|
(Empty) | |
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file | |
2 // for details. All rights reserved. Use of this source code is governed by a | |
3 // BSD-style license that can be found in the LICENSE file. | |
4 | |
5 library barback.serialize; | |
6 | |
7 import 'dart:async'; | |
8 import 'dart:isolate'; | |
9 | |
10 import 'asset_id.dart'; | |
11 import 'utils.dart'; | |
12 | |
13 /// Converts [id] into a serializable map. | |
14 Map serializeId(AssetId id) => {'package': id.package, 'path': id.path}; | |
15 | |
16 /// Converts [stream] into a [SendPort] with which another isolate can request | |
17 /// the data from [stream]. | |
18 SendPort serializeStream(Stream stream) { | |
19 var receivePort = new ReceivePort(); | |
Bob Nystrom
2013/11/06 21:29:08
Does this get closed by .first?
nweiz
2013/11/07 00:44:48
Yes, thank goodness. [Stream.first] cancels its su
| |
20 receivePort.first.then((sendPort) { | |
21 stream.listen((data) => sendPort.send({'type': 'data', 'data': data}), | |
22 onDone: () => sendPort.send({'type': 'done'}), | |
23 onError: (error, stackTrace) { | |
24 sendPort.send({ | |
25 'type': 'error', | |
26 'error': CrossIsolateException.serialize(error, stackTrace) | |
27 }); | |
28 }); | |
29 }); | |
30 | |
31 return receivePort.sendPort; | |
32 } | |
33 | |
34 /// Converts a serializable map into an [AssetId]. | |
35 AssetId deserializeId(Map id) => new AssetId(id['package'], id['path']); | |
36 | |
37 /// Convert a [SendPort] whose opposite is waiting to send us a stream into a | |
38 /// [Stream]. | |
39 /// | |
40 /// No stream data will actually be sent across the isolate boundary until | |
41 /// someone subscribes to the returned stream. | |
42 Stream deserializeStream(SendPort sendPort) { | |
43 return callbackStream(() { | |
44 var receivePort = new ReceivePort(); | |
45 sendPort.send(receivePort.sendPort); | |
46 return receivePort.transform( | |
47 const StreamTransformer(_deserializeTransformer)); | |
48 }); | |
49 } | |
50 | |
51 /// The body of a [StreamTransformer] that deserializes the values in a stream | |
52 /// sent by [serializeStream]. | |
53 StreamSubscription _deserializeTransformer(Stream input, bool cancelOnError) { | |
54 var subscription; | |
55 var transformed = input.transform(new StreamTransformer.fromHandlers( | |
56 handleData: (data, sink) { | |
57 if (data['type'] == 'data') { | |
58 sink.add(data['data']); | |
59 } else if (data['type'] == 'error') { | |
60 sink.addError(CrossIsolateException.deserialize(data['error'])); | |
61 } else { | |
62 assert(data['type'] == 'done'); | |
63 sink.close(); | |
64 subscription.cancel(); | |
65 } | |
66 })); | |
67 subscription = transformed.listen(null, cancelOnError: cancelOnError); | |
68 return subscription; | |
69 } | |
OLD | NEW |