Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(8)

Side by Side Diff: sdk/lib/io/websocket_impl.dart

Issue 124753002: Code cleanup (mostly io lib and some http lib). (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Merge to head. Created 6 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« sdk/lib/io/http_date.dart ('K') | « sdk/lib/io/websocket.dart ('k') | no next file » | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 part of dart.io; 5 part of dart.io;
6 6
7 const String _webSocketGUID = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"; 7 const String _webSocketGUID = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
8 8
9 class _WebSocketMessageType { 9 class _WebSocketMessageType {
10 static const int NONE = 0; 10 static const int NONE = 0;
(...skipping 33 matching lines...) Expand 10 before | Expand all | Expand 10 after
44 // TODO(ajohnsen): make this transformer reusable? 44 // TODO(ajohnsen): make this transformer reusable?
45 class _WebSocketProtocolTransformer implements StreamTransformer, EventSink { 45 class _WebSocketProtocolTransformer implements StreamTransformer, EventSink {
46 static const int START = 0; 46 static const int START = 0;
47 static const int LEN_FIRST = 1; 47 static const int LEN_FIRST = 1;
48 static const int LEN_REST = 2; 48 static const int LEN_REST = 2;
49 static const int MASK = 3; 49 static const int MASK = 3;
50 static const int PAYLOAD = 4; 50 static const int PAYLOAD = 4;
51 static const int CLOSED = 5; 51 static const int CLOSED = 5;
52 static const int FAILURE = 6; 52 static const int FAILURE = 6;
53 53
54 int _state;
55 bool _fin;
56 int _opcode;
57 int _len;
58 bool _masked;
59 int _maskingKey;
60 int _remainingLenBytes;
61 int _remainingMaskingKeyBytes;
62 int _remainingPayloadBytes;
63 int _unmaskingIndex;
64
65 int _currentMessageType;
66 List<int> _controlPayload;
67 StreamController _controller;
68
69 int closeCode = WebSocketStatus.NO_STATUS_RECEIVED;
70 String closeReason = "";
71
54 bool _serverSide; 72 bool _serverSide;
55 EventSink _eventSink; 73 EventSink _eventSink;
56 74
57 _WebSocketProtocolTransformer([bool this._serverSide = false]) { 75 _WebSocketProtocolTransformer([this._serverSide = false]) {
58 _prepareForNextFrame(); 76 _prepareForNextFrame();
59 _currentMessageType = _WebSocketMessageType.NONE; 77 _currentMessageType = _WebSocketMessageType.NONE;
60 } 78 }
61 79
62 Stream bind(Stream stream) { 80 Stream bind(Stream stream) {
63 return new Stream.eventTransformed( 81 return new Stream.eventTransformed(
64 stream, 82 stream,
65 (EventSink eventSink) { 83 (EventSink eventSink) {
66 if (_eventSink != null) { 84 if (_eventSink != null) {
67 throw new StateError("WebSocket transformer already used."); 85 throw new StateError("WebSocket transformer already used.");
68 } 86 }
69 _eventSink = eventSink; 87 _eventSink = eventSink;
70 return this; 88 return this;
71 }); 89 });
72 } 90 }
73 91
74 void addError(Object error, [StackTrace stackTrace]) { 92 void addError(Object error, [StackTrace stackTrace]) =>
75 _eventSink.addError(error, stackTrace); 93 _eventSink.addError(error, stackTrace);
76 }
77 94
78 void close() => _eventSink.close(); 95 void close() => _eventSink.close();
79 96
80 /** 97 /**
81 * Process data received from the underlying communication channel. 98 * Process data received from the underlying communication channel.
82 */ 99 */
83 void add(Uint8List buffer) { 100 void add(Uint8List buffer) {
84 int count = buffer.length; 101 int count = buffer.length;
85 int index = 0; 102 int index = 0;
86 int lastIndex = count; 103 int lastIndex = count;
(...skipping 263 matching lines...) Expand 10 before | Expand all | Expand 10 after
350 _opcode = null; 367 _opcode = null;
351 _len = null; 368 _len = null;
352 _masked = null; 369 _masked = null;
353 _maskingKey = 0; 370 _maskingKey = 0;
354 _remainingLenBytes = null; 371 _remainingLenBytes = null;
355 _remainingMaskingKeyBytes = null; 372 _remainingMaskingKeyBytes = null;
356 _remainingPayloadBytes = null; 373 _remainingPayloadBytes = null;
357 _unmaskingIndex = 0; 374 _unmaskingIndex = 0;
358 _controlPayload = null; 375 _controlPayload = null;
359 } 376 }
360
361 int _state;
362 bool _fin;
363 int _opcode;
364 int _len;
365 bool _masked;
366 int _maskingKey;
367 int _remainingLenBytes;
368 int _remainingMaskingKeyBytes;
369 int _remainingPayloadBytes;
370 int _unmaskingIndex;
371
372 int _currentMessageType;
373 List<int> _controlPayload;
374 StreamController _controller;
375
376 int closeCode = WebSocketStatus.NO_STATUS_RECEIVED;
377 String closeReason = "";
378 } 377 }
379 378
380 379
381 class _WebSocketPing { 380 class _WebSocketPing {
382 final List<int> payload; 381 final List<int> payload;
383 _WebSocketPing([this.payload = null]); 382 _WebSocketPing([this.payload = null]);
384 } 383 }
385 384
386 385
387 class _WebSocketPong { 386 class _WebSocketPong {
(...skipping 16 matching lines...) Expand all
404 .catchError(_controller.addError); 403 .catchError(_controller.addError);
405 }); 404 });
406 405
407 return _controller.stream; 406 return _controller.stream;
408 } 407 }
409 408
410 static Future<WebSocket> _upgrade(HttpRequest request, _protocolSelector) { 409 static Future<WebSocket> _upgrade(HttpRequest request, _protocolSelector) {
411 var response = request.response; 410 var response = request.response;
412 if (!_isUpgradeRequest(request)) { 411 if (!_isUpgradeRequest(request)) {
413 // Send error response. 412 // Send error response.
414 response.statusCode = HttpStatus.BAD_REQUEST; 413 response
415 response.close(); 414 ..statusCode = HttpStatus.BAD_REQUEST
415 ..close();
416 return new Future.error( 416 return new Future.error(
417 new WebSocketException("Invalid WebSocket upgrade request")); 417 new WebSocketException("Invalid WebSocket upgrade request"));
418 } 418 }
419 419
420 Future upgrade(String protocol) { 420 Future upgrade(String protocol) {
421 // Send the upgrade response. 421 // Send the upgrade response.
422 response.statusCode = HttpStatus.SWITCHING_PROTOCOLS; 422 response
423 response.headers.add(HttpHeaders.CONNECTION, "Upgrade"); 423 ..statusCode = HttpStatus.SWITCHING_PROTOCOLS
424 response.headers.add(HttpHeaders.UPGRADE, "websocket"); 424 ..headers.add(HttpHeaders.CONNECTION, "Upgrade")
425 ..headers.add(HttpHeaders.UPGRADE, "websocket");
425 String key = request.headers.value("Sec-WebSocket-Key"); 426 String key = request.headers.value("Sec-WebSocket-Key");
426 _SHA1 sha1 = new _SHA1(); 427 _SHA1 sha1 = new _SHA1();
427 sha1.add("$key$_webSocketGUID".codeUnits); 428 sha1.add("$key$_webSocketGUID".codeUnits);
428 String accept = _CryptoUtils.bytesToBase64(sha1.close()); 429 String accept = _CryptoUtils.bytesToBase64(sha1.close());
429 response.headers.add("Sec-WebSocket-Accept", accept); 430 response.headers.add("Sec-WebSocket-Accept", accept);
430 if (protocol != null && protocol.isNotEmpty) { 431 if (protocol != null && protocol.isNotEmpty) {
431 response.headers.add("Sec-WebSocket-Protocol", protocol); 432 response.headers.add("Sec-WebSocket-Protocol", protocol);
432 } 433 }
433 response.headers.contentLength = 0; 434 response.headers.contentLength = 0;
434 return response.detachSocket() 435 return response.detachSocket()
435 .then((socket) => new _WebSocketImpl._fromSocket( 436 .then((socket) => new _WebSocketImpl._fromSocket(
436 socket, protocol, true)); 437 socket, protocol, true));
437 } 438 }
438 439
439 var protocols = request.headers['Sec-WebSocket-Protocol']; 440 var protocols = request.headers['Sec-WebSocket-Protocol'];
440 if (protocols != null && _protocolSelector != null) { 441 if (protocols != null && _protocolSelector != null) {
441 // The suggested protocols can be spread over multiple lines, each 442 // The suggested protocols can be spread over multiple lines, each
442 // consisting of multiple protocols. To unify all of them, first join 443 // consisting of multiple protocols. To unify all of them, first join
443 // the lists with ', ' and then tokenize. 444 // the lists with ', ' and then tokenize.
444 protocols = _HttpParser._tokenizeFieldValue(protocols.join(', ')); 445 protocols = _HttpParser._tokenizeFieldValue(protocols.join(', '));
445 return new Future(() => _protocolSelector(protocols)) 446 return new Future(() => _protocolSelector(protocols))
446 .then((protocol) { 447 .then((protocol) {
447 if (protocols.indexOf(protocol) < 0) { 448 if (protocols.indexOf(protocol) < 0) {
448 throw new WebSocketException( 449 throw new WebSocketException(
449 "Selected protocol is not in the list of available protocols"); 450 "Selected protocol is not in the list of available protocols");
450 } 451 }
451 return protocol; 452 return protocol;
452 }) 453 })
453 .catchError((error) { 454 .catchError((error) {
454 response.statusCode = HttpStatus.INTERNAL_SERVER_ERROR; 455 response
455 response.close(); 456 ..statusCode = HttpStatus.INTERNAL_SERVER_ERROR
457 ..close();
456 throw error; 458 throw error;
457 }) 459 })
458 .then(upgrade); 460 .then(upgrade);
459 } else { 461 } else {
460 return upgrade(null); 462 return upgrade(null);
461 } 463 }
462 } 464 }
463 465
464 static bool _isUpgradeRequest(HttpRequest request) { 466 static bool _isUpgradeRequest(HttpRequest request) {
465 if (request.method != "GET") { 467 if (request.method != "GET") {
(...skipping 22 matching lines...) Expand all
488 return true; 490 return true;
489 } 491 }
490 } 492 }
491 493
492 494
493 // TODO(ajohnsen): Make this transformer reusable. 495 // TODO(ajohnsen): Make this transformer reusable.
494 class _WebSocketOutgoingTransformer implements StreamTransformer, EventSink { 496 class _WebSocketOutgoingTransformer implements StreamTransformer, EventSink {
495 final _WebSocketImpl webSocket; 497 final _WebSocketImpl webSocket;
496 EventSink _eventSink; 498 EventSink _eventSink;
497 499
498 _WebSocketOutgoingTransformer(_WebSocketImpl this.webSocket); 500 _WebSocketOutgoingTransformer(this.webSocket);
499 501
500 Stream bind(Stream stream) { 502 Stream bind(Stream stream) {
501 return new Stream.eventTransformed( 503 return new Stream.eventTransformed(
502 stream, 504 stream,
503 (EventSink eventSink) { 505 (EventSink eventSink) {
504 if (_eventSink != null) { 506 if (_eventSink != null) {
505 throw new StateError("WebSocket transformer already used"); 507 throw new StateError("WebSocket transformer already used");
506 } 508 }
507 _eventSink = eventSink; 509 _eventSink = eventSink;
508 return this; 510 return this;
(...skipping 21 matching lines...) Expand all
530 } 532 }
531 opcode = _WebSocketOpcode.BINARY; 533 opcode = _WebSocketOpcode.BINARY;
532 data = message; 534 data = message;
533 } 535 }
534 } else { 536 } else {
535 opcode = _WebSocketOpcode.TEXT; 537 opcode = _WebSocketOpcode.TEXT;
536 } 538 }
537 addFrame(opcode, data); 539 addFrame(opcode, data);
538 } 540 }
539 541
540 void addError(Object error, [StackTrace stackTrace]) { 542 void addError(Object error, [StackTrace stackTrace]) =>
541 _eventSink.addError(error, stackTrace); 543 _eventSink.addError(error, stackTrace);
542 }
543 544
544 void close() { 545 void close() {
545 int code = webSocket._outCloseCode; 546 int code = webSocket._outCloseCode;
546 String reason = webSocket._outCloseReason; 547 String reason = webSocket._outCloseReason;
547 List<int> data; 548 List<int> data;
548 if (code != null) { 549 if (code != null) {
549 data = new List<int>(); 550 data = new List<int>();
550 data.add((code >> 8) & 0xFF); 551 data.add((code >> 8) & 0xFF);
551 data.add(code & 0xFF); 552 data.add(code & 0xFF);
552 if (reason != null) { 553 if (reason != null) {
553 data.addAll(UTF8.encode(reason)); 554 data.addAll(UTF8.encode(reason));
554 } 555 }
555 } 556 }
556 addFrame(_WebSocketOpcode.CLOSE, data); 557 addFrame(_WebSocketOpcode.CLOSE, data);
557 _eventSink.close(); 558 _eventSink.close();
558 } 559 }
559 560
560 void addFrame(int opcode, List<int> data) { 561 void addFrame(int opcode, List<int> data) =>
561 createFrame(opcode, data, webSocket._serverSide).forEach(_eventSink.add); 562 createFrame(opcode, data, webSocket._serverSide).forEach(_eventSink.add);
562 }
563 563
564 static Iterable createFrame(int opcode, List<int> data, bool serverSide) { 564 static Iterable createFrame(int opcode, List<int> data, bool serverSide) {
565 bool mask = !serverSide; // Masking not implemented for server. 565 bool mask = !serverSide; // Masking not implemented for server.
566 int dataLength = data == null ? 0 : data.length; 566 int dataLength = data == null ? 0 : data.length;
567 // Determine the header size. 567 // Determine the header size.
568 int headerSize = (mask) ? 6 : 2; 568 int headerSize = (mask) ? 6 : 2;
569 if (dataLength > 65535) { 569 if (dataLength > 65535) {
570 headerSize += 8; 570 headerSize += 8;
571 } else if (dataLength > 125) { 571 } else if (dataLength > 125) {
572 headerSize += 2; 572 headerSize += 2;
(...skipping 60 matching lines...) Expand 10 before | Expand all | Expand 10 after
633 class _WebSocketConsumer implements StreamConsumer { 633 class _WebSocketConsumer implements StreamConsumer {
634 final _WebSocketImpl webSocket; 634 final _WebSocketImpl webSocket;
635 final Socket socket; 635 final Socket socket;
636 StreamController _controller; 636 StreamController _controller;
637 StreamSubscription _subscription; 637 StreamSubscription _subscription;
638 bool _issuedPause = false; 638 bool _issuedPause = false;
639 bool _closed = false; 639 bool _closed = false;
640 Completer _closeCompleter = new Completer(); 640 Completer _closeCompleter = new Completer();
641 Completer _completer; 641 Completer _completer;
642 642
643 _WebSocketConsumer(_WebSocketImpl this.webSocket, Socket this.socket); 643 _WebSocketConsumer(this.webSocket, this.socket);
644 644
645 void _onListen() { 645 void _onListen() {
646 if (_subscription != null) { 646 if (_subscription != null) {
647 _subscription.cancel(); 647 _subscription.cancel();
648 } 648 }
649 } 649 }
650 650
651 void _onPause() { 651 void _onPause() {
652 if (_subscription != null) { 652 if (_subscription != null) {
653 _subscription.pause(); 653 _subscription.pause();
(...skipping 141 matching lines...) Expand 10 before | Expand all | Expand 10 after
795 uri = new Uri(scheme: uri.scheme == "wss" ? "https" : "http", 795 uri = new Uri(scheme: uri.scheme == "wss" ? "https" : "http",
796 userInfo: uri.userInfo, 796 userInfo: uri.userInfo,
797 host: uri.host, 797 host: uri.host,
798 port: uri.port, 798 port: uri.port,
799 path: uri.path, 799 path: uri.path,
800 query: uri.query, 800 query: uri.query,
801 fragment: uri.fragment); 801 fragment: uri.fragment);
802 return _httpClient.openUrl("GET", uri) 802 return _httpClient.openUrl("GET", uri)
803 .then((request) { 803 .then((request) {
804 // Setup the initial handshake. 804 // Setup the initial handshake.
805 request.headers.add(HttpHeaders.CONNECTION, "upgrade"); 805 request.headers
806 request.headers.set(HttpHeaders.UPGRADE, "websocket"); 806 ..add(HttpHeaders.CONNECTION, "upgrade")
807 request.headers.set("Sec-WebSocket-Key", nonce); 807 ..set(HttpHeaders.UPGRADE, "websocket")
808 request.headers.set("Sec-WebSocket-Version", "13"); 808 ..set("Sec-WebSocket-Key", nonce)
809 ..set("Sec-WebSocket-Version", "13");
809 if (protocols.isNotEmpty) { 810 if (protocols.isNotEmpty) {
810 request.headers.add("Sec-WebSocket-Protocol", protocols); 811 request.headers.add("Sec-WebSocket-Protocol", protocols);
811 } 812 }
812 return request.close(); 813 return request.close();
813 }) 814 })
814 .then((response) { 815 .then((response) {
815 void error(String message) { 816 void error(String message) {
816 // Flush data. 817 // Flush data.
817 response.detachSocket().then((socket) { 818 response.detachSocket().then((socket) {
818 socket.destroy(); 819 socket.destroy();
(...skipping 23 matching lines...) Expand all
842 if (expectedAccept[i] != receivedAccept[i]) { 843 if (expectedAccept[i] != receivedAccept[i]) {
843 error("Bad response 'Sec-WebSocket-Accept' header"); 844 error("Bad response 'Sec-WebSocket-Accept' header");
844 } 845 }
845 } 846 }
846 var protocol = response.headers.value('Sec-WebSocket-Protocol'); 847 var protocol = response.headers.value('Sec-WebSocket-Protocol');
847 return response.detachSocket() 848 return response.detachSocket()
848 .then((socket) => new _WebSocketImpl._fromSocket(socket, protocol)); 849 .then((socket) => new _WebSocketImpl._fromSocket(socket, protocol));
849 }); 850 });
850 } 851 }
851 852
852 _WebSocketImpl._fromSocket(Socket this._socket, 853 _WebSocketImpl._fromSocket(this._socket, this.protocol,
853 String this.protocol, 854 [this._serverSide = false]) {
854 [bool this._serverSide = false]) {
855 _consumer = new _WebSocketConsumer(this, _socket); 855 _consumer = new _WebSocketConsumer(this, _socket);
856 _sink = new _StreamSinkImpl(_consumer); 856 _sink = new _StreamSinkImpl(_consumer);
857 _readyState = WebSocket.OPEN; 857 _readyState = WebSocket.OPEN;
858 858
859 var transformer = new _WebSocketProtocolTransformer(_serverSide); 859 var transformer = new _WebSocketProtocolTransformer(_serverSide);
860 _subscription = _socket.transform(transformer).listen( 860 _subscription = _socket.transform(transformer).listen(
861 (data) { 861 (data) {
862 if (data is _WebSocketPing) { 862 if (data is _WebSocketPing) {
863 if (!_writeClosed) _consumer.add(new _WebSocketPong(data.payload)); 863 if (!_writeClosed) _consumer.add(new _WebSocketPong(data.payload));
864 } else if (data is _WebSocketPong) { 864 } else if (data is _WebSocketPong) {
(...skipping 100 matching lines...) Expand 10 before | Expand all | Expand 10 after
965 (code < WebSocketStatus.NORMAL_CLOSURE || 965 (code < WebSocketStatus.NORMAL_CLOSURE ||
966 code == WebSocketStatus.RESERVED_1004 || 966 code == WebSocketStatus.RESERVED_1004 ||
967 code == WebSocketStatus.NO_STATUS_RECEIVED || 967 code == WebSocketStatus.NO_STATUS_RECEIVED ||
968 code == WebSocketStatus.ABNORMAL_CLOSURE || 968 code == WebSocketStatus.ABNORMAL_CLOSURE ||
969 (code > WebSocketStatus.INTERNAL_SERVER_ERROR && 969 (code > WebSocketStatus.INTERNAL_SERVER_ERROR &&
970 code < WebSocketStatus.RESERVED_1015) || 970 code < WebSocketStatus.RESERVED_1015) ||
971 (code >= WebSocketStatus.RESERVED_1015 && 971 (code >= WebSocketStatus.RESERVED_1015 &&
972 code < 3000)); 972 code < 3000));
973 } 973 }
974 } 974 }
OLDNEW
« sdk/lib/io/http_date.dart ('K') | « sdk/lib/io/websocket.dart ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698