Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(58)

Side by Side Diff: lib/src/utils.dart

Issue 1652413002: Use StreamChannel. (Closed) Base URL: git@github.com:dart-lang/json_rpc_2.git@master
Patch Set: Code review changes Created 4 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « lib/src/two_way_stream.dart ('k') | pubspec.yaml » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2014, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2014, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 import 'dart:async'; 5 import 'dart:async';
6 6
7 import 'package:stack_trace/stack_trace.dart'; 7 import 'package:stack_trace/stack_trace.dart';
8 import 'package:stream_channel/stream_channel.dart';
9
10 import '../error_code.dart' as error_code;
11 import 'exception.dart';
8 12
9 typedef ZeroArgumentFunction(); 13 typedef ZeroArgumentFunction();
10 14
11 /// Like [new Future.sync], but automatically wraps the future in a 15 /// Like [new Future.sync], but automatically wraps the future in a
12 /// [Chain.track] call. 16 /// [Chain.track] call.
13 Future syncFuture(callback()) => Chain.track(new Future.sync(callback)); 17 Future syncFuture(callback()) => Chain.track(new Future.sync(callback));
14 18
15 /// Returns a sentence fragment listing the elements of [iter]. 19 /// Returns a sentence fragment listing the elements of [iter].
16 /// 20 ///
17 /// This converts each element of [iter] to a string and separates them with 21 /// This converts each element of [iter] to a string and separates them with
(...skipping 39 matching lines...) Expand 10 before | Expand all | Expand 10 after
57 } 61 }
58 62
59 if (result is! Future) { 63 if (result is! Future) {
60 whenComplete(); 64 whenComplete();
61 return result; 65 return result;
62 } else { 66 } else {
63 return result.whenComplete(whenComplete); 67 return result.whenComplete(whenComplete);
64 } 68 }
65 } 69 }
66 70
71 /// A transformer that silently drops [FormatException]s.
72 final ignoreFormatExceptions = new StreamTransformer.fromHandlers(
73 handleError: (error, stackTrace, sink) {
74 if (error is FormatException) return;
75 sink.addError(error, stackTrace);
76 });
77
78 /// A transformer that sends error responses on [FormatException]s.
79 final StreamChannelTransformer respondToFormatExceptions =
80 new _RespondToFormatExceptionsTransformer();
81
82 /// The implementation of [respondToFormatExceptions].
83 class _RespondToFormatExceptionsTransformer
84 implements StreamChannelTransformer {
85 StreamChannel bind(StreamChannel channel) {
86 var transformed;
87 transformed = channel.changeStream((stream) {
88 return stream.handleError((error) {
89 if (error is! FormatException) throw error;
90
91 var exception = new RpcException(
92 error_code.PARSE_ERROR, 'Invalid JSON: ${error.message}');
93 transformed.sink.add(exception.serialize(error.source));
94 });
95 });
96 return transformed;
97 }
98 }
99
67 /// Returns a [StreamSink] that wraps [sink] and maps each event added using 100 /// Returns a [StreamSink] that wraps [sink] and maps each event added using
68 /// [callback]. 101 /// [callback].
69 StreamSink mapStreamSink(StreamSink sink, callback(event)) => 102 StreamSink mapStreamSink(StreamSink sink, callback(event)) =>
70 new _MappedStreamSink(sink, callback); 103 new _MappedStreamSink(sink, callback);
71 104
72 /// A [StreamSink] wrapper that maps each event added to the sink. 105 /// A [StreamSink] wrapper that maps each event added to the sink.
73 class _MappedStreamSink implements StreamSink { 106 class _MappedStreamSink implements StreamSink {
74 final StreamSink _inner; 107 final StreamSink _inner;
75 final Function _callback; 108 final Function _callback;
76 109
77 Future get done => _inner.done; 110 Future get done => _inner.done;
78 111
79 _MappedStreamSink(this._inner, this._callback); 112 _MappedStreamSink(this._inner, this._callback);
80 113
81 void add(event) => _inner.add(_callback(event)); 114 void add(event) => _inner.add(_callback(event));
82 void addError(error, [StackTrace stackTrace]) => 115 void addError(error, [StackTrace stackTrace]) =>
83 _inner.addError(error, stackTrace); 116 _inner.addError(error, stackTrace);
84 Future addStream(Stream stream) => _inner.addStream(stream.map(_callback)); 117 Future addStream(Stream stream) => _inner.addStream(stream.map(_callback));
85 Future close() => _inner.close(); 118 Future close() => _inner.close();
86 } 119 }
OLDNEW
« no previous file with comments | « lib/src/two_way_stream.dart ('k') | pubspec.yaml » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698