Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(80)

Side by Side Diff: sdk/lib/io/websocket_impl.dart

Issue 16125005: Make new StreamController be async by default. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Address review comments Created 7 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « sdk/lib/io/string_transformer.dart ('k') | sdk/lib/isolate/isolate_stream.dart » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 part of dart.io; 5 part of dart.io;
6 6
7 const String _webSocketGUID = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"; 7 const String _webSocketGUID = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
8 8
9 class _WebSocketMessageType { 9 class _WebSocketMessageType {
10 static const int NONE = 0; 10 static const int NONE = 0;
(...skipping 75 matching lines...) Expand 10 before | Expand all | Expand 10 after
86 if (_currentMessageType == _WebSocketMessageType.NONE) { 86 if (_currentMessageType == _WebSocketMessageType.NONE) {
87 throw new WebSocketException("Protocol error"); 87 throw new WebSocketException("Protocol error");
88 } 88 }
89 break; 89 break;
90 90
91 case _WebSocketOpcode.TEXT: 91 case _WebSocketOpcode.TEXT:
92 if (_currentMessageType != _WebSocketMessageType.NONE) { 92 if (_currentMessageType != _WebSocketMessageType.NONE) {
93 throw new WebSocketException("Protocol error"); 93 throw new WebSocketException("Protocol error");
94 } 94 }
95 _currentMessageType = _WebSocketMessageType.TEXT; 95 _currentMessageType = _WebSocketMessageType.TEXT;
96 _controller = new StreamController(); 96 _controller = new StreamController(sync: true);
97 _controller.stream 97 _controller.stream
98 .transform(new Utf8DecoderTransformer(null)) 98 .transform(new Utf8DecoderTransformer(null))
99 .fold(new StringBuffer(), (buffer, str) => buffer..write(str)) 99 .fold(new StringBuffer(), (buffer, str) => buffer..write(str))
100 .then((buffer) { 100 .then((buffer) {
101 sink.add(buffer.toString()); 101 sink.add(buffer.toString());
102 }, onError: (error) { 102 }, onError: (error) {
103 sink.addError(error); 103 sink.addError(error);
104 }); 104 });
105 break; 105 break;
106 106
107 case _WebSocketOpcode.BINARY: 107 case _WebSocketOpcode.BINARY:
108 if (_currentMessageType != _WebSocketMessageType.NONE) { 108 if (_currentMessageType != _WebSocketMessageType.NONE) {
109 throw new WebSocketException("Protocol error"); 109 throw new WebSocketException("Protocol error");
110 } 110 }
111 _currentMessageType = _WebSocketMessageType.BINARY; 111 _currentMessageType = _WebSocketMessageType.BINARY;
112 _controller = new StreamController(); 112 _controller = new StreamController(sync: true);
113 _controller.stream 113 _controller.stream
114 .fold(new _BufferList(), (buffer, data) => buffer..add(data)) 114 .fold(new _BufferList(), (buffer, data) => buffer..add(data))
115 .then((buffer) { 115 .then((buffer) {
116 sink.add(buffer.readBytes()); 116 sink.add(buffer.readBytes());
117 }, onError: (error) { 117 }, onError: (error) {
118 sink.addError(error); 118 sink.addError(error);
119 }); 119 });
120 break; 120 break;
121 121
122 case _WebSocketOpcode.CLOSE: 122 case _WebSocketOpcode.CLOSE:
(...skipping 246 matching lines...) Expand 10 before | Expand all | Expand 10 after
369 369
370 370
371 class _WebSocketPong { 371 class _WebSocketPong {
372 final List<int> payload; 372 final List<int> payload;
373 _WebSocketPong([this.payload = null]); 373 _WebSocketPong([this.payload = null]);
374 } 374 }
375 375
376 376
377 class _WebSocketTransformerImpl implements WebSocketTransformer { 377 class _WebSocketTransformerImpl implements WebSocketTransformer {
378 final StreamController<WebSocket> _controller = 378 final StreamController<WebSocket> _controller =
379 new StreamController<WebSocket>(); 379 new StreamController<WebSocket>(sync: true);
380 380
381 Stream<WebSocket> bind(Stream<HttpRequest> stream) { 381 Stream<WebSocket> bind(Stream<HttpRequest> stream) {
382 stream.listen((request) { 382 stream.listen((request) {
383 _upgrade(request) 383 _upgrade(request)
384 .then((WebSocket webSocket) => _controller.add(webSocket)) 384 .then((WebSocket webSocket) => _controller.add(webSocket))
385 .catchError((error) => _controller.addError(error)); 385 .catchError((error) => _controller.addError(error));
386 }); 386 });
387 387
388 return _controller.stream; 388 return _controller.stream;
389 } 389 }
(...skipping 169 matching lines...) Expand 10 before | Expand all | Expand 10 after
559 _WebSocketConsumer(_WebSocketImpl this.webSocket, Socket this.socket); 559 _WebSocketConsumer(_WebSocketImpl this.webSocket, Socket this.socket);
560 560
561 void _onListen() { 561 void _onListen() {
562 if (_subscription != null) { 562 if (_subscription != null) {
563 _subscription.cancel(); 563 _subscription.cancel();
564 } 564 }
565 } 565 }
566 566
567 _ensureController() { 567 _ensureController() {
568 if (_controller != null) return; 568 if (_controller != null) return;
569 _controller = new StreamController(onPause: () => _subscription.pause(), 569 _controller = new StreamController(sync: true,
570 onPause: () => _subscription.pause(),
570 onResume: () => _subscription.resume(), 571 onResume: () => _subscription.resume(),
571 onCancel: _onListen); 572 onCancel: _onListen);
572 var stream = _controller.stream.transform( 573 var stream = _controller.stream.transform(
573 new _WebSocketOutgoingTransformer(webSocket)); 574 new _WebSocketOutgoingTransformer(webSocket));
574 socket.addStream(stream) 575 socket.addStream(stream)
575 .then((_) { 576 .then((_) {
576 _done(); 577 _done();
577 _closeCompleter.complete(webSocket); 578 _closeCompleter.complete(webSocket);
578 }, 579 },
579 onError: (error) { 580 onError: (error) {
(...skipping 41 matching lines...) Expand 10 before | Expand all | Expand 10 after
621 } 622 }
622 623
623 void add(data) { 624 void add(data) {
624 _ensureController(); 625 _ensureController();
625 _controller.add(data); 626 _controller.add(data);
626 } 627 }
627 } 628 }
628 629
629 630
630 class _WebSocketImpl extends Stream implements WebSocket { 631 class _WebSocketImpl extends Stream implements WebSocket {
631 final StreamController _controller = new StreamController(); 632 final StreamController _controller = new StreamController(sync: true);
632 StreamSink _sink; 633 StreamSink _sink;
633 634
634 final Socket _socket; 635 final Socket _socket;
635 final bool _serverSide; 636 final bool _serverSide;
636 int _readyState = WebSocket.CONNECTING; 637 int _readyState = WebSocket.CONNECTING;
637 bool _writeClosed = false; 638 bool _writeClosed = false;
638 int _closeCode; 639 int _closeCode;
639 String _closeReason; 640 String _closeReason;
640 641
641 int _outCloseCode; 642 int _outCloseCode;
(...skipping 153 matching lines...) Expand 10 before | Expand all | Expand 10 after
795 (code < WebSocketStatus.NORMAL_CLOSURE || 796 (code < WebSocketStatus.NORMAL_CLOSURE ||
796 code == WebSocketStatus.RESERVED_1004 || 797 code == WebSocketStatus.RESERVED_1004 ||
797 code == WebSocketStatus.NO_STATUS_RECEIVED || 798 code == WebSocketStatus.NO_STATUS_RECEIVED ||
798 code == WebSocketStatus.ABNORMAL_CLOSURE || 799 code == WebSocketStatus.ABNORMAL_CLOSURE ||
799 (code > WebSocketStatus.INTERNAL_SERVER_ERROR && 800 (code > WebSocketStatus.INTERNAL_SERVER_ERROR &&
800 code < WebSocketStatus.RESERVED_1015) || 801 code < WebSocketStatus.RESERVED_1015) ||
801 (code >= WebSocketStatus.RESERVED_1015 && 802 (code >= WebSocketStatus.RESERVED_1015 &&
802 code < 3000)); 803 code < 3000));
803 } 804 }
804 } 805 }
OLDNEW
« no previous file with comments | « sdk/lib/io/string_transformer.dart ('k') | sdk/lib/isolate/isolate_stream.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698