Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(800)

Side by Side Diff: sdk/lib/io/websocket_impl.dart

Issue 25094002: Adapt streams for additional stackTrace argument. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Remove types in closures. Created 7 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « sdk/lib/io/stdio.dart ('k') | tests/lib/async/stack_trace09_test.dart » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 part of dart.io; 5 part of dart.io;
6 6
7 const String _webSocketGUID = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"; 7 const String _webSocketGUID = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
8 8
9 class _WebSocketMessageType { 9 class _WebSocketMessageType {
10 static const int NONE = 0; 10 static const int NONE = 0;
(...skipping 81 matching lines...) Expand 10 before | Expand all | Expand 10 after
92 if (_currentMessageType != _WebSocketMessageType.NONE) { 92 if (_currentMessageType != _WebSocketMessageType.NONE) {
93 throw new WebSocketException("Protocol error"); 93 throw new WebSocketException("Protocol error");
94 } 94 }
95 _currentMessageType = _WebSocketMessageType.TEXT; 95 _currentMessageType = _WebSocketMessageType.TEXT;
96 _controller = new StreamController(sync: true); 96 _controller = new StreamController(sync: true);
97 _controller.stream 97 _controller.stream
98 .transform(UTF8.decoder) 98 .transform(UTF8.decoder)
99 .fold(new StringBuffer(), (buffer, str) => buffer..write(str)) 99 .fold(new StringBuffer(), (buffer, str) => buffer..write(str))
100 .then((buffer) { 100 .then((buffer) {
101 sink.add(buffer.toString()); 101 sink.add(buffer.toString());
102 }, onError: (error) { 102 }, onError: sink.addError);
103 sink.addError(error);
104 });
105 break; 103 break;
106 104
107 case _WebSocketOpcode.BINARY: 105 case _WebSocketOpcode.BINARY:
108 if (_currentMessageType != _WebSocketMessageType.NONE) { 106 if (_currentMessageType != _WebSocketMessageType.NONE) {
109 throw new WebSocketException("Protocol error"); 107 throw new WebSocketException("Protocol error");
110 } 108 }
111 _currentMessageType = _WebSocketMessageType.BINARY; 109 _currentMessageType = _WebSocketMessageType.BINARY;
112 _controller = new StreamController(sync: true); 110 _controller = new StreamController(sync: true);
113 _controller.stream 111 _controller.stream
114 .fold(new BytesBuilder(), (buffer, data) => buffer..add(data)) 112 .fold(new BytesBuilder(), (buffer, data) => buffer..add(data))
115 .then((buffer) { 113 .then((buffer) {
116 sink.add(buffer.takeBytes()); 114 sink.add(buffer.takeBytes());
117 }, onError: (error) { 115 }, onError: sink.addError);
118 sink.addError(error);
119 });
120 break; 116 break;
121 117
122 case _WebSocketOpcode.CLOSE: 118 case _WebSocketOpcode.CLOSE:
123 case _WebSocketOpcode.PING: 119 case _WebSocketOpcode.PING:
124 case _WebSocketOpcode.PONG: 120 case _WebSocketOpcode.PONG:
125 // Control frames cannot be fragmented. 121 // Control frames cannot be fragmented.
126 if (!_fin) throw new WebSocketException("Protocol error"); 122 if (!_fin) throw new WebSocketException("Protocol error");
127 break; 123 break;
128 124
129 default: 125 default:
(...skipping 245 matching lines...) Expand 10 before | Expand all | Expand 10 after
375 371
376 372
377 class _WebSocketTransformerImpl implements WebSocketTransformer { 373 class _WebSocketTransformerImpl implements WebSocketTransformer {
378 final StreamController<WebSocket> _controller = 374 final StreamController<WebSocket> _controller =
379 new StreamController<WebSocket>(sync: true); 375 new StreamController<WebSocket>(sync: true);
380 376
381 Stream<WebSocket> bind(Stream<HttpRequest> stream) { 377 Stream<WebSocket> bind(Stream<HttpRequest> stream) {
382 stream.listen((request) { 378 stream.listen((request) {
383 _upgrade(request) 379 _upgrade(request)
384 .then((WebSocket webSocket) => _controller.add(webSocket)) 380 .then((WebSocket webSocket) => _controller.add(webSocket))
385 .catchError((error) => _controller.addError(error)); 381 .catchError(_controller.addError);
386 }); 382 });
387 383
388 return _controller.stream; 384 return _controller.stream;
389 } 385 }
390 386
391 static Future<WebSocket> _upgrade(HttpRequest request) { 387 static Future<WebSocket> _upgrade(HttpRequest request) {
392 var response = request.response; 388 var response = request.response;
393 if (!_isUpgradeRequest(request)) { 389 if (!_isUpgradeRequest(request)) {
394 // Send error response and drain the request. 390 // Send error response and drain the request.
395 request.listen((_) {}, onDone: () { 391 request.listen((_) {}, onDone: () {
(...skipping 217 matching lines...) Expand 10 before | Expand all | Expand 10 after
613 _controller = new StreamController(sync: true, 609 _controller = new StreamController(sync: true,
614 onPause: _onPause, 610 onPause: _onPause,
615 onResume: _onResume, 611 onResume: _onResume,
616 onCancel: _onListen); 612 onCancel: _onListen);
617 var stream = _controller.stream.transform( 613 var stream = _controller.stream.transform(
618 new _WebSocketOutgoingTransformer(webSocket)); 614 new _WebSocketOutgoingTransformer(webSocket));
619 socket.addStream(stream) 615 socket.addStream(stream)
620 .then((_) { 616 .then((_) {
621 _done(); 617 _done();
622 _closeCompleter.complete(webSocket); 618 _closeCompleter.complete(webSocket);
623 }, onError: (error) { 619 }, onError: (error, StackTrace stackTrace) {
624 _closed = true; 620 _closed = true;
625 _cancel(); 621 _cancel();
626 if (error is ArgumentError) { 622 if (error is ArgumentError) {
627 if (!_done(error)) { 623 if (!_done(error, stackTrace)) {
628 _closeCompleter.completeError(error); 624 _closeCompleter.completeError(error, stackTrace);
629 } 625 }
630 } else { 626 } else {
631 _done(); 627 _done();
632 _closeCompleter.complete(webSocket); 628 _closeCompleter.complete(webSocket);
633 } 629 }
634 }); 630 });
635 } 631 }
636 632
637 bool _done([error]) { 633 bool _done([error, StackTrace stackTrace]) {
638 if (_completer == null) return false; 634 if (_completer == null) return false;
639 if (error != null) { 635 if (error != null) {
640 _completer.completeError(error); 636 _completer.completeError(error, stackTrace);
641 } else { 637 } else {
642 _completer.complete(webSocket); 638 _completer.complete(webSocket);
643 } 639 }
644 _completer = null; 640 _completer = null;
645 return true; 641 return true;
646 } 642 }
647 643
648 Future addStream(var stream) { 644 Future addStream(var stream) {
649 if (_closed) { 645 if (_closed) {
650 stream.listen(null).cancel(); 646 stream.listen(null).cancel();
651 return new Future.value(webSocket); 647 return new Future.value(webSocket);
652 } 648 }
653 _ensureController(); 649 _ensureController();
654 _completer = new Completer(); 650 _completer = new Completer();
655 _subscription = stream.listen( 651 _subscription = stream.listen(
656 (data) { 652 (data) {
657 _controller.add(data); 653 _controller.add(data);
658 }, 654 },
659 onDone: () { 655 onDone: _done,
660 _done(); 656 onError: _done,
661 },
662 onError: (error) {
663 _done(error);
664 },
665 cancelOnError: true); 657 cancelOnError: true);
666 if (_issuedPause) { 658 if (_issuedPause) {
667 _subscription.pause(); 659 _subscription.pause();
668 _issuedPause = false; 660 _issuedPause = false;
669 } 661 }
670 return _completer.future; 662 return _completer.future;
671 } 663 }
672 664
673 Future close() { 665 Future close() {
674 _ensureController(); 666 _ensureController();
(...skipping 150 matching lines...) Expand 10 before | Expand all | Expand 10 after
825 }, 817 },
826 cancelOnError: true); 818 cancelOnError: true);
827 _subscription.pause(); 819 _subscription.pause();
828 _controller = new StreamController(sync: true, 820 _controller = new StreamController(sync: true,
829 onListen: _subscription.resume, 821 onListen: _subscription.resume,
830 onPause: _subscription.pause, 822 onPause: _subscription.pause,
831 onResume: _subscription.resume); 823 onResume: _subscription.resume);
832 } 824 }
833 825
834 StreamSubscription listen(void onData(message), 826 StreamSubscription listen(void onData(message),
835 {void onError(error), 827 {Function onError,
836 void onDone(), 828 void onDone(),
837 bool cancelOnError}) { 829 bool cancelOnError}) {
838 return _controller.stream.listen(onData, 830 return _controller.stream.listen(onData,
839 onError: onError, 831 onError: onError,
840 onDone: onDone, 832 onDone: onDone,
841 cancelOnError: cancelOnError); 833 cancelOnError: cancelOnError);
842 } 834 }
843 835
844 Duration get pingInterval => _pingInterval; 836 Duration get pingInterval => _pingInterval;
845 837
(...skipping 52 matching lines...) Expand 10 before | Expand all | Expand 10 after
898 (code < WebSocketStatus.NORMAL_CLOSURE || 890 (code < WebSocketStatus.NORMAL_CLOSURE ||
899 code == WebSocketStatus.RESERVED_1004 || 891 code == WebSocketStatus.RESERVED_1004 ||
900 code == WebSocketStatus.NO_STATUS_RECEIVED || 892 code == WebSocketStatus.NO_STATUS_RECEIVED ||
901 code == WebSocketStatus.ABNORMAL_CLOSURE || 893 code == WebSocketStatus.ABNORMAL_CLOSURE ||
902 (code > WebSocketStatus.INTERNAL_SERVER_ERROR && 894 (code > WebSocketStatus.INTERNAL_SERVER_ERROR &&
903 code < WebSocketStatus.RESERVED_1015) || 895 code < WebSocketStatus.RESERVED_1015) ||
904 (code >= WebSocketStatus.RESERVED_1015 && 896 (code >= WebSocketStatus.RESERVED_1015 &&
905 code < 3000)); 897 code < 3000));
906 } 898 }
907 } 899 }
OLDNEW
« no previous file with comments | « sdk/lib/io/stdio.dart ('k') | tests/lib/async/stack_trace09_test.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698