Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(283)

Side by Side Diff: sdk/lib/io/websocket_impl.dart

Issue 91223002: Add support for Websocket protocols. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Fix selecting invalid protocol and document. Created 7 years ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « sdk/lib/io/websocket.dart ('k') | tests/standalone/io/socket_test.dart » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 part of dart.io; 5 part of dart.io;
6 6
7 const String _webSocketGUID = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"; 7 const String _webSocketGUID = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
8 8
9 class _WebSocketMessageType { 9 class _WebSocketMessageType {
10 static const int NONE = 0; 10 static const int NONE = 0;
(...skipping 375 matching lines...) Expand 10 before | Expand all | Expand 10 after
386 386
387 class _WebSocketPong { 387 class _WebSocketPong {
388 final List<int> payload; 388 final List<int> payload;
389 _WebSocketPong([this.payload = null]); 389 _WebSocketPong([this.payload = null]);
390 } 390 }
391 391
392 392
393 class _WebSocketTransformerImpl implements WebSocketTransformer { 393 class _WebSocketTransformerImpl implements WebSocketTransformer {
394 final StreamController<WebSocket> _controller = 394 final StreamController<WebSocket> _controller =
395 new StreamController<WebSocket>(sync: true); 395 new StreamController<WebSocket>(sync: true);
396 final Function _protocolSelector;
397
398 _WebSocketTransformerImpl(this._protocolSelector);
396 399
397 Stream<WebSocket> bind(Stream<HttpRequest> stream) { 400 Stream<WebSocket> bind(Stream<HttpRequest> stream) {
398 stream.listen((request) { 401 stream.listen((request) {
399 _upgrade(request) 402 _upgrade(request, _protocolSelector)
400 .then((WebSocket webSocket) => _controller.add(webSocket)) 403 .then((WebSocket webSocket) => _controller.add(webSocket))
401 .catchError(_controller.addError); 404 .catchError(_controller.addError);
402 }); 405 });
403 406
404 return _controller.stream; 407 return _controller.stream;
405 } 408 }
406 409
407 static Future<WebSocket> _upgrade(HttpRequest request) { 410 static Future<WebSocket> _upgrade(HttpRequest request, _protocolSelector) {
408 var response = request.response; 411 var response = request.response;
409 if (!_isUpgradeRequest(request)) { 412 if (!_isUpgradeRequest(request)) {
410 // Send error response and drain the request. 413 // Send error response.
411 request.listen((_) {}, onDone: () { 414 response.statusCode = HttpStatus.BAD_REQUEST;
412 response.statusCode = HttpStatus.BAD_REQUEST; 415 response.close();
413 response.contentLength = 0;
414 response.close();
415 });
416 return new Future.error( 416 return new Future.error(
417 new WebSocketException("Invalid WebSocket upgrade request")); 417 new WebSocketException("Invalid WebSocket upgrade request"));
418 } 418 }
419 419
420 // Send the upgrade response. 420 Future upgrade(String protocol) {
421 response.statusCode = HttpStatus.SWITCHING_PROTOCOLS; 421 // Send the upgrade response.
422 response.headers.add(HttpHeaders.CONNECTION, "Upgrade"); 422 response.statusCode = HttpStatus.SWITCHING_PROTOCOLS;
423 response.headers.add(HttpHeaders.UPGRADE, "websocket"); 423 response.headers.add(HttpHeaders.CONNECTION, "Upgrade");
424 String key = request.headers.value("Sec-WebSocket-Key"); 424 response.headers.add(HttpHeaders.UPGRADE, "websocket");
425 _SHA1 sha1 = new _SHA1(); 425 String key = request.headers.value("Sec-WebSocket-Key");
426 sha1.add("$key$_webSocketGUID".codeUnits); 426 _SHA1 sha1 = new _SHA1();
427 String accept = _CryptoUtils.bytesToBase64(sha1.close()); 427 sha1.add("$key$_webSocketGUID".codeUnits);
428 response.headers.add("Sec-WebSocket-Accept", accept); 428 String accept = _CryptoUtils.bytesToBase64(sha1.close());
429 response.headers.contentLength = 0; 429 response.headers.add("Sec-WebSocket-Accept", accept);
430 return response.detachSocket() 430 if (protocol != null && protocol.isNotEmpty) {
431 .then((socket) => new _WebSocketImpl._fromSocket(socket, true)); 431 response.headers.add("Sec-WebSocket-Protocol", protocol);
432 }
433 response.headers.contentLength = 0;
434 return response.detachSocket()
435 .then((socket) => new _WebSocketImpl._fromSocket(
436 socket, protocol, true));
437 }
438
439 var protocols = request.headers['Sec-WebSocket-Protocol'];
440 if (protocols != null && _protocolSelector != null) {
441 // The suggested protocols can be spread over multiple lines, each
442 // consisting of multiple protocols. To unify all of them, first join
443 // the lists with ', ' and then tokenize.
444 protocols = _HttpParser._tokenizeFieldValue(protocols.join(', '));
445 return new Future(() => _protocolSelector(protocols))
446 .then((protocol) {
447 if (protocols.indexOf(protocol) < 0) {
448 throw new WebSocketException(
449 "Selected protocol is not in the list of available protocols");
450 }
451 return protocol;
452 })
453 .catchError((error) {
454 response.statusCode = HttpStatus.INTERNAL_SERVER_ERROR;
455 response.close();
456 throw error;
457 })
458 .then(upgrade);
459 } else {
460 return upgrade(null);
461 }
432 } 462 }
433 463
434 static bool _isUpgradeRequest(HttpRequest request) { 464 static bool _isUpgradeRequest(HttpRequest request) {
435 if (request.method != "GET") { 465 if (request.method != "GET") {
436 return false; 466 return false;
437 } 467 }
438 if (request.headers[HttpHeaders.CONNECTION] == null) { 468 if (request.headers[HttpHeaders.CONNECTION] == null) {
439 return false; 469 return false;
440 } 470 }
441 bool isUpgrade = false; 471 bool isUpgrade = false;
(...skipping 275 matching lines...) Expand 10 before | Expand all | Expand 10 after
717 747
718 void closeSocket() { 748 void closeSocket() {
719 _closed = true; 749 _closed = true;
720 _cancel(); 750 _cancel();
721 close(); 751 close();
722 } 752 }
723 } 753 }
724 754
725 755
726 class _WebSocketImpl extends Stream implements WebSocket { 756 class _WebSocketImpl extends Stream implements WebSocket {
757 final String protocol;
758
727 StreamController _controller; 759 StreamController _controller;
728 StreamSubscription _subscription; 760 StreamSubscription _subscription;
729 StreamSink _sink; 761 StreamSink _sink;
730 762
731 final Socket _socket; 763 final Socket _socket;
732 final bool _serverSide; 764 final bool _serverSide;
733 int _readyState = WebSocket.CONNECTING; 765 int _readyState = WebSocket.CONNECTING;
734 bool _writeClosed = false; 766 bool _writeClosed = false;
735 int _closeCode; 767 int _closeCode;
736 String _closeReason; 768 String _closeReason;
737 Duration _pingInterval; 769 Duration _pingInterval;
738 Timer _pingTimer; 770 Timer _pingTimer;
739 _WebSocketConsumer _consumer; 771 _WebSocketConsumer _consumer;
740 772
741 int _outCloseCode; 773 int _outCloseCode;
742 String _outCloseReason; 774 String _outCloseReason;
743 775
744 static final HttpClient _httpClient = new HttpClient(); 776 static final HttpClient _httpClient = new HttpClient();
745 777
746 static Future<WebSocket> connect(String url, [protocols]) { 778 static Future<WebSocket> connect(String url, List<String> protocols) {
747 Uri uri = Uri.parse(url); 779 Uri uri = Uri.parse(url);
748 if (uri.scheme != "ws" && uri.scheme != "wss") { 780 if (uri.scheme != "ws" && uri.scheme != "wss") {
749 throw new WebSocketException("Unsupported URL scheme '${uri.scheme}'"); 781 throw new WebSocketException("Unsupported URL scheme '${uri.scheme}'");
750 } 782 }
751 if (uri.userInfo != "") { 783 if (uri.userInfo != "") {
752 throw new WebSocketException("Unsupported user info '${uri.userInfo}'"); 784 throw new WebSocketException("Unsupported user info '${uri.userInfo}'");
753 } 785 }
754 786
755 Random random = new Random(); 787 Random random = new Random();
756 // Generate 16 random bytes. 788 // Generate 16 random bytes.
(...skipping 10 matching lines...) Expand all
767 path: uri.path, 799 path: uri.path,
768 query: uri.query, 800 query: uri.query,
769 fragment: uri.fragment); 801 fragment: uri.fragment);
770 return _httpClient.openUrl("GET", uri) 802 return _httpClient.openUrl("GET", uri)
771 .then((request) { 803 .then((request) {
772 // Setup the initial handshake. 804 // Setup the initial handshake.
773 request.headers.add(HttpHeaders.CONNECTION, "upgrade"); 805 request.headers.add(HttpHeaders.CONNECTION, "upgrade");
774 request.headers.set(HttpHeaders.UPGRADE, "websocket"); 806 request.headers.set(HttpHeaders.UPGRADE, "websocket");
775 request.headers.set("Sec-WebSocket-Key", nonce); 807 request.headers.set("Sec-WebSocket-Key", nonce);
776 request.headers.set("Sec-WebSocket-Version", "13"); 808 request.headers.set("Sec-WebSocket-Version", "13");
809 if (protocols.isNotEmpty) {
810 request.headers.add("Sec-WebSocket-Protocol", protocols);
811 }
777 return request.close(); 812 return request.close();
778 }) 813 })
779 .then((response) { 814 .then((response) {
780 void error(String message) { 815 void error(String message) {
781 // Flush data. 816 // Flush data.
782 response.detachSocket().then((socket) { 817 response.detachSocket().then((socket) {
783 socket.destroy(); 818 socket.destroy();
784 }); 819 });
785 throw new WebSocketException(message); 820 throw new WebSocketException(message);
786 } 821 }
(...skipping 14 matching lines...) Expand all
801 List<int> expectedAccept = sha1.close(); 836 List<int> expectedAccept = sha1.close();
802 List<int> receivedAccept = _CryptoUtils.base64StringToBytes(accept); 837 List<int> receivedAccept = _CryptoUtils.base64StringToBytes(accept);
803 if (expectedAccept.length != receivedAccept.length) { 838 if (expectedAccept.length != receivedAccept.length) {
804 error("Reasponse header 'Sec-WebSocket-Accept' is the wrong length"); 839 error("Reasponse header 'Sec-WebSocket-Accept' is the wrong length");
805 } 840 }
806 for (int i = 0; i < expectedAccept.length; i++) { 841 for (int i = 0; i < expectedAccept.length; i++) {
807 if (expectedAccept[i] != receivedAccept[i]) { 842 if (expectedAccept[i] != receivedAccept[i]) {
808 error("Bad response 'Sec-WebSocket-Accept' header"); 843 error("Bad response 'Sec-WebSocket-Accept' header");
809 } 844 }
810 } 845 }
846 var protocol = response.headers.value('Sec-WebSocket-Protocol');
811 return response.detachSocket() 847 return response.detachSocket()
812 .then((socket) => new _WebSocketImpl._fromSocket(socket)); 848 .then((socket) => new _WebSocketImpl._fromSocket(socket, protocol));
813 }); 849 });
814 } 850 }
815 851
816 _WebSocketImpl._fromSocket(Socket this._socket, 852 _WebSocketImpl._fromSocket(Socket this._socket,
853 String this.protocol,
817 [bool this._serverSide = false]) { 854 [bool this._serverSide = false]) {
818 _consumer = new _WebSocketConsumer(this, _socket); 855 _consumer = new _WebSocketConsumer(this, _socket);
819 _sink = new _StreamSinkImpl(_consumer); 856 _sink = new _StreamSinkImpl(_consumer);
820 _readyState = WebSocket.OPEN; 857 _readyState = WebSocket.OPEN;
821 858
822 var transformer = new _WebSocketProtocolTransformer(_serverSide); 859 var transformer = new _WebSocketProtocolTransformer(_serverSide);
823 _subscription = _socket.transform(transformer).listen( 860 _subscription = _socket.transform(transformer).listen(
824 (data) { 861 (data) {
825 if (data is _WebSocketPing) { 862 if (data is _WebSocketPing) {
826 if (!_writeClosed) _consumer.add(new _WebSocketPong(data.payload)); 863 if (!_writeClosed) _consumer.add(new _WebSocketPong(data.payload));
(...skipping 59 matching lines...) Expand 10 before | Expand all | Expand 10 after
886 _pingTimer = new Timer(_pingInterval, () { 923 _pingTimer = new Timer(_pingInterval, () {
887 // No pong received. 924 // No pong received.
888 _close(WebSocketStatus.GOING_AWAY); 925 _close(WebSocketStatus.GOING_AWAY);
889 }); 926 });
890 }); 927 });
891 } 928 }
892 929
893 int get readyState => _readyState; 930 int get readyState => _readyState;
894 931
895 String get extensions => null; 932 String get extensions => null;
896 String get protocol => null;
897 int get closeCode => _closeCode; 933 int get closeCode => _closeCode;
898 String get closeReason => _closeReason; 934 String get closeReason => _closeReason;
899 935
900 void add(data) => _sink.add(data); 936 void add(data) => _sink.add(data);
901 void addError(error, [StackTrace stackTrace]) => 937 void addError(error, [StackTrace stackTrace]) =>
902 _sink.addError(error, stackTrace); 938 _sink.addError(error, stackTrace);
903 Future addStream(Stream stream) => _sink.addStream(stream); 939 Future addStream(Stream stream) => _sink.addStream(stream);
904 Future get done => _sink.done; 940 Future get done => _sink.done;
905 941
906 Future close([int code, String reason]) { 942 Future close([int code, String reason]) {
(...skipping 22 matching lines...) Expand all
929 (code < WebSocketStatus.NORMAL_CLOSURE || 965 (code < WebSocketStatus.NORMAL_CLOSURE ||
930 code == WebSocketStatus.RESERVED_1004 || 966 code == WebSocketStatus.RESERVED_1004 ||
931 code == WebSocketStatus.NO_STATUS_RECEIVED || 967 code == WebSocketStatus.NO_STATUS_RECEIVED ||
932 code == WebSocketStatus.ABNORMAL_CLOSURE || 968 code == WebSocketStatus.ABNORMAL_CLOSURE ||
933 (code > WebSocketStatus.INTERNAL_SERVER_ERROR && 969 (code > WebSocketStatus.INTERNAL_SERVER_ERROR &&
934 code < WebSocketStatus.RESERVED_1015) || 970 code < WebSocketStatus.RESERVED_1015) ||
935 (code >= WebSocketStatus.RESERVED_1015 && 971 (code >= WebSocketStatus.RESERVED_1015 &&
936 code < 3000)); 972 code < 3000));
937 } 973 }
938 } 974 }
OLDNEW
« no previous file with comments | « sdk/lib/io/websocket.dart ('k') | tests/standalone/io/socket_test.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698