Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(52)

Side by Side Diff: pkg/http/lib/src/utils.dart

Issue 11825010: Update pkg/http to use the new async APIs. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Code review changes Created 7 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « pkg/http/lib/src/streamed_response.dart ('k') | pkg/http/test/client_test.dart » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 library utils; 5 library utils;
6 6
7 import 'dart:async'; 7 import 'dart:async';
8 import 'dart:crypto'; 8 import 'dart:crypto';
9 import 'dart:io'; 9 import 'dart:io';
10 import 'dart:scalarlist'; 10 import 'dart:scalarlist';
11 import 'dart:uri'; 11 import 'dart:uri';
12 import 'dart:utf'; 12 import 'dart:utf';
13 13
14 import 'byte_stream.dart';
15
14 /// Converts a URL query string (or `application/x-www-form-urlencoded` body) 16 /// Converts a URL query string (or `application/x-www-form-urlencoded` body)
15 /// into a [Map] from parameter names to values. 17 /// into a [Map] from parameter names to values.
16 /// 18 ///
17 /// queryToMap("foo=bar&baz=bang&qux"); 19 /// queryToMap("foo=bar&baz=bang&qux");
18 /// //=> {"foo": "bar", "baz": "bang", "qux": ""} 20 /// //=> {"foo": "bar", "baz": "bang", "qux": ""}
19 Map<String, String> queryToMap(String queryList) { 21 Map<String, String> queryToMap(String queryList) {
20 var map = <String>{}; 22 var map = <String>{};
21 for (var pair in queryList.split("&")) { 23 for (var pair in queryList.split("&")) {
22 var split = split1(pair, "="); 24 var split = split1(pair, "=");
23 if (split.isEmpty) continue; 25 if (split.isEmpty) continue;
(...skipping 100 matching lines...) Expand 10 before | Expand all | Expand 10 after
124 /// [ByteArrayViewable], this just returns a view on [input]. 126 /// [ByteArrayViewable], this just returns a view on [input].
125 Uint8List toUint8List(List<int> input) { 127 Uint8List toUint8List(List<int> input) {
126 if (input is Uint8List) return input; 128 if (input is Uint8List) return input;
127 if (input is ByteArrayViewable) input = input.asByteArray(); 129 if (input is ByteArrayViewable) input = input.asByteArray();
128 if (input is ByteArray) return new Uint8List.view(input); 130 if (input is ByteArray) return new Uint8List.view(input);
129 var output = new Uint8List(input.length); 131 var output = new Uint8List(input.length);
130 output.setRange(0, input.length, input); 132 output.setRange(0, input.length, input);
131 return output; 133 return output;
132 } 134 }
133 135
134 /// Buffers all input from an InputStream and returns it as a future. 136 /// If [stream] is already a [ByteStream], returns it. Otherwise, wraps it in a
135 Future<List<int>> consumeInputStream(InputStream stream) { 137 /// [ByteStream].
136 if (stream.closed) return new Future<List<int>>.immediate(<int>[]); 138 ByteStream toByteStream(Stream<List<int>> stream) {
139 if (stream is ByteStream) return stream;
140 return new ByteStream(stream);
141 }
137 142
138 var completer = new Completer<List<int>>(); 143 /// Calls [onDone] once [stream] (a single-subscription [Stream]) is finished.
139 /// TODO(nweiz): use BufferList when issue 6409 is fixed 144 /// The return value, also a single-subscription [Stream] should be used in
140 var buffer = <int>[]; 145 /// place of [stream] after calling this method.
141 stream.onClosed = () => completer.complete(buffer); 146 Stream onDone(Stream stream, void onDone()) {
142 stream.onData = () => buffer.addAll(stream.read()); 147 var pair = tee(stream);
143 stream.onError = completer.completeError; 148 pair.first.listen((_) {}, onError: (_) {}, onDone: onDone);
149 return pair.last;
150 }
151
152 // TODO(nweiz): remove this once issue 7785 is fixed.
153 /// Wraps [stream] in a single-subscription [ByteStream] that emits the same
154 /// data.
155 ByteStream wrapInputStream(InputStream stream) {
156 if (stream.closed) return emptyStream;
157
158 var controller = new StreamController.singleSubscription();
159 stream.onClosed = controller.close;
160 stream.onData = () => controller.add(stream.read());
161 stream.onError = (e) => controller.signalError(new AsyncError(e));
162 return new ByteStream(controller);
163 }
164
165 // TODO(nweiz): remove this once issue 7785 is fixed.
166 /// Wraps [stream] in a [StreamConsumer] so that [Stream]s can by piped into it
167 /// using [Stream.pipe].
168 StreamConsumer<List<int>, dynamic> wrapOutputStream(OutputStream stream) =>
169 new _OutputStreamConsumer(stream);
170
171 /// A [StreamConsumer] that pipes data into an [OutputStream].
172 class _OutputStreamConsumer implements StreamConsumer<List<int>, dynamic> {
173 final OutputStream _outputStream;
174
175 _OutputStreamConsumer(this._outputStream)
176 : super();
177
178 Future consume(Stream<List<int>> stream) {
179 // TODO(nweiz): we have to manually keep track of whether or not the
180 // completer has completed since the output stream could signal an error
181 // after close() has been called but before it has shut down internally. See
182 // the following TODO.
183 var completed = false;
184 var completer = new Completer();
185 stream.listen((data) => _outputStream.write(data), onDone: () {
186 _outputStream.close();
187 // TODO(nweiz): wait until _outputStream.onClosed is called once issue
188 // 7761 is fixed.
189 if (!completed) completer.complete();
190 completed = true;
191 });
192
193 _outputStream.onError = (e) {
194 if (!completed) completer.completeError(e);
195 completed = true;
196 };
197
198 return completer.future;
199 }
200 }
201
202 // TODO(nweiz): remove this when issue 7786 is fixed.
203 /// Pipes all data and errors from [stream] into [sink]. When [stream] is done,
204 /// [sink] is closed and the returned [Future] is completed.
205 Future store(Stream stream, StreamSink sink) {
206 var completer = new Completer();
207 stream.listen(sink.add,
208 onError: sink.signalError,
209 onDone: () {
210 sink.close();
211 completer.complete();
212 });
144 return completer.future; 213 return completer.future;
145 } 214 }
146 215
147 /// Takes all input from [source] and writes it to [sink], then closes [sink]. 216 /// Pipes all data and errors from [stream] into [sink]. Completes [Future] once
148 /// Returns a [Future] that completes when [source] is exhausted. 217 /// [stream] is done. Unlike [store], [sink] remains open after [stream] is
149 void pipeInputToInput(InputStream source, ListInputStream sink) { 218 /// done.
150 source.onClosed = sink.markEndOfStream; 219 Future writeStreamToSink(Stream stream, StreamSink sink) {
151 source.onData = () => sink.write(source.read());
152 // TODO(nweiz): propagate source errors to the sink. See issue 3657.
153 // TODO(nweiz): we need to use async here to avoid issue 4974.
154 source.onError = (e) => async.then((_) {
155 throw e;
156 });
157 }
158
159 /// Takes all input from [source] and writes it to [sink], but does not close
160 /// [sink] when [source] is closed. Returns a [Future] that completes when
161 /// [source] is closed.
162 Future writeInputToInput(InputStream source, ListInputStream sink) {
163 var completer = new Completer(); 220 var completer = new Completer();
164 source.onClosed = () => completer.complete(null); 221 stream.listen(sink.add,
165 source.onData = () => sink.write(source.read()); 222 onError: sink.signalError,
166 // TODO(nweiz): propagate source errors to the sink. See issue 3657. 223 onDone: () => completer.complete());
167 return completer.future; 224 return completer.future;
168 } 225 }
169 226
170 /// Returns a [Future] that asynchronously completes to `null`. 227 /// Returns a [Future] that asynchronously completes to `null`.
171 Future get async { 228 Future get async => new Future.immediate(null);
172 var completer = new Completer(); 229
173 new Timer(0, (_) => completer.complete(null)); 230 /// Returns a closed [Stream] with no elements.
174 return completer.future; 231 Stream get emptyStream => streamFromIterable([]);
232
233 /// Creates a single-subscription stream that emits the items in [iter] and then
234 /// ends.
235 Stream streamFromIterable(Iterable iter) {
236 var stream = new StreamController.singleSubscription();
237 iter.forEach(stream.add);
238 stream.close();
239 return stream.stream;
240 }
241
242 // TODO(nweiz): remove this when issue 7787 is fixed.
243 /// Creates two single-subscription [Stream]s that each emit all values and
244 /// errors from [stream]. This is useful if [stream] is single-subscription but
245 /// multiple subscribers are necessary.
246 Pair<Stream, Stream> tee(Stream stream) {
247 var controller1 = new StreamController.singleSubscription();
248 var controller2 = new StreamController.singleSubscription();
249 stream.listen((value) {
250 controller1.add(value);
251 controller2.add(value);
252 }, onError: (error) {
253 controller1.signalError(error);
254 controller2.signalError(error);
255 }, onDone: () {
256 controller1.close();
257 controller2.close();
258 });
259 return new Pair<Stream, Stream>(controller1.stream, controller2.stream);
260 }
261
262 /// A pair of values.
263 class Pair<E, F> {
264 E first;
265 F last;
266
267 Pair(this.first, this.last);
268
269 String toString() => '($first, $last)';
270
271 bool operator==(other) {
272 if (other is! Pair) return false;
273 return other.first == first && other.last == last;
274 }
275
276 int get hashCode => first.hashCode ^ last.hashCode;
277 }
278
279 /// Configures [future] so that its result (success or exception) is passed on
280 /// to [completer].
281 void chainToCompleter(Future future, Completer completer) {
282 future.then((v) => completer.complete(v)).catchError((e) {
283 completer.completeError(e.error, e.stackTrace);
284 });
175 } 285 }
176 286
177 // TOOD(nweiz): Get rid of this once https://codereview.chromium.org/11293132/ 287 // TOOD(nweiz): Get rid of this once https://codereview.chromium.org/11293132/
178 // is in. 288 // is in.
179 /// Runs [fn] for each element in [input] in order, moving to the next element 289 /// Runs [fn] for each element in [input] in order, moving to the next element
180 /// only when the [Future] returned by [fn] completes. Returns a [Future] that 290 /// only when the [Future] returned by [fn] completes. Returns a [Future] that
181 /// completes when all elements have been processed. 291 /// completes when all elements have been processed.
182 /// 292 ///
183 /// The return values of all [Future]s are discarded. Any errors will cause the 293 /// The return values of all [Future]s are discarded. Any errors will cause the
184 /// iteration to stop and will be piped through the return value. 294 /// iteration to stop and will be piped through the return value.
185 Future forEachFuture(Iterable input, Future fn(element)) { 295 Future forEachFuture(Iterable input, Future fn(element)) {
186 var iterator = input.iterator; 296 var iterator = input.iterator;
187 Future nextElement(_) { 297 Future nextElement(_) {
188 if (!iterator.moveNext()) return new Future.immediate(null); 298 if (!iterator.moveNext()) return new Future.immediate(null);
189 return fn(iterator.current).then(nextElement); 299 return fn(iterator.current).then(nextElement);
190 } 300 }
191 return nextElement(null); 301 return nextElement(null);
192 } 302 }
OLDNEW
« no previous file with comments | « pkg/http/lib/src/streamed_response.dart ('k') | pkg/http/test/client_test.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698