| Index: lib/src/web_socket.dart
|
| diff --git a/lib/src/web_socket.dart b/lib/src/web_socket.dart
|
| index 968c70b0e13dd0606b2e75309425c41b84fc5cf6..e52c5f52909fbc9f1a79823617395ec54569d9ff 100644
|
| --- a/lib/src/web_socket.dart
|
| +++ b/lib/src/web_socket.dart
|
| @@ -5,13 +5,10 @@
|
| library http_parser.web_socket;
|
|
|
| import 'dart:async';
|
| -import 'dart:convert';
|
| -import 'dart:math';
|
| -import 'dart:typed_data';
|
|
|
| import 'package:crypto/crypto.dart';
|
|
|
| -import 'bytes_builder.dart';
|
| +import 'copy/web_socket_impl.dart';
|
|
|
| /// An implementation of the WebSocket protocol that's not specific to "dart:io"
|
| /// or to any particular HTTP API.
|
| @@ -62,7 +59,7 @@ abstract class CompatibleWebSocket implements Stream, StreamSink {
|
| var hash = new SHA1();
|
| // We use [codeUnits] here rather than UTF-8-decoding the string because
|
| // [key] is expected to be base64 encoded, and so will be pure ASCII.
|
| - hash.add((key + _webSocketGUID).codeUnits);
|
| + hash.add((key + webSocketGUID).codeUnits);
|
| return CryptoUtils.bytesToBase64(hash.close());
|
| }
|
|
|
| @@ -75,12 +72,14 @@ abstract class CompatibleWebSocket implements Stream, StreamSink {
|
| /// `Socket`), it will be used for both sending and receiving data. Otherwise,
|
| /// it will be used for receiving data and [sink] will be used for sending it.
|
| ///
|
| + /// [protocol] should be the protocol negotiated by this handshake, if any.
|
| + ///
|
| /// If this is a WebSocket server, [serverSide] should be `true` (the
|
| /// default); if it's a client, [serverSide] should be `false`.
|
| ///
|
| /// [WebSocket handshake]: https://tools.ietf.org/html/rfc6455#section-4
|
| factory CompatibleWebSocket(Stream<List<int>> stream,
|
| - {StreamSink<List<int>> sink, bool serverSide: true}) {
|
| + {StreamSink<List<int>> sink, String protocol, bool serverSide: true}) {
|
| if (sink == null) {
|
| if (stream is! StreamSink) {
|
| throw new ArgumentError("If stream isn't also a StreamSink, sink must "
|
| @@ -89,7 +88,7 @@ abstract class CompatibleWebSocket implements Stream, StreamSink {
|
| sink = stream as StreamSink;
|
| }
|
|
|
| - return new _WebSocketImpl._fromSocket(stream, sink, serverSide);
|
| + return new WebSocketImpl.fromSocket(stream, sink, protocol, serverSide);
|
| }
|
|
|
| /// Closes the web socket connection.
|
| @@ -113,823 +112,3 @@ class CompatibleWebSocketException implements Exception {
|
| ? "CompatibleWebSocketException" :
|
| "CompatibleWebSocketException: $message";
|
| }
|
| -
|
| -// The following code is copied from sdk/lib/io/websocket_impl.dart. The
|
| -// "dart:io" implementation isn't used directly both to support non-"dart:io"
|
| -// applications, and because it's incompatible with non-"dart:io" HTTP requests
|
| -// (issue 18172).
|
| -//
|
| -// Because it's copied directly, only modifications necessary to support the
|
| -// desired public API and to remove "dart:io" dependencies have been made.
|
| -
|
| -/**
|
| - * Web socket status codes used when closing a web socket connection.
|
| - */
|
| -abstract class _WebSocketStatus {
|
| - static const int NORMAL_CLOSURE = 1000;
|
| - static const int GOING_AWAY = 1001;
|
| - static const int PROTOCOL_ERROR = 1002;
|
| - static const int UNSUPPORTED_DATA = 1003;
|
| - static const int RESERVED_1004 = 1004;
|
| - static const int NO_STATUS_RECEIVED = 1005;
|
| - static const int ABNORMAL_CLOSURE = 1006;
|
| - static const int INVALID_FRAME_PAYLOAD_DATA = 1007;
|
| - static const int POLICY_VIOLATION = 1008;
|
| - static const int MESSAGE_TOO_BIG = 1009;
|
| - static const int MISSING_MANDATORY_EXTENSION = 1010;
|
| - static const int INTERNAL_SERVER_ERROR = 1011;
|
| - static const int RESERVED_1015 = 1015;
|
| -}
|
| -
|
| -abstract class _WebSocketState {
|
| - static const int CONNECTING = 0;
|
| - static const int OPEN = 1;
|
| - static const int CLOSING = 2;
|
| - static const int CLOSED = 3;
|
| -}
|
| -
|
| -const String _webSocketGUID = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
|
| -
|
| -final _random = new Random();
|
| -
|
| -// Matches _WebSocketOpcode.
|
| -class _WebSocketMessageType {
|
| - static const int NONE = 0;
|
| - static const int TEXT = 1;
|
| - static const int BINARY = 2;
|
| -}
|
| -
|
| -
|
| -class _WebSocketOpcode {
|
| - static const int CONTINUATION = 0;
|
| - static const int TEXT = 1;
|
| - static const int BINARY = 2;
|
| - static const int RESERVED_3 = 3;
|
| - static const int RESERVED_4 = 4;
|
| - static const int RESERVED_5 = 5;
|
| - static const int RESERVED_6 = 6;
|
| - static const int RESERVED_7 = 7;
|
| - static const int CLOSE = 8;
|
| - static const int PING = 9;
|
| - static const int PONG = 10;
|
| - static const int RESERVED_B = 11;
|
| - static const int RESERVED_C = 12;
|
| - static const int RESERVED_D = 13;
|
| - static const int RESERVED_E = 14;
|
| - static const int RESERVED_F = 15;
|
| -}
|
| -
|
| -/**
|
| - * The web socket protocol transformer handles the protocol byte stream
|
| - * which is supplied through the [:handleData:]. As the protocol is processed,
|
| - * it'll output frame data as either a List<int> or String.
|
| - *
|
| - * Important infomation about usage: Be sure you use cancelOnError, so the
|
| - * socket will be closed when the processer encounter an error. Not using it
|
| - * will lead to undefined behaviour.
|
| - */
|
| -// TODO(ajohnsen): make this transformer reusable?
|
| -class _WebSocketProtocolTransformer implements StreamTransformer, EventSink {
|
| - static const int START = 0;
|
| - static const int LEN_FIRST = 1;
|
| - static const int LEN_REST = 2;
|
| - static const int MASK = 3;
|
| - static const int PAYLOAD = 4;
|
| - static const int CLOSED = 5;
|
| - static const int FAILURE = 6;
|
| -
|
| - int _state = START;
|
| - bool _fin = false;
|
| - int _opcode = -1;
|
| - int _len = -1;
|
| - bool _masked = false;
|
| - int _remainingLenBytes = -1;
|
| - int _remainingMaskingKeyBytes = 4;
|
| - int _remainingPayloadBytes = -1;
|
| - int _unmaskingIndex = 0;
|
| - int _currentMessageType = _WebSocketMessageType.NONE;
|
| - int closeCode = _WebSocketStatus.NO_STATUS_RECEIVED;
|
| - String closeReason = "";
|
| -
|
| - EventSink _eventSink;
|
| -
|
| - final bool _serverSide;
|
| - final List _maskingBytes = new List(4);
|
| - final BytesBuilder _payload = new BytesBuilder(copy: false);
|
| -
|
| - _WebSocketProtocolTransformer([this._serverSide = false]);
|
| -
|
| - Stream bind(Stream stream) {
|
| - return new Stream.eventTransformed(
|
| - stream,
|
| - (EventSink eventSink) {
|
| - if (_eventSink != null) {
|
| - throw new StateError("WebSocket transformer already used.");
|
| - }
|
| - _eventSink = eventSink;
|
| - return this;
|
| - });
|
| - }
|
| -
|
| - void addError(Object error, [StackTrace stackTrace]) =>
|
| - _eventSink.addError(error, stackTrace);
|
| -
|
| - void close() => _eventSink.close();
|
| -
|
| - /**
|
| - * Process data received from the underlying communication channel.
|
| - */
|
| - void add(Uint8List buffer) {
|
| - int count = buffer.length;
|
| - int index = 0;
|
| - int lastIndex = count;
|
| - if (_state == CLOSED) {
|
| - throw new CompatibleWebSocketException("Data on closed connection");
|
| - }
|
| - if (_state == FAILURE) {
|
| - throw new CompatibleWebSocketException("Data on failed connection");
|
| - }
|
| - while ((index < lastIndex) && _state != CLOSED && _state != FAILURE) {
|
| - int byte = buffer[index];
|
| - if (_state <= LEN_REST) {
|
| - if (_state == START) {
|
| - _fin = (byte & 0x80) != 0;
|
| - if ((byte & 0x70) != 0) {
|
| - // The RSV1, RSV2 bits RSV3 must be all zero.
|
| - throw new CompatibleWebSocketException("Protocol error");
|
| - }
|
| - _opcode = (byte & 0xF);
|
| - if (_opcode <= _WebSocketOpcode.BINARY) {
|
| - if (_opcode == _WebSocketOpcode.CONTINUATION) {
|
| - if (_currentMessageType == _WebSocketMessageType.NONE) {
|
| - throw new CompatibleWebSocketException("Protocol error");
|
| - }
|
| - } else {
|
| - assert(_opcode == _WebSocketOpcode.TEXT ||
|
| - _opcode == _WebSocketOpcode.BINARY);
|
| - if (_currentMessageType != _WebSocketMessageType.NONE) {
|
| - throw new CompatibleWebSocketException("Protocol error");
|
| - }
|
| - _currentMessageType = _opcode;
|
| - }
|
| - } else if (_opcode >= _WebSocketOpcode.CLOSE &&
|
| - _opcode <= _WebSocketOpcode.PONG) {
|
| - // Control frames cannot be fragmented.
|
| - if (!_fin) throw new CompatibleWebSocketException("Protocol error");
|
| - } else {
|
| - throw new CompatibleWebSocketException("Protocol error");
|
| - }
|
| - _state = LEN_FIRST;
|
| - } else if (_state == LEN_FIRST) {
|
| - _masked = (byte & 0x80) != 0;
|
| - _len = byte & 0x7F;
|
| - if (_isControlFrame() && _len > 125) {
|
| - throw new CompatibleWebSocketException("Protocol error");
|
| - }
|
| - if (_len == 126) {
|
| - _len = 0;
|
| - _remainingLenBytes = 2;
|
| - _state = LEN_REST;
|
| - } else if (_len == 127) {
|
| - _len = 0;
|
| - _remainingLenBytes = 8;
|
| - _state = LEN_REST;
|
| - } else {
|
| - assert(_len < 126);
|
| - _lengthDone();
|
| - }
|
| - } else {
|
| - assert(_state == LEN_REST);
|
| - _len = _len << 8 | byte;
|
| - _remainingLenBytes--;
|
| - if (_remainingLenBytes == 0) {
|
| - _lengthDone();
|
| - }
|
| - }
|
| - } else {
|
| - if (_state == MASK) {
|
| - _maskingBytes[4 - _remainingMaskingKeyBytes--] = byte;
|
| - if (_remainingMaskingKeyBytes == 0) {
|
| - _maskDone();
|
| - }
|
| - } else {
|
| - assert(_state == PAYLOAD);
|
| - // The payload is not handled one byte at a time but in blocks.
|
| - int payloadLength = min(lastIndex - index, _remainingPayloadBytes);
|
| - _remainingPayloadBytes -= payloadLength;
|
| - // Unmask payload if masked.
|
| - if (_masked) {
|
| - _unmask(index, payloadLength, buffer);
|
| - }
|
| - // Control frame and data frame share _payloads.
|
| - _payload.add(
|
| - new Uint8List.view(buffer.buffer, index, payloadLength));
|
| - index += payloadLength;
|
| - if (_isControlFrame()) {
|
| - if (_remainingPayloadBytes == 0) _controlFrameEnd();
|
| - } else {
|
| - if (_currentMessageType != _WebSocketMessageType.TEXT &&
|
| - _currentMessageType != _WebSocketMessageType.BINARY) {
|
| - throw new CompatibleWebSocketException("Protocol error");
|
| - }
|
| - if (_remainingPayloadBytes == 0) _messageFrameEnd();
|
| - }
|
| -
|
| - // Hack - as we always do index++ below.
|
| - index--;
|
| - }
|
| - }
|
| -
|
| - // Move to the next byte.
|
| - index++;
|
| - }
|
| - }
|
| -
|
| - void _unmask(int index, int length, Uint8List buffer) {
|
| - const int BLOCK_SIZE = 16;
|
| - // Skip Int32x4-version if message is small.
|
| - if (length >= BLOCK_SIZE) {
|
| - // Start by aligning to 16 bytes.
|
| - final int startOffset = BLOCK_SIZE - (index & 15);
|
| - final int end = index + startOffset;
|
| - for (int i = index; i < end; i++) {
|
| - buffer[i] ^= _maskingBytes[_unmaskingIndex++ & 3];
|
| - }
|
| - index += startOffset;
|
| - length -= startOffset;
|
| - final int blockCount = length ~/ BLOCK_SIZE;
|
| - if (blockCount > 0) {
|
| - // Create mask block.
|
| - int mask = 0;
|
| - for (int i = 3; i >= 0; i--) {
|
| - mask = (mask << 8) | _maskingBytes[(_unmaskingIndex + i) & 3];
|
| - }
|
| - Int32x4 blockMask = new Int32x4(mask, mask, mask, mask);
|
| - Int32x4List blockBuffer = new Int32x4List.view(
|
| - buffer.buffer, index, blockCount);
|
| - for (int i = 0; i < blockBuffer.length; i++) {
|
| - blockBuffer[i] ^= blockMask;
|
| - }
|
| - final int bytes = blockCount * BLOCK_SIZE;
|
| - index += bytes;
|
| - length -= bytes;
|
| - }
|
| - }
|
| - // Handle end.
|
| - final int end = index + length;
|
| - for (int i = index; i < end; i++) {
|
| - buffer[i] ^= _maskingBytes[_unmaskingIndex++ & 3];
|
| - }
|
| - }
|
| -
|
| - void _lengthDone() {
|
| - if (_masked) {
|
| - if (!_serverSide) {
|
| - throw new CompatibleWebSocketException(
|
| - "Received masked frame from server");
|
| - }
|
| - _state = MASK;
|
| - } else {
|
| - if (_serverSide) {
|
| - throw new CompatibleWebSocketException(
|
| - "Received unmasked frame from client");
|
| - }
|
| - _remainingPayloadBytes = _len;
|
| - _startPayload();
|
| - }
|
| - }
|
| -
|
| - void _maskDone() {
|
| - _remainingPayloadBytes = _len;
|
| - _startPayload();
|
| - }
|
| -
|
| - void _startPayload() {
|
| - // If there is no actual payload perform perform callbacks without
|
| - // going through the PAYLOAD state.
|
| - if (_remainingPayloadBytes == 0) {
|
| - if (_isControlFrame()) {
|
| - switch (_opcode) {
|
| - case _WebSocketOpcode.CLOSE:
|
| - _state = CLOSED;
|
| - _eventSink.close();
|
| - break;
|
| - case _WebSocketOpcode.PING:
|
| - _eventSink.add(new _WebSocketPing());
|
| - break;
|
| - case _WebSocketOpcode.PONG:
|
| - _eventSink.add(new _WebSocketPong());
|
| - break;
|
| - }
|
| - _prepareForNextFrame();
|
| - } else {
|
| - _messageFrameEnd();
|
| - }
|
| - } else {
|
| - _state = PAYLOAD;
|
| - }
|
| - }
|
| -
|
| - void _messageFrameEnd() {
|
| - if (_fin) {
|
| - switch (_currentMessageType) {
|
| - case _WebSocketMessageType.TEXT:
|
| - _eventSink.add(UTF8.decode(_payload.takeBytes()));
|
| - break;
|
| - case _WebSocketMessageType.BINARY:
|
| - _eventSink.add(_payload.takeBytes());
|
| - break;
|
| - }
|
| - _currentMessageType = _WebSocketMessageType.NONE;
|
| - }
|
| - _prepareForNextFrame();
|
| - }
|
| -
|
| - void _controlFrameEnd() {
|
| - switch (_opcode) {
|
| - case _WebSocketOpcode.CLOSE:
|
| - closeCode = _WebSocketStatus.NO_STATUS_RECEIVED;
|
| - var payload = _payload.takeBytes();
|
| - if (payload.length > 0) {
|
| - if (payload.length == 1) {
|
| - throw new CompatibleWebSocketException("Protocol error");
|
| - }
|
| - closeCode = payload[0] << 8 | payload[1];
|
| - if (closeCode == _WebSocketStatus.NO_STATUS_RECEIVED) {
|
| - throw new CompatibleWebSocketException("Protocol error");
|
| - }
|
| - if (payload.length > 2) {
|
| - closeReason = UTF8.decode(payload.sublist(2));
|
| - }
|
| - }
|
| - _state = CLOSED;
|
| - _eventSink.close();
|
| - break;
|
| -
|
| - case _WebSocketOpcode.PING:
|
| - _eventSink.add(new _WebSocketPing(_payload.takeBytes()));
|
| - break;
|
| -
|
| - case _WebSocketOpcode.PONG:
|
| - _eventSink.add(new _WebSocketPong(_payload.takeBytes()));
|
| - break;
|
| - }
|
| - _prepareForNextFrame();
|
| - }
|
| -
|
| - bool _isControlFrame() {
|
| - return _opcode == _WebSocketOpcode.CLOSE ||
|
| - _opcode == _WebSocketOpcode.PING ||
|
| - _opcode == _WebSocketOpcode.PONG;
|
| - }
|
| -
|
| - void _prepareForNextFrame() {
|
| - if (_state != CLOSED && _state != FAILURE) _state = START;
|
| - _fin = false;
|
| - _opcode = -1;
|
| - _len = -1;
|
| - _remainingLenBytes = -1;
|
| - _remainingMaskingKeyBytes = 4;
|
| - _remainingPayloadBytes = -1;
|
| - _unmaskingIndex = 0;
|
| - }
|
| -}
|
| -
|
| -
|
| -class _WebSocketPing {
|
| - final List<int> payload;
|
| - _WebSocketPing([this.payload = null]);
|
| -}
|
| -
|
| -
|
| -class _WebSocketPong {
|
| - final List<int> payload;
|
| - _WebSocketPong([this.payload = null]);
|
| -}
|
| -
|
| -// TODO(ajohnsen): Make this transformer reusable.
|
| -class _WebSocketOutgoingTransformer implements StreamTransformer, EventSink {
|
| - final _WebSocketImpl webSocket;
|
| - EventSink _eventSink;
|
| -
|
| - _WebSocketOutgoingTransformer(this.webSocket);
|
| -
|
| - Stream bind(Stream stream) {
|
| - return new Stream.eventTransformed(
|
| - stream,
|
| - (EventSink eventSink) {
|
| - if (_eventSink != null) {
|
| - throw new StateError("WebSocket transformer already used");
|
| - }
|
| - _eventSink = eventSink;
|
| - return this;
|
| - });
|
| - }
|
| -
|
| - void add(message) {
|
| - if (message is _WebSocketPong) {
|
| - addFrame(_WebSocketOpcode.PONG, message.payload);
|
| - return;
|
| - }
|
| - if (message is _WebSocketPing) {
|
| - addFrame(_WebSocketOpcode.PING, message.payload);
|
| - return;
|
| - }
|
| - List<int> data;
|
| - int opcode;
|
| - if (message != null) {
|
| - if (message is String) {
|
| - opcode = _WebSocketOpcode.TEXT;
|
| - data = UTF8.encode(message);
|
| - } else {
|
| - if (message is !List<int>) {
|
| - throw new ArgumentError(message);
|
| - }
|
| - opcode = _WebSocketOpcode.BINARY;
|
| - data = message;
|
| - }
|
| - } else {
|
| - opcode = _WebSocketOpcode.TEXT;
|
| - }
|
| - addFrame(opcode, data);
|
| - }
|
| -
|
| - void addError(Object error, [StackTrace stackTrace]) =>
|
| - _eventSink.addError(error, stackTrace);
|
| -
|
| - void close() {
|
| - int code = webSocket._outCloseCode;
|
| - String reason = webSocket._outCloseReason;
|
| - List<int> data;
|
| - if (code != null) {
|
| - data = new List<int>();
|
| - data.add((code >> 8) & 0xFF);
|
| - data.add(code & 0xFF);
|
| - if (reason != null) {
|
| - data.addAll(UTF8.encode(reason));
|
| - }
|
| - }
|
| - addFrame(_WebSocketOpcode.CLOSE, data);
|
| - _eventSink.close();
|
| - }
|
| -
|
| - void addFrame(int opcode, List<int> data) =>
|
| - createFrame(opcode, data, webSocket._serverSide).forEach(_eventSink.add);
|
| -
|
| - static Iterable createFrame(int opcode, List<int> data, bool serverSide) {
|
| - bool mask = !serverSide; // Masking not implemented for server.
|
| - int dataLength = data == null ? 0 : data.length;
|
| - // Determine the header size.
|
| - int headerSize = (mask) ? 6 : 2;
|
| - if (dataLength > 65535) {
|
| - headerSize += 8;
|
| - } else if (dataLength > 125) {
|
| - headerSize += 2;
|
| - }
|
| - Uint8List header = new Uint8List(headerSize);
|
| - int index = 0;
|
| - // Set FIN and opcode.
|
| - header[index++] = 0x80 | opcode;
|
| - // Determine size and position of length field.
|
| - int lengthBytes = 1;
|
| - if (dataLength > 65535) {
|
| - header[index++] = 127;
|
| - lengthBytes = 8;
|
| - } else if (dataLength > 125) {
|
| - header[index++] = 126;
|
| - lengthBytes = 2;
|
| - }
|
| - // Write the length in network byte order into the header.
|
| - for (int i = 0; i < lengthBytes; i++) {
|
| - header[index++] = dataLength >> (((lengthBytes - 1) - i) * 8) & 0xFF;
|
| - }
|
| - if (mask) {
|
| - header[1] |= 1 << 7;
|
| - var maskBytes = [_random.nextInt(256), _random.nextInt(256),
|
| - _random.nextInt(256), _random.nextInt(256)];
|
| - header.setRange(index, index + 4, maskBytes);
|
| - index += 4;
|
| - if (data != null) {
|
| - Uint8List list;
|
| - // If this is a text message just do the masking inside the
|
| - // encoded data.
|
| - if (opcode == _WebSocketOpcode.TEXT && data is Uint8List) {
|
| - list = data;
|
| - } else {
|
| - if (data is Uint8List) {
|
| - list = new Uint8List.fromList(data);
|
| - } else {
|
| - list = new Uint8List(data.length);
|
| - for (int i = 0; i < data.length; i++) {
|
| - if (data[i] < 0 || 255 < data[i]) {
|
| - throw new ArgumentError(
|
| - "List element is not a byte value "
|
| - "(value ${data[i]} at index $i)");
|
| - }
|
| - list[i] = data[i];
|
| - }
|
| - }
|
| - }
|
| - const int BLOCK_SIZE = 16;
|
| - int blockCount = list.length ~/ BLOCK_SIZE;
|
| - if (blockCount > 0) {
|
| - // Create mask block.
|
| - int mask = 0;
|
| - for (int i = 3; i >= 0; i--) {
|
| - mask = (mask << 8) | maskBytes[i];
|
| - }
|
| - Int32x4 blockMask = new Int32x4(mask, mask, mask, mask);
|
| - Int32x4List blockBuffer = new Int32x4List.view(
|
| - list.buffer, 0, blockCount);
|
| - for (int i = 0; i < blockBuffer.length; i++) {
|
| - blockBuffer[i] ^= blockMask;
|
| - }
|
| - }
|
| - // Handle end.
|
| - for (int i = blockCount * BLOCK_SIZE; i < list.length; i++) {
|
| - list[i] ^= maskBytes[i & 3];
|
| - }
|
| - data = list;
|
| - }
|
| - }
|
| - assert(index == headerSize);
|
| - if (data == null) {
|
| - return [header];
|
| - } else {
|
| - return [header, data];
|
| - }
|
| - }
|
| -}
|
| -
|
| -
|
| -class _WebSocketConsumer implements StreamConsumer {
|
| - final _WebSocketImpl webSocket;
|
| - final StreamSink<List<int>> sink;
|
| - StreamController _controller;
|
| - StreamSubscription _subscription;
|
| - bool _issuedPause = false;
|
| - bool _closed = false;
|
| - Completer _closeCompleter = new Completer();
|
| - Completer _completer;
|
| -
|
| - _WebSocketConsumer(this.webSocket, this.sink);
|
| -
|
| - void _onListen() {
|
| - if (_subscription != null) {
|
| - _subscription.cancel();
|
| - }
|
| - }
|
| -
|
| - void _onPause() {
|
| - if (_subscription != null) {
|
| - _subscription.pause();
|
| - } else {
|
| - _issuedPause = true;
|
| - }
|
| - }
|
| -
|
| - void _onResume() {
|
| - if (_subscription != null) {
|
| - _subscription.resume();
|
| - } else {
|
| - _issuedPause = false;
|
| - }
|
| - }
|
| -
|
| - void _cancel() {
|
| - if (_subscription != null) {
|
| - var subscription = _subscription;
|
| - _subscription = null;
|
| - subscription.cancel();
|
| - }
|
| - }
|
| -
|
| - _ensureController() {
|
| - if (_controller != null) return;
|
| - _controller = new StreamController(sync: true,
|
| - onPause: _onPause,
|
| - onResume: _onResume,
|
| - onCancel: _onListen);
|
| - var stream = _controller.stream.transform(
|
| - new _WebSocketOutgoingTransformer(webSocket));
|
| - sink.addStream(stream)
|
| - .then((_) {
|
| - _done();
|
| - _closeCompleter.complete(webSocket);
|
| - }, onError: (error, StackTrace stackTrace) {
|
| - _closed = true;
|
| - _cancel();
|
| - if (error is ArgumentError) {
|
| - if (!_done(error, stackTrace)) {
|
| - _closeCompleter.completeError(error, stackTrace);
|
| - }
|
| - } else {
|
| - _done();
|
| - _closeCompleter.complete(webSocket);
|
| - }
|
| - });
|
| - }
|
| -
|
| - bool _done([error, StackTrace stackTrace]) {
|
| - if (_completer == null) return false;
|
| - if (error != null) {
|
| - _completer.completeError(error, stackTrace);
|
| - } else {
|
| - _completer.complete(webSocket);
|
| - }
|
| - _completer = null;
|
| - return true;
|
| - }
|
| -
|
| - Future addStream(var stream) {
|
| - if (_closed) {
|
| - stream.listen(null).cancel();
|
| - return new Future.value(webSocket);
|
| - }
|
| - _ensureController();
|
| - _completer = new Completer();
|
| - _subscription = stream.listen(
|
| - (data) {
|
| - _controller.add(data);
|
| - },
|
| - onDone: _done,
|
| - onError: _done,
|
| - cancelOnError: true);
|
| - if (_issuedPause) {
|
| - _subscription.pause();
|
| - _issuedPause = false;
|
| - }
|
| - return _completer.future;
|
| - }
|
| -
|
| - Future close() {
|
| - _ensureController();
|
| - Future closeSocket() {
|
| - return sink.close().catchError((_) {}).then((_) => webSocket);
|
| - }
|
| - _controller.close();
|
| - return _closeCompleter.future.then((_) => closeSocket());
|
| - }
|
| -
|
| - void add(data) {
|
| - if (_closed) return;
|
| - _ensureController();
|
| - _controller.add(data);
|
| - }
|
| -
|
| - void closeSocket() {
|
| - _closed = true;
|
| - _cancel();
|
| - close();
|
| - }
|
| -}
|
| -
|
| -
|
| -class _WebSocketImpl extends Stream implements CompatibleWebSocket {
|
| - StreamController _controller;
|
| - StreamSubscription _subscription;
|
| - StreamController _sink;
|
| -
|
| - final bool _serverSide;
|
| - int _readyState = _WebSocketState.CONNECTING;
|
| - bool _writeClosed = false;
|
| - int _closeCode;
|
| - String _closeReason;
|
| - Duration _pingInterval;
|
| - Timer _pingTimer;
|
| - _WebSocketConsumer _consumer;
|
| -
|
| - int _outCloseCode;
|
| - String _outCloseReason;
|
| - Timer _closeTimer;
|
| -
|
| - _WebSocketImpl._fromSocket(Stream<List<int>> stream,
|
| - StreamSink<List<int>> sink, [this._serverSide = false]) {
|
| - _consumer = new _WebSocketConsumer(this, sink);
|
| - _sink = new StreamController();
|
| - _sink.stream.pipe(_consumer);
|
| - _readyState = _WebSocketState.OPEN;
|
| -
|
| - var transformer = new _WebSocketProtocolTransformer(_serverSide);
|
| - _subscription = stream.transform(transformer).listen(
|
| - (data) {
|
| - if (data is _WebSocketPing) {
|
| - if (!_writeClosed) _consumer.add(new _WebSocketPong(data.payload));
|
| - } else if (data is _WebSocketPong) {
|
| - // Simply set pingInterval, as it'll cancel any timers.
|
| - pingInterval = _pingInterval;
|
| - } else {
|
| - _controller.add(data);
|
| - }
|
| - },
|
| - onError: (error) {
|
| - if (_closeTimer != null) _closeTimer.cancel();
|
| - if (error is FormatException) {
|
| - _close(_WebSocketStatus.INVALID_FRAME_PAYLOAD_DATA);
|
| - } else {
|
| - _close(_WebSocketStatus.PROTOCOL_ERROR);
|
| - }
|
| - _controller.close();
|
| - },
|
| - onDone: () {
|
| - if (_closeTimer != null) _closeTimer.cancel();
|
| - if (_readyState == _WebSocketState.OPEN) {
|
| - _readyState = _WebSocketState.CLOSING;
|
| - if (!_isReservedStatusCode(transformer.closeCode)) {
|
| - _close(transformer.closeCode);
|
| - } else {
|
| - _close();
|
| - }
|
| - _readyState = _WebSocketState.CLOSED;
|
| - }
|
| - _closeCode = transformer.closeCode;
|
| - _closeReason = transformer.closeReason;
|
| - _controller.close();
|
| - },
|
| - cancelOnError: true);
|
| - _subscription.pause();
|
| - _controller = new StreamController(sync: true,
|
| - onListen: _subscription.resume,
|
| - onPause: _subscription.pause,
|
| - onResume: _subscription.resume);
|
| - }
|
| -
|
| - StreamSubscription listen(void onData(message),
|
| - {Function onError,
|
| - void onDone(),
|
| - bool cancelOnError}) {
|
| - return _controller.stream.listen(onData,
|
| - onError: onError,
|
| - onDone: onDone,
|
| - cancelOnError: cancelOnError);
|
| - }
|
| -
|
| - Duration get pingInterval => _pingInterval;
|
| -
|
| - void set pingInterval(Duration interval) {
|
| - if (_writeClosed) return;
|
| - if (_pingTimer != null) _pingTimer.cancel();
|
| - _pingInterval = interval;
|
| -
|
| - if (_pingInterval == null) return;
|
| -
|
| - _pingTimer = new Timer(_pingInterval, () {
|
| - if (_writeClosed) return;
|
| - _consumer.add(new _WebSocketPing());
|
| - _pingTimer = new Timer(_pingInterval, () {
|
| - // No pong received.
|
| - _close(_WebSocketStatus.GOING_AWAY);
|
| - });
|
| - });
|
| - }
|
| -
|
| - int get closeCode => _closeCode;
|
| - String get closeReason => _closeReason;
|
| -
|
| - void add(data) => _sink.add(data);
|
| - void addError(error, [StackTrace stackTrace]) =>
|
| - _sink.addError(error, stackTrace);
|
| - Future addStream(Stream stream) => _sink.addStream(stream);
|
| - Future get done => _sink.done;
|
| -
|
| - Future close([int code, String reason]) {
|
| - if (_isReservedStatusCode(code)) {
|
| - throw new CompatibleWebSocketException("Reserved status code $code");
|
| - }
|
| - if (_outCloseCode == null) {
|
| - _outCloseCode = code;
|
| - _outCloseReason = reason;
|
| - }
|
| - if (_closeTimer == null && !_controller.isClosed) {
|
| - // When closing the web-socket, we no longer accept data.
|
| - _closeTimer = new Timer(const Duration(seconds: 5), () {
|
| - _subscription.cancel();
|
| - _controller.close();
|
| - });
|
| - }
|
| - return _sink.close();
|
| - }
|
| -
|
| - void _close([int code, String reason]) {
|
| - if (_writeClosed) return;
|
| - if (_outCloseCode == null) {
|
| - _outCloseCode = code;
|
| - _outCloseReason = reason;
|
| - }
|
| - _writeClosed = true;
|
| - _consumer.closeSocket();
|
| - }
|
| -
|
| - static bool _isReservedStatusCode(int code) {
|
| - return code != null &&
|
| - (code < _WebSocketStatus.NORMAL_CLOSURE ||
|
| - code == _WebSocketStatus.RESERVED_1004 ||
|
| - code == _WebSocketStatus.NO_STATUS_RECEIVED ||
|
| - code == _WebSocketStatus.ABNORMAL_CLOSURE ||
|
| - (code > _WebSocketStatus.INTERNAL_SERVER_ERROR &&
|
| - code < _WebSocketStatus.RESERVED_1015) ||
|
| - (code >= _WebSocketStatus.RESERVED_1015 &&
|
| - code < 3000));
|
| - }
|
| -}
|
| -
|
|
|