OLD | NEW |
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 part of dart.io; | 5 part of dart.io; |
6 | 6 |
7 const String _webSocketGUID = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"; | 7 const String _webSocketGUID = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"; |
8 | 8 |
9 class _WebSocketMessageType { | 9 class _WebSocketMessageType { |
10 static const int NONE = 0; | 10 static const int NONE = 0; |
(...skipping 81 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
92 if (_currentMessageType != _WebSocketMessageType.NONE) { | 92 if (_currentMessageType != _WebSocketMessageType.NONE) { |
93 throw new WebSocketException("Protocol error"); | 93 throw new WebSocketException("Protocol error"); |
94 } | 94 } |
95 _currentMessageType = _WebSocketMessageType.TEXT; | 95 _currentMessageType = _WebSocketMessageType.TEXT; |
96 _controller = new StreamController(sync: true); | 96 _controller = new StreamController(sync: true); |
97 _controller.stream | 97 _controller.stream |
98 .transform(UTF8.decoder) | 98 .transform(UTF8.decoder) |
99 .fold(new StringBuffer(), (buffer, str) => buffer..write(str)) | 99 .fold(new StringBuffer(), (buffer, str) => buffer..write(str)) |
100 .then((buffer) { | 100 .then((buffer) { |
101 sink.add(buffer.toString()); | 101 sink.add(buffer.toString()); |
102 }, onError: (error) { | 102 }, onError: sink.addError); |
103 sink.addError(error); | |
104 }); | |
105 break; | 103 break; |
106 | 104 |
107 case _WebSocketOpcode.BINARY: | 105 case _WebSocketOpcode.BINARY: |
108 if (_currentMessageType != _WebSocketMessageType.NONE) { | 106 if (_currentMessageType != _WebSocketMessageType.NONE) { |
109 throw new WebSocketException("Protocol error"); | 107 throw new WebSocketException("Protocol error"); |
110 } | 108 } |
111 _currentMessageType = _WebSocketMessageType.BINARY; | 109 _currentMessageType = _WebSocketMessageType.BINARY; |
112 _controller = new StreamController(sync: true); | 110 _controller = new StreamController(sync: true); |
113 _controller.stream | 111 _controller.stream |
114 .fold(new BytesBuilder(), (buffer, data) => buffer..add(data)) | 112 .fold(new BytesBuilder(), (buffer, data) => buffer..add(data)) |
115 .then((buffer) { | 113 .then((buffer) { |
116 sink.add(buffer.takeBytes()); | 114 sink.add(buffer.takeBytes()); |
117 }, onError: (error) { | 115 }, onError: sink.addError); |
118 sink.addError(error); | |
119 }); | |
120 break; | 116 break; |
121 | 117 |
122 case _WebSocketOpcode.CLOSE: | 118 case _WebSocketOpcode.CLOSE: |
123 case _WebSocketOpcode.PING: | 119 case _WebSocketOpcode.PING: |
124 case _WebSocketOpcode.PONG: | 120 case _WebSocketOpcode.PONG: |
125 // Control frames cannot be fragmented. | 121 // Control frames cannot be fragmented. |
126 if (!_fin) throw new WebSocketException("Protocol error"); | 122 if (!_fin) throw new WebSocketException("Protocol error"); |
127 break; | 123 break; |
128 | 124 |
129 default: | 125 default: |
(...skipping 245 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
375 | 371 |
376 | 372 |
377 class _WebSocketTransformerImpl implements WebSocketTransformer { | 373 class _WebSocketTransformerImpl implements WebSocketTransformer { |
378 final StreamController<WebSocket> _controller = | 374 final StreamController<WebSocket> _controller = |
379 new StreamController<WebSocket>(sync: true); | 375 new StreamController<WebSocket>(sync: true); |
380 | 376 |
381 Stream<WebSocket> bind(Stream<HttpRequest> stream) { | 377 Stream<WebSocket> bind(Stream<HttpRequest> stream) { |
382 stream.listen((request) { | 378 stream.listen((request) { |
383 _upgrade(request) | 379 _upgrade(request) |
384 .then((WebSocket webSocket) => _controller.add(webSocket)) | 380 .then((WebSocket webSocket) => _controller.add(webSocket)) |
385 .catchError((error) => _controller.addError(error)); | 381 .catchError(_controller.addError); |
386 }); | 382 }); |
387 | 383 |
388 return _controller.stream; | 384 return _controller.stream; |
389 } | 385 } |
390 | 386 |
391 static Future<WebSocket> _upgrade(HttpRequest request) { | 387 static Future<WebSocket> _upgrade(HttpRequest request) { |
392 var response = request.response; | 388 var response = request.response; |
393 if (!_isUpgradeRequest(request)) { | 389 if (!_isUpgradeRequest(request)) { |
394 // Send error response and drain the request. | 390 // Send error response and drain the request. |
395 request.listen((_) {}, onDone: () { | 391 request.listen((_) {}, onDone: () { |
(...skipping 217 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
613 _controller = new StreamController(sync: true, | 609 _controller = new StreamController(sync: true, |
614 onPause: _onPause, | 610 onPause: _onPause, |
615 onResume: _onResume, | 611 onResume: _onResume, |
616 onCancel: _onListen); | 612 onCancel: _onListen); |
617 var stream = _controller.stream.transform( | 613 var stream = _controller.stream.transform( |
618 new _WebSocketOutgoingTransformer(webSocket)); | 614 new _WebSocketOutgoingTransformer(webSocket)); |
619 socket.addStream(stream) | 615 socket.addStream(stream) |
620 .then((_) { | 616 .then((_) { |
621 _done(); | 617 _done(); |
622 _closeCompleter.complete(webSocket); | 618 _closeCompleter.complete(webSocket); |
623 }, onError: (error) { | 619 }, onError: (error, StackTrace stackTrace) { |
624 _closed = true; | 620 _closed = true; |
625 _cancel(); | 621 _cancel(); |
626 if (error is ArgumentError) { | 622 if (error is ArgumentError) { |
627 if (!_done(error)) { | 623 if (!_done(error, stackTrace)) { |
628 _closeCompleter.completeError(error); | 624 _closeCompleter.completeError(error, stackTrace); |
629 } | 625 } |
630 } else { | 626 } else { |
631 _done(); | 627 _done(); |
632 _closeCompleter.complete(webSocket); | 628 _closeCompleter.complete(webSocket); |
633 } | 629 } |
634 }); | 630 }); |
635 } | 631 } |
636 | 632 |
637 bool _done([error]) { | 633 bool _done([error, StackTrace stackTrace]) { |
638 if (_completer == null) return false; | 634 if (_completer == null) return false; |
639 if (error != null) { | 635 if (error != null) { |
640 _completer.completeError(error); | 636 _completer.completeError(error, stackTrace); |
641 } else { | 637 } else { |
642 _completer.complete(webSocket); | 638 _completer.complete(webSocket); |
643 } | 639 } |
644 _completer = null; | 640 _completer = null; |
645 return true; | 641 return true; |
646 } | 642 } |
647 | 643 |
648 Future addStream(var stream) { | 644 Future addStream(var stream) { |
649 if (_closed) { | 645 if (_closed) { |
650 stream.listen(null).cancel(); | 646 stream.listen(null).cancel(); |
651 return new Future.value(webSocket); | 647 return new Future.value(webSocket); |
652 } | 648 } |
653 _ensureController(); | 649 _ensureController(); |
654 _completer = new Completer(); | 650 _completer = new Completer(); |
655 _subscription = stream.listen( | 651 _subscription = stream.listen( |
656 (data) { | 652 (data) { |
657 _controller.add(data); | 653 _controller.add(data); |
658 }, | 654 }, |
659 onDone: () { | 655 onDone: _done, |
660 _done(); | 656 onError: _done, |
661 }, | |
662 onError: (error) { | |
663 _done(error); | |
664 }, | |
665 cancelOnError: true); | 657 cancelOnError: true); |
666 if (_issuedPause) { | 658 if (_issuedPause) { |
667 _subscription.pause(); | 659 _subscription.pause(); |
668 _issuedPause = false; | 660 _issuedPause = false; |
669 } | 661 } |
670 return _completer.future; | 662 return _completer.future; |
671 } | 663 } |
672 | 664 |
673 Future close() { | 665 Future close() { |
674 _ensureController(); | 666 _ensureController(); |
(...skipping 150 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
825 }, | 817 }, |
826 cancelOnError: true); | 818 cancelOnError: true); |
827 _subscription.pause(); | 819 _subscription.pause(); |
828 _controller = new StreamController(sync: true, | 820 _controller = new StreamController(sync: true, |
829 onListen: _subscription.resume, | 821 onListen: _subscription.resume, |
830 onPause: _subscription.pause, | 822 onPause: _subscription.pause, |
831 onResume: _subscription.resume); | 823 onResume: _subscription.resume); |
832 } | 824 } |
833 | 825 |
834 StreamSubscription listen(void onData(message), | 826 StreamSubscription listen(void onData(message), |
835 {void onError(error), | 827 {Function onError, |
836 void onDone(), | 828 void onDone(), |
837 bool cancelOnError}) { | 829 bool cancelOnError}) { |
838 return _controller.stream.listen(onData, | 830 return _controller.stream.listen(onData, |
839 onError: onError, | 831 onError: onError, |
840 onDone: onDone, | 832 onDone: onDone, |
841 cancelOnError: cancelOnError); | 833 cancelOnError: cancelOnError); |
842 } | 834 } |
843 | 835 |
844 Duration get pingInterval => _pingInterval; | 836 Duration get pingInterval => _pingInterval; |
845 | 837 |
(...skipping 52 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
898 (code < WebSocketStatus.NORMAL_CLOSURE || | 890 (code < WebSocketStatus.NORMAL_CLOSURE || |
899 code == WebSocketStatus.RESERVED_1004 || | 891 code == WebSocketStatus.RESERVED_1004 || |
900 code == WebSocketStatus.NO_STATUS_RECEIVED || | 892 code == WebSocketStatus.NO_STATUS_RECEIVED || |
901 code == WebSocketStatus.ABNORMAL_CLOSURE || | 893 code == WebSocketStatus.ABNORMAL_CLOSURE || |
902 (code > WebSocketStatus.INTERNAL_SERVER_ERROR && | 894 (code > WebSocketStatus.INTERNAL_SERVER_ERROR && |
903 code < WebSocketStatus.RESERVED_1015) || | 895 code < WebSocketStatus.RESERVED_1015) || |
904 (code >= WebSocketStatus.RESERVED_1015 && | 896 (code >= WebSocketStatus.RESERVED_1015 && |
905 code < 3000)); | 897 code < 3000)); |
906 } | 898 } |
907 } | 899 } |
OLD | NEW |