OLD | NEW |
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 part of dart.io; | 5 part of dart.io; |
6 | 6 |
7 const String _webSocketGUID = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"; | 7 const String _webSocketGUID = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"; |
8 | 8 |
9 class _WebSocketMessageType { | 9 class _WebSocketMessageType { |
10 static const int NONE = 0; | 10 static const int NONE = 0; |
(...skipping 75 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
86 if (_currentMessageType == _WebSocketMessageType.NONE) { | 86 if (_currentMessageType == _WebSocketMessageType.NONE) { |
87 throw new WebSocketException("Protocol error"); | 87 throw new WebSocketException("Protocol error"); |
88 } | 88 } |
89 break; | 89 break; |
90 | 90 |
91 case _WebSocketOpcode.TEXT: | 91 case _WebSocketOpcode.TEXT: |
92 if (_currentMessageType != _WebSocketMessageType.NONE) { | 92 if (_currentMessageType != _WebSocketMessageType.NONE) { |
93 throw new WebSocketException("Protocol error"); | 93 throw new WebSocketException("Protocol error"); |
94 } | 94 } |
95 _currentMessageType = _WebSocketMessageType.TEXT; | 95 _currentMessageType = _WebSocketMessageType.TEXT; |
96 _controller = new StreamController(); | 96 _controller = new StreamController(sync: true); |
97 _controller.stream | 97 _controller.stream |
98 .transform(new Utf8DecoderTransformer(null)) | 98 .transform(new Utf8DecoderTransformer(null)) |
99 .fold(new StringBuffer(), (buffer, str) => buffer..write(str)) | 99 .fold(new StringBuffer(), (buffer, str) => buffer..write(str)) |
100 .then((buffer) { | 100 .then((buffer) { |
101 sink.add(buffer.toString()); | 101 sink.add(buffer.toString()); |
102 }, onError: (error) { | 102 }, onError: (error) { |
103 sink.addError(error); | 103 sink.addError(error); |
104 }); | 104 }); |
105 break; | 105 break; |
106 | 106 |
107 case _WebSocketOpcode.BINARY: | 107 case _WebSocketOpcode.BINARY: |
108 if (_currentMessageType != _WebSocketMessageType.NONE) { | 108 if (_currentMessageType != _WebSocketMessageType.NONE) { |
109 throw new WebSocketException("Protocol error"); | 109 throw new WebSocketException("Protocol error"); |
110 } | 110 } |
111 _currentMessageType = _WebSocketMessageType.BINARY; | 111 _currentMessageType = _WebSocketMessageType.BINARY; |
112 _controller = new StreamController(); | 112 _controller = new StreamController(sync: true); |
113 _controller.stream | 113 _controller.stream |
114 .fold(new _BufferList(), (buffer, data) => buffer..add(data)) | 114 .fold(new _BufferList(), (buffer, data) => buffer..add(data)) |
115 .then((buffer) { | 115 .then((buffer) { |
116 sink.add(buffer.readBytes()); | 116 sink.add(buffer.readBytes()); |
117 }, onError: (error) { | 117 }, onError: (error) { |
118 sink.addError(error); | 118 sink.addError(error); |
119 }); | 119 }); |
120 break; | 120 break; |
121 | 121 |
122 case _WebSocketOpcode.CLOSE: | 122 case _WebSocketOpcode.CLOSE: |
(...skipping 246 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
369 | 369 |
370 | 370 |
371 class _WebSocketPong { | 371 class _WebSocketPong { |
372 final List<int> payload; | 372 final List<int> payload; |
373 _WebSocketPong([this.payload = null]); | 373 _WebSocketPong([this.payload = null]); |
374 } | 374 } |
375 | 375 |
376 | 376 |
377 class _WebSocketTransformerImpl implements WebSocketTransformer { | 377 class _WebSocketTransformerImpl implements WebSocketTransformer { |
378 final StreamController<WebSocket> _controller = | 378 final StreamController<WebSocket> _controller = |
379 new StreamController<WebSocket>(); | 379 new StreamController<WebSocket>(sync: true); |
380 | 380 |
381 Stream<WebSocket> bind(Stream<HttpRequest> stream) { | 381 Stream<WebSocket> bind(Stream<HttpRequest> stream) { |
382 stream.listen((request) { | 382 stream.listen((request) { |
383 _upgrade(request) | 383 _upgrade(request) |
384 .then((WebSocket webSocket) => _controller.add(webSocket)) | 384 .then((WebSocket webSocket) => _controller.add(webSocket)) |
385 .catchError((error) => _controller.addError(error)); | 385 .catchError((error) => _controller.addError(error)); |
386 }); | 386 }); |
387 | 387 |
388 return _controller.stream; | 388 return _controller.stream; |
389 } | 389 } |
(...skipping 169 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
559 _WebSocketConsumer(_WebSocketImpl this.webSocket, Socket this.socket); | 559 _WebSocketConsumer(_WebSocketImpl this.webSocket, Socket this.socket); |
560 | 560 |
561 void _onListen() { | 561 void _onListen() { |
562 if (_subscription != null) { | 562 if (_subscription != null) { |
563 _subscription.cancel(); | 563 _subscription.cancel(); |
564 } | 564 } |
565 } | 565 } |
566 | 566 |
567 _ensureController() { | 567 _ensureController() { |
568 if (_controller != null) return; | 568 if (_controller != null) return; |
569 _controller = new StreamController(onPause: () => _subscription.pause(), | 569 _controller = new StreamController(sync: true, |
| 570 onPause: () => _subscription.pause(), |
570 onResume: () => _subscription.resume(), | 571 onResume: () => _subscription.resume(), |
571 onCancel: _onListen); | 572 onCancel: _onListen); |
572 var stream = _controller.stream.transform( | 573 var stream = _controller.stream.transform( |
573 new _WebSocketOutgoingTransformer(webSocket)); | 574 new _WebSocketOutgoingTransformer(webSocket)); |
574 socket.addStream(stream) | 575 socket.addStream(stream) |
575 .then((_) { | 576 .then((_) { |
576 _done(); | 577 _done(); |
577 _closeCompleter.complete(webSocket); | 578 _closeCompleter.complete(webSocket); |
578 }, | 579 }, |
579 onError: (error) { | 580 onError: (error) { |
(...skipping 41 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
621 } | 622 } |
622 | 623 |
623 void add(data) { | 624 void add(data) { |
624 _ensureController(); | 625 _ensureController(); |
625 _controller.add(data); | 626 _controller.add(data); |
626 } | 627 } |
627 } | 628 } |
628 | 629 |
629 | 630 |
630 class _WebSocketImpl extends Stream implements WebSocket { | 631 class _WebSocketImpl extends Stream implements WebSocket { |
631 final StreamController _controller = new StreamController(); | 632 final StreamController _controller = new StreamController(sync: true); |
632 StreamSink _sink; | 633 StreamSink _sink; |
633 | 634 |
634 final Socket _socket; | 635 final Socket _socket; |
635 final bool _serverSide; | 636 final bool _serverSide; |
636 int _readyState = WebSocket.CONNECTING; | 637 int _readyState = WebSocket.CONNECTING; |
637 bool _writeClosed = false; | 638 bool _writeClosed = false; |
638 int _closeCode; | 639 int _closeCode; |
639 String _closeReason; | 640 String _closeReason; |
640 | 641 |
641 int _outCloseCode; | 642 int _outCloseCode; |
(...skipping 153 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
795 (code < WebSocketStatus.NORMAL_CLOSURE || | 796 (code < WebSocketStatus.NORMAL_CLOSURE || |
796 code == WebSocketStatus.RESERVED_1004 || | 797 code == WebSocketStatus.RESERVED_1004 || |
797 code == WebSocketStatus.NO_STATUS_RECEIVED || | 798 code == WebSocketStatus.NO_STATUS_RECEIVED || |
798 code == WebSocketStatus.ABNORMAL_CLOSURE || | 799 code == WebSocketStatus.ABNORMAL_CLOSURE || |
799 (code > WebSocketStatus.INTERNAL_SERVER_ERROR && | 800 (code > WebSocketStatus.INTERNAL_SERVER_ERROR && |
800 code < WebSocketStatus.RESERVED_1015) || | 801 code < WebSocketStatus.RESERVED_1015) || |
801 (code >= WebSocketStatus.RESERVED_1015 && | 802 (code >= WebSocketStatus.RESERVED_1015 && |
802 code < 3000)); | 803 code < 3000)); |
803 } | 804 } |
804 } | 805 } |
OLD | NEW |