| OLD | NEW |
| (Empty) |
| 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file | |
| 2 // for details. All rights reserved. Use of this source code is governed by a | |
| 3 // BSD-style license that can be found in the LICENSE file. | |
| 4 | |
| 5 part of dart.io; | |
| 6 | |
| 7 const String _webSocketGUID = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"; | |
| 8 const String _clientNoContextTakeover = "client_no_context_takeover"; | |
| 9 const String _serverNoContextTakeover = "server_no_context_takeover"; | |
| 10 const String _clientMaxWindowBits = "client_max_window_bits"; | |
| 11 const String _serverMaxWindowBits = "server_max_window_bits"; | |
| 12 | |
| 13 // Matches _WebSocketOpcode. | |
| 14 class _WebSocketMessageType { | |
| 15 static const int NONE = 0; | |
| 16 static const int TEXT = 1; | |
| 17 static const int BINARY = 2; | |
| 18 } | |
| 19 | |
| 20 class _WebSocketOpcode { | |
| 21 static const int CONTINUATION = 0; | |
| 22 static const int TEXT = 1; | |
| 23 static const int BINARY = 2; | |
| 24 static const int RESERVED_3 = 3; | |
| 25 static const int RESERVED_4 = 4; | |
| 26 static const int RESERVED_5 = 5; | |
| 27 static const int RESERVED_6 = 6; | |
| 28 static const int RESERVED_7 = 7; | |
| 29 static const int CLOSE = 8; | |
| 30 static const int PING = 9; | |
| 31 static const int PONG = 10; | |
| 32 static const int RESERVED_B = 11; | |
| 33 static const int RESERVED_C = 12; | |
| 34 static const int RESERVED_D = 13; | |
| 35 static const int RESERVED_E = 14; | |
| 36 static const int RESERVED_F = 15; | |
| 37 } | |
| 38 | |
| 39 /** | |
| 40 * Stores the header and integer value derived from negotiation of | |
| 41 * client_max_window_bits and server_max_window_bits. headerValue will be | |
| 42 * set in the Websocket response headers. | |
| 43 */ | |
| 44 class _CompressionMaxWindowBits { | |
| 45 String headerValue; | |
| 46 int maxWindowBits; | |
| 47 _CompressionMaxWindowBits([this.headerValue, this.maxWindowBits]); | |
| 48 String toString() => headerValue; | |
| 49 } | |
| 50 | |
| 51 /** | |
| 52 * The web socket protocol transformer handles the protocol byte stream | |
| 53 * which is supplied through the [:handleData:]. As the protocol is processed, | |
| 54 * it'll output frame data as either a List<int> or String. | |
| 55 * | |
| 56 * Important information about usage: Be sure you use cancelOnError, so the | |
| 57 * socket will be closed when the processor encounter an error. Not using it | |
| 58 * will lead to undefined behaviour. | |
| 59 */ | |
| 60 // TODO(ajohnsen): make this transformer reusable? | |
| 61 class _WebSocketProtocolTransformer | |
| 62 implements StreamTransformer<List<int>, dynamic>, EventSink<Uint8List> { | |
| 63 static const int START = 0; | |
| 64 static const int LEN_FIRST = 1; | |
| 65 static const int LEN_REST = 2; | |
| 66 static const int MASK = 3; | |
| 67 static const int PAYLOAD = 4; | |
| 68 static const int CLOSED = 5; | |
| 69 static const int FAILURE = 6; | |
| 70 static const int FIN = 0x80; | |
| 71 static const int RSV1 = 0x40; | |
| 72 static const int RSV2 = 0x20; | |
| 73 static const int RSV3 = 0x10; | |
| 74 static const int OPCODE = 0xF; | |
| 75 | |
| 76 int _state = START; | |
| 77 bool _fin = false; | |
| 78 bool _compressed = false; | |
| 79 int _opcode = -1; | |
| 80 int _len = -1; | |
| 81 bool _masked = false; | |
| 82 int _remainingLenBytes = -1; | |
| 83 int _remainingMaskingKeyBytes = 4; | |
| 84 int _remainingPayloadBytes = -1; | |
| 85 int _unmaskingIndex = 0; | |
| 86 int _currentMessageType = _WebSocketMessageType.NONE; | |
| 87 int closeCode = WebSocketStatus.NO_STATUS_RECEIVED; | |
| 88 String closeReason = ""; | |
| 89 | |
| 90 EventSink _eventSink; | |
| 91 | |
| 92 final bool _serverSide; | |
| 93 final List _maskingBytes = new List(4); | |
| 94 final BytesBuilder _payload = new BytesBuilder(copy: false); | |
| 95 | |
| 96 _WebSocketPerMessageDeflate _deflate; | |
| 97 _WebSocketProtocolTransformer([this._serverSide = false, this._deflate]); | |
| 98 | |
| 99 Stream bind(Stream stream) { | |
| 100 return new Stream.eventTransformed(stream, (EventSink eventSink) { | |
| 101 if (_eventSink != null) { | |
| 102 throw new StateError("WebSocket transformer already used."); | |
| 103 } | |
| 104 _eventSink = eventSink; | |
| 105 return this; | |
| 106 }); | |
| 107 } | |
| 108 | |
| 109 void addError(Object error, [StackTrace stackTrace]) { | |
| 110 _eventSink.addError(error, stackTrace); | |
| 111 } | |
| 112 | |
| 113 void close() { _eventSink.close(); } | |
| 114 | |
| 115 /** | |
| 116 * Process data received from the underlying communication channel. | |
| 117 */ | |
| 118 void add(List<int> bytes) { | |
| 119 var buffer = bytes is Uint8List ? bytes : new Uint8List.fromList(bytes); | |
| 120 int index = 0; | |
| 121 int lastIndex = buffer.length; | |
| 122 if (_state == CLOSED) { | |
| 123 throw new WebSocketException("Data on closed connection"); | |
| 124 } | |
| 125 if (_state == FAILURE) { | |
| 126 throw new WebSocketException("Data on failed connection"); | |
| 127 } | |
| 128 while ((index < lastIndex) && _state != CLOSED && _state != FAILURE) { | |
| 129 int byte = buffer[index]; | |
| 130 if (_state <= LEN_REST) { | |
| 131 if (_state == START) { | |
| 132 _fin = (byte & FIN) != 0; | |
| 133 | |
| 134 if((byte & (RSV2 | RSV3)) != 0) { | |
| 135 // The RSV2, RSV3 bits must both be zero. | |
| 136 throw new WebSocketException("Protocol error"); | |
| 137 } | |
| 138 | |
| 139 _opcode = (byte & OPCODE); | |
| 140 | |
| 141 if (_opcode != _WebSocketOpcode.CONTINUATION) { | |
| 142 if ((byte & RSV1) != 0) { | |
| 143 _compressed = true; | |
| 144 } else { | |
| 145 _compressed = false; | |
| 146 } | |
| 147 } | |
| 148 | |
| 149 if (_opcode <= _WebSocketOpcode.BINARY) { | |
| 150 if (_opcode == _WebSocketOpcode.CONTINUATION) { | |
| 151 if (_currentMessageType == _WebSocketMessageType.NONE) { | |
| 152 throw new WebSocketException("Protocol error"); | |
| 153 } | |
| 154 } else { | |
| 155 assert(_opcode == _WebSocketOpcode.TEXT || | |
| 156 _opcode == _WebSocketOpcode.BINARY); | |
| 157 if (_currentMessageType != _WebSocketMessageType.NONE) { | |
| 158 throw new WebSocketException("Protocol error"); | |
| 159 } | |
| 160 _currentMessageType = _opcode; | |
| 161 } | |
| 162 } else if (_opcode >= _WebSocketOpcode.CLOSE && | |
| 163 _opcode <= _WebSocketOpcode.PONG) { | |
| 164 // Control frames cannot be fragmented. | |
| 165 if (!_fin) throw new WebSocketException("Protocol error"); | |
| 166 } else { | |
| 167 throw new WebSocketException("Protocol error"); | |
| 168 } | |
| 169 _state = LEN_FIRST; | |
| 170 } else if (_state == LEN_FIRST) { | |
| 171 _masked = (byte & 0x80) != 0; | |
| 172 _len = byte & 0x7F; | |
| 173 if (_isControlFrame() && _len > 125) { | |
| 174 throw new WebSocketException("Protocol error"); | |
| 175 } | |
| 176 if (_len == 126) { | |
| 177 _len = 0; | |
| 178 _remainingLenBytes = 2; | |
| 179 _state = LEN_REST; | |
| 180 } else if (_len == 127) { | |
| 181 _len = 0; | |
| 182 _remainingLenBytes = 8; | |
| 183 _state = LEN_REST; | |
| 184 } else { | |
| 185 assert(_len < 126); | |
| 186 _lengthDone(); | |
| 187 } | |
| 188 } else { | |
| 189 assert(_state == LEN_REST); | |
| 190 _len = _len << 8 | byte; | |
| 191 _remainingLenBytes--; | |
| 192 if (_remainingLenBytes == 0) { | |
| 193 _lengthDone(); | |
| 194 } | |
| 195 } | |
| 196 } else { | |
| 197 if (_state == MASK) { | |
| 198 _maskingBytes[4 - _remainingMaskingKeyBytes--] = byte; | |
| 199 if (_remainingMaskingKeyBytes == 0) { | |
| 200 _maskDone(); | |
| 201 } | |
| 202 } else { | |
| 203 assert(_state == PAYLOAD); | |
| 204 // The payload is not handled one byte at a time but in blocks. | |
| 205 int payloadLength = min(lastIndex - index, _remainingPayloadBytes); | |
| 206 _remainingPayloadBytes -= payloadLength; | |
| 207 // Unmask payload if masked. | |
| 208 if (_masked) { | |
| 209 _unmask(index, payloadLength, buffer); | |
| 210 } | |
| 211 // Control frame and data frame share _payloads. | |
| 212 _payload.add(new Uint8List.view(buffer.buffer, index, payloadLength)); | |
| 213 index += payloadLength; | |
| 214 if (_isControlFrame()) { | |
| 215 if (_remainingPayloadBytes == 0) _controlFrameEnd(); | |
| 216 } else { | |
| 217 if (_currentMessageType != _WebSocketMessageType.TEXT && | |
| 218 _currentMessageType != _WebSocketMessageType.BINARY) { | |
| 219 throw new WebSocketException("Protocol error"); | |
| 220 } | |
| 221 if (_remainingPayloadBytes == 0) _messageFrameEnd(); | |
| 222 } | |
| 223 | |
| 224 // Hack - as we always do index++ below. | |
| 225 index--; | |
| 226 } | |
| 227 } | |
| 228 | |
| 229 // Move to the next byte. | |
| 230 index++; | |
| 231 } | |
| 232 } | |
| 233 | |
| 234 void _unmask(int index, int length, Uint8List buffer) { | |
| 235 const int BLOCK_SIZE = 16; | |
| 236 // Skip Int32x4-version if message is small. | |
| 237 if (length >= BLOCK_SIZE) { | |
| 238 // Start by aligning to 16 bytes. | |
| 239 final int startOffset = BLOCK_SIZE - (index & 15); | |
| 240 final int end = index + startOffset; | |
| 241 for (int i = index; i < end; i++) { | |
| 242 buffer[i] ^= _maskingBytes[_unmaskingIndex++ & 3]; | |
| 243 } | |
| 244 index += startOffset; | |
| 245 length -= startOffset; | |
| 246 final int blockCount = length ~/ BLOCK_SIZE; | |
| 247 if (blockCount > 0) { | |
| 248 // Create mask block. | |
| 249 int mask = 0; | |
| 250 for (int i = 3; i >= 0; i--) { | |
| 251 mask = (mask << 8) | _maskingBytes[(_unmaskingIndex + i) & 3]; | |
| 252 } | |
| 253 Int32x4 blockMask = new Int32x4(mask, mask, mask, mask); | |
| 254 Int32x4List blockBuffer = | |
| 255 new Int32x4List.view(buffer.buffer, index, blockCount); | |
| 256 for (int i = 0; i < blockBuffer.length; i++) { | |
| 257 blockBuffer[i] ^= blockMask; | |
| 258 } | |
| 259 final int bytes = blockCount * BLOCK_SIZE; | |
| 260 index += bytes; | |
| 261 length -= bytes; | |
| 262 } | |
| 263 } | |
| 264 // Handle end. | |
| 265 final int end = index + length; | |
| 266 for (int i = index; i < end; i++) { | |
| 267 buffer[i] ^= _maskingBytes[_unmaskingIndex++ & 3]; | |
| 268 } | |
| 269 } | |
| 270 | |
| 271 void _lengthDone() { | |
| 272 if (_masked) { | |
| 273 if (!_serverSide) { | |
| 274 throw new WebSocketException("Received masked frame from server"); | |
| 275 } | |
| 276 _state = MASK; | |
| 277 } else { | |
| 278 if (_serverSide) { | |
| 279 throw new WebSocketException("Received unmasked frame from client"); | |
| 280 } | |
| 281 _remainingPayloadBytes = _len; | |
| 282 _startPayload(); | |
| 283 } | |
| 284 } | |
| 285 | |
| 286 void _maskDone() { | |
| 287 _remainingPayloadBytes = _len; | |
| 288 _startPayload(); | |
| 289 } | |
| 290 | |
| 291 void _startPayload() { | |
| 292 // If there is no actual payload perform perform callbacks without | |
| 293 // going through the PAYLOAD state. | |
| 294 if (_remainingPayloadBytes == 0) { | |
| 295 if (_isControlFrame()) { | |
| 296 switch (_opcode) { | |
| 297 case _WebSocketOpcode.CLOSE: | |
| 298 _state = CLOSED; | |
| 299 _eventSink.close(); | |
| 300 break; | |
| 301 case _WebSocketOpcode.PING: | |
| 302 _eventSink.add(new _WebSocketPing()); | |
| 303 break; | |
| 304 case _WebSocketOpcode.PONG: | |
| 305 _eventSink.add(new _WebSocketPong()); | |
| 306 break; | |
| 307 } | |
| 308 _prepareForNextFrame(); | |
| 309 } else { | |
| 310 _messageFrameEnd(); | |
| 311 } | |
| 312 } else { | |
| 313 _state = PAYLOAD; | |
| 314 } | |
| 315 } | |
| 316 | |
| 317 void _messageFrameEnd() { | |
| 318 if (_fin) { | |
| 319 var bytes = _payload.takeBytes(); | |
| 320 if (_deflate != null && _compressed) { | |
| 321 bytes = _deflate.processIncomingMessage(bytes); | |
| 322 } | |
| 323 | |
| 324 switch (_currentMessageType) { | |
| 325 case _WebSocketMessageType.TEXT: | |
| 326 _eventSink.add(UTF8.decode(bytes)); | |
| 327 break; | |
| 328 case _WebSocketMessageType.BINARY: | |
| 329 _eventSink.add(bytes); | |
| 330 break; | |
| 331 } | |
| 332 _currentMessageType = _WebSocketMessageType.NONE; | |
| 333 } | |
| 334 _prepareForNextFrame(); | |
| 335 } | |
| 336 | |
| 337 void _controlFrameEnd() { | |
| 338 switch (_opcode) { | |
| 339 case _WebSocketOpcode.CLOSE: | |
| 340 closeCode = WebSocketStatus.NO_STATUS_RECEIVED; | |
| 341 var payload = _payload.takeBytes(); | |
| 342 if (payload.length > 0) { | |
| 343 if (payload.length == 1) { | |
| 344 throw new WebSocketException("Protocol error"); | |
| 345 } | |
| 346 closeCode = payload[0] << 8 | payload[1]; | |
| 347 if (closeCode == WebSocketStatus.NO_STATUS_RECEIVED) { | |
| 348 throw new WebSocketException("Protocol error"); | |
| 349 } | |
| 350 if (payload.length > 2) { | |
| 351 closeReason = UTF8.decode(payload.sublist(2)); | |
| 352 } | |
| 353 } | |
| 354 _state = CLOSED; | |
| 355 _eventSink.close(); | |
| 356 break; | |
| 357 | |
| 358 case _WebSocketOpcode.PING: | |
| 359 _eventSink.add(new _WebSocketPing(_payload.takeBytes())); | |
| 360 break; | |
| 361 | |
| 362 case _WebSocketOpcode.PONG: | |
| 363 _eventSink.add(new _WebSocketPong(_payload.takeBytes())); | |
| 364 break; | |
| 365 } | |
| 366 _prepareForNextFrame(); | |
| 367 } | |
| 368 | |
| 369 bool _isControlFrame() { | |
| 370 return _opcode == _WebSocketOpcode.CLOSE || | |
| 371 _opcode == _WebSocketOpcode.PING || | |
| 372 _opcode == _WebSocketOpcode.PONG; | |
| 373 } | |
| 374 | |
| 375 void _prepareForNextFrame() { | |
| 376 if (_state != CLOSED && _state != FAILURE) _state = START; | |
| 377 _fin = false; | |
| 378 _opcode = -1; | |
| 379 _len = -1; | |
| 380 _remainingLenBytes = -1; | |
| 381 _remainingMaskingKeyBytes = 4; | |
| 382 _remainingPayloadBytes = -1; | |
| 383 _unmaskingIndex = 0; | |
| 384 } | |
| 385 } | |
| 386 | |
| 387 class _WebSocketPing { | |
| 388 final List<int> payload; | |
| 389 _WebSocketPing([this.payload = null]); | |
| 390 } | |
| 391 | |
| 392 class _WebSocketPong { | |
| 393 final List<int> payload; | |
| 394 _WebSocketPong([this.payload = null]); | |
| 395 } | |
| 396 | |
| 397 class _WebSocketTransformerImpl implements WebSocketTransformer { | |
| 398 final StreamController<WebSocket> _controller = | |
| 399 new StreamController<WebSocket>(sync: true); | |
| 400 final Function _protocolSelector; | |
| 401 final CompressionOptions _compression; | |
| 402 | |
| 403 _WebSocketTransformerImpl(this._protocolSelector, this._compression); | |
| 404 | |
| 405 Stream<WebSocket> bind(Stream<HttpRequest> stream) { | |
| 406 stream.listen((request) { | |
| 407 _upgrade(request, _protocolSelector, _compression) | |
| 408 .then((WebSocket webSocket) => _controller.add(webSocket)) | |
| 409 .catchError(_controller.addError); | |
| 410 }, onDone: () { | |
| 411 _controller.close(); | |
| 412 }); | |
| 413 | |
| 414 return _controller.stream; | |
| 415 } | |
| 416 | |
| 417 static Future<WebSocket> _upgrade( | |
| 418 HttpRequest request, _protocolSelector, CompressionOptions compression) { | |
| 419 var response = request.response; | |
| 420 if (!_isUpgradeRequest(request)) { | |
| 421 // Send error response. | |
| 422 response | |
| 423 ..statusCode = HttpStatus.BAD_REQUEST | |
| 424 ..close(); | |
| 425 return new Future.error( | |
| 426 new WebSocketException("Invalid WebSocket upgrade request")); | |
| 427 } | |
| 428 | |
| 429 Future upgrade(String protocol) { | |
| 430 // Send the upgrade response. | |
| 431 response | |
| 432 ..statusCode = HttpStatus.SWITCHING_PROTOCOLS | |
| 433 ..headers.add(HttpHeaders.CONNECTION, "Upgrade") | |
| 434 ..headers.add(HttpHeaders.UPGRADE, "websocket"); | |
| 435 String key = request.headers.value("Sec-WebSocket-Key"); | |
| 436 _SHA1 sha1 = new _SHA1(); | |
| 437 sha1.add("$key$_webSocketGUID".codeUnits); | |
| 438 String accept = _CryptoUtils.bytesToBase64(sha1.close()); | |
| 439 response.headers.add("Sec-WebSocket-Accept", accept); | |
| 440 if (protocol != null) { | |
| 441 response.headers.add("Sec-WebSocket-Protocol", protocol); | |
| 442 } | |
| 443 | |
| 444 var deflate = _negotiateCompression(request, response, compression); | |
| 445 | |
| 446 response.headers.contentLength = 0; | |
| 447 return response.detachSocket().then((socket) => | |
| 448 new _WebSocketImpl._fromSocket( | |
| 449 socket, protocol, compression, true, deflate)); | |
| 450 } | |
| 451 | |
| 452 var protocols = request.headers['Sec-WebSocket-Protocol']; | |
| 453 if (protocols != null && _protocolSelector != null) { | |
| 454 // The suggested protocols can be spread over multiple lines, each | |
| 455 // consisting of multiple protocols. To unify all of them, first join | |
| 456 // the lists with ', ' and then tokenize. | |
| 457 protocols = _HttpParser._tokenizeFieldValue(protocols.join(', ')); | |
| 458 return new Future(() => _protocolSelector(protocols)).then((protocol) { | |
| 459 if (protocols.indexOf(protocol) < 0) { | |
| 460 throw new WebSocketException( | |
| 461 "Selected protocol is not in the list of available protocols"); | |
| 462 } | |
| 463 return protocol; | |
| 464 }).catchError((error) { | |
| 465 response | |
| 466 ..statusCode = HttpStatus.INTERNAL_SERVER_ERROR | |
| 467 ..close(); | |
| 468 throw error; | |
| 469 }).then(upgrade); | |
| 470 } else { | |
| 471 return upgrade(null); | |
| 472 } | |
| 473 } | |
| 474 | |
| 475 static _WebSocketPerMessageDeflate _negotiateCompression(HttpRequest request, | |
| 476 HttpResponse response, CompressionOptions compression) { | |
| 477 var extensionHeader = request.headers.value("Sec-WebSocket-Extensions"); | |
| 478 | |
| 479 extensionHeader ??= ""; | |
| 480 | |
| 481 var hv = HeaderValue.parse(extensionHeader, valueSeparator: ','); | |
| 482 if (compression.enabled && hv.value == _WebSocketImpl.PER_MESSAGE_DEFLATE) { | |
| 483 var info = compression._createHeader(hv); | |
| 484 | |
| 485 response.headers.add("Sec-WebSocket-Extensions", info.headerValue); | |
| 486 var serverNoContextTakeover = | |
| 487 (hv.parameters.containsKey(_serverNoContextTakeover) && | |
| 488 compression.serverNoContextTakeover); | |
| 489 var clientNoContextTakeover = | |
| 490 (hv.parameters.containsKey(_clientNoContextTakeover) && | |
| 491 compression.clientNoContextTakeover); | |
| 492 var deflate = new _WebSocketPerMessageDeflate( | |
| 493 serverNoContextTakeover: serverNoContextTakeover, | |
| 494 clientNoContextTakeover: clientNoContextTakeover, | |
| 495 serverMaxWindowBits: info.maxWindowBits, | |
| 496 clientMaxWindowBits: info.maxWindowBits, | |
| 497 serverSide: true); | |
| 498 | |
| 499 return deflate; | |
| 500 } | |
| 501 | |
| 502 return null; | |
| 503 } | |
| 504 | |
| 505 static bool _isUpgradeRequest(HttpRequest request) { | |
| 506 if (request.method != "GET") { | |
| 507 return false; | |
| 508 } | |
| 509 if (request.headers[HttpHeaders.CONNECTION] == null) { | |
| 510 return false; | |
| 511 } | |
| 512 bool isUpgrade = false; | |
| 513 request.headers[HttpHeaders.CONNECTION].forEach((String value) { | |
| 514 if (value.toLowerCase() == "upgrade") isUpgrade = true; | |
| 515 }); | |
| 516 if (!isUpgrade) return false; | |
| 517 String upgrade = request.headers.value(HttpHeaders.UPGRADE); | |
| 518 if (upgrade == null || upgrade.toLowerCase() != "websocket") { | |
| 519 return false; | |
| 520 } | |
| 521 String version = request.headers.value("Sec-WebSocket-Version"); | |
| 522 if (version == null || version != "13") { | |
| 523 return false; | |
| 524 } | |
| 525 String key = request.headers.value("Sec-WebSocket-Key"); | |
| 526 if (key == null) { | |
| 527 return false; | |
| 528 } | |
| 529 return true; | |
| 530 } | |
| 531 } | |
| 532 | |
| 533 class _WebSocketPerMessageDeflate { | |
| 534 bool serverNoContextTakeover; | |
| 535 bool clientNoContextTakeover; | |
| 536 int clientMaxWindowBits; | |
| 537 int serverMaxWindowBits; | |
| 538 bool serverSide; | |
| 539 | |
| 540 _Filter decoder; | |
| 541 _Filter encoder; | |
| 542 | |
| 543 _WebSocketPerMessageDeflate( | |
| 544 {this.clientMaxWindowBits: _WebSocketImpl.DEFAULT_WINDOW_BITS, | |
| 545 this.serverMaxWindowBits: _WebSocketImpl.DEFAULT_WINDOW_BITS, | |
| 546 this.serverNoContextTakeover: false, | |
| 547 this.clientNoContextTakeover: false, | |
| 548 this.serverSide: false}); | |
| 549 | |
| 550 void _ensureDecoder() { | |
| 551 if (decoder == null) { | |
| 552 decoder = _Filter._newZLibInflateFilter( | |
| 553 serverSide ? clientMaxWindowBits : serverMaxWindowBits, null, true); | |
| 554 } | |
| 555 } | |
| 556 | |
| 557 void _ensureEncoder() { | |
| 558 if (encoder == null) { | |
| 559 encoder = _Filter._newZLibDeflateFilter( | |
| 560 false, | |
| 561 ZLibOption.DEFAULT_LEVEL, | |
| 562 serverSide ? serverMaxWindowBits : clientMaxWindowBits, | |
| 563 ZLibOption.DEFAULT_MEM_LEVEL, | |
| 564 ZLibOption.STRATEGY_DEFAULT, | |
| 565 null, | |
| 566 true); | |
| 567 } | |
| 568 } | |
| 569 | |
| 570 Uint8List processIncomingMessage(List<int> msg) { | |
| 571 _ensureDecoder(); | |
| 572 | |
| 573 var data = []; | |
| 574 data.addAll(msg); | |
| 575 data.addAll(const [0x00, 0x00, 0xff, 0xff]); | |
| 576 | |
| 577 decoder.process(data, 0, data.length); | |
| 578 var result = []; | |
| 579 var out; | |
| 580 | |
| 581 while ((out = decoder.processed()) != null) { | |
| 582 result.addAll(out); | |
| 583 } | |
| 584 | |
| 585 if ((serverSide && clientNoContextTakeover) || | |
| 586 (!serverSide && serverNoContextTakeover)) { | |
| 587 decoder = null; | |
| 588 } | |
| 589 | |
| 590 return new Uint8List.fromList(result); | |
| 591 } | |
| 592 | |
| 593 List<int> processOutgoingMessage(List<int> msg) { | |
| 594 _ensureEncoder(); | |
| 595 var result = []; | |
| 596 Uint8List buffer; | |
| 597 var out; | |
| 598 | |
| 599 if (msg is! Uint8List) { | |
| 600 for (var i = 0; i < msg.length; i++) { | |
| 601 if (msg[i] < 0 || 255 < msg[i]) { | |
| 602 throw new ArgumentError("List element is not a byte value " | |
| 603 "(value ${msg[i]} at index $i)"); | |
| 604 } | |
| 605 } | |
| 606 buffer = new Uint8List.fromList(msg); | |
| 607 } else { | |
| 608 buffer = msg; | |
| 609 } | |
| 610 | |
| 611 encoder.process(buffer, 0, buffer.length); | |
| 612 | |
| 613 while ((out = encoder.processed()) != null) { | |
| 614 result.addAll(out); | |
| 615 } | |
| 616 | |
| 617 if ((!serverSide && clientNoContextTakeover) || | |
| 618 (serverSide && serverNoContextTakeover)) { | |
| 619 encoder = null; | |
| 620 } | |
| 621 | |
| 622 if (result.length > 4) { | |
| 623 result = result.sublist(0, result.length - 4); | |
| 624 } | |
| 625 | |
| 626 return result; | |
| 627 } | |
| 628 } | |
| 629 | |
| 630 // TODO(ajohnsen): Make this transformer reusable. | |
| 631 class _WebSocketOutgoingTransformer | |
| 632 implements StreamTransformer<dynamic, List<int>>, EventSink { | |
| 633 final _WebSocketImpl webSocket; | |
| 634 EventSink<List<int>> _eventSink; | |
| 635 | |
| 636 _WebSocketPerMessageDeflate _deflateHelper; | |
| 637 | |
| 638 _WebSocketOutgoingTransformer(this.webSocket) { | |
| 639 _deflateHelper = webSocket._deflate; | |
| 640 } | |
| 641 | |
| 642 Stream<List<int>> bind(Stream stream) { | |
| 643 return new Stream.eventTransformed(stream, (eventSink) { | |
| 644 if (_eventSink != null) { | |
| 645 throw new StateError("WebSocket transformer already used"); | |
| 646 } | |
| 647 _eventSink = eventSink; | |
| 648 return this; | |
| 649 }); | |
| 650 } | |
| 651 | |
| 652 void add(message) { | |
| 653 if (message is _WebSocketPong) { | |
| 654 addFrame(_WebSocketOpcode.PONG, message.payload); | |
| 655 return; | |
| 656 } | |
| 657 if (message is _WebSocketPing) { | |
| 658 addFrame(_WebSocketOpcode.PING, message.payload); | |
| 659 return; | |
| 660 } | |
| 661 List<int> data; | |
| 662 int opcode; | |
| 663 if (message != null) { | |
| 664 if (message is String) { | |
| 665 opcode = _WebSocketOpcode.TEXT; | |
| 666 data = UTF8.encode(message); | |
| 667 } else { | |
| 668 if (message is List<int>) { | |
| 669 data = message; | |
| 670 opcode = _WebSocketOpcode.BINARY; | |
| 671 } else { | |
| 672 throw new ArgumentError(message); | |
| 673 } | |
| 674 } | |
| 675 | |
| 676 if (_deflateHelper != null) { | |
| 677 data = _deflateHelper.processOutgoingMessage(data); | |
| 678 } | |
| 679 } else { | |
| 680 opcode = _WebSocketOpcode.TEXT; | |
| 681 } | |
| 682 addFrame(opcode, data); | |
| 683 } | |
| 684 | |
| 685 void addError(Object error, [StackTrace stackTrace]) { | |
| 686 _eventSink.addError(error, stackTrace); | |
| 687 } | |
| 688 | |
| 689 void close() { | |
| 690 int code = webSocket._outCloseCode; | |
| 691 String reason = webSocket._outCloseReason; | |
| 692 List<int> data; | |
| 693 if (code != null) { | |
| 694 data = new List<int>(); | |
| 695 data.add((code >> 8) & 0xFF); | |
| 696 data.add(code & 0xFF); | |
| 697 if (reason != null) { | |
| 698 data.addAll(UTF8.encode(reason)); | |
| 699 } | |
| 700 } | |
| 701 addFrame(_WebSocketOpcode.CLOSE, data); | |
| 702 _eventSink.close(); | |
| 703 } | |
| 704 | |
| 705 void addFrame(int opcode, List<int> data) => createFrame( | |
| 706 opcode, | |
| 707 data, | |
| 708 webSocket._serverSide, | |
| 709 _deflateHelper != null && | |
| 710 (opcode == _WebSocketOpcode.TEXT || | |
| 711 opcode == _WebSocketOpcode.BINARY)).forEach((e) { | |
| 712 _eventSink.add(e); | |
| 713 }); | |
| 714 | |
| 715 static Iterable<List<int>> createFrame( | |
| 716 int opcode, List<int> data, bool serverSide, bool compressed) { | |
| 717 bool mask = !serverSide; // Masking not implemented for server. | |
| 718 int dataLength = data == null ? 0 : data.length; | |
| 719 // Determine the header size. | |
| 720 int headerSize = (mask) ? 6 : 2; | |
| 721 if (dataLength > 65535) { | |
| 722 headerSize += 8; | |
| 723 } else if (dataLength > 125) { | |
| 724 headerSize += 2; | |
| 725 } | |
| 726 Uint8List header = new Uint8List(headerSize); | |
| 727 int index = 0; | |
| 728 | |
| 729 // Set FIN and opcode. | |
| 730 var hoc = _WebSocketProtocolTransformer.FIN | |
| 731 | (compressed ? _WebSocketProtocolTransformer.RSV1 : 0) | |
| 732 | (opcode & _WebSocketProtocolTransformer.OPCODE); | |
| 733 | |
| 734 header[index++] = hoc; | |
| 735 // Determine size and position of length field. | |
| 736 int lengthBytes = 1; | |
| 737 if (dataLength > 65535) { | |
| 738 header[index++] = 127; | |
| 739 lengthBytes = 8; | |
| 740 } else if (dataLength > 125) { | |
| 741 header[index++] = 126; | |
| 742 lengthBytes = 2; | |
| 743 } | |
| 744 // Write the length in network byte order into the header. | |
| 745 for (int i = 0; i < lengthBytes; i++) { | |
| 746 header[index++] = dataLength >> (((lengthBytes - 1) - i) * 8) & 0xFF; | |
| 747 } | |
| 748 if (mask) { | |
| 749 header[1] |= 1 << 7; | |
| 750 var maskBytes = _IOCrypto.getRandomBytes(4); | |
| 751 header.setRange(index, index + 4, maskBytes); | |
| 752 index += 4; | |
| 753 if (data != null) { | |
| 754 Uint8List list; | |
| 755 // If this is a text message just do the masking inside the | |
| 756 // encoded data. | |
| 757 if (opcode == _WebSocketOpcode.TEXT && data is Uint8List) { | |
| 758 list = data; | |
| 759 } else { | |
| 760 if (data is Uint8List) { | |
| 761 list = new Uint8List.fromList(data); | |
| 762 } else { | |
| 763 list = new Uint8List(data.length); | |
| 764 for (int i = 0; i < data.length; i++) { | |
| 765 if (data[i] < 0 || 255 < data[i]) { | |
| 766 throw new ArgumentError("List element is not a byte value " | |
| 767 "(value ${data[i]} at index $i)"); | |
| 768 } | |
| 769 list[i] = data[i]; | |
| 770 } | |
| 771 } | |
| 772 } | |
| 773 const int BLOCK_SIZE = 16; | |
| 774 int blockCount = list.length ~/ BLOCK_SIZE; | |
| 775 if (blockCount > 0) { | |
| 776 // Create mask block. | |
| 777 int mask = 0; | |
| 778 for (int i = 3; i >= 0; i--) { | |
| 779 mask = (mask << 8) | maskBytes[i]; | |
| 780 } | |
| 781 Int32x4 blockMask = new Int32x4(mask, mask, mask, mask); | |
| 782 Int32x4List blockBuffer = | |
| 783 new Int32x4List.view(list.buffer, 0, blockCount); | |
| 784 for (int i = 0; i < blockBuffer.length; i++) { | |
| 785 blockBuffer[i] ^= blockMask; | |
| 786 } | |
| 787 } | |
| 788 // Handle end. | |
| 789 for (int i = blockCount * BLOCK_SIZE; i < list.length; i++) { | |
| 790 list[i] ^= maskBytes[i & 3]; | |
| 791 } | |
| 792 data = list; | |
| 793 } | |
| 794 } | |
| 795 assert(index == headerSize); | |
| 796 if (data == null) { | |
| 797 return [header]; | |
| 798 } else { | |
| 799 return [header, data]; | |
| 800 } | |
| 801 } | |
| 802 } | |
| 803 | |
| 804 class _WebSocketConsumer implements StreamConsumer { | |
| 805 final _WebSocketImpl webSocket; | |
| 806 final Socket socket; | |
| 807 StreamController _controller; | |
| 808 StreamSubscription _subscription; | |
| 809 bool _issuedPause = false; | |
| 810 bool _closed = false; | |
| 811 Completer _closeCompleter = new Completer(); | |
| 812 Completer _completer; | |
| 813 | |
| 814 _WebSocketConsumer(this.webSocket, this.socket); | |
| 815 | |
| 816 void _onListen() { | |
| 817 if (_subscription != null) { | |
| 818 _subscription.cancel(); | |
| 819 } | |
| 820 } | |
| 821 | |
| 822 void _onPause() { | |
| 823 if (_subscription != null) { | |
| 824 _subscription.pause(); | |
| 825 } else { | |
| 826 _issuedPause = true; | |
| 827 } | |
| 828 } | |
| 829 | |
| 830 void _onResume() { | |
| 831 if (_subscription != null) { | |
| 832 _subscription.resume(); | |
| 833 } else { | |
| 834 _issuedPause = false; | |
| 835 } | |
| 836 } | |
| 837 | |
| 838 void _cancel() { | |
| 839 if (_subscription != null) { | |
| 840 var subscription = _subscription; | |
| 841 _subscription = null; | |
| 842 subscription.cancel(); | |
| 843 } | |
| 844 } | |
| 845 | |
| 846 _ensureController() { | |
| 847 if (_controller != null) return; | |
| 848 _controller = new StreamController( | |
| 849 sync: true, | |
| 850 onPause: _onPause, | |
| 851 onResume: _onResume, | |
| 852 onCancel: _onListen); | |
| 853 var stream = _controller.stream | |
| 854 .transform(new _WebSocketOutgoingTransformer(webSocket)); | |
| 855 socket.addStream(stream).then((_) { | |
| 856 _done(); | |
| 857 _closeCompleter.complete(webSocket); | |
| 858 }, onError: (error, StackTrace stackTrace) { | |
| 859 _closed = true; | |
| 860 _cancel(); | |
| 861 if (error is ArgumentError) { | |
| 862 if (!_done(error, stackTrace)) { | |
| 863 _closeCompleter.completeError(error, stackTrace); | |
| 864 } | |
| 865 } else { | |
| 866 _done(); | |
| 867 _closeCompleter.complete(webSocket); | |
| 868 } | |
| 869 }); | |
| 870 } | |
| 871 | |
| 872 bool _done([error, StackTrace stackTrace]) { | |
| 873 if (_completer == null) return false; | |
| 874 if (error != null) { | |
| 875 _completer.completeError(error, stackTrace); | |
| 876 } else { | |
| 877 _completer.complete(webSocket); | |
| 878 } | |
| 879 _completer = null; | |
| 880 return true; | |
| 881 } | |
| 882 | |
| 883 Future addStream(var stream) { | |
| 884 if (_closed) { | |
| 885 stream.listen(null).cancel(); | |
| 886 return new Future.value(webSocket); | |
| 887 } | |
| 888 _ensureController(); | |
| 889 _completer = new Completer(); | |
| 890 _subscription = stream.listen((data) { | |
| 891 _controller.add(data); | |
| 892 }, onDone: _done, onError: _done, cancelOnError: true); | |
| 893 if (_issuedPause) { | |
| 894 _subscription.pause(); | |
| 895 _issuedPause = false; | |
| 896 } | |
| 897 return _completer.future; | |
| 898 } | |
| 899 | |
| 900 Future close() { | |
| 901 _ensureController(); | |
| 902 Future closeSocket() { | |
| 903 return socket.close().catchError((_) {}).then((_) => webSocket); | |
| 904 } | |
| 905 _controller.close(); | |
| 906 return _closeCompleter.future.then((_) => closeSocket()); | |
| 907 } | |
| 908 | |
| 909 void add(data) { | |
| 910 if (_closed) return; | |
| 911 _ensureController(); | |
| 912 _controller.add(data); | |
| 913 } | |
| 914 | |
| 915 void closeSocket() { | |
| 916 _closed = true; | |
| 917 _cancel(); | |
| 918 close(); | |
| 919 } | |
| 920 } | |
| 921 | |
| 922 class _WebSocketImpl extends Stream with _ServiceObject implements WebSocket { | |
| 923 // Use default Map so we keep order. | |
| 924 static Map<int, _WebSocketImpl> _webSockets = new Map<int, _WebSocketImpl>(); | |
| 925 static const int DEFAULT_WINDOW_BITS = 15; | |
| 926 static const String PER_MESSAGE_DEFLATE = "permessage-deflate"; | |
| 927 | |
| 928 final String protocol; | |
| 929 | |
| 930 StreamController _controller; | |
| 931 StreamSubscription _subscription; | |
| 932 StreamSink _sink; | |
| 933 | |
| 934 final _socket; | |
| 935 final bool _serverSide; | |
| 936 int _readyState = WebSocket.CONNECTING; | |
| 937 bool _writeClosed = false; | |
| 938 int _closeCode; | |
| 939 String _closeReason; | |
| 940 Duration _pingInterval; | |
| 941 Timer _pingTimer; | |
| 942 _WebSocketConsumer _consumer; | |
| 943 | |
| 944 int _outCloseCode; | |
| 945 String _outCloseReason; | |
| 946 Timer _closeTimer; | |
| 947 _WebSocketPerMessageDeflate _deflate; | |
| 948 | |
| 949 static final HttpClient _httpClient = new HttpClient(); | |
| 950 | |
| 951 static Future<WebSocket> connect( | |
| 952 String url, Iterable<String> protocols, Map<String, dynamic> headers, | |
| 953 {CompressionOptions compression: CompressionOptions.DEFAULT}) { | |
| 954 Uri uri = Uri.parse(url); | |
| 955 if (uri.scheme != "ws" && uri.scheme != "wss") { | |
| 956 throw new WebSocketException("Unsupported URL scheme '${uri.scheme}'"); | |
| 957 } | |
| 958 | |
| 959 Random random = new Random(); | |
| 960 // Generate 16 random bytes. | |
| 961 Uint8List nonceData = new Uint8List(16); | |
| 962 for (int i = 0; i < 16; i++) { | |
| 963 nonceData[i] = random.nextInt(256); | |
| 964 } | |
| 965 String nonce = _CryptoUtils.bytesToBase64(nonceData); | |
| 966 | |
| 967 uri = new Uri( | |
| 968 scheme: uri.scheme == "wss" ? "https" : "http", | |
| 969 userInfo: uri.userInfo, | |
| 970 host: uri.host, | |
| 971 port: uri.port, | |
| 972 path: uri.path, | |
| 973 query: uri.query, | |
| 974 fragment: uri.fragment); | |
| 975 return _httpClient.openUrl("GET", uri).then((request) { | |
| 976 if (uri.userInfo != null && !uri.userInfo.isEmpty) { | |
| 977 // If the URL contains user information use that for basic | |
| 978 // authorization. | |
| 979 String auth = _CryptoUtils.bytesToBase64(UTF8.encode(uri.userInfo)); | |
| 980 request.headers.set(HttpHeaders.AUTHORIZATION, "Basic $auth"); | |
| 981 } | |
| 982 if (headers != null) { | |
| 983 headers.forEach((field, value) => request.headers.add(field, value)); | |
| 984 } | |
| 985 // Setup the initial handshake. | |
| 986 request.headers | |
| 987 ..set(HttpHeaders.CONNECTION, "Upgrade") | |
| 988 ..set(HttpHeaders.UPGRADE, "websocket") | |
| 989 ..set("Sec-WebSocket-Key", nonce) | |
| 990 ..set("Cache-Control", "no-cache") | |
| 991 ..set("Sec-WebSocket-Version", "13"); | |
| 992 if (protocols != null) { | |
| 993 request.headers.add("Sec-WebSocket-Protocol", protocols.toList()); | |
| 994 } | |
| 995 | |
| 996 if (compression.enabled) { | |
| 997 request.headers | |
| 998 .add("Sec-WebSocket-Extensions", compression._createHeader()); | |
| 999 } | |
| 1000 | |
| 1001 return request.close(); | |
| 1002 }).then((response) { | |
| 1003 | |
| 1004 void error(String message) { | |
| 1005 // Flush data. | |
| 1006 response.detachSocket().then((socket) { | |
| 1007 socket.destroy(); | |
| 1008 }); | |
| 1009 throw new WebSocketException(message); | |
| 1010 } | |
| 1011 | |
| 1012 if (response.statusCode != HttpStatus.SWITCHING_PROTOCOLS || | |
| 1013 response.headers[HttpHeaders.CONNECTION] == null || | |
| 1014 !response.headers[HttpHeaders.CONNECTION] | |
| 1015 .any((value) => value.toLowerCase() == "upgrade") || | |
| 1016 response.headers.value(HttpHeaders.UPGRADE).toLowerCase() != | |
| 1017 "websocket") { | |
| 1018 error("Connection to '$uri' was not upgraded to websocket"); | |
| 1019 } | |
| 1020 String accept = response.headers.value("Sec-WebSocket-Accept"); | |
| 1021 if (accept == null) { | |
| 1022 error("Response did not contain a 'Sec-WebSocket-Accept' header"); | |
| 1023 } | |
| 1024 _SHA1 sha1 = new _SHA1(); | |
| 1025 sha1.add("$nonce$_webSocketGUID".codeUnits); | |
| 1026 List<int> expectedAccept = sha1.close(); | |
| 1027 List<int> receivedAccept = _CryptoUtils.base64StringToBytes(accept); | |
| 1028 if (expectedAccept.length != receivedAccept.length) { | |
| 1029 error("Reasponse header 'Sec-WebSocket-Accept' is the wrong length"); | |
| 1030 } | |
| 1031 for (int i = 0; i < expectedAccept.length; i++) { | |
| 1032 if (expectedAccept[i] != receivedAccept[i]) { | |
| 1033 error("Bad response 'Sec-WebSocket-Accept' header"); | |
| 1034 } | |
| 1035 } | |
| 1036 var protocol = response.headers.value('Sec-WebSocket-Protocol'); | |
| 1037 | |
| 1038 _WebSocketPerMessageDeflate deflate = | |
| 1039 negotiateClientCompression(response, compression); | |
| 1040 | |
| 1041 return response.detachSocket().then/*<WebSocket>*/((socket) => | |
| 1042 new _WebSocketImpl._fromSocket( | |
| 1043 socket, protocol, compression, false, deflate)); | |
| 1044 }); | |
| 1045 } | |
| 1046 | |
| 1047 static _WebSocketPerMessageDeflate negotiateClientCompression( | |
| 1048 HttpClientResponse response, CompressionOptions compression) { | |
| 1049 String extensionHeader = response.headers.value('Sec-WebSocket-Extensions'); | |
| 1050 | |
| 1051 if (extensionHeader == null) { | |
| 1052 extensionHeader = ""; | |
| 1053 } | |
| 1054 | |
| 1055 var hv = HeaderValue.parse(extensionHeader, valueSeparator: ','); | |
| 1056 | |
| 1057 if (compression.enabled && hv.value == PER_MESSAGE_DEFLATE) { | |
| 1058 var serverNoContextTakeover = | |
| 1059 hv.parameters.containsKey(_serverNoContextTakeover); | |
| 1060 var clientNoContextTakeover = | |
| 1061 hv.parameters.containsKey(_clientNoContextTakeover); | |
| 1062 | |
| 1063 int getWindowBits(String type) { | |
| 1064 var o = hv.parameters[type]; | |
| 1065 if (o == null) { | |
| 1066 return DEFAULT_WINDOW_BITS; | |
| 1067 } | |
| 1068 | |
| 1069 return int.parse(o, onError: (s) => DEFAULT_WINDOW_BITS); | |
| 1070 } | |
| 1071 | |
| 1072 return new _WebSocketPerMessageDeflate( | |
| 1073 clientMaxWindowBits: getWindowBits(_clientMaxWindowBits), | |
| 1074 serverMaxWindowBits: getWindowBits(_serverMaxWindowBits), | |
| 1075 clientNoContextTakeover: clientNoContextTakeover, | |
| 1076 serverNoContextTakeover: serverNoContextTakeover); | |
| 1077 } | |
| 1078 | |
| 1079 return null; | |
| 1080 } | |
| 1081 | |
| 1082 _WebSocketImpl._fromSocket( | |
| 1083 this._socket, this.protocol, CompressionOptions compression, | |
| 1084 [this._serverSide = false, _WebSocketPerMessageDeflate deflate]) { | |
| 1085 _consumer = new _WebSocketConsumer(this, _socket); | |
| 1086 _sink = new _StreamSinkImpl(_consumer); | |
| 1087 _readyState = WebSocket.OPEN; | |
| 1088 _deflate = deflate; | |
| 1089 | |
| 1090 var transformer = new _WebSocketProtocolTransformer(_serverSide, _deflate); | |
| 1091 _subscription = _socket.transform(transformer).listen((data) { | |
| 1092 if (data is _WebSocketPing) { | |
| 1093 if (!_writeClosed) _consumer.add(new _WebSocketPong(data.payload)); | |
| 1094 } else if (data is _WebSocketPong) { | |
| 1095 // Simply set pingInterval, as it'll cancel any timers. | |
| 1096 pingInterval = _pingInterval; | |
| 1097 } else { | |
| 1098 _controller.add(data); | |
| 1099 } | |
| 1100 }, onError: (error, stackTrace) { | |
| 1101 if (_closeTimer != null) _closeTimer.cancel(); | |
| 1102 if (error is FormatException) { | |
| 1103 _close(WebSocketStatus.INVALID_FRAME_PAYLOAD_DATA); | |
| 1104 } else { | |
| 1105 _close(WebSocketStatus.PROTOCOL_ERROR); | |
| 1106 } | |
| 1107 // An error happened, set the close code set above. | |
| 1108 _closeCode = _outCloseCode; | |
| 1109 _closeReason = _outCloseReason; | |
| 1110 _controller.close(); | |
| 1111 }, onDone: () { | |
| 1112 if (_closeTimer != null) _closeTimer.cancel(); | |
| 1113 if (_readyState == WebSocket.OPEN) { | |
| 1114 _readyState = WebSocket.CLOSING; | |
| 1115 if (!_isReservedStatusCode(transformer.closeCode)) { | |
| 1116 _close(transformer.closeCode, transformer.closeReason); | |
| 1117 } else { | |
| 1118 _close(); | |
| 1119 } | |
| 1120 _readyState = WebSocket.CLOSED; | |
| 1121 } | |
| 1122 // Protocol close, use close code from transformer. | |
| 1123 _closeCode = transformer.closeCode; | |
| 1124 _closeReason = transformer.closeReason; | |
| 1125 _controller.close(); | |
| 1126 }, cancelOnError: true); | |
| 1127 _subscription.pause(); | |
| 1128 _controller = new StreamController( | |
| 1129 sync: true, onListen: _subscription.resume, onCancel: () { | |
| 1130 _subscription.cancel(); | |
| 1131 _subscription = null; | |
| 1132 }, onPause: _subscription.pause, onResume: _subscription.resume); | |
| 1133 | |
| 1134 _webSockets[_serviceId] = this; | |
| 1135 try { | |
| 1136 _socket._owner = this; | |
| 1137 } catch (_) {} | |
| 1138 } | |
| 1139 | |
| 1140 StreamSubscription listen(void onData(message), | |
| 1141 {Function onError, void onDone(), bool cancelOnError}) { | |
| 1142 return _controller.stream.listen(onData, | |
| 1143 onError: onError, onDone: onDone, cancelOnError: cancelOnError); | |
| 1144 } | |
| 1145 | |
| 1146 Duration get pingInterval => _pingInterval; | |
| 1147 | |
| 1148 void set pingInterval(Duration interval) { | |
| 1149 if (_writeClosed) return; | |
| 1150 if (_pingTimer != null) _pingTimer.cancel(); | |
| 1151 _pingInterval = interval; | |
| 1152 | |
| 1153 if (_pingInterval == null) return; | |
| 1154 | |
| 1155 _pingTimer = new Timer(_pingInterval, () { | |
| 1156 if (_writeClosed) return; | |
| 1157 _consumer.add(new _WebSocketPing()); | |
| 1158 _pingTimer = new Timer(_pingInterval, () { | |
| 1159 // No pong received. | |
| 1160 _close(WebSocketStatus.GOING_AWAY); | |
| 1161 }); | |
| 1162 }); | |
| 1163 } | |
| 1164 | |
| 1165 int get readyState => _readyState; | |
| 1166 | |
| 1167 String get extensions => null; | |
| 1168 int get closeCode => _closeCode; | |
| 1169 String get closeReason => _closeReason; | |
| 1170 | |
| 1171 void add(data) { _sink.add(data); } | |
| 1172 void addError(error, [StackTrace stackTrace]) { | |
| 1173 _sink.addError(error, stackTrace); | |
| 1174 } | |
| 1175 Future addStream(Stream stream) => _sink.addStream(stream); | |
| 1176 Future get done => _sink.done; | |
| 1177 | |
| 1178 Future close([int code, String reason]) { | |
| 1179 if (_isReservedStatusCode(code)) { | |
| 1180 throw new WebSocketException("Reserved status code $code"); | |
| 1181 } | |
| 1182 if (_outCloseCode == null) { | |
| 1183 _outCloseCode = code; | |
| 1184 _outCloseReason = reason; | |
| 1185 } | |
| 1186 if (!_controller.isClosed) { | |
| 1187 // If a close has not yet been received from the other end then | |
| 1188 // 1) make sure to listen on the stream so the close frame will be | |
| 1189 // processed if received. | |
| 1190 // 2) set a timer terminate the connection if a close frame is | |
| 1191 // not received. | |
| 1192 if (!_controller.hasListener && _subscription != null) { | |
| 1193 _controller.stream.drain().catchError((_) => {}); | |
| 1194 } | |
| 1195 if (_closeTimer == null) { | |
| 1196 // When closing the web-socket, we no longer accept data. | |
| 1197 _closeTimer = new Timer(const Duration(seconds: 5), () { | |
| 1198 // Reuse code and reason from the local close. | |
| 1199 _closeCode = _outCloseCode; | |
| 1200 _closeReason = _outCloseReason; | |
| 1201 if (_subscription != null) _subscription.cancel(); | |
| 1202 _controller.close(); | |
| 1203 _webSockets.remove(_serviceId); | |
| 1204 }); | |
| 1205 } | |
| 1206 } | |
| 1207 return _sink.close(); | |
| 1208 } | |
| 1209 | |
| 1210 void _close([int code, String reason]) { | |
| 1211 if (_writeClosed) return; | |
| 1212 if (_outCloseCode == null) { | |
| 1213 _outCloseCode = code; | |
| 1214 _outCloseReason = reason; | |
| 1215 } | |
| 1216 _writeClosed = true; | |
| 1217 _consumer.closeSocket(); | |
| 1218 _webSockets.remove(_serviceId); | |
| 1219 } | |
| 1220 | |
| 1221 String get _serviceTypePath => 'io/websockets'; | |
| 1222 String get _serviceTypeName => 'WebSocket'; | |
| 1223 | |
| 1224 Map<String, dynamic> _toJSON(bool ref) { | |
| 1225 var name = '${_socket.address.host}:${_socket.port}'; | |
| 1226 var r = <String, dynamic>{ | |
| 1227 'id': _servicePath, | |
| 1228 'type': _serviceType(ref), | |
| 1229 'name': name, | |
| 1230 'user_name': name, | |
| 1231 }; | |
| 1232 if (ref) { | |
| 1233 return r; | |
| 1234 } | |
| 1235 try { | |
| 1236 r['socket'] = _socket._toJSON(true); | |
| 1237 } catch (_) { | |
| 1238 r['socket'] = { | |
| 1239 'id': _servicePath, | |
| 1240 'type': '@Socket', | |
| 1241 'name': 'UserSocket', | |
| 1242 'user_name': 'UserSocket', | |
| 1243 }; | |
| 1244 } | |
| 1245 return r; | |
| 1246 } | |
| 1247 | |
| 1248 static bool _isReservedStatusCode(int code) { | |
| 1249 return code != null && | |
| 1250 (code < WebSocketStatus.NORMAL_CLOSURE || | |
| 1251 code == WebSocketStatus.RESERVED_1004 || | |
| 1252 code == WebSocketStatus.NO_STATUS_RECEIVED || | |
| 1253 code == WebSocketStatus.ABNORMAL_CLOSURE || | |
| 1254 (code > WebSocketStatus.INTERNAL_SERVER_ERROR && | |
| 1255 code < WebSocketStatus.RESERVED_1015) || | |
| 1256 (code >= WebSocketStatus.RESERVED_1015 && code < 3000)); | |
| 1257 } | |
| 1258 } | |
| OLD | NEW |