Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(24)

Unified Diff: pkg/dev_compiler/tool/input_sdk/lib/io/websocket_impl.dart

Issue 2698353003: unfork DDC's copy of most SDK libraries (Closed)
Patch Set: revert core_patch Created 3 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: pkg/dev_compiler/tool/input_sdk/lib/io/websocket_impl.dart
diff --git a/pkg/dev_compiler/tool/input_sdk/lib/io/websocket_impl.dart b/pkg/dev_compiler/tool/input_sdk/lib/io/websocket_impl.dart
deleted file mode 100644
index 18d1cf22bc2407a2bce1ea4461cc9431ddebe43d..0000000000000000000000000000000000000000
--- a/pkg/dev_compiler/tool/input_sdk/lib/io/websocket_impl.dart
+++ /dev/null
@@ -1,1258 +0,0 @@
-// Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
-// for details. All rights reserved. Use of this source code is governed by a
-// BSD-style license that can be found in the LICENSE file.
-
-part of dart.io;
-
-const String _webSocketGUID = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
-const String _clientNoContextTakeover = "client_no_context_takeover";
-const String _serverNoContextTakeover = "server_no_context_takeover";
-const String _clientMaxWindowBits = "client_max_window_bits";
-const String _serverMaxWindowBits = "server_max_window_bits";
-
-// Matches _WebSocketOpcode.
-class _WebSocketMessageType {
- static const int NONE = 0;
- static const int TEXT = 1;
- static const int BINARY = 2;
-}
-
-class _WebSocketOpcode {
- static const int CONTINUATION = 0;
- static const int TEXT = 1;
- static const int BINARY = 2;
- static const int RESERVED_3 = 3;
- static const int RESERVED_4 = 4;
- static const int RESERVED_5 = 5;
- static const int RESERVED_6 = 6;
- static const int RESERVED_7 = 7;
- static const int CLOSE = 8;
- static const int PING = 9;
- static const int PONG = 10;
- static const int RESERVED_B = 11;
- static const int RESERVED_C = 12;
- static const int RESERVED_D = 13;
- static const int RESERVED_E = 14;
- static const int RESERVED_F = 15;
-}
-
-/**
- * Stores the header and integer value derived from negotiation of
- * client_max_window_bits and server_max_window_bits. headerValue will be
- * set in the Websocket response headers.
- */
-class _CompressionMaxWindowBits {
- String headerValue;
- int maxWindowBits;
- _CompressionMaxWindowBits([this.headerValue, this.maxWindowBits]);
- String toString() => headerValue;
-}
-
-/**
- * The web socket protocol transformer handles the protocol byte stream
- * which is supplied through the [:handleData:]. As the protocol is processed,
- * it'll output frame data as either a List<int> or String.
- *
- * Important information about usage: Be sure you use cancelOnError, so the
- * socket will be closed when the processor encounter an error. Not using it
- * will lead to undefined behaviour.
- */
-// TODO(ajohnsen): make this transformer reusable?
-class _WebSocketProtocolTransformer
- implements StreamTransformer<List<int>, dynamic>, EventSink<Uint8List> {
- static const int START = 0;
- static const int LEN_FIRST = 1;
- static const int LEN_REST = 2;
- static const int MASK = 3;
- static const int PAYLOAD = 4;
- static const int CLOSED = 5;
- static const int FAILURE = 6;
- static const int FIN = 0x80;
- static const int RSV1 = 0x40;
- static const int RSV2 = 0x20;
- static const int RSV3 = 0x10;
- static const int OPCODE = 0xF;
-
- int _state = START;
- bool _fin = false;
- bool _compressed = false;
- int _opcode = -1;
- int _len = -1;
- bool _masked = false;
- int _remainingLenBytes = -1;
- int _remainingMaskingKeyBytes = 4;
- int _remainingPayloadBytes = -1;
- int _unmaskingIndex = 0;
- int _currentMessageType = _WebSocketMessageType.NONE;
- int closeCode = WebSocketStatus.NO_STATUS_RECEIVED;
- String closeReason = "";
-
- EventSink _eventSink;
-
- final bool _serverSide;
- final List _maskingBytes = new List(4);
- final BytesBuilder _payload = new BytesBuilder(copy: false);
-
- _WebSocketPerMessageDeflate _deflate;
- _WebSocketProtocolTransformer([this._serverSide = false, this._deflate]);
-
- Stream bind(Stream stream) {
- return new Stream.eventTransformed(stream, (EventSink eventSink) {
- if (_eventSink != null) {
- throw new StateError("WebSocket transformer already used.");
- }
- _eventSink = eventSink;
- return this;
- });
- }
-
- void addError(Object error, [StackTrace stackTrace]) {
- _eventSink.addError(error, stackTrace);
- }
-
- void close() { _eventSink.close(); }
-
- /**
- * Process data received from the underlying communication channel.
- */
- void add(List<int> bytes) {
- var buffer = bytes is Uint8List ? bytes : new Uint8List.fromList(bytes);
- int index = 0;
- int lastIndex = buffer.length;
- if (_state == CLOSED) {
- throw new WebSocketException("Data on closed connection");
- }
- if (_state == FAILURE) {
- throw new WebSocketException("Data on failed connection");
- }
- while ((index < lastIndex) && _state != CLOSED && _state != FAILURE) {
- int byte = buffer[index];
- if (_state <= LEN_REST) {
- if (_state == START) {
- _fin = (byte & FIN) != 0;
-
- if((byte & (RSV2 | RSV3)) != 0) {
- // The RSV2, RSV3 bits must both be zero.
- throw new WebSocketException("Protocol error");
- }
-
- _opcode = (byte & OPCODE);
-
- if (_opcode != _WebSocketOpcode.CONTINUATION) {
- if ((byte & RSV1) != 0) {
- _compressed = true;
- } else {
- _compressed = false;
- }
- }
-
- if (_opcode <= _WebSocketOpcode.BINARY) {
- if (_opcode == _WebSocketOpcode.CONTINUATION) {
- if (_currentMessageType == _WebSocketMessageType.NONE) {
- throw new WebSocketException("Protocol error");
- }
- } else {
- assert(_opcode == _WebSocketOpcode.TEXT ||
- _opcode == _WebSocketOpcode.BINARY);
- if (_currentMessageType != _WebSocketMessageType.NONE) {
- throw new WebSocketException("Protocol error");
- }
- _currentMessageType = _opcode;
- }
- } else if (_opcode >= _WebSocketOpcode.CLOSE &&
- _opcode <= _WebSocketOpcode.PONG) {
- // Control frames cannot be fragmented.
- if (!_fin) throw new WebSocketException("Protocol error");
- } else {
- throw new WebSocketException("Protocol error");
- }
- _state = LEN_FIRST;
- } else if (_state == LEN_FIRST) {
- _masked = (byte & 0x80) != 0;
- _len = byte & 0x7F;
- if (_isControlFrame() && _len > 125) {
- throw new WebSocketException("Protocol error");
- }
- if (_len == 126) {
- _len = 0;
- _remainingLenBytes = 2;
- _state = LEN_REST;
- } else if (_len == 127) {
- _len = 0;
- _remainingLenBytes = 8;
- _state = LEN_REST;
- } else {
- assert(_len < 126);
- _lengthDone();
- }
- } else {
- assert(_state == LEN_REST);
- _len = _len << 8 | byte;
- _remainingLenBytes--;
- if (_remainingLenBytes == 0) {
- _lengthDone();
- }
- }
- } else {
- if (_state == MASK) {
- _maskingBytes[4 - _remainingMaskingKeyBytes--] = byte;
- if (_remainingMaskingKeyBytes == 0) {
- _maskDone();
- }
- } else {
- assert(_state == PAYLOAD);
- // The payload is not handled one byte at a time but in blocks.
- int payloadLength = min(lastIndex - index, _remainingPayloadBytes);
- _remainingPayloadBytes -= payloadLength;
- // Unmask payload if masked.
- if (_masked) {
- _unmask(index, payloadLength, buffer);
- }
- // Control frame and data frame share _payloads.
- _payload.add(new Uint8List.view(buffer.buffer, index, payloadLength));
- index += payloadLength;
- if (_isControlFrame()) {
- if (_remainingPayloadBytes == 0) _controlFrameEnd();
- } else {
- if (_currentMessageType != _WebSocketMessageType.TEXT &&
- _currentMessageType != _WebSocketMessageType.BINARY) {
- throw new WebSocketException("Protocol error");
- }
- if (_remainingPayloadBytes == 0) _messageFrameEnd();
- }
-
- // Hack - as we always do index++ below.
- index--;
- }
- }
-
- // Move to the next byte.
- index++;
- }
- }
-
- void _unmask(int index, int length, Uint8List buffer) {
- const int BLOCK_SIZE = 16;
- // Skip Int32x4-version if message is small.
- if (length >= BLOCK_SIZE) {
- // Start by aligning to 16 bytes.
- final int startOffset = BLOCK_SIZE - (index & 15);
- final int end = index + startOffset;
- for (int i = index; i < end; i++) {
- buffer[i] ^= _maskingBytes[_unmaskingIndex++ & 3];
- }
- index += startOffset;
- length -= startOffset;
- final int blockCount = length ~/ BLOCK_SIZE;
- if (blockCount > 0) {
- // Create mask block.
- int mask = 0;
- for (int i = 3; i >= 0; i--) {
- mask = (mask << 8) | _maskingBytes[(_unmaskingIndex + i) & 3];
- }
- Int32x4 blockMask = new Int32x4(mask, mask, mask, mask);
- Int32x4List blockBuffer =
- new Int32x4List.view(buffer.buffer, index, blockCount);
- for (int i = 0; i < blockBuffer.length; i++) {
- blockBuffer[i] ^= blockMask;
- }
- final int bytes = blockCount * BLOCK_SIZE;
- index += bytes;
- length -= bytes;
- }
- }
- // Handle end.
- final int end = index + length;
- for (int i = index; i < end; i++) {
- buffer[i] ^= _maskingBytes[_unmaskingIndex++ & 3];
- }
- }
-
- void _lengthDone() {
- if (_masked) {
- if (!_serverSide) {
- throw new WebSocketException("Received masked frame from server");
- }
- _state = MASK;
- } else {
- if (_serverSide) {
- throw new WebSocketException("Received unmasked frame from client");
- }
- _remainingPayloadBytes = _len;
- _startPayload();
- }
- }
-
- void _maskDone() {
- _remainingPayloadBytes = _len;
- _startPayload();
- }
-
- void _startPayload() {
- // If there is no actual payload perform perform callbacks without
- // going through the PAYLOAD state.
- if (_remainingPayloadBytes == 0) {
- if (_isControlFrame()) {
- switch (_opcode) {
- case _WebSocketOpcode.CLOSE:
- _state = CLOSED;
- _eventSink.close();
- break;
- case _WebSocketOpcode.PING:
- _eventSink.add(new _WebSocketPing());
- break;
- case _WebSocketOpcode.PONG:
- _eventSink.add(new _WebSocketPong());
- break;
- }
- _prepareForNextFrame();
- } else {
- _messageFrameEnd();
- }
- } else {
- _state = PAYLOAD;
- }
- }
-
- void _messageFrameEnd() {
- if (_fin) {
- var bytes = _payload.takeBytes();
- if (_deflate != null && _compressed) {
- bytes = _deflate.processIncomingMessage(bytes);
- }
-
- switch (_currentMessageType) {
- case _WebSocketMessageType.TEXT:
- _eventSink.add(UTF8.decode(bytes));
- break;
- case _WebSocketMessageType.BINARY:
- _eventSink.add(bytes);
- break;
- }
- _currentMessageType = _WebSocketMessageType.NONE;
- }
- _prepareForNextFrame();
- }
-
- void _controlFrameEnd() {
- switch (_opcode) {
- case _WebSocketOpcode.CLOSE:
- closeCode = WebSocketStatus.NO_STATUS_RECEIVED;
- var payload = _payload.takeBytes();
- if (payload.length > 0) {
- if (payload.length == 1) {
- throw new WebSocketException("Protocol error");
- }
- closeCode = payload[0] << 8 | payload[1];
- if (closeCode == WebSocketStatus.NO_STATUS_RECEIVED) {
- throw new WebSocketException("Protocol error");
- }
- if (payload.length > 2) {
- closeReason = UTF8.decode(payload.sublist(2));
- }
- }
- _state = CLOSED;
- _eventSink.close();
- break;
-
- case _WebSocketOpcode.PING:
- _eventSink.add(new _WebSocketPing(_payload.takeBytes()));
- break;
-
- case _WebSocketOpcode.PONG:
- _eventSink.add(new _WebSocketPong(_payload.takeBytes()));
- break;
- }
- _prepareForNextFrame();
- }
-
- bool _isControlFrame() {
- return _opcode == _WebSocketOpcode.CLOSE ||
- _opcode == _WebSocketOpcode.PING ||
- _opcode == _WebSocketOpcode.PONG;
- }
-
- void _prepareForNextFrame() {
- if (_state != CLOSED && _state != FAILURE) _state = START;
- _fin = false;
- _opcode = -1;
- _len = -1;
- _remainingLenBytes = -1;
- _remainingMaskingKeyBytes = 4;
- _remainingPayloadBytes = -1;
- _unmaskingIndex = 0;
- }
-}
-
-class _WebSocketPing {
- final List<int> payload;
- _WebSocketPing([this.payload = null]);
-}
-
-class _WebSocketPong {
- final List<int> payload;
- _WebSocketPong([this.payload = null]);
-}
-
-class _WebSocketTransformerImpl implements WebSocketTransformer {
- final StreamController<WebSocket> _controller =
- new StreamController<WebSocket>(sync: true);
- final Function _protocolSelector;
- final CompressionOptions _compression;
-
- _WebSocketTransformerImpl(this._protocolSelector, this._compression);
-
- Stream<WebSocket> bind(Stream<HttpRequest> stream) {
- stream.listen((request) {
- _upgrade(request, _protocolSelector, _compression)
- .then((WebSocket webSocket) => _controller.add(webSocket))
- .catchError(_controller.addError);
- }, onDone: () {
- _controller.close();
- });
-
- return _controller.stream;
- }
-
- static Future<WebSocket> _upgrade(
- HttpRequest request, _protocolSelector, CompressionOptions compression) {
- var response = request.response;
- if (!_isUpgradeRequest(request)) {
- // Send error response.
- response
- ..statusCode = HttpStatus.BAD_REQUEST
- ..close();
- return new Future.error(
- new WebSocketException("Invalid WebSocket upgrade request"));
- }
-
- Future upgrade(String protocol) {
- // Send the upgrade response.
- response
- ..statusCode = HttpStatus.SWITCHING_PROTOCOLS
- ..headers.add(HttpHeaders.CONNECTION, "Upgrade")
- ..headers.add(HttpHeaders.UPGRADE, "websocket");
- String key = request.headers.value("Sec-WebSocket-Key");
- _SHA1 sha1 = new _SHA1();
- sha1.add("$key$_webSocketGUID".codeUnits);
- String accept = _CryptoUtils.bytesToBase64(sha1.close());
- response.headers.add("Sec-WebSocket-Accept", accept);
- if (protocol != null) {
- response.headers.add("Sec-WebSocket-Protocol", protocol);
- }
-
- var deflate = _negotiateCompression(request, response, compression);
-
- response.headers.contentLength = 0;
- return response.detachSocket().then((socket) =>
- new _WebSocketImpl._fromSocket(
- socket, protocol, compression, true, deflate));
- }
-
- var protocols = request.headers['Sec-WebSocket-Protocol'];
- if (protocols != null && _protocolSelector != null) {
- // The suggested protocols can be spread over multiple lines, each
- // consisting of multiple protocols. To unify all of them, first join
- // the lists with ', ' and then tokenize.
- protocols = _HttpParser._tokenizeFieldValue(protocols.join(', '));
- return new Future(() => _protocolSelector(protocols)).then((protocol) {
- if (protocols.indexOf(protocol) < 0) {
- throw new WebSocketException(
- "Selected protocol is not in the list of available protocols");
- }
- return protocol;
- }).catchError((error) {
- response
- ..statusCode = HttpStatus.INTERNAL_SERVER_ERROR
- ..close();
- throw error;
- }).then(upgrade);
- } else {
- return upgrade(null);
- }
- }
-
- static _WebSocketPerMessageDeflate _negotiateCompression(HttpRequest request,
- HttpResponse response, CompressionOptions compression) {
- var extensionHeader = request.headers.value("Sec-WebSocket-Extensions");
-
- extensionHeader ??= "";
-
- var hv = HeaderValue.parse(extensionHeader, valueSeparator: ',');
- if (compression.enabled && hv.value == _WebSocketImpl.PER_MESSAGE_DEFLATE) {
- var info = compression._createHeader(hv);
-
- response.headers.add("Sec-WebSocket-Extensions", info.headerValue);
- var serverNoContextTakeover =
- (hv.parameters.containsKey(_serverNoContextTakeover) &&
- compression.serverNoContextTakeover);
- var clientNoContextTakeover =
- (hv.parameters.containsKey(_clientNoContextTakeover) &&
- compression.clientNoContextTakeover);
- var deflate = new _WebSocketPerMessageDeflate(
- serverNoContextTakeover: serverNoContextTakeover,
- clientNoContextTakeover: clientNoContextTakeover,
- serverMaxWindowBits: info.maxWindowBits,
- clientMaxWindowBits: info.maxWindowBits,
- serverSide: true);
-
- return deflate;
- }
-
- return null;
- }
-
- static bool _isUpgradeRequest(HttpRequest request) {
- if (request.method != "GET") {
- return false;
- }
- if (request.headers[HttpHeaders.CONNECTION] == null) {
- return false;
- }
- bool isUpgrade = false;
- request.headers[HttpHeaders.CONNECTION].forEach((String value) {
- if (value.toLowerCase() == "upgrade") isUpgrade = true;
- });
- if (!isUpgrade) return false;
- String upgrade = request.headers.value(HttpHeaders.UPGRADE);
- if (upgrade == null || upgrade.toLowerCase() != "websocket") {
- return false;
- }
- String version = request.headers.value("Sec-WebSocket-Version");
- if (version == null || version != "13") {
- return false;
- }
- String key = request.headers.value("Sec-WebSocket-Key");
- if (key == null) {
- return false;
- }
- return true;
- }
-}
-
-class _WebSocketPerMessageDeflate {
- bool serverNoContextTakeover;
- bool clientNoContextTakeover;
- int clientMaxWindowBits;
- int serverMaxWindowBits;
- bool serverSide;
-
- _Filter decoder;
- _Filter encoder;
-
- _WebSocketPerMessageDeflate(
- {this.clientMaxWindowBits: _WebSocketImpl.DEFAULT_WINDOW_BITS,
- this.serverMaxWindowBits: _WebSocketImpl.DEFAULT_WINDOW_BITS,
- this.serverNoContextTakeover: false,
- this.clientNoContextTakeover: false,
- this.serverSide: false});
-
- void _ensureDecoder() {
- if (decoder == null) {
- decoder = _Filter._newZLibInflateFilter(
- serverSide ? clientMaxWindowBits : serverMaxWindowBits, null, true);
- }
- }
-
- void _ensureEncoder() {
- if (encoder == null) {
- encoder = _Filter._newZLibDeflateFilter(
- false,
- ZLibOption.DEFAULT_LEVEL,
- serverSide ? serverMaxWindowBits : clientMaxWindowBits,
- ZLibOption.DEFAULT_MEM_LEVEL,
- ZLibOption.STRATEGY_DEFAULT,
- null,
- true);
- }
- }
-
- Uint8List processIncomingMessage(List<int> msg) {
- _ensureDecoder();
-
- var data = [];
- data.addAll(msg);
- data.addAll(const [0x00, 0x00, 0xff, 0xff]);
-
- decoder.process(data, 0, data.length);
- var result = [];
- var out;
-
- while ((out = decoder.processed()) != null) {
- result.addAll(out);
- }
-
- if ((serverSide && clientNoContextTakeover) ||
- (!serverSide && serverNoContextTakeover)) {
- decoder = null;
- }
-
- return new Uint8List.fromList(result);
- }
-
- List<int> processOutgoingMessage(List<int> msg) {
- _ensureEncoder();
- var result = [];
- Uint8List buffer;
- var out;
-
- if (msg is! Uint8List) {
- for (var i = 0; i < msg.length; i++) {
- if (msg[i] < 0 || 255 < msg[i]) {
- throw new ArgumentError("List element is not a byte value "
- "(value ${msg[i]} at index $i)");
- }
- }
- buffer = new Uint8List.fromList(msg);
- } else {
- buffer = msg;
- }
-
- encoder.process(buffer, 0, buffer.length);
-
- while ((out = encoder.processed()) != null) {
- result.addAll(out);
- }
-
- if ((!serverSide && clientNoContextTakeover) ||
- (serverSide && serverNoContextTakeover)) {
- encoder = null;
- }
-
- if (result.length > 4) {
- result = result.sublist(0, result.length - 4);
- }
-
- return result;
- }
-}
-
-// TODO(ajohnsen): Make this transformer reusable.
-class _WebSocketOutgoingTransformer
- implements StreamTransformer<dynamic, List<int>>, EventSink {
- final _WebSocketImpl webSocket;
- EventSink<List<int>> _eventSink;
-
- _WebSocketPerMessageDeflate _deflateHelper;
-
- _WebSocketOutgoingTransformer(this.webSocket) {
- _deflateHelper = webSocket._deflate;
- }
-
- Stream<List<int>> bind(Stream stream) {
- return new Stream.eventTransformed(stream, (eventSink) {
- if (_eventSink != null) {
- throw new StateError("WebSocket transformer already used");
- }
- _eventSink = eventSink;
- return this;
- });
- }
-
- void add(message) {
- if (message is _WebSocketPong) {
- addFrame(_WebSocketOpcode.PONG, message.payload);
- return;
- }
- if (message is _WebSocketPing) {
- addFrame(_WebSocketOpcode.PING, message.payload);
- return;
- }
- List<int> data;
- int opcode;
- if (message != null) {
- if (message is String) {
- opcode = _WebSocketOpcode.TEXT;
- data = UTF8.encode(message);
- } else {
- if (message is List<int>) {
- data = message;
- opcode = _WebSocketOpcode.BINARY;
- } else {
- throw new ArgumentError(message);
- }
- }
-
- if (_deflateHelper != null) {
- data = _deflateHelper.processOutgoingMessage(data);
- }
- } else {
- opcode = _WebSocketOpcode.TEXT;
- }
- addFrame(opcode, data);
- }
-
- void addError(Object error, [StackTrace stackTrace]) {
- _eventSink.addError(error, stackTrace);
- }
-
- void close() {
- int code = webSocket._outCloseCode;
- String reason = webSocket._outCloseReason;
- List<int> data;
- if (code != null) {
- data = new List<int>();
- data.add((code >> 8) & 0xFF);
- data.add(code & 0xFF);
- if (reason != null) {
- data.addAll(UTF8.encode(reason));
- }
- }
- addFrame(_WebSocketOpcode.CLOSE, data);
- _eventSink.close();
- }
-
- void addFrame(int opcode, List<int> data) => createFrame(
- opcode,
- data,
- webSocket._serverSide,
- _deflateHelper != null &&
- (opcode == _WebSocketOpcode.TEXT ||
- opcode == _WebSocketOpcode.BINARY)).forEach((e) {
- _eventSink.add(e);
- });
-
- static Iterable<List<int>> createFrame(
- int opcode, List<int> data, bool serverSide, bool compressed) {
- bool mask = !serverSide; // Masking not implemented for server.
- int dataLength = data == null ? 0 : data.length;
- // Determine the header size.
- int headerSize = (mask) ? 6 : 2;
- if (dataLength > 65535) {
- headerSize += 8;
- } else if (dataLength > 125) {
- headerSize += 2;
- }
- Uint8List header = new Uint8List(headerSize);
- int index = 0;
-
- // Set FIN and opcode.
- var hoc = _WebSocketProtocolTransformer.FIN
- | (compressed ? _WebSocketProtocolTransformer.RSV1 : 0)
- | (opcode & _WebSocketProtocolTransformer.OPCODE);
-
- header[index++] = hoc;
- // Determine size and position of length field.
- int lengthBytes = 1;
- if (dataLength > 65535) {
- header[index++] = 127;
- lengthBytes = 8;
- } else if (dataLength > 125) {
- header[index++] = 126;
- lengthBytes = 2;
- }
- // Write the length in network byte order into the header.
- for (int i = 0; i < lengthBytes; i++) {
- header[index++] = dataLength >> (((lengthBytes - 1) - i) * 8) & 0xFF;
- }
- if (mask) {
- header[1] |= 1 << 7;
- var maskBytes = _IOCrypto.getRandomBytes(4);
- header.setRange(index, index + 4, maskBytes);
- index += 4;
- if (data != null) {
- Uint8List list;
- // If this is a text message just do the masking inside the
- // encoded data.
- if (opcode == _WebSocketOpcode.TEXT && data is Uint8List) {
- list = data;
- } else {
- if (data is Uint8List) {
- list = new Uint8List.fromList(data);
- } else {
- list = new Uint8List(data.length);
- for (int i = 0; i < data.length; i++) {
- if (data[i] < 0 || 255 < data[i]) {
- throw new ArgumentError("List element is not a byte value "
- "(value ${data[i]} at index $i)");
- }
- list[i] = data[i];
- }
- }
- }
- const int BLOCK_SIZE = 16;
- int blockCount = list.length ~/ BLOCK_SIZE;
- if (blockCount > 0) {
- // Create mask block.
- int mask = 0;
- for (int i = 3; i >= 0; i--) {
- mask = (mask << 8) | maskBytes[i];
- }
- Int32x4 blockMask = new Int32x4(mask, mask, mask, mask);
- Int32x4List blockBuffer =
- new Int32x4List.view(list.buffer, 0, blockCount);
- for (int i = 0; i < blockBuffer.length; i++) {
- blockBuffer[i] ^= blockMask;
- }
- }
- // Handle end.
- for (int i = blockCount * BLOCK_SIZE; i < list.length; i++) {
- list[i] ^= maskBytes[i & 3];
- }
- data = list;
- }
- }
- assert(index == headerSize);
- if (data == null) {
- return [header];
- } else {
- return [header, data];
- }
- }
-}
-
-class _WebSocketConsumer implements StreamConsumer {
- final _WebSocketImpl webSocket;
- final Socket socket;
- StreamController _controller;
- StreamSubscription _subscription;
- bool _issuedPause = false;
- bool _closed = false;
- Completer _closeCompleter = new Completer();
- Completer _completer;
-
- _WebSocketConsumer(this.webSocket, this.socket);
-
- void _onListen() {
- if (_subscription != null) {
- _subscription.cancel();
- }
- }
-
- void _onPause() {
- if (_subscription != null) {
- _subscription.pause();
- } else {
- _issuedPause = true;
- }
- }
-
- void _onResume() {
- if (_subscription != null) {
- _subscription.resume();
- } else {
- _issuedPause = false;
- }
- }
-
- void _cancel() {
- if (_subscription != null) {
- var subscription = _subscription;
- _subscription = null;
- subscription.cancel();
- }
- }
-
- _ensureController() {
- if (_controller != null) return;
- _controller = new StreamController(
- sync: true,
- onPause: _onPause,
- onResume: _onResume,
- onCancel: _onListen);
- var stream = _controller.stream
- .transform(new _WebSocketOutgoingTransformer(webSocket));
- socket.addStream(stream).then((_) {
- _done();
- _closeCompleter.complete(webSocket);
- }, onError: (error, StackTrace stackTrace) {
- _closed = true;
- _cancel();
- if (error is ArgumentError) {
- if (!_done(error, stackTrace)) {
- _closeCompleter.completeError(error, stackTrace);
- }
- } else {
- _done();
- _closeCompleter.complete(webSocket);
- }
- });
- }
-
- bool _done([error, StackTrace stackTrace]) {
- if (_completer == null) return false;
- if (error != null) {
- _completer.completeError(error, stackTrace);
- } else {
- _completer.complete(webSocket);
- }
- _completer = null;
- return true;
- }
-
- Future addStream(var stream) {
- if (_closed) {
- stream.listen(null).cancel();
- return new Future.value(webSocket);
- }
- _ensureController();
- _completer = new Completer();
- _subscription = stream.listen((data) {
- _controller.add(data);
- }, onDone: _done, onError: _done, cancelOnError: true);
- if (_issuedPause) {
- _subscription.pause();
- _issuedPause = false;
- }
- return _completer.future;
- }
-
- Future close() {
- _ensureController();
- Future closeSocket() {
- return socket.close().catchError((_) {}).then((_) => webSocket);
- }
- _controller.close();
- return _closeCompleter.future.then((_) => closeSocket());
- }
-
- void add(data) {
- if (_closed) return;
- _ensureController();
- _controller.add(data);
- }
-
- void closeSocket() {
- _closed = true;
- _cancel();
- close();
- }
-}
-
-class _WebSocketImpl extends Stream with _ServiceObject implements WebSocket {
- // Use default Map so we keep order.
- static Map<int, _WebSocketImpl> _webSockets = new Map<int, _WebSocketImpl>();
- static const int DEFAULT_WINDOW_BITS = 15;
- static const String PER_MESSAGE_DEFLATE = "permessage-deflate";
-
- final String protocol;
-
- StreamController _controller;
- StreamSubscription _subscription;
- StreamSink _sink;
-
- final _socket;
- final bool _serverSide;
- int _readyState = WebSocket.CONNECTING;
- bool _writeClosed = false;
- int _closeCode;
- String _closeReason;
- Duration _pingInterval;
- Timer _pingTimer;
- _WebSocketConsumer _consumer;
-
- int _outCloseCode;
- String _outCloseReason;
- Timer _closeTimer;
- _WebSocketPerMessageDeflate _deflate;
-
- static final HttpClient _httpClient = new HttpClient();
-
- static Future<WebSocket> connect(
- String url, Iterable<String> protocols, Map<String, dynamic> headers,
- {CompressionOptions compression: CompressionOptions.DEFAULT}) {
- Uri uri = Uri.parse(url);
- if (uri.scheme != "ws" && uri.scheme != "wss") {
- throw new WebSocketException("Unsupported URL scheme '${uri.scheme}'");
- }
-
- Random random = new Random();
- // Generate 16 random bytes.
- Uint8List nonceData = new Uint8List(16);
- for (int i = 0; i < 16; i++) {
- nonceData[i] = random.nextInt(256);
- }
- String nonce = _CryptoUtils.bytesToBase64(nonceData);
-
- uri = new Uri(
- scheme: uri.scheme == "wss" ? "https" : "http",
- userInfo: uri.userInfo,
- host: uri.host,
- port: uri.port,
- path: uri.path,
- query: uri.query,
- fragment: uri.fragment);
- return _httpClient.openUrl("GET", uri).then((request) {
- if (uri.userInfo != null && !uri.userInfo.isEmpty) {
- // If the URL contains user information use that for basic
- // authorization.
- String auth = _CryptoUtils.bytesToBase64(UTF8.encode(uri.userInfo));
- request.headers.set(HttpHeaders.AUTHORIZATION, "Basic $auth");
- }
- if (headers != null) {
- headers.forEach((field, value) => request.headers.add(field, value));
- }
- // Setup the initial handshake.
- request.headers
- ..set(HttpHeaders.CONNECTION, "Upgrade")
- ..set(HttpHeaders.UPGRADE, "websocket")
- ..set("Sec-WebSocket-Key", nonce)
- ..set("Cache-Control", "no-cache")
- ..set("Sec-WebSocket-Version", "13");
- if (protocols != null) {
- request.headers.add("Sec-WebSocket-Protocol", protocols.toList());
- }
-
- if (compression.enabled) {
- request.headers
- .add("Sec-WebSocket-Extensions", compression._createHeader());
- }
-
- return request.close();
- }).then((response) {
-
- void error(String message) {
- // Flush data.
- response.detachSocket().then((socket) {
- socket.destroy();
- });
- throw new WebSocketException(message);
- }
-
- if (response.statusCode != HttpStatus.SWITCHING_PROTOCOLS ||
- response.headers[HttpHeaders.CONNECTION] == null ||
- !response.headers[HttpHeaders.CONNECTION]
- .any((value) => value.toLowerCase() == "upgrade") ||
- response.headers.value(HttpHeaders.UPGRADE).toLowerCase() !=
- "websocket") {
- error("Connection to '$uri' was not upgraded to websocket");
- }
- String accept = response.headers.value("Sec-WebSocket-Accept");
- if (accept == null) {
- error("Response did not contain a 'Sec-WebSocket-Accept' header");
- }
- _SHA1 sha1 = new _SHA1();
- sha1.add("$nonce$_webSocketGUID".codeUnits);
- List<int> expectedAccept = sha1.close();
- List<int> receivedAccept = _CryptoUtils.base64StringToBytes(accept);
- if (expectedAccept.length != receivedAccept.length) {
- error("Reasponse header 'Sec-WebSocket-Accept' is the wrong length");
- }
- for (int i = 0; i < expectedAccept.length; i++) {
- if (expectedAccept[i] != receivedAccept[i]) {
- error("Bad response 'Sec-WebSocket-Accept' header");
- }
- }
- var protocol = response.headers.value('Sec-WebSocket-Protocol');
-
- _WebSocketPerMessageDeflate deflate =
- negotiateClientCompression(response, compression);
-
- return response.detachSocket().then/*<WebSocket>*/((socket) =>
- new _WebSocketImpl._fromSocket(
- socket, protocol, compression, false, deflate));
- });
- }
-
- static _WebSocketPerMessageDeflate negotiateClientCompression(
- HttpClientResponse response, CompressionOptions compression) {
- String extensionHeader = response.headers.value('Sec-WebSocket-Extensions');
-
- if (extensionHeader == null) {
- extensionHeader = "";
- }
-
- var hv = HeaderValue.parse(extensionHeader, valueSeparator: ',');
-
- if (compression.enabled && hv.value == PER_MESSAGE_DEFLATE) {
- var serverNoContextTakeover =
- hv.parameters.containsKey(_serverNoContextTakeover);
- var clientNoContextTakeover =
- hv.parameters.containsKey(_clientNoContextTakeover);
-
- int getWindowBits(String type) {
- var o = hv.parameters[type];
- if (o == null) {
- return DEFAULT_WINDOW_BITS;
- }
-
- return int.parse(o, onError: (s) => DEFAULT_WINDOW_BITS);
- }
-
- return new _WebSocketPerMessageDeflate(
- clientMaxWindowBits: getWindowBits(_clientMaxWindowBits),
- serverMaxWindowBits: getWindowBits(_serverMaxWindowBits),
- clientNoContextTakeover: clientNoContextTakeover,
- serverNoContextTakeover: serverNoContextTakeover);
- }
-
- return null;
- }
-
- _WebSocketImpl._fromSocket(
- this._socket, this.protocol, CompressionOptions compression,
- [this._serverSide = false, _WebSocketPerMessageDeflate deflate]) {
- _consumer = new _WebSocketConsumer(this, _socket);
- _sink = new _StreamSinkImpl(_consumer);
- _readyState = WebSocket.OPEN;
- _deflate = deflate;
-
- var transformer = new _WebSocketProtocolTransformer(_serverSide, _deflate);
- _subscription = _socket.transform(transformer).listen((data) {
- if (data is _WebSocketPing) {
- if (!_writeClosed) _consumer.add(new _WebSocketPong(data.payload));
- } else if (data is _WebSocketPong) {
- // Simply set pingInterval, as it'll cancel any timers.
- pingInterval = _pingInterval;
- } else {
- _controller.add(data);
- }
- }, onError: (error, stackTrace) {
- if (_closeTimer != null) _closeTimer.cancel();
- if (error is FormatException) {
- _close(WebSocketStatus.INVALID_FRAME_PAYLOAD_DATA);
- } else {
- _close(WebSocketStatus.PROTOCOL_ERROR);
- }
- // An error happened, set the close code set above.
- _closeCode = _outCloseCode;
- _closeReason = _outCloseReason;
- _controller.close();
- }, onDone: () {
- if (_closeTimer != null) _closeTimer.cancel();
- if (_readyState == WebSocket.OPEN) {
- _readyState = WebSocket.CLOSING;
- if (!_isReservedStatusCode(transformer.closeCode)) {
- _close(transformer.closeCode, transformer.closeReason);
- } else {
- _close();
- }
- _readyState = WebSocket.CLOSED;
- }
- // Protocol close, use close code from transformer.
- _closeCode = transformer.closeCode;
- _closeReason = transformer.closeReason;
- _controller.close();
- }, cancelOnError: true);
- _subscription.pause();
- _controller = new StreamController(
- sync: true, onListen: _subscription.resume, onCancel: () {
- _subscription.cancel();
- _subscription = null;
- }, onPause: _subscription.pause, onResume: _subscription.resume);
-
- _webSockets[_serviceId] = this;
- try {
- _socket._owner = this;
- } catch (_) {}
- }
-
- StreamSubscription listen(void onData(message),
- {Function onError, void onDone(), bool cancelOnError}) {
- return _controller.stream.listen(onData,
- onError: onError, onDone: onDone, cancelOnError: cancelOnError);
- }
-
- Duration get pingInterval => _pingInterval;
-
- void set pingInterval(Duration interval) {
- if (_writeClosed) return;
- if (_pingTimer != null) _pingTimer.cancel();
- _pingInterval = interval;
-
- if (_pingInterval == null) return;
-
- _pingTimer = new Timer(_pingInterval, () {
- if (_writeClosed) return;
- _consumer.add(new _WebSocketPing());
- _pingTimer = new Timer(_pingInterval, () {
- // No pong received.
- _close(WebSocketStatus.GOING_AWAY);
- });
- });
- }
-
- int get readyState => _readyState;
-
- String get extensions => null;
- int get closeCode => _closeCode;
- String get closeReason => _closeReason;
-
- void add(data) { _sink.add(data); }
- void addError(error, [StackTrace stackTrace]) {
- _sink.addError(error, stackTrace);
- }
- Future addStream(Stream stream) => _sink.addStream(stream);
- Future get done => _sink.done;
-
- Future close([int code, String reason]) {
- if (_isReservedStatusCode(code)) {
- throw new WebSocketException("Reserved status code $code");
- }
- if (_outCloseCode == null) {
- _outCloseCode = code;
- _outCloseReason = reason;
- }
- if (!_controller.isClosed) {
- // If a close has not yet been received from the other end then
- // 1) make sure to listen on the stream so the close frame will be
- // processed if received.
- // 2) set a timer terminate the connection if a close frame is
- // not received.
- if (!_controller.hasListener && _subscription != null) {
- _controller.stream.drain().catchError((_) => {});
- }
- if (_closeTimer == null) {
- // When closing the web-socket, we no longer accept data.
- _closeTimer = new Timer(const Duration(seconds: 5), () {
- // Reuse code and reason from the local close.
- _closeCode = _outCloseCode;
- _closeReason = _outCloseReason;
- if (_subscription != null) _subscription.cancel();
- _controller.close();
- _webSockets.remove(_serviceId);
- });
- }
- }
- return _sink.close();
- }
-
- void _close([int code, String reason]) {
- if (_writeClosed) return;
- if (_outCloseCode == null) {
- _outCloseCode = code;
- _outCloseReason = reason;
- }
- _writeClosed = true;
- _consumer.closeSocket();
- _webSockets.remove(_serviceId);
- }
-
- String get _serviceTypePath => 'io/websockets';
- String get _serviceTypeName => 'WebSocket';
-
- Map<String, dynamic> _toJSON(bool ref) {
- var name = '${_socket.address.host}:${_socket.port}';
- var r = <String, dynamic>{
- 'id': _servicePath,
- 'type': _serviceType(ref),
- 'name': name,
- 'user_name': name,
- };
- if (ref) {
- return r;
- }
- try {
- r['socket'] = _socket._toJSON(true);
- } catch (_) {
- r['socket'] = {
- 'id': _servicePath,
- 'type': '@Socket',
- 'name': 'UserSocket',
- 'user_name': 'UserSocket',
- };
- }
- return r;
- }
-
- static bool _isReservedStatusCode(int code) {
- return code != null &&
- (code < WebSocketStatus.NORMAL_CLOSURE ||
- code == WebSocketStatus.RESERVED_1004 ||
- code == WebSocketStatus.NO_STATUS_RECEIVED ||
- code == WebSocketStatus.ABNORMAL_CLOSURE ||
- (code > WebSocketStatus.INTERNAL_SERVER_ERROR &&
- code < WebSocketStatus.RESERVED_1015) ||
- (code >= WebSocketStatus.RESERVED_1015 && code < 3000));
- }
-}
« no previous file with comments | « pkg/dev_compiler/tool/input_sdk/lib/io/websocket.dart ('k') | pkg/dev_compiler/tool/input_sdk/lib/isolate/capability.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698