Chromium Code Reviews| Index: dart/sdk/lib/_internal/pub/asset/dart/serialize/aggregate_transform.dart |
| diff --git a/dart/sdk/lib/_internal/pub/asset/dart/serialize/aggregate_transform.dart b/dart/sdk/lib/_internal/pub/asset/dart/serialize/aggregate_transform.dart |
| new file mode 100644 |
| index 0000000000000000000000000000000000000000..59c8bab7b19816d213903578f0534f25c9152463 |
| --- /dev/null |
| +++ b/dart/sdk/lib/_internal/pub/asset/dart/serialize/aggregate_transform.dart |
| @@ -0,0 +1,184 @@ |
| +// Copyright (c) 2014, the Dart project authors. Please see the AUTHORS file |
| +// for details. All rights reserved. Use of this source code is governed by a |
| +// BSD-style license that can be found in the LICENSE file. |
| + |
| +library pub.asset.serialize.aggregate_transform; |
| + |
| +import 'dart:async'; |
| +import 'dart:isolate'; |
| +import 'dart:convert'; |
| + |
| +import 'package:barback/barback.dart'; |
| +// TODO(nweiz): don't import from "src" once issue 14966 is fixed. |
| +import 'package:barback/src/internal_asset.dart'; |
| + |
| +import '../serialize.dart'; |
| +import '../utils.dart'; |
| + |
| +/// Serialize the methods shared between [AggregateTransform] and |
| +/// [DeclaringAggregateTransform]. |
| +/// |
| +/// [additionalFields] contains additional serialized fields to add to the |
| +/// serialized transform. [methodHandlers] is a set of additional methods. Each |
| +/// value should take a JSON message and return the response (which may be a |
| +/// Future). |
| +Map _serializeBaseAggregateTransform(transform, Map additionalFields, |
| + Map<String, Function> methodHandlers) { |
| + var receivePort = new ReceivePort(); |
| + receivePort.listen((wrappedMessage) { |
| + respond(wrappedMessage, (message) { |
| + var handler = methodHandlers[message['type']]; |
| + if (handler != null) return handler(message); |
| + |
| + if (message['type'] == 'consumePrimary') { |
| + transform.consumePrimary(deserializeId(message['assetId'])); |
| + return null; |
| + } |
| + |
| + assert(message['type'] == 'log'); |
| + var method = { |
| + 'Info': transform.logger.info, |
| + 'Fine': transform.logger.fine, |
| + 'Warning': transform.logger.warning, |
| + 'Error': transform.logger.error |
| + }[message['level']]; |
| + assert(method != null); |
| + |
| + var assetId = message['assetId'] == null ? null : |
| + deserializeId(message['assetId']); |
| + var span = message['span'] == null ? null : |
| + deserializeSpan(message['span']); |
| + method(message['message'], asset: assetId, span: span); |
| + }); |
| + }); |
| + |
| + return { |
| + 'port': receivePort.sendPort, |
| + 'key': transform.key, |
| + 'package': transform.package |
| + }..addAll(additionalFields); |
| +} |
| + |
| +/// Converts [transform] into a serializable map. |
| +Map serializeAggregateTransform(AggregateTransform transform) { |
| + return _serializeBaseAggregateTransform(transform, { |
| + 'primaryInputs': serializeStream(transform.primaryInputs, serializeAsset) |
| + }, { |
| + 'getInput': (message) => transform.getInput(deserializeId(message['id'])) |
| + .then((asset) => serializeAsset(asset)), |
| + 'addOutput': (message) => |
| + transform.addOutput(deserializeAsset(message['output'])) |
| + }); |
| +} |
| + |
| +/// Converts [transform] into a serializable map. |
| +Map serializeDeclaringAggregateTransform( |
| + DeclaringAggregateTransform transform) { |
| + return _serializeBaseAggregateTransform(transform, { |
| + 'primaryIds': serializeStream(transform.primaryIds, serializeId) |
| + }, { |
|
Bob Nystrom
2014/06/09 22:05:48
What about getInput and addOutput?
nweiz
2014/06/10 19:19:36
Those aren't supported for declaring transforms.
|
| + 'declareOutput': (message) => |
| + transform.declareOutput(deserializeId(message['output'])) |
| + }); |
| +} |
| + |
| +/// The base class for wrappers for [AggregateTransform]s that are in the host |
| +/// isolate. |
| +class _ForeignBaseAggregateTransform { |
| + /// The port with which we communicate with the host isolate. |
| + /// |
| + /// This port and all messages sent across it are specific to this transform. |
| + final SendPort _port; |
| + |
| + final String key; |
| + |
| + final String package; |
| + |
| + TransformLogger get logger => _logger; |
| + TransformLogger _logger; |
| + |
| + _ForeignBaseAggregateTransform(Map transform) |
| + : _port = transform['port'], |
| + key = transform['key'], |
| + package = transform['package'] { |
| + _logger = new TransformLogger((assetId, level, message, span) { |
| + call(_port, { |
| + 'type': 'log', |
| + 'level': level.name, |
| + 'message': message, |
| + 'assetId': assetId == null ? null : serializeId(assetId), |
| + 'span': span == null ? null : serializeSpan(span) |
| + }); |
| + }); |
| + } |
| + |
| + void consumePrimary(AssetId id) { |
| + call(_port, {'type': 'consumePrimary', 'assetId': serializeId(id)}); |
| + } |
| +} |
| + |
| +//# if barback >=0.14.1-dev |
|
Bob Nystrom
2014/06/09 22:05:48
Is there are reason this needs to be limited, but
nweiz
2014/06/10 19:19:36
Done.
|
| + /// A wrapper for an [AggregateTransform] that's in the host isolate. |
| + /// |
| + /// This retrieves inputs from and sends outputs and logs to the host isolate. |
| + class ForeignAggregateTransform extends _ForeignBaseAggregateTransform |
|
Bob Nystrom
2014/06/09 22:05:48
Nit, but I don't think it's worth indenting stuff
nweiz
2014/06/10 19:19:36
Done.
|
| + implements AggregateTransform { |
| + final Stream<Asset> primaryInputs; |
| + |
| + /// Creates a transform from a serialized map sent from the host isolate. |
| + ForeignAggregateTransform(Map transform) |
| + : primaryInputs = deserializeStream( |
| + transform['primaryInputs'], deserializeAsset), |
| + super(transform); |
| + |
| + Future<Asset> getInput(AssetId id) { |
| + return call(_port, { |
| + 'type': 'getInput', |
| + 'id': serializeId(id) |
| + }).then(deserializeAsset); |
| + } |
| + |
| + Future<String> readInputAsString(AssetId id, {Encoding encoding}) { |
|
Bob Nystrom
2014/06/09 22:05:48
Can these be put in a mixin? The non-aggregate cla
nweiz
2014/06/10 19:19:36
Done.
|
| + if (encoding == null) encoding = UTF8; |
| + return getInput(id).then((input) => |
| + input.readAsString(encoding: encoding)); |
| + } |
| + |
| + Stream<List<int>> readInput(AssetId id) => |
| + futureStream(getInput(id).then((input) => input.read())); |
| + |
| + Future<bool> hasInput(AssetId id) { |
| + return getInput(id).then((_) => true).catchError((error) { |
| + if (error is AssetNotFoundException && error.id == id) return false; |
| + throw error; |
| + }); |
| + } |
| + |
| + void addOutput(Asset output) { |
| + call(_port, { |
| + 'type': 'addOutput', |
| + 'output': serializeAsset(output) |
| + }); |
| + } |
| + } |
| + |
| + /// A wrapper for a [DeclaringAggregateTransform] that's in the host isolate. |
| + class ForeignDeclaringAggregateTransform |
| + extends _ForeignBaseAggregateTransform |
| + implements DeclaringAggregateTransform { |
| + final Stream<AssetId> primaryIds; |
| + |
| + /// Creates a transform from a serializable map sent from the host isolate. |
| + ForeignDeclaringAggregateTransform(Map transform) |
| + : primaryIds = deserializeStream( |
| + transform['primaryIds'], deserializeId), |
| + super(transform); |
| + |
| + void declareOutput(AssetId id) { |
| + call(_port, { |
| + 'type': 'declareOutput', |
| + 'output': serializeId(id) |
| + }); |
| + } |
| + } |
| +//# end |