Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(493)

Side by Side Diff: sdk/lib/io/websocket_impl.dart

Issue 1158593002: Initial Implementation of WebSocket Compression (Closed) Base URL: https://github.com/dart-lang/sdk.git@master
Patch Set: Fix Type Annotations Created 5 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « no previous file | no next file » | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 part of dart.io; 5 part of dart.io;
6 6
7 const String _webSocketGUID = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"; 7 const String _webSocketGUID = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
8 8
9 // Matches _WebSocketOpcode. 9 // Matches _WebSocketOpcode.
10 class _WebSocketMessageType { 10 class _WebSocketMessageType {
(...skipping 20 matching lines...) Expand all
31 static const int RESERVED_D = 13; 31 static const int RESERVED_D = 13;
32 static const int RESERVED_E = 14; 32 static const int RESERVED_E = 14;
33 static const int RESERVED_F = 15; 33 static const int RESERVED_F = 15;
34 } 34 }
35 35
36 /** 36 /**
37 * The web socket protocol transformer handles the protocol byte stream 37 * The web socket protocol transformer handles the protocol byte stream
38 * which is supplied through the [:handleData:]. As the protocol is processed, 38 * which is supplied through the [:handleData:]. As the protocol is processed,
39 * it'll output frame data as either a List<int> or String. 39 * it'll output frame data as either a List<int> or String.
40 * 40 *
41 * Important infomation about usage: Be sure you use cancelOnError, so the 41 * Important information about usage: Be sure you use cancelOnError, so the
42 * socket will be closed when the processer encounter an error. Not using it 42 * socket will be closed when the processor encounter an error. Not using it
43 * will lead to undefined behaviour. 43 * will lead to undefined behaviour.
44 */ 44 */
45 // TODO(ajohnsen): make this transformer reusable? 45 // TODO(ajohnsen): make this transformer reusable?
46 class _WebSocketProtocolTransformer implements StreamTransformer, EventSink { 46 class _WebSocketProtocolTransformer implements StreamTransformer, EventSink {
47 static const int START = 0; 47 static const int START = 0;
48 static const int LEN_FIRST = 1; 48 static const int LEN_FIRST = 1;
49 static const int LEN_REST = 2; 49 static const int LEN_REST = 2;
50 static const int MASK = 3; 50 static const int MASK = 3;
51 static const int PAYLOAD = 4; 51 static const int PAYLOAD = 4;
52 static const int CLOSED = 5; 52 static const int CLOSED = 5;
(...skipping 11 matching lines...) Expand all
64 int _currentMessageType = _WebSocketMessageType.NONE; 64 int _currentMessageType = _WebSocketMessageType.NONE;
65 int closeCode = WebSocketStatus.NO_STATUS_RECEIVED; 65 int closeCode = WebSocketStatus.NO_STATUS_RECEIVED;
66 String closeReason = ""; 66 String closeReason = "";
67 67
68 EventSink _eventSink; 68 EventSink _eventSink;
69 69
70 final bool _serverSide; 70 final bool _serverSide;
71 final List _maskingBytes = new List(4); 71 final List _maskingBytes = new List(4);
72 final BytesBuilder _payload = new BytesBuilder(copy: false); 72 final BytesBuilder _payload = new BytesBuilder(copy: false);
73 73
74 _WebSocketPerMessageDeflateHelper _deflateHelper;
75
74 _WebSocketProtocolTransformer([this._serverSide = false]); 76 _WebSocketProtocolTransformer([this._serverSide = false]);
75 77
76 Stream bind(Stream stream) { 78 Stream bind(Stream stream) {
77 return new Stream.eventTransformed( 79 return new Stream.eventTransformed(
78 stream, 80 stream,
79 (EventSink eventSink) { 81 (EventSink eventSink) {
80 if (_eventSink != null) { 82 if (_eventSink != null) {
81 throw new StateError("WebSocket transformer already used."); 83 throw new StateError("WebSocket transformer already used.");
82 } 84 }
83 _eventSink = eventSink; 85 _eventSink = eventSink;
(...skipping 192 matching lines...) Expand 10 before | Expand all | Expand 10 after
276 _prepareForNextFrame(); 278 _prepareForNextFrame();
277 } else { 279 } else {
278 _messageFrameEnd(); 280 _messageFrameEnd();
279 } 281 }
280 } else { 282 } else {
281 _state = PAYLOAD; 283 _state = PAYLOAD;
282 } 284 }
283 } 285 }
284 286
285 void _messageFrameEnd() { 287 void _messageFrameEnd() {
286 if (_fin) { 288 if (_fin) {
Søren Gjesse 2015/05/26 09:11:33 Move getting bytes and optionally decompression up
287 switch (_currentMessageType) { 289 switch (_currentMessageType) {
288 case _WebSocketMessageType.TEXT: 290 case _WebSocketMessageType.TEXT:
289 _eventSink.add(UTF8.decode(_payload.takeBytes())); 291 var bytes = _payload.takeBytes();
292 if (_deflateHelper != null) {
293 bytes = _deflateHelper.processIncomingMessage(bytes);
294 }
295 _eventSink.add(UTF8.decode(bytes));
290 break; 296 break;
291 case _WebSocketMessageType.BINARY: 297 case _WebSocketMessageType.BINARY:
292 _eventSink.add(_payload.takeBytes()); 298 var bytes = _payload.takeBytes();
299 if (_deflateHelper != null) {
300 bytes = _deflateHelper.processIncomingMessage(bytes);
301 }
302 _eventSink.add(bytes);
293 break; 303 break;
294 } 304 }
295 _currentMessageType = _WebSocketMessageType.NONE; 305 _currentMessageType = _WebSocketMessageType.NONE;
296 } 306 }
297 _prepareForNextFrame(); 307 _prepareForNextFrame();
298 } 308 }
299 309
300 void _controlFrameEnd() { 310 void _controlFrameEnd() {
301 switch (_opcode) { 311 switch (_opcode) {
302 case _WebSocketOpcode.CLOSE: 312 case _WebSocketOpcode.CLOSE:
(...skipping 94 matching lines...) Expand 10 before | Expand all | Expand 10 after
397 ..headers.add(HttpHeaders.CONNECTION, "Upgrade") 407 ..headers.add(HttpHeaders.CONNECTION, "Upgrade")
398 ..headers.add(HttpHeaders.UPGRADE, "websocket"); 408 ..headers.add(HttpHeaders.UPGRADE, "websocket");
399 String key = request.headers.value("Sec-WebSocket-Key"); 409 String key = request.headers.value("Sec-WebSocket-Key");
400 _SHA1 sha1 = new _SHA1(); 410 _SHA1 sha1 = new _SHA1();
401 sha1.add("$key$_webSocketGUID".codeUnits); 411 sha1.add("$key$_webSocketGUID".codeUnits);
402 String accept = _CryptoUtils.bytesToBase64(sha1.close()); 412 String accept = _CryptoUtils.bytesToBase64(sha1.close());
403 response.headers.add("Sec-WebSocket-Accept", accept); 413 response.headers.add("Sec-WebSocket-Accept", accept);
404 if (protocol != null) { 414 if (protocol != null) {
405 response.headers.add("Sec-WebSocket-Protocol", protocol); 415 response.headers.add("Sec-WebSocket-Protocol", protocol);
406 } 416 }
417
418 var extensionHeader = request.headers.value("Sec-WebSocket-Extensions");
419
420 if (extensionHeader == null) {
421 extensionHeader = "";
422 }
423
424 Iterable<List<String>> extensions = extensionHeader.split(",").map((it) => it.split("; "));
Søren Gjesse 2015/05/26 09:11:33 I think it will be helpful to create a class to re
Søren Gjesse 2015/05/26 09:11:33 Long line - more below.
425
426 _WebSocketPerMessageDeflateHelper deflateHelper;
427
428 if (extensions.any((x) => x[0] == "permessage-deflate")) {
429 response.headers.add("Sec-WebSocket-Extensions", "permessage-deflate");
430 deflateHelper = new _WebSocketPerMessageDeflateHelper();
431 }
432
407 response.headers.contentLength = 0; 433 response.headers.contentLength = 0;
408 return response.detachSocket() 434 return response.detachSocket()
409 .then((socket) => new _WebSocketImpl._fromSocket( 435 .then((socket) => new _WebSocketImpl._fromSocket(
410 socket, protocol, true)); 436 socket, protocol, true).._deflateHelper = deflateHelper);
Søren Gjesse 2015/05/26 09:11:33 Please pass the arguments used for handling compre
411 } 437 }
412 438
413 var protocols = request.headers['Sec-WebSocket-Protocol']; 439 var protocols = request.headers['Sec-WebSocket-Protocol'];
414 if (protocols != null && _protocolSelector != null) { 440 if (protocols != null && _protocolSelector != null) {
415 // The suggested protocols can be spread over multiple lines, each 441 // The suggested protocols can be spread over multiple lines, each
416 // consisting of multiple protocols. To unify all of them, first join 442 // consisting of multiple protocols. To unify all of them, first join
417 // the lists with ', ' and then tokenize. 443 // the lists with ', ' and then tokenize.
418 protocols = _HttpParser._tokenizeFieldValue(protocols.join(', ')); 444 protocols = _HttpParser._tokenizeFieldValue(protocols.join(', '));
419 return new Future(() => _protocolSelector(protocols)) 445 return new Future(() => _protocolSelector(protocols))
420 .then((protocol) { 446 .then((protocol) {
(...skipping 36 matching lines...) Expand 10 before | Expand all | Expand 10 after
457 return false; 483 return false;
458 } 484 }
459 String key = request.headers.value("Sec-WebSocket-Key"); 485 String key = request.headers.value("Sec-WebSocket-Key");
460 if (key == null) { 486 if (key == null) {
461 return false; 487 return false;
462 } 488 }
463 return true; 489 return true;
464 } 490 }
465 } 491 }
466 492
493 class _WebSocketPerMessageDeflateHelper {
Søren Gjesse 2015/05/26 09:11:33 I would just call this _WebSocketPerMessageDeflate
494 int windowBits;
495
496 ZLibDecoder decoder;
497 ZLibEncoder encoder;
498
499 _WebSocketPerMessageDeflateHelper({this.windowBits}) {
500 decoder = windowBits != null ? new ZLibDecoder(windowBits: windowBits) : new ZLibDecoder();
501 encoder = windowBits != null ? new ZLibEncoder(windowBits: windowBits) : new ZLibEncoder();
502 }
503
504 List<int> processIncomingMessage(List<int> msg) {
505 var builder = new BytesBuilder();
506 builder.add(msg);
507 builder.add(const [0x00, 0x00, 0xff, 0xff]);
508 return decoder.convert(builder.takeBytes());
509 }
510
511 List<int> processOutgoingMessage(List<int> msg) {
512 var c = encoder.convert(msg);
513 c = c.sublist(0, c.length - 4);
514 return c;
515 }
516 }
467 517
468 // TODO(ajohnsen): Make this transformer reusable. 518 // TODO(ajohnsen): Make this transformer reusable.
469 class _WebSocketOutgoingTransformer implements StreamTransformer, EventSink { 519 class _WebSocketOutgoingTransformer implements StreamTransformer, EventSink {
470 final _WebSocketImpl webSocket; 520 final _WebSocketImpl webSocket;
471 EventSink _eventSink; 521 EventSink _eventSink;
472 522
523 _WebSocketPerMessageDeflateHelper _deflateHelper;
524
473 _WebSocketOutgoingTransformer(this.webSocket); 525 _WebSocketOutgoingTransformer(this.webSocket);
474 526
475 Stream bind(Stream stream) { 527 Stream bind(Stream stream) {
476 return new Stream.eventTransformed( 528 return new Stream.eventTransformed(
477 stream, 529 stream,
478 (EventSink eventSink) { 530 (EventSink eventSink) {
479 if (_eventSink != null) { 531 if (_eventSink != null) {
480 throw new StateError("WebSocket transformer already used"); 532 throw new StateError("WebSocket transformer already used");
481 } 533 }
482 _eventSink = eventSink; 534 _eventSink = eventSink;
483 return this; 535 return this;
484 }); 536 });
485 } 537 }
486 538
487 void add(message) { 539 void add(message) {
488 if (message is _WebSocketPong) { 540 if (message is _WebSocketPong) {
489 addFrame(_WebSocketOpcode.PONG, message.payload); 541 addFrame(_WebSocketOpcode.PONG, message.payload);
490 return; 542 return;
491 } 543 }
492 if (message is _WebSocketPing) { 544 if (message is _WebSocketPing) {
493 addFrame(_WebSocketOpcode.PING, message.payload); 545 addFrame(_WebSocketOpcode.PING, message.payload);
494 return; 546 return;
495 } 547 }
496 List<int> data; 548 List<int> data;
497 int opcode; 549 int opcode;
498 if (message != null) { 550 if (message != null) {
499 if (message is String) { 551 if (message is String) {
500 opcode = _WebSocketOpcode.TEXT; 552 opcode = _WebSocketOpcode.TEXT;
501 data = UTF8.encode(message); 553 data = UTF8.encode(message);
554
555 if (_deflateHelper != null) {
556 data = _deflateHelper.processOutgoingMessage(data);
557 }
502 } else { 558 } else {
503 if (message is !List<int>) { 559 if (message is !List<int>) {
504 throw new ArgumentError(message); 560 throw new ArgumentError(message);
505 } 561 }
506 opcode = _WebSocketOpcode.BINARY; 562 opcode = _WebSocketOpcode.BINARY;
507 data = message; 563 data = message;
564
565 if (_deflateHelper != null) {
566 data = _deflateHelper.processOutgoingMessage(data);
567 }
508 } 568 }
509 } else { 569 } else {
510 opcode = _WebSocketOpcode.TEXT; 570 opcode = _WebSocketOpcode.TEXT;
511 } 571 }
Søren Gjesse 2015/05/26 09:11:33 Move the compression down here to avoid the duplic
512 addFrame(opcode, data); 572 addFrame(opcode, data);
513 } 573 }
514 574
515 void addError(Object error, [StackTrace stackTrace]) => 575 void addError(Object error, [StackTrace stackTrace]) =>
516 _eventSink.addError(error, stackTrace); 576 _eventSink.addError(error, stackTrace);
517 577
518 void close() { 578 void close() {
519 int code = webSocket._outCloseCode; 579 int code = webSocket._outCloseCode;
520 String reason = webSocket._outCloseReason; 580 String reason = webSocket._outCloseReason;
521 List<int> data; 581 List<int> data;
(...skipping 237 matching lines...) Expand 10 before | Expand all | Expand 10 after
759 bool _writeClosed = false; 819 bool _writeClosed = false;
760 int _closeCode; 820 int _closeCode;
761 String _closeReason; 821 String _closeReason;
762 Duration _pingInterval; 822 Duration _pingInterval;
763 Timer _pingTimer; 823 Timer _pingTimer;
764 _WebSocketConsumer _consumer; 824 _WebSocketConsumer _consumer;
765 825
766 int _outCloseCode; 826 int _outCloseCode;
767 String _outCloseReason; 827 String _outCloseReason;
768 Timer _closeTimer; 828 Timer _closeTimer;
829 _WebSocketPerMessageDeflateHelper _deflateHelper;
769 830
770 static final HttpClient _httpClient = new HttpClient(); 831 static final HttpClient _httpClient = new HttpClient();
771 832
772 static Future<WebSocket> connect( 833 static Future<WebSocket> connect(
773 String url, Iterable<String> protocols, Map<String, dynamic> headers) { 834 String url, Iterable<String> protocols, Map<String, dynamic> headers) {
774 Uri uri = Uri.parse(url); 835 Uri uri = Uri.parse(url);
775 if (uri.scheme != "ws" && uri.scheme != "wss") { 836 if (uri.scheme != "ws" && uri.scheme != "wss") {
776 throw new WebSocketException("Unsupported URL scheme '${uri.scheme}'"); 837 throw new WebSocketException("Unsupported URL scheme '${uri.scheme}'");
777 } 838 }
778 839
(...skipping 23 matching lines...) Expand all
802 } 863 }
803 if (headers != null) { 864 if (headers != null) {
804 headers.forEach((field, value) => request.headers.add(field, value)); 865 headers.forEach((field, value) => request.headers.add(field, value));
805 } 866 }
806 // Setup the initial handshake. 867 // Setup the initial handshake.
807 request.headers 868 request.headers
808 ..set(HttpHeaders.CONNECTION, "Upgrade") 869 ..set(HttpHeaders.CONNECTION, "Upgrade")
809 ..set(HttpHeaders.UPGRADE, "websocket") 870 ..set(HttpHeaders.UPGRADE, "websocket")
810 ..set("Sec-WebSocket-Key", nonce) 871 ..set("Sec-WebSocket-Key", nonce)
811 ..set("Cache-Control", "no-cache") 872 ..set("Cache-Control", "no-cache")
812 ..set("Sec-WebSocket-Version", "13"); 873 ..set("Sec-WebSocket-Version", "13")
874 ..set("Sec-WebSocket-Extensions", "permessage-deflate");
Søren Gjesse 2015/05/26 09:11:33 There should be some way of controlling aspects of
813 if (protocols != null) { 875 if (protocols != null) {
814 request.headers.add("Sec-WebSocket-Protocol", protocols.toList()); 876 request.headers.add("Sec-WebSocket-Protocol", protocols.toList());
815 } 877 }
816 return request.close(); 878 return request.close();
817 }) 879 })
818 .then((response) { 880 .then((response) {
819 void error(String message) { 881 void error(String message) {
820 // Flush data. 882 // Flush data.
821 response.detachSocket().then((socket) { 883 response.detachSocket().then((socket) {
822 socket.destroy(); 884 socket.destroy();
(...skipping 18 matching lines...) Expand all
841 List<int> receivedAccept = _CryptoUtils.base64StringToBytes(accept); 903 List<int> receivedAccept = _CryptoUtils.base64StringToBytes(accept);
842 if (expectedAccept.length != receivedAccept.length) { 904 if (expectedAccept.length != receivedAccept.length) {
843 error("Reasponse header 'Sec-WebSocket-Accept' is the wrong length"); 905 error("Reasponse header 'Sec-WebSocket-Accept' is the wrong length");
844 } 906 }
845 for (int i = 0; i < expectedAccept.length; i++) { 907 for (int i = 0; i < expectedAccept.length; i++) {
846 if (expectedAccept[i] != receivedAccept[i]) { 908 if (expectedAccept[i] != receivedAccept[i]) {
847 error("Bad response 'Sec-WebSocket-Accept' header"); 909 error("Bad response 'Sec-WebSocket-Accept' header");
848 } 910 }
849 } 911 }
850 var protocol = response.headers.value('Sec-WebSocket-Protocol'); 912 var protocol = response.headers.value('Sec-WebSocket-Protocol');
913
914 String extensionHeader = response.headers.value('Sec-WebSocket-Extension s');
915
916 if (extensionHeader == null) {
917 extensionHeader = "";
918 }
919
920 Iterable<List<String>> extensions = extensionHeader.split(", ").map((it) => it.split("; "));
921
922 _WebSocketPerMessageDeflateHelper deflateHelper;
923
924 if (extensions.any((x) => x[0] == "permessage-deflate")) {
925 response.headers.add("Sec-WebSocket-Extensions", "permessage-deflate") ;
Søren Gjesse 2015/05/26 09:11:33 Even for the simplest implementation you need to s
926 deflateHelper = new _WebSocketPerMessageDeflateHelper();
927 }
928
851 return response.detachSocket() 929 return response.detachSocket()
852 .then((socket) => new _WebSocketImpl._fromSocket(socket, protocol)); 930 .then((socket) => new _WebSocketImpl._fromSocket(socket, protocol)
931 .._deflateHelper = deflateHelper);
853 }); 932 });
854 } 933 }
855 934
856 _WebSocketImpl._fromSocket(this._socket, this.protocol, 935 _WebSocketImpl._fromSocket(this._socket, this.protocol,
857 [this._serverSide = false]) { 936 [this._serverSide = false]) {
858 _consumer = new _WebSocketConsumer(this, _socket); 937 _consumer = new _WebSocketConsumer(this, _socket);
859 _sink = new _StreamSinkImpl(_consumer); 938 _sink = new _StreamSinkImpl(_consumer);
860 _readyState = WebSocket.OPEN; 939 _readyState = WebSocket.OPEN;
861 940
862 var transformer = new _WebSocketProtocolTransformer(_serverSide); 941 var transformer = new _WebSocketProtocolTransformer(_serverSide);
942 transformer._deflateHelper = _deflateHelper;
863 _subscription = _socket.transform(transformer).listen( 943 _subscription = _socket.transform(transformer).listen(
864 (data) { 944 (data) {
865 if (data is _WebSocketPing) { 945 if (data is _WebSocketPing) {
866 if (!_writeClosed) _consumer.add(new _WebSocketPong(data.payload)); 946 if (!_writeClosed) _consumer.add(new _WebSocketPong(data.payload));
867 } else if (data is _WebSocketPong) { 947 } else if (data is _WebSocketPong) {
868 // Simply set pingInterval, as it'll cancel any timers. 948 // Simply set pingInterval, as it'll cancel any timers.
869 pingInterval = _pingInterval; 949 pingInterval = _pingInterval;
870 } else { 950 } else {
871 _controller.add(data); 951 _controller.add(data);
872 } 952 }
(...skipping 153 matching lines...) Expand 10 before | Expand all | Expand 10 after
1026 (code < WebSocketStatus.NORMAL_CLOSURE || 1106 (code < WebSocketStatus.NORMAL_CLOSURE ||
1027 code == WebSocketStatus.RESERVED_1004 || 1107 code == WebSocketStatus.RESERVED_1004 ||
1028 code == WebSocketStatus.NO_STATUS_RECEIVED || 1108 code == WebSocketStatus.NO_STATUS_RECEIVED ||
1029 code == WebSocketStatus.ABNORMAL_CLOSURE || 1109 code == WebSocketStatus.ABNORMAL_CLOSURE ||
1030 (code > WebSocketStatus.INTERNAL_SERVER_ERROR && 1110 (code > WebSocketStatus.INTERNAL_SERVER_ERROR &&
1031 code < WebSocketStatus.RESERVED_1015) || 1111 code < WebSocketStatus.RESERVED_1015) ||
1032 (code >= WebSocketStatus.RESERVED_1015 && 1112 (code >= WebSocketStatus.RESERVED_1015 &&
1033 code < 3000)); 1113 code < 3000));
1034 } 1114 }
1035 } 1115 }
OLDNEW
« no previous file with comments | « no previous file | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698