OLD | NEW |
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 part of dart.io; | 5 part of dart.io; |
6 | 6 |
7 const String _webSocketGUID = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"; | 7 const String _webSocketGUID = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"; |
8 | 8 |
9 class _WebSocketMessageType { | 9 class _WebSocketMessageType { |
10 static const int NONE = 0; | 10 static const int NONE = 0; |
(...skipping 33 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
44 // TODO(ajohnsen): make this transformer reusable? | 44 // TODO(ajohnsen): make this transformer reusable? |
45 class _WebSocketProtocolTransformer implements StreamTransformer, EventSink { | 45 class _WebSocketProtocolTransformer implements StreamTransformer, EventSink { |
46 static const int START = 0; | 46 static const int START = 0; |
47 static const int LEN_FIRST = 1; | 47 static const int LEN_FIRST = 1; |
48 static const int LEN_REST = 2; | 48 static const int LEN_REST = 2; |
49 static const int MASK = 3; | 49 static const int MASK = 3; |
50 static const int PAYLOAD = 4; | 50 static const int PAYLOAD = 4; |
51 static const int CLOSED = 5; | 51 static const int CLOSED = 5; |
52 static const int FAILURE = 6; | 52 static const int FAILURE = 6; |
53 | 53 |
| 54 int _state; |
| 55 bool _fin; |
| 56 int _opcode; |
| 57 int _len; |
| 58 bool _masked; |
| 59 int _maskingKey; |
| 60 int _remainingLenBytes; |
| 61 int _remainingMaskingKeyBytes; |
| 62 int _remainingPayloadBytes; |
| 63 int _unmaskingIndex; |
| 64 |
| 65 int _currentMessageType; |
| 66 List<int> _controlPayload; |
| 67 StreamController _controller; |
| 68 |
| 69 int closeCode = WebSocketStatus.NO_STATUS_RECEIVED; |
| 70 String closeReason = ""; |
| 71 |
54 bool _serverSide; | 72 bool _serverSide; |
55 EventSink _eventSink; | 73 EventSink _eventSink; |
56 | 74 |
57 _WebSocketProtocolTransformer([bool this._serverSide = false]) { | 75 _WebSocketProtocolTransformer([this._serverSide = false]) { |
58 _prepareForNextFrame(); | 76 _prepareForNextFrame(); |
59 _currentMessageType = _WebSocketMessageType.NONE; | 77 _currentMessageType = _WebSocketMessageType.NONE; |
60 } | 78 } |
61 | 79 |
62 Stream bind(Stream stream) { | 80 Stream bind(Stream stream) { |
63 return new Stream.eventTransformed( | 81 return new Stream.eventTransformed( |
64 stream, | 82 stream, |
65 (EventSink eventSink) { | 83 (EventSink eventSink) { |
66 if (_eventSink != null) { | 84 if (_eventSink != null) { |
67 throw new StateError("WebSocket transformer already used."); | 85 throw new StateError("WebSocket transformer already used."); |
68 } | 86 } |
69 _eventSink = eventSink; | 87 _eventSink = eventSink; |
70 return this; | 88 return this; |
71 }); | 89 }); |
72 } | 90 } |
73 | 91 |
74 void addError(Object error, [StackTrace stackTrace]) { | 92 void addError(Object error, [StackTrace stackTrace]) => |
75 _eventSink.addError(error, stackTrace); | 93 _eventSink.addError(error, stackTrace); |
76 } | |
77 | 94 |
78 void close() => _eventSink.close(); | 95 void close() => _eventSink.close(); |
79 | 96 |
80 /** | 97 /** |
81 * Process data received from the underlying communication channel. | 98 * Process data received from the underlying communication channel. |
82 */ | 99 */ |
83 void add(Uint8List buffer) { | 100 void add(Uint8List buffer) { |
84 int count = buffer.length; | 101 int count = buffer.length; |
85 int index = 0; | 102 int index = 0; |
86 int lastIndex = count; | 103 int lastIndex = count; |
(...skipping 263 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
350 _opcode = null; | 367 _opcode = null; |
351 _len = null; | 368 _len = null; |
352 _masked = null; | 369 _masked = null; |
353 _maskingKey = 0; | 370 _maskingKey = 0; |
354 _remainingLenBytes = null; | 371 _remainingLenBytes = null; |
355 _remainingMaskingKeyBytes = null; | 372 _remainingMaskingKeyBytes = null; |
356 _remainingPayloadBytes = null; | 373 _remainingPayloadBytes = null; |
357 _unmaskingIndex = 0; | 374 _unmaskingIndex = 0; |
358 _controlPayload = null; | 375 _controlPayload = null; |
359 } | 376 } |
360 | |
361 int _state; | |
362 bool _fin; | |
363 int _opcode; | |
364 int _len; | |
365 bool _masked; | |
366 int _maskingKey; | |
367 int _remainingLenBytes; | |
368 int _remainingMaskingKeyBytes; | |
369 int _remainingPayloadBytes; | |
370 int _unmaskingIndex; | |
371 | |
372 int _currentMessageType; | |
373 List<int> _controlPayload; | |
374 StreamController _controller; | |
375 | |
376 int closeCode = WebSocketStatus.NO_STATUS_RECEIVED; | |
377 String closeReason = ""; | |
378 } | 377 } |
379 | 378 |
380 | 379 |
381 class _WebSocketPing { | 380 class _WebSocketPing { |
382 final List<int> payload; | 381 final List<int> payload; |
383 _WebSocketPing([this.payload = null]); | 382 _WebSocketPing([this.payload = null]); |
384 } | 383 } |
385 | 384 |
386 | 385 |
387 class _WebSocketPong { | 386 class _WebSocketPong { |
(...skipping 16 matching lines...) Expand all Loading... |
404 .catchError(_controller.addError); | 403 .catchError(_controller.addError); |
405 }); | 404 }); |
406 | 405 |
407 return _controller.stream; | 406 return _controller.stream; |
408 } | 407 } |
409 | 408 |
410 static Future<WebSocket> _upgrade(HttpRequest request, _protocolSelector) { | 409 static Future<WebSocket> _upgrade(HttpRequest request, _protocolSelector) { |
411 var response = request.response; | 410 var response = request.response; |
412 if (!_isUpgradeRequest(request)) { | 411 if (!_isUpgradeRequest(request)) { |
413 // Send error response. | 412 // Send error response. |
414 response.statusCode = HttpStatus.BAD_REQUEST; | 413 response |
415 response.close(); | 414 ..statusCode = HttpStatus.BAD_REQUEST |
| 415 ..close(); |
416 return new Future.error( | 416 return new Future.error( |
417 new WebSocketException("Invalid WebSocket upgrade request")); | 417 new WebSocketException("Invalid WebSocket upgrade request")); |
418 } | 418 } |
419 | 419 |
420 Future upgrade(String protocol) { | 420 Future upgrade(String protocol) { |
421 // Send the upgrade response. | 421 // Send the upgrade response. |
422 response.statusCode = HttpStatus.SWITCHING_PROTOCOLS; | 422 response |
423 response.headers.add(HttpHeaders.CONNECTION, "Upgrade"); | 423 ..statusCode = HttpStatus.SWITCHING_PROTOCOLS |
424 response.headers.add(HttpHeaders.UPGRADE, "websocket"); | 424 ..headers.add(HttpHeaders.CONNECTION, "Upgrade") |
| 425 ..headers.add(HttpHeaders.UPGRADE, "websocket"); |
425 String key = request.headers.value("Sec-WebSocket-Key"); | 426 String key = request.headers.value("Sec-WebSocket-Key"); |
426 _SHA1 sha1 = new _SHA1(); | 427 _SHA1 sha1 = new _SHA1(); |
427 sha1.add("$key$_webSocketGUID".codeUnits); | 428 sha1.add("$key$_webSocketGUID".codeUnits); |
428 String accept = _CryptoUtils.bytesToBase64(sha1.close()); | 429 String accept = _CryptoUtils.bytesToBase64(sha1.close()); |
429 response.headers.add("Sec-WebSocket-Accept", accept); | 430 response.headers.add("Sec-WebSocket-Accept", accept); |
430 if (protocol != null && protocol.isNotEmpty) { | 431 if (protocol != null && protocol.isNotEmpty) { |
431 response.headers.add("Sec-WebSocket-Protocol", protocol); | 432 response.headers.add("Sec-WebSocket-Protocol", protocol); |
432 } | 433 } |
433 response.headers.contentLength = 0; | 434 response.headers.contentLength = 0; |
434 return response.detachSocket() | 435 return response.detachSocket() |
435 .then((socket) => new _WebSocketImpl._fromSocket( | 436 .then((socket) => new _WebSocketImpl._fromSocket( |
436 socket, protocol, true)); | 437 socket, protocol, true)); |
437 } | 438 } |
438 | 439 |
439 var protocols = request.headers['Sec-WebSocket-Protocol']; | 440 var protocols = request.headers['Sec-WebSocket-Protocol']; |
440 if (protocols != null && _protocolSelector != null) { | 441 if (protocols != null && _protocolSelector != null) { |
441 // The suggested protocols can be spread over multiple lines, each | 442 // The suggested protocols can be spread over multiple lines, each |
442 // consisting of multiple protocols. To unify all of them, first join | 443 // consisting of multiple protocols. To unify all of them, first join |
443 // the lists with ', ' and then tokenize. | 444 // the lists with ', ' and then tokenize. |
444 protocols = _HttpParser._tokenizeFieldValue(protocols.join(', ')); | 445 protocols = _HttpParser._tokenizeFieldValue(protocols.join(', ')); |
445 return new Future(() => _protocolSelector(protocols)) | 446 return new Future(() => _protocolSelector(protocols)) |
446 .then((protocol) { | 447 .then((protocol) { |
447 if (protocols.indexOf(protocol) < 0) { | 448 if (protocols.indexOf(protocol) < 0) { |
448 throw new WebSocketException( | 449 throw new WebSocketException( |
449 "Selected protocol is not in the list of available protocols"); | 450 "Selected protocol is not in the list of available protocols"); |
450 } | 451 } |
451 return protocol; | 452 return protocol; |
452 }) | 453 }) |
453 .catchError((error) { | 454 .catchError((error) { |
454 response.statusCode = HttpStatus.INTERNAL_SERVER_ERROR; | 455 response |
455 response.close(); | 456 ..statusCode = HttpStatus.INTERNAL_SERVER_ERROR |
| 457 ..close(); |
456 throw error; | 458 throw error; |
457 }) | 459 }) |
458 .then(upgrade); | 460 .then(upgrade); |
459 } else { | 461 } else { |
460 return upgrade(null); | 462 return upgrade(null); |
461 } | 463 } |
462 } | 464 } |
463 | 465 |
464 static bool _isUpgradeRequest(HttpRequest request) { | 466 static bool _isUpgradeRequest(HttpRequest request) { |
465 if (request.method != "GET") { | 467 if (request.method != "GET") { |
(...skipping 22 matching lines...) Expand all Loading... |
488 return true; | 490 return true; |
489 } | 491 } |
490 } | 492 } |
491 | 493 |
492 | 494 |
493 // TODO(ajohnsen): Make this transformer reusable. | 495 // TODO(ajohnsen): Make this transformer reusable. |
494 class _WebSocketOutgoingTransformer implements StreamTransformer, EventSink { | 496 class _WebSocketOutgoingTransformer implements StreamTransformer, EventSink { |
495 final _WebSocketImpl webSocket; | 497 final _WebSocketImpl webSocket; |
496 EventSink _eventSink; | 498 EventSink _eventSink; |
497 | 499 |
498 _WebSocketOutgoingTransformer(_WebSocketImpl this.webSocket); | 500 _WebSocketOutgoingTransformer(this.webSocket); |
499 | 501 |
500 Stream bind(Stream stream) { | 502 Stream bind(Stream stream) { |
501 return new Stream.eventTransformed( | 503 return new Stream.eventTransformed( |
502 stream, | 504 stream, |
503 (EventSink eventSink) { | 505 (EventSink eventSink) { |
504 if (_eventSink != null) { | 506 if (_eventSink != null) { |
505 throw new StateError("WebSocket transformer already used"); | 507 throw new StateError("WebSocket transformer already used"); |
506 } | 508 } |
507 _eventSink = eventSink; | 509 _eventSink = eventSink; |
508 return this; | 510 return this; |
(...skipping 21 matching lines...) Expand all Loading... |
530 } | 532 } |
531 opcode = _WebSocketOpcode.BINARY; | 533 opcode = _WebSocketOpcode.BINARY; |
532 data = message; | 534 data = message; |
533 } | 535 } |
534 } else { | 536 } else { |
535 opcode = _WebSocketOpcode.TEXT; | 537 opcode = _WebSocketOpcode.TEXT; |
536 } | 538 } |
537 addFrame(opcode, data); | 539 addFrame(opcode, data); |
538 } | 540 } |
539 | 541 |
540 void addError(Object error, [StackTrace stackTrace]) { | 542 void addError(Object error, [StackTrace stackTrace]) => |
541 _eventSink.addError(error, stackTrace); | 543 _eventSink.addError(error, stackTrace); |
542 } | |
543 | 544 |
544 void close() { | 545 void close() { |
545 int code = webSocket._outCloseCode; | 546 int code = webSocket._outCloseCode; |
546 String reason = webSocket._outCloseReason; | 547 String reason = webSocket._outCloseReason; |
547 List<int> data; | 548 List<int> data; |
548 if (code != null) { | 549 if (code != null) { |
549 data = new List<int>(); | 550 data = new List<int>(); |
550 data.add((code >> 8) & 0xFF); | 551 data.add((code >> 8) & 0xFF); |
551 data.add(code & 0xFF); | 552 data.add(code & 0xFF); |
552 if (reason != null) { | 553 if (reason != null) { |
553 data.addAll(UTF8.encode(reason)); | 554 data.addAll(UTF8.encode(reason)); |
554 } | 555 } |
555 } | 556 } |
556 addFrame(_WebSocketOpcode.CLOSE, data); | 557 addFrame(_WebSocketOpcode.CLOSE, data); |
557 _eventSink.close(); | 558 _eventSink.close(); |
558 } | 559 } |
559 | 560 |
560 void addFrame(int opcode, List<int> data) { | 561 void addFrame(int opcode, List<int> data) => |
561 createFrame(opcode, data, webSocket._serverSide).forEach(_eventSink.add); | 562 createFrame(opcode, data, webSocket._serverSide).forEach(_eventSink.add); |
562 } | |
563 | 563 |
564 static Iterable createFrame(int opcode, List<int> data, bool serverSide) { | 564 static Iterable createFrame(int opcode, List<int> data, bool serverSide) { |
565 bool mask = !serverSide; // Masking not implemented for server. | 565 bool mask = !serverSide; // Masking not implemented for server. |
566 int dataLength = data == null ? 0 : data.length; | 566 int dataLength = data == null ? 0 : data.length; |
567 // Determine the header size. | 567 // Determine the header size. |
568 int headerSize = (mask) ? 6 : 2; | 568 int headerSize = (mask) ? 6 : 2; |
569 if (dataLength > 65535) { | 569 if (dataLength > 65535) { |
570 headerSize += 8; | 570 headerSize += 8; |
571 } else if (dataLength > 125) { | 571 } else if (dataLength > 125) { |
572 headerSize += 2; | 572 headerSize += 2; |
(...skipping 60 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
633 class _WebSocketConsumer implements StreamConsumer { | 633 class _WebSocketConsumer implements StreamConsumer { |
634 final _WebSocketImpl webSocket; | 634 final _WebSocketImpl webSocket; |
635 final Socket socket; | 635 final Socket socket; |
636 StreamController _controller; | 636 StreamController _controller; |
637 StreamSubscription _subscription; | 637 StreamSubscription _subscription; |
638 bool _issuedPause = false; | 638 bool _issuedPause = false; |
639 bool _closed = false; | 639 bool _closed = false; |
640 Completer _closeCompleter = new Completer(); | 640 Completer _closeCompleter = new Completer(); |
641 Completer _completer; | 641 Completer _completer; |
642 | 642 |
643 _WebSocketConsumer(_WebSocketImpl this.webSocket, Socket this.socket); | 643 _WebSocketConsumer(this.webSocket, this.socket); |
644 | 644 |
645 void _onListen() { | 645 void _onListen() { |
646 if (_subscription != null) { | 646 if (_subscription != null) { |
647 _subscription.cancel(); | 647 _subscription.cancel(); |
648 } | 648 } |
649 } | 649 } |
650 | 650 |
651 void _onPause() { | 651 void _onPause() { |
652 if (_subscription != null) { | 652 if (_subscription != null) { |
653 _subscription.pause(); | 653 _subscription.pause(); |
(...skipping 141 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
795 uri = new Uri(scheme: uri.scheme == "wss" ? "https" : "http", | 795 uri = new Uri(scheme: uri.scheme == "wss" ? "https" : "http", |
796 userInfo: uri.userInfo, | 796 userInfo: uri.userInfo, |
797 host: uri.host, | 797 host: uri.host, |
798 port: uri.port, | 798 port: uri.port, |
799 path: uri.path, | 799 path: uri.path, |
800 query: uri.query, | 800 query: uri.query, |
801 fragment: uri.fragment); | 801 fragment: uri.fragment); |
802 return _httpClient.openUrl("GET", uri) | 802 return _httpClient.openUrl("GET", uri) |
803 .then((request) { | 803 .then((request) { |
804 // Setup the initial handshake. | 804 // Setup the initial handshake. |
805 request.headers.add(HttpHeaders.CONNECTION, "upgrade"); | 805 request.headers |
806 request.headers.set(HttpHeaders.UPGRADE, "websocket"); | 806 ..add(HttpHeaders.CONNECTION, "upgrade") |
807 request.headers.set("Sec-WebSocket-Key", nonce); | 807 ..set(HttpHeaders.UPGRADE, "websocket") |
808 request.headers.set("Sec-WebSocket-Version", "13"); | 808 ..set("Sec-WebSocket-Key", nonce) |
| 809 ..set("Sec-WebSocket-Version", "13"); |
809 if (protocols.isNotEmpty) { | 810 if (protocols.isNotEmpty) { |
810 request.headers.add("Sec-WebSocket-Protocol", protocols); | 811 request.headers.add("Sec-WebSocket-Protocol", protocols); |
811 } | 812 } |
812 return request.close(); | 813 return request.close(); |
813 }) | 814 }) |
814 .then((response) { | 815 .then((response) { |
815 void error(String message) { | 816 void error(String message) { |
816 // Flush data. | 817 // Flush data. |
817 response.detachSocket().then((socket) { | 818 response.detachSocket().then((socket) { |
818 socket.destroy(); | 819 socket.destroy(); |
(...skipping 23 matching lines...) Expand all Loading... |
842 if (expectedAccept[i] != receivedAccept[i]) { | 843 if (expectedAccept[i] != receivedAccept[i]) { |
843 error("Bad response 'Sec-WebSocket-Accept' header"); | 844 error("Bad response 'Sec-WebSocket-Accept' header"); |
844 } | 845 } |
845 } | 846 } |
846 var protocol = response.headers.value('Sec-WebSocket-Protocol'); | 847 var protocol = response.headers.value('Sec-WebSocket-Protocol'); |
847 return response.detachSocket() | 848 return response.detachSocket() |
848 .then((socket) => new _WebSocketImpl._fromSocket(socket, protocol)); | 849 .then((socket) => new _WebSocketImpl._fromSocket(socket, protocol)); |
849 }); | 850 }); |
850 } | 851 } |
851 | 852 |
852 _WebSocketImpl._fromSocket(Socket this._socket, | 853 _WebSocketImpl._fromSocket(this._socket, this.protocol, |
853 String this.protocol, | 854 [this._serverSide = false]) { |
854 [bool this._serverSide = false]) { | |
855 _consumer = new _WebSocketConsumer(this, _socket); | 855 _consumer = new _WebSocketConsumer(this, _socket); |
856 _sink = new _StreamSinkImpl(_consumer); | 856 _sink = new _StreamSinkImpl(_consumer); |
857 _readyState = WebSocket.OPEN; | 857 _readyState = WebSocket.OPEN; |
858 | 858 |
859 var transformer = new _WebSocketProtocolTransformer(_serverSide); | 859 var transformer = new _WebSocketProtocolTransformer(_serverSide); |
860 _subscription = _socket.transform(transformer).listen( | 860 _subscription = _socket.transform(transformer).listen( |
861 (data) { | 861 (data) { |
862 if (data is _WebSocketPing) { | 862 if (data is _WebSocketPing) { |
863 if (!_writeClosed) _consumer.add(new _WebSocketPong(data.payload)); | 863 if (!_writeClosed) _consumer.add(new _WebSocketPong(data.payload)); |
864 } else if (data is _WebSocketPong) { | 864 } else if (data is _WebSocketPong) { |
(...skipping 100 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
965 (code < WebSocketStatus.NORMAL_CLOSURE || | 965 (code < WebSocketStatus.NORMAL_CLOSURE || |
966 code == WebSocketStatus.RESERVED_1004 || | 966 code == WebSocketStatus.RESERVED_1004 || |
967 code == WebSocketStatus.NO_STATUS_RECEIVED || | 967 code == WebSocketStatus.NO_STATUS_RECEIVED || |
968 code == WebSocketStatus.ABNORMAL_CLOSURE || | 968 code == WebSocketStatus.ABNORMAL_CLOSURE || |
969 (code > WebSocketStatus.INTERNAL_SERVER_ERROR && | 969 (code > WebSocketStatus.INTERNAL_SERVER_ERROR && |
970 code < WebSocketStatus.RESERVED_1015) || | 970 code < WebSocketStatus.RESERVED_1015) || |
971 (code >= WebSocketStatus.RESERVED_1015 && | 971 (code >= WebSocketStatus.RESERVED_1015 && |
972 code < 3000)); | 972 code < 3000)); |
973 } | 973 } |
974 } | 974 } |
OLD | NEW |