OLD | NEW |
1 // Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 import 'dart:async'; | 5 import 'dart:async'; |
6 import 'dart:convert'; | |
7 | 6 |
8 import 'package:async/async.dart'; | 7 import 'src/stream_channel_transformer.dart'; |
9 | 8 |
10 export 'src/delegating_stream_channel.dart'; | 9 export 'src/delegating_stream_channel.dart'; |
11 export 'src/isolate_channel.dart'; | 10 export 'src/isolate_channel.dart'; |
12 export 'src/multi_channel.dart'; | 11 export 'src/multi_channel.dart'; |
13 export 'src/stream_channel_completer.dart'; | 12 export 'src/stream_channel_completer.dart'; |
| 13 export 'src/stream_channel_transformer.dart'; |
14 | 14 |
15 /// An abstract class representing a two-way communication channel. | 15 /// An abstract class representing a two-way communication channel. |
16 /// | 16 /// |
17 /// Users should consider the [stream] emitting a "done" event to be the | 17 /// Users should consider the [stream] emitting a "done" event to be the |
18 /// canonical indicator that the channel has closed. If they wish to close the | 18 /// canonical indicator that the channel has closed. If they wish to close the |
19 /// channel, they should close the [sink]—canceling the stream subscription is | 19 /// channel, they should close the [sink]—canceling the stream subscription is |
20 /// not sufficient. Protocol errors may be emitted through the stream or through | 20 /// not sufficient. Protocol errors may be emitted through the stream or through |
21 /// [Sink.done], depending on their underlying cause. Note that the sink may | 21 /// [Sink.done], depending on their underlying cause. Note that the sink may |
22 /// silently drop events if the channel closes before [Sink.close] is called. | 22 /// silently drop events if the channel closes before [Sink.close] is called. |
23 /// | 23 /// |
(...skipping 39 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
63 /// | 63 /// |
64 /// Note that this stream/sink pair must provide the guarantees listed in the | 64 /// Note that this stream/sink pair must provide the guarantees listed in the |
65 /// [StreamChannel] documentation. | 65 /// [StreamChannel] documentation. |
66 factory StreamChannel(Stream<T> stream, StreamSink<T> sink) => | 66 factory StreamChannel(Stream<T> stream, StreamSink<T> sink) => |
67 new _StreamChannel<T>(stream, sink); | 67 new _StreamChannel<T>(stream, sink); |
68 | 68 |
69 /// Connects [this] to [other], so that any values emitted by either are sent | 69 /// Connects [this] to [other], so that any values emitted by either are sent |
70 /// directly to the other. | 70 /// directly to the other. |
71 void pipe(StreamChannel<T> other); | 71 void pipe(StreamChannel<T> other); |
72 | 72 |
73 /// Transforms [this] using [codec]. | 73 /// Transforms [this] using [transformer]. |
74 /// | 74 /// |
75 /// This returns a stream channel that encodes all input using [Codec.encoder] | 75 /// This is identical to calling `transformer.bind(channel)`. |
76 /// before passing it to this channel's [sink], and decodes all output from | 76 StreamChannel transform(StreamChannelTransformer<dynamic, T> transformer); |
77 /// this channel's [stream] using [Codec.decoder]. | |
78 StreamChannel transform(Codec<dynamic, T> codec); | |
79 } | 77 } |
80 | 78 |
81 /// An implementation of [StreamChannel] that simply takes a stream and a sink | 79 /// An implementation of [StreamChannel] that simply takes a stream and a sink |
82 /// as parameters. | 80 /// as parameters. |
83 /// | 81 /// |
84 /// This is distinct from [StreamChannel] so that it can use | 82 /// This is distinct from [StreamChannel] so that it can use |
85 /// [StreamChannelMixin]. | 83 /// [StreamChannelMixin]. |
86 class _StreamChannel<T> extends StreamChannelMixin<T> { | 84 class _StreamChannel<T> extends StreamChannelMixin<T> { |
87 final Stream<T> stream; | 85 final Stream<T> stream; |
88 final StreamSink<T> sink; | 86 final StreamSink<T> sink; |
89 | 87 |
90 _StreamChannel(this.stream, this.sink); | 88 _StreamChannel(this.stream, this.sink); |
91 } | 89 } |
92 | 90 |
93 /// A mixin that implements the instance methods of [StreamChannel] in terms of | 91 /// A mixin that implements the instance methods of [StreamChannel] in terms of |
94 /// [stream] and [sink]. | 92 /// [stream] and [sink]. |
95 abstract class StreamChannelMixin<T> implements StreamChannel<T> { | 93 abstract class StreamChannelMixin<T> implements StreamChannel<T> { |
96 void pipe(StreamChannel<T> other) { | 94 void pipe(StreamChannel<T> other) { |
97 stream.pipe(other.sink); | 95 stream.pipe(other.sink); |
98 other.stream.pipe(sink); | 96 other.stream.pipe(sink); |
99 } | 97 } |
100 | 98 |
101 StreamChannel transform(Codec<dynamic, T> codec) { | 99 StreamChannel transform(StreamChannelTransformer<dynamic, T> transformer) => |
102 var sinkTransformer = | 100 transformer.bind(this); |
103 new StreamSinkTransformer.fromStreamTransformer(codec.encoder); | |
104 return new _StreamChannel( | |
105 stream.transform(codec.decoder), | |
106 sinkTransformer.bind(sink)); | |
107 } | |
108 } | 101 } |
OLD | NEW |