OLD | NEW |
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 library pub.load_transformers; | 5 library pub.load_transformers; |
6 | 6 |
7 import 'dart:async'; | 7 import 'dart:async'; |
8 import 'dart:convert'; | 8 import 'dart:convert'; |
9 import 'dart:isolate'; | 9 import 'dart:isolate'; |
10 | 10 |
(...skipping 14 matching lines...) Expand all Loading... |
25 import 'dart:async'; | 25 import 'dart:async'; |
26 import 'dart:isolate'; | 26 import 'dart:isolate'; |
27 import 'dart:convert'; | 27 import 'dart:convert'; |
28 import 'dart:mirrors'; | 28 import 'dart:mirrors'; |
29 | 29 |
30 import 'http://<<HOST_AND_PORT>>/packages/source_maps/span.dart'; | 30 import 'http://<<HOST_AND_PORT>>/packages/source_maps/span.dart'; |
31 import 'http://<<HOST_AND_PORT>>/packages/stack_trace/stack_trace.dart'; | 31 import 'http://<<HOST_AND_PORT>>/packages/stack_trace/stack_trace.dart'; |
32 import 'http://<<HOST_AND_PORT>>/packages/barback/barback.dart'; | 32 import 'http://<<HOST_AND_PORT>>/packages/barback/barback.dart'; |
33 | 33 |
34 /// Sets up the initial communication with the host isolate. | 34 /// Sets up the initial communication with the host isolate. |
35 void main() { | 35 void main({message}) { |
36 port.receive((args, replyTo) { | 36 var port = new ReceivePort(); |
37 _sendFuture(replyTo, new Future.sync(() { | 37 message.send(['success', port.sendPort]); |
| 38 port.listen((args) { |
| 39 _sendFuture(args['replyTo'], new Future.sync(() { |
38 var library = Uri.parse(args['library']); | 40 var library = Uri.parse(args['library']); |
39 var configuration = JSON.decode(args['configuration']); | 41 var configuration = JSON.decode(args['configuration']); |
40 return initialize(library, configuration). | 42 return initialize(library, configuration). |
41 map(_serializeTransformerOrGroup).toList(); | 43 map(_serializeTransformerOrGroup).toList(); |
42 })); | 44 })); |
43 }); | 45 }); |
44 } | 46 } |
45 | 47 |
46 /// Loads all the transformers and groups defined in [uri]. | 48 /// Loads all the transformers and groups defined in [uri]. |
47 /// | 49 /// |
(...skipping 57 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
105 'type': 'log', | 107 'type': 'log', |
106 'level': level.name, | 108 'level': level.name, |
107 'message': message, | 109 'message': message, |
108 'assetId': assetId == null ? null : _serializeId(assetId), | 110 'assetId': assetId == null ? null : _serializeId(assetId), |
109 'span': span == null ? null : _serializeSpan(span) | 111 'span': span == null ? null : _serializeSpan(span) |
110 }); | 112 }); |
111 }); | 113 }); |
112 } | 114 } |
113 | 115 |
114 Future<Asset> getInput(AssetId id) { | 116 Future<Asset> getInput(AssetId id) { |
115 return _receiveFuture(_port.call({ | 117 return _callAndReceiveFuture(_port, { |
116 'type': 'getInput', | 118 'type': 'getInput', |
117 'id': _serializeId(id) | 119 'id': _serializeId(id) |
118 })).then(_deserializeAsset); | 120 }).then(_deserializeAsset); |
119 } | 121 } |
120 | 122 |
121 Future<String> readInputAsString(AssetId id, {Encoding encoding}) { | 123 Future<String> readInputAsString(AssetId id, {Encoding encoding}) { |
122 if (encoding == null) encoding = UTF8; | 124 if (encoding == null) encoding = UTF8; |
123 return getInput(id).then((input) => input.readAsString(encoding: encoding)); | 125 return getInput(id).then((input) => input.readAsString(encoding: encoding)); |
124 } | 126 } |
125 | 127 |
126 Stream<List<int>> readInput(AssetId id) => | 128 Stream<List<int>> readInput(AssetId id) => |
127 _futureStream(getInput(id).then((input) => input.read())); | 129 _futureStream(getInput(id).then((input) => input.read())); |
128 | 130 |
(...skipping 45 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
174 return _serializeTransformer(transformerOrGroup); | 176 return _serializeTransformer(transformerOrGroup); |
175 } else { | 177 } else { |
176 assert(transformerOrGroup is TransformerGroup); | 178 assert(transformerOrGroup is TransformerGroup); |
177 return _serializeTransformerGroup(transformerOrGroup); | 179 return _serializeTransformerGroup(transformerOrGroup); |
178 } | 180 } |
179 } | 181 } |
180 | 182 |
181 /// Converts [transformer] into a serializable map. | 183 /// Converts [transformer] into a serializable map. |
182 Map _serializeTransformer(Transformer transformer) { | 184 Map _serializeTransformer(Transformer transformer) { |
183 var port = new ReceivePort(); | 185 var port = new ReceivePort(); |
184 port.receive((message, replyTo) { | 186 port.listen((message) { |
| 187 var replyTo = message['replyTo']; |
185 _sendFuture(replyTo, new Future.sync(() { | 188 _sendFuture(replyTo, new Future.sync(() { |
186 if (message['type'] == 'isPrimary') { | 189 if (message['type'] == 'isPrimary') { |
187 return transformer.isPrimary(_deserializeAsset(message['asset'])); | 190 return transformer.isPrimary(_deserializeAsset(message['asset'])); |
188 } else { | 191 } else { |
189 assert(message['type'] == 'apply'); | 192 assert(message['type'] == 'apply'); |
190 return transformer.apply( | 193 return transformer.apply( |
191 new ForeignTransform(message['transform'])); | 194 new ForeignTransform(message['transform'])); |
192 } | 195 } |
193 })); | 196 })); |
194 }); | 197 }); |
195 | 198 |
196 return { | 199 return { |
197 'type': 'Transformer', | 200 'type': 'Transformer', |
198 'toString': transformer.toString(), | 201 'toString': transformer.toString(), |
199 'port': port.toSendPort() | 202 'port': port.sendPort |
200 }; | 203 }; |
201 } | 204 } |
202 | 205 |
203 // Converts [group] into a serializable map. | 206 // Converts [group] into a serializable map. |
204 Map _serializeTransformerGroup(TransformerGroup group) { | 207 Map _serializeTransformerGroup(TransformerGroup group) { |
205 return { | 208 return { |
206 'type': 'TransformerGroup', | 209 'type': 'TransformerGroup', |
207 'toString': group.toString(), | 210 'toString': group.toString(), |
208 'phases': group.phases.map((phase) { | 211 'phases': group.phases.map((phase) { |
209 return phase.map(_serializeTransformerOrGroup).toList(); | 212 return phase.map(_serializeTransformerOrGroup).toList(); |
210 }).toList() | 213 }).toList() |
211 }; | 214 }; |
212 } | 215 } |
213 | 216 |
| 217 /// When the input receives a 'done' as data-event, transforms it to a |
| 218 /// done event and cancels the subscription. |
| 219 StreamSubscription doneTransformer(Stream input, bool cancelOnError) { |
| 220 var subscription; |
| 221 var transformed = input.transform(new StreamTransformer.fromHandlers( |
| 222 handleData: (data, sink) { |
| 223 if (data == 'done') { |
| 224 sink.close(); |
| 225 subscription.cancel(); |
| 226 } else { |
| 227 sink.add(data); |
| 228 } |
| 229 })); |
| 230 subscription = transformed.listen(null, cancelOnError: cancelOnError); |
| 231 return subscription; |
| 232 } |
| 233 |
214 /// Converts a serializable map into an [Asset]. | 234 /// Converts a serializable map into an [Asset]. |
215 Asset _deserializeAsset(Map asset) { | 235 Asset _deserializeAsset(Map asset) { |
216 var box = new MessageBox(); | 236 var receivePort = new ReceivePort(); |
217 asset['sink'].add(box.sink); | 237 asset['sendPort'].send(receivePort.sendPort); |
218 return new Asset.fromStream(_deserializeId(asset['id']), box.stream); | 238 var stream = receivePort.transform(const StreamTransformer(doneTransformer)); |
| 239 return new Asset.fromStream(_deserializeId(asset['id']), stream); |
219 } | 240 } |
220 | 241 |
221 /// Converts a serializable map into an [AssetId]. | 242 /// Converts a serializable map into an [AssetId]. |
222 AssetId _deserializeId(Map id) => new AssetId(id['package'], id['path']); | 243 AssetId _deserializeId(Map id) => new AssetId(id['package'], id['path']); |
223 | 244 |
224 /// Converts [asset] into a serializable map. | 245 /// Converts [asset] into a serializable map. |
225 Map _serializeAsset(Asset asset) { | 246 Map _serializeAsset(Asset asset) { |
226 // We can't send IsolateStreams (issue 12437), so instead we send a sink and | 247 // We can't send IsolateStreams (issue 12437), so instead we send a sink and |
227 // get the isolate to send us back another sink. | 248 // get the isolate to send us back another sink. |
228 var box = new MessageBox(); | 249 var receivePort = new ReceivePort(); |
229 box.stream.first.then((sink) { | 250 receivePort.first.then((sendPort) { |
230 asset.read().listen(sink.add, | 251 asset.read().listen(sendPort.send, |
231 onError: sink.addError, | 252 onError: (error, stackTrace) { |
232 onDone: sink.close); | 253 throw new UnimplementedError('Error during asset serialization'); |
| 254 }, |
| 255 onDone: () { sendPort.send('done'); }); |
233 }); | 256 }); |
234 | 257 |
235 return { | 258 return { |
236 'id': _serializeId(asset.id), | 259 'id': _serializeId(asset.id), |
237 'sink': box.sink | 260 'sendPort': receivePort.sendPort |
238 }; | 261 }; |
239 } | 262 } |
240 | 263 |
241 /// Converts [id] into a serializable map. | 264 /// Converts [id] into a serializable map. |
242 Map _serializeId(AssetId id) => {'package': id.package, 'path': id.path}; | 265 Map _serializeId(AssetId id) => {'package': id.package, 'path': id.path}; |
243 | 266 |
244 /// Converts [span] into a serializable map. | 267 /// Converts [span] into a serializable map. |
245 Map _serializeSpan(Span span) { | 268 Map _serializeSpan(Span span) { |
246 // TODO(nweiz): convert FileSpans to FileSpans. | 269 // TODO(nweiz): convert FileSpans to FileSpans. |
247 return { | 270 return { |
(...skipping 25 matching lines...) Expand all Loading... |
273 future.then((result) { | 296 future.then((result) { |
274 port.send({'success': result}); | 297 port.send({'success': result}); |
275 }).catchError((error) { | 298 }).catchError((error) { |
276 // TODO(nweiz): at least MissingInputException should be preserved here. | 299 // TODO(nweiz): at least MissingInputException should be preserved here. |
277 port.send({'error': CrossIsolateException.serialize(error)}); | 300 port.send({'error': CrossIsolateException.serialize(error)}); |
278 }); | 301 }); |
279 } | 302 } |
280 | 303 |
281 /// Receives the result of [_sendFuture] from [portCall], which should be the | 304 /// Receives the result of [_sendFuture] from [portCall], which should be the |
282 /// return value of [SendPort.call]. | 305 /// return value of [SendPort.call]. |
283 Future _receiveFuture(Future portCall) { | 306 /// |
284 return portCall.then((response) { | 307 /// The [message] argument is modified to include the [replyTo] port. |
285 if (response.containsKey('success')) return response['success']; | 308 Future _callAndReceiveFuture(SendPort port, Map message) { |
286 return new Future.error( | 309 var responsePort = new ReceivePort(); |
287 new CrossIsolateException.deserialize(response['error'])); | 310 message['replyTo'] = responsePort.sendPort; |
| 311 return new Future.sync(() { |
| 312 port.send(message); |
| 313 return responsePort.first.then((response) { |
| 314 if (response.containsKey('success')) return response['success']; |
| 315 return new Future.error( |
| 316 new CrossIsolateException.deserialize(response['error'])); |
| 317 }); |
288 }); | 318 }); |
289 } | 319 } |
290 | 320 |
291 /// An exception that was originally raised in another isolate. | 321 /// An exception that was originally raised in another isolate. |
292 /// | 322 /// |
293 /// Exception objects can't cross isolate boundaries in general, so this class | 323 /// Exception objects can't cross isolate boundaries in general, so this class |
294 /// wraps as much information as can be consistently serialized. | 324 /// wraps as much information as can be consistently serialized. |
295 class CrossIsolateException implements Exception { | 325 class CrossIsolateException implements Exception { |
296 /// The name of the type of exception thrown. | 326 /// The name of the type of exception thrown. |
297 /// | 327 /// |
(...skipping 74 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
372 return id.getAssetId(server.barback).then((assetId) { | 402 return id.getAssetId(server.barback).then((assetId) { |
373 var path = assetId.path.replaceFirst('lib/', ''); | 403 var path = assetId.path.replaceFirst('lib/', ''); |
374 // TODO(nweiz): load from a "package:" URI when issue 12474 is fixed. | 404 // TODO(nweiz): load from a "package:" URI when issue 12474 is fixed. |
375 var hostAndPort = '${server.address.address}:${server.port}'; | 405 var hostAndPort = '${server.address.address}:${server.port}'; |
376 var uri = 'http://$hostAndPort/packages/${id.package}/$path'; | 406 var uri = 'http://$hostAndPort/packages/${id.package}/$path'; |
377 var code = 'import "$uri";' + | 407 var code = 'import "$uri";' + |
378 _TRANSFORMER_ISOLATE.replaceAll('<<HOST_AND_PORT>>', hostAndPort); | 408 _TRANSFORMER_ISOLATE.replaceAll('<<HOST_AND_PORT>>', hostAndPort); |
379 log.fine("Loading transformers from $assetId"); | 409 log.fine("Loading transformers from $assetId"); |
380 | 410 |
381 return dart.runInIsolate(code).then((sendPort) { | 411 return dart.runInIsolate(code).then((sendPort) { |
382 return _receiveFuture(sendPort.call({ | 412 return _callAndReceiveFuture(sendPort, { |
383 'library': uri, | 413 'library': uri, |
384 // TODO(nweiz): support non-JSON-encodable configuration maps. | 414 // TODO(nweiz): support non-JSON-encodable configuration maps. |
385 'configuration': JSON.encode(id.configuration) | 415 'configuration': JSON.encode(id.configuration) |
386 })).then((transformers) { | 416 }).then((transformers) { |
387 transformers = transformers.map(_deserializeTransformerOrGroup).toSet(); | 417 transformers = transformers.map(_deserializeTransformerOrGroup).toSet(); |
388 log.fine("Transformers from $assetId: $transformers"); | 418 log.fine("Transformers from $assetId: $transformers"); |
389 return transformers; | 419 return transformers; |
390 }); | 420 }); |
391 }).catchError((error) { | 421 }).catchError((error) { |
392 if (error is! dart.CrossIsolateException) throw error; | 422 if (error is! dart.CrossIsolateException) throw error; |
393 if (error.type != 'IsolateSpawnException') throw error; | 423 if (error.type != 'IsolateSpawnException') throw error; |
394 // TODO(nweiz): don't parse this as a string once issues 12617 and 12689 | 424 // TODO(nweiz): don't parse this as a string once issues 12617 and 12689 |
395 // are fixed. | 425 // are fixed. |
396 if (!error.message.split('\n')[1].startsWith('import "$uri";')) { | 426 if (!error.message.split('\n')[1].startsWith('import "$uri";')) { |
(...skipping 18 matching lines...) Expand all Loading... |
415 final SendPort _port; | 445 final SendPort _port; |
416 | 446 |
417 /// The result of calling [toString] on the transformer in the isolate. | 447 /// The result of calling [toString] on the transformer in the isolate. |
418 final String _toString; | 448 final String _toString; |
419 | 449 |
420 _ForeignTransformer(Map map) | 450 _ForeignTransformer(Map map) |
421 : _port = map['port'], | 451 : _port = map['port'], |
422 _toString = map['toString']; | 452 _toString = map['toString']; |
423 | 453 |
424 Future<bool> isPrimary(Asset asset) { | 454 Future<bool> isPrimary(Asset asset) { |
425 return _receiveFuture(_port.call({ | 455 return _callAndReceiveFuture(_port, { |
426 'type': 'isPrimary', | 456 'type': 'isPrimary', |
427 'asset': _serializeAsset(asset) | 457 'asset': _serializeAsset(asset) |
428 })); | 458 }); |
429 } | 459 } |
430 | 460 |
431 Future apply(Transform transform) { | 461 Future apply(Transform transform) { |
432 return _receiveFuture(_port.call({ | 462 return _callAndReceiveFuture(_port, { |
433 'type': 'apply', | 463 'type': 'apply', |
434 'transform': _serializeTransform(transform) | 464 'transform': _serializeTransform(transform) |
435 })); | 465 }); |
436 } | 466 } |
437 | 467 |
438 String toString() => _toString; | 468 String toString() => _toString; |
439 } | 469 } |
440 | 470 |
441 /// A wrapper for a transformer group that's in a different isolate. | 471 /// A wrapper for a transformer group that's in a different isolate. |
442 class _ForeignGroup implements TransformerGroup { | 472 class _ForeignGroup implements TransformerGroup { |
443 final Iterable<Iterable> phases; | 473 final Iterable<Iterable> phases; |
444 | 474 |
445 /// The result of calling [toString] on the transformer group in the isolate. | 475 /// The result of calling [toString] on the transformer group in the isolate. |
(...skipping 11 matching lines...) Expand all Loading... |
457 /// Converts a serializable map into a [Transformer] or a [TransformerGroup]. | 487 /// Converts a serializable map into a [Transformer] or a [TransformerGroup]. |
458 _deserializeTransformerOrGroup(Map map) { | 488 _deserializeTransformerOrGroup(Map map) { |
459 if (map['type'] == 'Transformer') return new _ForeignTransformer(map); | 489 if (map['type'] == 'Transformer') return new _ForeignTransformer(map); |
460 assert(map['type'] == 'TransformerGroup'); | 490 assert(map['type'] == 'TransformerGroup'); |
461 return new _ForeignGroup(map); | 491 return new _ForeignGroup(map); |
462 } | 492 } |
463 | 493 |
464 /// Converts [transform] into a serializable map. | 494 /// Converts [transform] into a serializable map. |
465 Map _serializeTransform(Transform transform) { | 495 Map _serializeTransform(Transform transform) { |
466 var receivePort = new ReceivePort(); | 496 var receivePort = new ReceivePort(); |
467 receivePort.receive((message, replyTo) { | 497 receivePort.listen((message) { |
| 498 var replyTo = message['replyTo']; |
468 if (message['type'] == 'getInput') { | 499 if (message['type'] == 'getInput') { |
469 _sendFuture(replyTo, transform.getInput(_deserializeId(message['id'])) | 500 _sendFuture(replyTo, transform.getInput(_deserializeId(message['id'])) |
470 .then(_serializeAsset)); | 501 .then(_serializeAsset)); |
471 } else if (message['type'] == 'addOutput') { | 502 } else if (message['type'] == 'addOutput') { |
472 transform.addOutput(_deserializeAsset(message['output'])); | 503 transform.addOutput(_deserializeAsset(message['output'])); |
473 } else { | 504 } else { |
474 assert(message['type'] == 'log'); | 505 assert(message['type'] == 'log'); |
475 | 506 |
476 var method; | 507 var method; |
477 if (message['level'] == 'Info') { | 508 if (message['level'] == 'Info') { |
478 method = transform.logger.info; | 509 method = transform.logger.info; |
479 } else if (message['level'] == 'Warning') { | 510 } else if (message['level'] == 'Warning') { |
480 method = transform.logger.warning; | 511 method = transform.logger.warning; |
481 } else { | 512 } else { |
482 assert(message['level'] == 'Error'); | 513 assert(message['level'] == 'Error'); |
483 method = transform.logger.error; | 514 method = transform.logger.error; |
484 } | 515 } |
485 | 516 |
486 var assetId = message['assetId'] == null ? null : | 517 var assetId = message['assetId'] == null ? null : |
487 _deserializeId(message['assetId']); | 518 _deserializeId(message['assetId']); |
488 var span = message['span'] == null ? null : | 519 var span = message['span'] == null ? null : |
489 _deserializeSpan(message['span']); | 520 _deserializeSpan(message['span']); |
490 method(message['message'], asset: assetId, span: span); | 521 method(message['message'], asset: assetId, span: span); |
491 } | 522 } |
492 }); | 523 }); |
493 | 524 |
494 return { | 525 return { |
495 'port': receivePort.toSendPort(), | 526 'port': receivePort.sendPort, |
496 'primaryInput': _serializeAsset(transform.primaryInput) | 527 'primaryInput': _serializeAsset(transform.primaryInput) |
497 }; | 528 }; |
498 } | 529 } |
499 | 530 |
| 531 /// When the input receives a 'done' as data-event, transforms it to a |
| 532 /// done event and cancels the subscription. |
| 533 StreamSubscription doneTransformer(Stream input, bool cancelOnError) { |
| 534 var subscription; |
| 535 var transformed = input.transform(new StreamTransformer.fromHandlers( |
| 536 handleData: (data, sink) { |
| 537 if (data == 'done') { |
| 538 sink.close(); |
| 539 subscription.cancel(); |
| 540 } else { |
| 541 sink.add(data); |
| 542 } |
| 543 })); |
| 544 subscription = transformed.listen(null, cancelOnError: cancelOnError); |
| 545 return subscription; |
| 546 } |
| 547 |
500 /// Converts a serializable map into an [Asset]. | 548 /// Converts a serializable map into an [Asset]. |
501 Asset _deserializeAsset(Map asset) { | 549 Asset _deserializeAsset(Map asset) { |
502 var box = new MessageBox(); | 550 var receivePort = new ReceivePort(); |
503 asset['sink'].add(box.sink); | 551 asset['sendPort'].send(receivePort.sendPort); |
504 return new Asset.fromStream(_deserializeId(asset['id']), box.stream); | 552 var stream = receivePort.transform(const StreamTransformer(doneTransformer)); |
| 553 return new Asset.fromStream(_deserializeId(asset['id']), stream); |
505 } | 554 } |
506 | 555 |
507 /// Converts a serializable map into an [AssetId]. | 556 /// Converts a serializable map into an [AssetId]. |
508 AssetId _deserializeId(Map id) => new AssetId(id['package'], id['path']); | 557 AssetId _deserializeId(Map id) => new AssetId(id['package'], id['path']); |
509 | 558 |
510 /// Converts a serializable map into a [Span]. | 559 /// Converts a serializable map into a [Span]. |
511 Span _deserializeSpan(Map span) { | 560 Span _deserializeSpan(Map span) { |
512 assert(span['type'] == 'fixed'); | 561 assert(span['type'] == 'fixed'); |
513 var location = _deserializeLocation(span['start']); | 562 var location = _deserializeLocation(span['start']); |
514 return new FixedSpan(span['sourceUrl'], location.offset, location.line, | 563 return new FixedSpan(span['sourceUrl'], location.offset, location.line, |
515 location.column, text: span['text'], isIdentifier: span['isIdentifier']); | 564 location.column, text: span['text'], isIdentifier: span['isIdentifier']); |
516 } | 565 } |
517 | 566 |
518 /// Converts a serializable map into a [Location]. | 567 /// Converts a serializable map into a [Location]. |
519 Location _deserializeLocation(Map location) { | 568 Location _deserializeLocation(Map location) { |
520 assert(location['type'] == 'fixed'); | 569 assert(location['type'] == 'fixed'); |
521 return new FixedLocation(location['offset'], location['sourceUrl'], | 570 return new FixedLocation(location['offset'], location['sourceUrl'], |
522 location['line'], location['column']); | 571 location['line'], location['column']); |
523 } | 572 } |
524 | 573 |
525 // TODO(nweiz): add custom serialization code for assets that can be more | 574 // TODO(nweiz): add custom serialization code for assets that can be more |
526 // efficiently serialized. | 575 // efficiently serialized. |
527 /// Converts [asset] into a serializable map. | 576 /// Converts [asset] into a serializable map. |
528 Map _serializeAsset(Asset asset) { | 577 Map _serializeAsset(Asset asset) { |
529 // We can't send IsolateStreams (issue 12437), so instead we send a sink and | 578 // We can't send IsolateStreams (issue 12437), so instead we send a sink and |
530 // get the isolate to send us back another sink. | 579 // get the isolate to send us back another sink. |
531 var box = new MessageBox(); | 580 var receivePort = new ReceivePort(); |
532 box.stream.first.then((sink) { | 581 receivePort.first.then((sendPort) { |
533 asset.read().listen(sink.add, | 582 asset.read().listen(sendPort.send, |
534 onError: sink.addError, | 583 onError: (error, stackTrace) { |
535 onDone: sink.close); | 584 throw new UnimplementedError('Error during asset serialization'); |
| 585 }, |
| 586 onDone: () { sendPort.send('done'); }); |
536 }); | 587 }); |
537 | 588 |
538 return { | 589 return { |
539 'id': _serializeId(asset.id), | 590 'id': _serializeId(asset.id), |
540 'sink': box.sink | 591 'sendPort': receivePort.sendPort |
541 }; | 592 }; |
542 } | 593 } |
543 | 594 |
544 /// Converts [id] into a serializable map. | 595 /// Converts [id] into a serializable map. |
545 Map _serializeId(AssetId id) => {'package': id.package, 'path': id.path}; | 596 Map _serializeId(AssetId id) => {'package': id.package, 'path': id.path}; |
546 | 597 |
547 /// Sends the result of [future] through [port]. | 598 /// Sends the result of [future] through [port]. |
548 /// | 599 /// |
549 /// This should be received on the other end using [_receiveFuture]. It | 600 /// This should be received on the other end using [_callAndReceiveFuture]. It |
550 /// re-raises any exceptions on the other side as [dart.CrossIsolateException]s. | 601 /// re-raises any exceptions on the other side as [dart.CrossIsolateException]s. |
551 void _sendFuture(SendPort port, Future future) { | 602 void _sendFuture(SendPort port, Future future) { |
552 future.then((result) { | 603 future.then((result) { |
553 port.send({'success': result}); | 604 port.send({'success': result}); |
554 }).catchError((error) { | 605 }).catchError((error) { |
555 // TODO(nweiz): at least MissingInputException should be preserved here. | 606 // TODO(nweiz): at least MissingInputException should be preserved here. |
556 port.send({'error': dart.CrossIsolateException.serialize(error)}); | 607 port.send({'error': dart.CrossIsolateException.serialize(error)}); |
557 }); | 608 }); |
558 } | 609 } |
559 | 610 |
560 /// Receives the result of [_sendFuture] from [portCall], which should be the | 611 /// Receives the result of [_sendFuture] from [portCall], which should be the |
561 /// return value of [SendPort.call]. | 612 /// return value of [SendPort.call]. |
562 Future _receiveFuture(Future portCall) { | 613 /// |
563 return portCall.then((response) { | 614 /// The [message] argument is modified to include the [replyTo] port. |
564 if (response.containsKey('success')) return response['success']; | 615 Future _callAndReceiveFuture(SendPort port, Map message) { |
565 return new Future.error( | 616 var responsePort = new ReceivePort(); |
566 new dart.CrossIsolateException.deserialize(response['error'])); | 617 message['replyTo'] = responsePort.sendPort; |
| 618 return new Future.sync(() { |
| 619 port.send(message); |
| 620 return responsePort.first.then((response) { |
| 621 if (response.containsKey('success')) return response['success']; |
| 622 return new Future.error( |
| 623 new dart.CrossIsolateException.deserialize(response['error'])); |
| 624 }); |
567 }); | 625 }); |
568 } | 626 } |
OLD | NEW |