OLD | NEW |
(Empty) | |
| 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. |
| 4 |
| 5 part of dart.io; |
| 6 |
| 7 const String _webSocketGUID = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"; |
| 8 const String _clientNoContextTakeover = "client_no_context_takeover"; |
| 9 const String _serverNoContextTakeover = "server_no_context_takeover"; |
| 10 const String _clientMaxWindowBits = "client_max_window_bits"; |
| 11 const String _serverMaxWindowBits = "server_max_window_bits"; |
| 12 |
| 13 // Matches _WebSocketOpcode. |
| 14 class _WebSocketMessageType { |
| 15 static const int NONE = 0; |
| 16 static const int TEXT = 1; |
| 17 static const int BINARY = 2; |
| 18 } |
| 19 |
| 20 class _WebSocketOpcode { |
| 21 static const int CONTINUATION = 0; |
| 22 static const int TEXT = 1; |
| 23 static const int BINARY = 2; |
| 24 static const int RESERVED_3 = 3; |
| 25 static const int RESERVED_4 = 4; |
| 26 static const int RESERVED_5 = 5; |
| 27 static const int RESERVED_6 = 6; |
| 28 static const int RESERVED_7 = 7; |
| 29 static const int CLOSE = 8; |
| 30 static const int PING = 9; |
| 31 static const int PONG = 10; |
| 32 static const int RESERVED_B = 11; |
| 33 static const int RESERVED_C = 12; |
| 34 static const int RESERVED_D = 13; |
| 35 static const int RESERVED_E = 14; |
| 36 static const int RESERVED_F = 15; |
| 37 } |
| 38 |
| 39 /** |
| 40 * Stores the header and integer value derived from negotiation of |
| 41 * client_max_window_bits and server_max_window_bits. headerValue will be |
| 42 * set in the Websocket response headers. |
| 43 */ |
| 44 class _CompressionMaxWindowBits { |
| 45 String headerValue; |
| 46 int maxWindowBits; |
| 47 _CompressionMaxWindowBits([this.headerValue, this.maxWindowBits]); |
| 48 String toString() => headerValue; |
| 49 } |
| 50 |
| 51 /** |
| 52 * The web socket protocol transformer handles the protocol byte stream |
| 53 * which is supplied through the [:handleData:]. As the protocol is processed, |
| 54 * it'll output frame data as either a List<int> or String. |
| 55 * |
| 56 * Important information about usage: Be sure you use cancelOnError, so the |
| 57 * socket will be closed when the processor encounter an error. Not using it |
| 58 * will lead to undefined behaviour. |
| 59 */ |
| 60 // TODO(ajohnsen): make this transformer reusable? |
| 61 class _WebSocketProtocolTransformer |
| 62 implements StreamTransformer<List<int>, dynamic>, EventSink<Uint8List> { |
| 63 static const int START = 0; |
| 64 static const int LEN_FIRST = 1; |
| 65 static const int LEN_REST = 2; |
| 66 static const int MASK = 3; |
| 67 static const int PAYLOAD = 4; |
| 68 static const int CLOSED = 5; |
| 69 static const int FAILURE = 6; |
| 70 static const int FIN = 0x80; |
| 71 static const int RSV1 = 0x40; |
| 72 static const int RSV2 = 0x20; |
| 73 static const int RSV3 = 0x10; |
| 74 static const int OPCODE = 0xF; |
| 75 |
| 76 int _state = START; |
| 77 bool _fin = false; |
| 78 bool _compressed = false; |
| 79 int _opcode = -1; |
| 80 int _len = -1; |
| 81 bool _masked = false; |
| 82 int _remainingLenBytes = -1; |
| 83 int _remainingMaskingKeyBytes = 4; |
| 84 int _remainingPayloadBytes = -1; |
| 85 int _unmaskingIndex = 0; |
| 86 int _currentMessageType = _WebSocketMessageType.NONE; |
| 87 int closeCode = WebSocketStatus.NO_STATUS_RECEIVED; |
| 88 String closeReason = ""; |
| 89 |
| 90 EventSink _eventSink; |
| 91 |
| 92 final bool _serverSide; |
| 93 final List _maskingBytes = new List(4); |
| 94 final BytesBuilder _payload = new BytesBuilder(copy: false); |
| 95 |
| 96 _WebSocketPerMessageDeflate _deflate; |
| 97 _WebSocketProtocolTransformer([this._serverSide = false, this._deflate]); |
| 98 |
| 99 Stream bind(Stream stream) { |
| 100 return new Stream.eventTransformed(stream, (EventSink eventSink) { |
| 101 if (_eventSink != null) { |
| 102 throw new StateError("WebSocket transformer already used."); |
| 103 } |
| 104 _eventSink = eventSink; |
| 105 return this; |
| 106 }); |
| 107 } |
| 108 |
| 109 void addError(Object error, [StackTrace stackTrace]) { |
| 110 _eventSink.addError(error, stackTrace); |
| 111 } |
| 112 |
| 113 void close() { _eventSink.close(); } |
| 114 |
| 115 /** |
| 116 * Process data received from the underlying communication channel. |
| 117 */ |
| 118 void add(List<int> bytes) { |
| 119 var buffer = bytes is Uint8List ? bytes : new Uint8List.fromList(bytes); |
| 120 int index = 0; |
| 121 int lastIndex = buffer.length; |
| 122 if (_state == CLOSED) { |
| 123 throw new WebSocketException("Data on closed connection"); |
| 124 } |
| 125 if (_state == FAILURE) { |
| 126 throw new WebSocketException("Data on failed connection"); |
| 127 } |
| 128 while ((index < lastIndex) && _state != CLOSED && _state != FAILURE) { |
| 129 int byte = buffer[index]; |
| 130 if (_state <= LEN_REST) { |
| 131 if (_state == START) { |
| 132 _fin = (byte & FIN) != 0; |
| 133 |
| 134 if((byte & (RSV2 | RSV3)) != 0) { |
| 135 // The RSV2, RSV3 bits must both be zero. |
| 136 throw new WebSocketException("Protocol error"); |
| 137 } |
| 138 |
| 139 _opcode = (byte & OPCODE); |
| 140 |
| 141 if (_opcode != _WebSocketOpcode.CONTINUATION) { |
| 142 if ((byte & RSV1) != 0) { |
| 143 _compressed = true; |
| 144 } else { |
| 145 _compressed = false; |
| 146 } |
| 147 } |
| 148 |
| 149 if (_opcode <= _WebSocketOpcode.BINARY) { |
| 150 if (_opcode == _WebSocketOpcode.CONTINUATION) { |
| 151 if (_currentMessageType == _WebSocketMessageType.NONE) { |
| 152 throw new WebSocketException("Protocol error"); |
| 153 } |
| 154 } else { |
| 155 assert(_opcode == _WebSocketOpcode.TEXT || |
| 156 _opcode == _WebSocketOpcode.BINARY); |
| 157 if (_currentMessageType != _WebSocketMessageType.NONE) { |
| 158 throw new WebSocketException("Protocol error"); |
| 159 } |
| 160 _currentMessageType = _opcode; |
| 161 } |
| 162 } else if (_opcode >= _WebSocketOpcode.CLOSE && |
| 163 _opcode <= _WebSocketOpcode.PONG) { |
| 164 // Control frames cannot be fragmented. |
| 165 if (!_fin) throw new WebSocketException("Protocol error"); |
| 166 } else { |
| 167 throw new WebSocketException("Protocol error"); |
| 168 } |
| 169 _state = LEN_FIRST; |
| 170 } else if (_state == LEN_FIRST) { |
| 171 _masked = (byte & 0x80) != 0; |
| 172 _len = byte & 0x7F; |
| 173 if (_isControlFrame() && _len > 125) { |
| 174 throw new WebSocketException("Protocol error"); |
| 175 } |
| 176 if (_len == 126) { |
| 177 _len = 0; |
| 178 _remainingLenBytes = 2; |
| 179 _state = LEN_REST; |
| 180 } else if (_len == 127) { |
| 181 _len = 0; |
| 182 _remainingLenBytes = 8; |
| 183 _state = LEN_REST; |
| 184 } else { |
| 185 assert(_len < 126); |
| 186 _lengthDone(); |
| 187 } |
| 188 } else { |
| 189 assert(_state == LEN_REST); |
| 190 _len = _len << 8 | byte; |
| 191 _remainingLenBytes--; |
| 192 if (_remainingLenBytes == 0) { |
| 193 _lengthDone(); |
| 194 } |
| 195 } |
| 196 } else { |
| 197 if (_state == MASK) { |
| 198 _maskingBytes[4 - _remainingMaskingKeyBytes--] = byte; |
| 199 if (_remainingMaskingKeyBytes == 0) { |
| 200 _maskDone(); |
| 201 } |
| 202 } else { |
| 203 assert(_state == PAYLOAD); |
| 204 // The payload is not handled one byte at a time but in blocks. |
| 205 int payloadLength = min(lastIndex - index, _remainingPayloadBytes); |
| 206 _remainingPayloadBytes -= payloadLength; |
| 207 // Unmask payload if masked. |
| 208 if (_masked) { |
| 209 _unmask(index, payloadLength, buffer); |
| 210 } |
| 211 // Control frame and data frame share _payloads. |
| 212 _payload.add(new Uint8List.view(buffer.buffer, index, payloadLength)); |
| 213 index += payloadLength; |
| 214 if (_isControlFrame()) { |
| 215 if (_remainingPayloadBytes == 0) _controlFrameEnd(); |
| 216 } else { |
| 217 if (_currentMessageType != _WebSocketMessageType.TEXT && |
| 218 _currentMessageType != _WebSocketMessageType.BINARY) { |
| 219 throw new WebSocketException("Protocol error"); |
| 220 } |
| 221 if (_remainingPayloadBytes == 0) _messageFrameEnd(); |
| 222 } |
| 223 |
| 224 // Hack - as we always do index++ below. |
| 225 index--; |
| 226 } |
| 227 } |
| 228 |
| 229 // Move to the next byte. |
| 230 index++; |
| 231 } |
| 232 } |
| 233 |
| 234 void _unmask(int index, int length, Uint8List buffer) { |
| 235 const int BLOCK_SIZE = 16; |
| 236 // Skip Int32x4-version if message is small. |
| 237 if (length >= BLOCK_SIZE) { |
| 238 // Start by aligning to 16 bytes. |
| 239 final int startOffset = BLOCK_SIZE - (index & 15); |
| 240 final int end = index + startOffset; |
| 241 for (int i = index; i < end; i++) { |
| 242 buffer[i] ^= _maskingBytes[_unmaskingIndex++ & 3]; |
| 243 } |
| 244 index += startOffset; |
| 245 length -= startOffset; |
| 246 final int blockCount = length ~/ BLOCK_SIZE; |
| 247 if (blockCount > 0) { |
| 248 // Create mask block. |
| 249 int mask = 0; |
| 250 for (int i = 3; i >= 0; i--) { |
| 251 mask = (mask << 8) | _maskingBytes[(_unmaskingIndex + i) & 3]; |
| 252 } |
| 253 Int32x4 blockMask = new Int32x4(mask, mask, mask, mask); |
| 254 Int32x4List blockBuffer = |
| 255 new Int32x4List.view(buffer.buffer, index, blockCount); |
| 256 for (int i = 0; i < blockBuffer.length; i++) { |
| 257 blockBuffer[i] ^= blockMask; |
| 258 } |
| 259 final int bytes = blockCount * BLOCK_SIZE; |
| 260 index += bytes; |
| 261 length -= bytes; |
| 262 } |
| 263 } |
| 264 // Handle end. |
| 265 final int end = index + length; |
| 266 for (int i = index; i < end; i++) { |
| 267 buffer[i] ^= _maskingBytes[_unmaskingIndex++ & 3]; |
| 268 } |
| 269 } |
| 270 |
| 271 void _lengthDone() { |
| 272 if (_masked) { |
| 273 if (!_serverSide) { |
| 274 throw new WebSocketException("Received masked frame from server"); |
| 275 } |
| 276 _state = MASK; |
| 277 } else { |
| 278 if (_serverSide) { |
| 279 throw new WebSocketException("Received unmasked frame from client"); |
| 280 } |
| 281 _remainingPayloadBytes = _len; |
| 282 _startPayload(); |
| 283 } |
| 284 } |
| 285 |
| 286 void _maskDone() { |
| 287 _remainingPayloadBytes = _len; |
| 288 _startPayload(); |
| 289 } |
| 290 |
| 291 void _startPayload() { |
| 292 // If there is no actual payload perform perform callbacks without |
| 293 // going through the PAYLOAD state. |
| 294 if (_remainingPayloadBytes == 0) { |
| 295 if (_isControlFrame()) { |
| 296 switch (_opcode) { |
| 297 case _WebSocketOpcode.CLOSE: |
| 298 _state = CLOSED; |
| 299 _eventSink.close(); |
| 300 break; |
| 301 case _WebSocketOpcode.PING: |
| 302 _eventSink.add(new _WebSocketPing()); |
| 303 break; |
| 304 case _WebSocketOpcode.PONG: |
| 305 _eventSink.add(new _WebSocketPong()); |
| 306 break; |
| 307 } |
| 308 _prepareForNextFrame(); |
| 309 } else { |
| 310 _messageFrameEnd(); |
| 311 } |
| 312 } else { |
| 313 _state = PAYLOAD; |
| 314 } |
| 315 } |
| 316 |
| 317 void _messageFrameEnd() { |
| 318 if (_fin) { |
| 319 var bytes = _payload.takeBytes(); |
| 320 if (_deflate != null && _compressed) { |
| 321 bytes = _deflate.processIncomingMessage(bytes); |
| 322 } |
| 323 |
| 324 switch (_currentMessageType) { |
| 325 case _WebSocketMessageType.TEXT: |
| 326 _eventSink.add(UTF8.decode(bytes)); |
| 327 break; |
| 328 case _WebSocketMessageType.BINARY: |
| 329 _eventSink.add(bytes); |
| 330 break; |
| 331 } |
| 332 _currentMessageType = _WebSocketMessageType.NONE; |
| 333 } |
| 334 _prepareForNextFrame(); |
| 335 } |
| 336 |
| 337 void _controlFrameEnd() { |
| 338 switch (_opcode) { |
| 339 case _WebSocketOpcode.CLOSE: |
| 340 closeCode = WebSocketStatus.NO_STATUS_RECEIVED; |
| 341 var payload = _payload.takeBytes(); |
| 342 if (payload.length > 0) { |
| 343 if (payload.length == 1) { |
| 344 throw new WebSocketException("Protocol error"); |
| 345 } |
| 346 closeCode = payload[0] << 8 | payload[1]; |
| 347 if (closeCode == WebSocketStatus.NO_STATUS_RECEIVED) { |
| 348 throw new WebSocketException("Protocol error"); |
| 349 } |
| 350 if (payload.length > 2) { |
| 351 closeReason = UTF8.decode(payload.sublist(2)); |
| 352 } |
| 353 } |
| 354 _state = CLOSED; |
| 355 _eventSink.close(); |
| 356 break; |
| 357 |
| 358 case _WebSocketOpcode.PING: |
| 359 _eventSink.add(new _WebSocketPing(_payload.takeBytes())); |
| 360 break; |
| 361 |
| 362 case _WebSocketOpcode.PONG: |
| 363 _eventSink.add(new _WebSocketPong(_payload.takeBytes())); |
| 364 break; |
| 365 } |
| 366 _prepareForNextFrame(); |
| 367 } |
| 368 |
| 369 bool _isControlFrame() { |
| 370 return _opcode == _WebSocketOpcode.CLOSE || |
| 371 _opcode == _WebSocketOpcode.PING || |
| 372 _opcode == _WebSocketOpcode.PONG; |
| 373 } |
| 374 |
| 375 void _prepareForNextFrame() { |
| 376 if (_state != CLOSED && _state != FAILURE) _state = START; |
| 377 _fin = false; |
| 378 _opcode = -1; |
| 379 _len = -1; |
| 380 _remainingLenBytes = -1; |
| 381 _remainingMaskingKeyBytes = 4; |
| 382 _remainingPayloadBytes = -1; |
| 383 _unmaskingIndex = 0; |
| 384 } |
| 385 } |
| 386 |
| 387 class _WebSocketPing { |
| 388 final List<int> payload; |
| 389 _WebSocketPing([this.payload = null]); |
| 390 } |
| 391 |
| 392 class _WebSocketPong { |
| 393 final List<int> payload; |
| 394 _WebSocketPong([this.payload = null]); |
| 395 } |
| 396 |
| 397 class _WebSocketTransformerImpl implements WebSocketTransformer { |
| 398 final StreamController<WebSocket> _controller = |
| 399 new StreamController<WebSocket>(sync: true); |
| 400 final Function _protocolSelector; |
| 401 final CompressionOptions _compression; |
| 402 |
| 403 _WebSocketTransformerImpl(this._protocolSelector, this._compression); |
| 404 |
| 405 Stream<WebSocket> bind(Stream<HttpRequest> stream) { |
| 406 stream.listen((request) { |
| 407 _upgrade(request, _protocolSelector, _compression) |
| 408 .then((WebSocket webSocket) => _controller.add(webSocket)) |
| 409 .catchError(_controller.addError); |
| 410 }, onDone: () { |
| 411 _controller.close(); |
| 412 }); |
| 413 |
| 414 return _controller.stream; |
| 415 } |
| 416 |
| 417 static Future<WebSocket> _upgrade( |
| 418 HttpRequest request, _protocolSelector, CompressionOptions compression) { |
| 419 var response = request.response; |
| 420 if (!_isUpgradeRequest(request)) { |
| 421 // Send error response. |
| 422 response |
| 423 ..statusCode = HttpStatus.BAD_REQUEST |
| 424 ..close(); |
| 425 return new Future.error( |
| 426 new WebSocketException("Invalid WebSocket upgrade request")); |
| 427 } |
| 428 |
| 429 Future upgrade(String protocol) { |
| 430 // Send the upgrade response. |
| 431 response |
| 432 ..statusCode = HttpStatus.SWITCHING_PROTOCOLS |
| 433 ..headers.add(HttpHeaders.CONNECTION, "Upgrade") |
| 434 ..headers.add(HttpHeaders.UPGRADE, "websocket"); |
| 435 String key = request.headers.value("Sec-WebSocket-Key"); |
| 436 _SHA1 sha1 = new _SHA1(); |
| 437 sha1.add("$key$_webSocketGUID".codeUnits); |
| 438 String accept = _CryptoUtils.bytesToBase64(sha1.close()); |
| 439 response.headers.add("Sec-WebSocket-Accept", accept); |
| 440 if (protocol != null) { |
| 441 response.headers.add("Sec-WebSocket-Protocol", protocol); |
| 442 } |
| 443 |
| 444 var deflate = _negotiateCompression(request, response, compression); |
| 445 |
| 446 response.headers.contentLength = 0; |
| 447 return response.detachSocket().then((socket) => |
| 448 new _WebSocketImpl._fromSocket( |
| 449 socket, protocol, compression, true, deflate)); |
| 450 } |
| 451 |
| 452 var protocols = request.headers['Sec-WebSocket-Protocol']; |
| 453 if (protocols != null && _protocolSelector != null) { |
| 454 // The suggested protocols can be spread over multiple lines, each |
| 455 // consisting of multiple protocols. To unify all of them, first join |
| 456 // the lists with ', ' and then tokenize. |
| 457 protocols = _HttpParser._tokenizeFieldValue(protocols.join(', ')); |
| 458 return new Future(() => _protocolSelector(protocols)).then((protocol) { |
| 459 if (protocols.indexOf(protocol) < 0) { |
| 460 throw new WebSocketException( |
| 461 "Selected protocol is not in the list of available protocols"); |
| 462 } |
| 463 return protocol; |
| 464 }).catchError((error) { |
| 465 response |
| 466 ..statusCode = HttpStatus.INTERNAL_SERVER_ERROR |
| 467 ..close(); |
| 468 throw error; |
| 469 }).then(upgrade); |
| 470 } else { |
| 471 return upgrade(null); |
| 472 } |
| 473 } |
| 474 |
| 475 static _WebSocketPerMessageDeflate _negotiateCompression(HttpRequest request, |
| 476 HttpResponse response, CompressionOptions compression) { |
| 477 var extensionHeader = request.headers.value("Sec-WebSocket-Extensions"); |
| 478 |
| 479 extensionHeader ??= ""; |
| 480 |
| 481 var hv = HeaderValue.parse(extensionHeader, valueSeparator: ','); |
| 482 if (compression.enabled && hv.value == _WebSocketImpl.PER_MESSAGE_DEFLATE) { |
| 483 var info = compression._createHeader(hv); |
| 484 |
| 485 response.headers.add("Sec-WebSocket-Extensions", info.headerValue); |
| 486 var serverNoContextTakeover = |
| 487 (hv.parameters.containsKey(_serverNoContextTakeover) && |
| 488 compression.serverNoContextTakeover); |
| 489 var clientNoContextTakeover = |
| 490 (hv.parameters.containsKey(_clientNoContextTakeover) && |
| 491 compression.clientNoContextTakeover); |
| 492 var deflate = new _WebSocketPerMessageDeflate( |
| 493 serverNoContextTakeover: serverNoContextTakeover, |
| 494 clientNoContextTakeover: clientNoContextTakeover, |
| 495 serverMaxWindowBits: info.maxWindowBits, |
| 496 clientMaxWindowBits: info.maxWindowBits, |
| 497 serverSide: true); |
| 498 |
| 499 return deflate; |
| 500 } |
| 501 |
| 502 return null; |
| 503 } |
| 504 |
| 505 static bool _isUpgradeRequest(HttpRequest request) { |
| 506 if (request.method != "GET") { |
| 507 return false; |
| 508 } |
| 509 if (request.headers[HttpHeaders.CONNECTION] == null) { |
| 510 return false; |
| 511 } |
| 512 bool isUpgrade = false; |
| 513 request.headers[HttpHeaders.CONNECTION].forEach((String value) { |
| 514 if (value.toLowerCase() == "upgrade") isUpgrade = true; |
| 515 }); |
| 516 if (!isUpgrade) return false; |
| 517 String upgrade = request.headers.value(HttpHeaders.UPGRADE); |
| 518 if (upgrade == null || upgrade.toLowerCase() != "websocket") { |
| 519 return false; |
| 520 } |
| 521 String version = request.headers.value("Sec-WebSocket-Version"); |
| 522 if (version == null || version != "13") { |
| 523 return false; |
| 524 } |
| 525 String key = request.headers.value("Sec-WebSocket-Key"); |
| 526 if (key == null) { |
| 527 return false; |
| 528 } |
| 529 return true; |
| 530 } |
| 531 } |
| 532 |
| 533 class _WebSocketPerMessageDeflate { |
| 534 bool serverNoContextTakeover; |
| 535 bool clientNoContextTakeover; |
| 536 int clientMaxWindowBits; |
| 537 int serverMaxWindowBits; |
| 538 bool serverSide; |
| 539 |
| 540 _Filter decoder; |
| 541 _Filter encoder; |
| 542 |
| 543 _WebSocketPerMessageDeflate( |
| 544 {this.clientMaxWindowBits: _WebSocketImpl.DEFAULT_WINDOW_BITS, |
| 545 this.serverMaxWindowBits: _WebSocketImpl.DEFAULT_WINDOW_BITS, |
| 546 this.serverNoContextTakeover: false, |
| 547 this.clientNoContextTakeover: false, |
| 548 this.serverSide: false}); |
| 549 |
| 550 void _ensureDecoder() { |
| 551 if (decoder == null) { |
| 552 decoder = _Filter._newZLibInflateFilter( |
| 553 serverSide ? clientMaxWindowBits : serverMaxWindowBits, null, true); |
| 554 } |
| 555 } |
| 556 |
| 557 void _ensureEncoder() { |
| 558 if (encoder == null) { |
| 559 encoder = _Filter._newZLibDeflateFilter( |
| 560 false, |
| 561 ZLibOption.DEFAULT_LEVEL, |
| 562 serverSide ? serverMaxWindowBits : clientMaxWindowBits, |
| 563 ZLibOption.DEFAULT_MEM_LEVEL, |
| 564 ZLibOption.STRATEGY_DEFAULT, |
| 565 null, |
| 566 true); |
| 567 } |
| 568 } |
| 569 |
| 570 Uint8List processIncomingMessage(List<int> msg) { |
| 571 _ensureDecoder(); |
| 572 |
| 573 var data = []; |
| 574 data.addAll(msg); |
| 575 data.addAll(const [0x00, 0x00, 0xff, 0xff]); |
| 576 |
| 577 decoder.process(data, 0, data.length); |
| 578 var result = []; |
| 579 var out; |
| 580 |
| 581 while ((out = decoder.processed()) != null) { |
| 582 result.addAll(out); |
| 583 } |
| 584 |
| 585 if ((serverSide && clientNoContextTakeover) || |
| 586 (!serverSide && serverNoContextTakeover)) { |
| 587 decoder = null; |
| 588 } |
| 589 |
| 590 return new Uint8List.fromList(result); |
| 591 } |
| 592 |
| 593 List<int> processOutgoingMessage(List<int> msg) { |
| 594 _ensureEncoder(); |
| 595 var result = []; |
| 596 Uint8List buffer; |
| 597 var out; |
| 598 |
| 599 if (msg is! Uint8List) { |
| 600 for (var i = 0; i < msg.length; i++) { |
| 601 if (msg[i] < 0 || 255 < msg[i]) { |
| 602 throw new ArgumentError("List element is not a byte value " |
| 603 "(value ${msg[i]} at index $i)"); |
| 604 } |
| 605 } |
| 606 buffer = new Uint8List.fromList(msg); |
| 607 } else { |
| 608 buffer = msg; |
| 609 } |
| 610 |
| 611 encoder.process(buffer, 0, buffer.length); |
| 612 |
| 613 while ((out = encoder.processed()) != null) { |
| 614 result.addAll(out); |
| 615 } |
| 616 |
| 617 if ((!serverSide && clientNoContextTakeover) || |
| 618 (serverSide && serverNoContextTakeover)) { |
| 619 encoder = null; |
| 620 } |
| 621 |
| 622 if (result.length > 4) { |
| 623 result = result.sublist(0, result.length - 4); |
| 624 } |
| 625 |
| 626 return result; |
| 627 } |
| 628 } |
| 629 |
| 630 // TODO(ajohnsen): Make this transformer reusable. |
| 631 class _WebSocketOutgoingTransformer |
| 632 implements StreamTransformer<dynamic, List<int>>, EventSink { |
| 633 final _WebSocketImpl webSocket; |
| 634 EventSink<List<int>> _eventSink; |
| 635 |
| 636 _WebSocketPerMessageDeflate _deflateHelper; |
| 637 |
| 638 _WebSocketOutgoingTransformer(this.webSocket) { |
| 639 _deflateHelper = webSocket._deflate; |
| 640 } |
| 641 |
| 642 Stream<List<int>> bind(Stream stream) { |
| 643 return new Stream.eventTransformed(stream, (eventSink) { |
| 644 if (_eventSink != null) { |
| 645 throw new StateError("WebSocket transformer already used"); |
| 646 } |
| 647 _eventSink = eventSink; |
| 648 return this; |
| 649 }); |
| 650 } |
| 651 |
| 652 void add(message) { |
| 653 if (message is _WebSocketPong) { |
| 654 addFrame(_WebSocketOpcode.PONG, message.payload); |
| 655 return; |
| 656 } |
| 657 if (message is _WebSocketPing) { |
| 658 addFrame(_WebSocketOpcode.PING, message.payload); |
| 659 return; |
| 660 } |
| 661 List<int> data; |
| 662 int opcode; |
| 663 if (message != null) { |
| 664 if (message is String) { |
| 665 opcode = _WebSocketOpcode.TEXT; |
| 666 data = UTF8.encode(message); |
| 667 } else { |
| 668 if (message is List<int>) { |
| 669 data = message; |
| 670 opcode = _WebSocketOpcode.BINARY; |
| 671 } else { |
| 672 throw new ArgumentError(message); |
| 673 } |
| 674 } |
| 675 |
| 676 if (_deflateHelper != null) { |
| 677 data = _deflateHelper.processOutgoingMessage(data); |
| 678 } |
| 679 } else { |
| 680 opcode = _WebSocketOpcode.TEXT; |
| 681 } |
| 682 addFrame(opcode, data); |
| 683 } |
| 684 |
| 685 void addError(Object error, [StackTrace stackTrace]) { |
| 686 _eventSink.addError(error, stackTrace); |
| 687 } |
| 688 |
| 689 void close() { |
| 690 int code = webSocket._outCloseCode; |
| 691 String reason = webSocket._outCloseReason; |
| 692 List<int> data; |
| 693 if (code != null) { |
| 694 data = new List<int>(); |
| 695 data.add((code >> 8) & 0xFF); |
| 696 data.add(code & 0xFF); |
| 697 if (reason != null) { |
| 698 data.addAll(UTF8.encode(reason)); |
| 699 } |
| 700 } |
| 701 addFrame(_WebSocketOpcode.CLOSE, data); |
| 702 _eventSink.close(); |
| 703 } |
| 704 |
| 705 void addFrame(int opcode, List<int> data) => createFrame( |
| 706 opcode, |
| 707 data, |
| 708 webSocket._serverSide, |
| 709 _deflateHelper != null && |
| 710 (opcode == _WebSocketOpcode.TEXT || |
| 711 opcode == _WebSocketOpcode.BINARY)).forEach((e) { |
| 712 _eventSink.add(e); |
| 713 }); |
| 714 |
| 715 static Iterable<List<int>> createFrame( |
| 716 int opcode, List<int> data, bool serverSide, bool compressed) { |
| 717 bool mask = !serverSide; // Masking not implemented for server. |
| 718 int dataLength = data == null ? 0 : data.length; |
| 719 // Determine the header size. |
| 720 int headerSize = (mask) ? 6 : 2; |
| 721 if (dataLength > 65535) { |
| 722 headerSize += 8; |
| 723 } else if (dataLength > 125) { |
| 724 headerSize += 2; |
| 725 } |
| 726 Uint8List header = new Uint8List(headerSize); |
| 727 int index = 0; |
| 728 |
| 729 // Set FIN and opcode. |
| 730 var hoc = _WebSocketProtocolTransformer.FIN |
| 731 | (compressed ? _WebSocketProtocolTransformer.RSV1 : 0) |
| 732 | (opcode & _WebSocketProtocolTransformer.OPCODE); |
| 733 |
| 734 header[index++] = hoc; |
| 735 // Determine size and position of length field. |
| 736 int lengthBytes = 1; |
| 737 if (dataLength > 65535) { |
| 738 header[index++] = 127; |
| 739 lengthBytes = 8; |
| 740 } else if (dataLength > 125) { |
| 741 header[index++] = 126; |
| 742 lengthBytes = 2; |
| 743 } |
| 744 // Write the length in network byte order into the header. |
| 745 for (int i = 0; i < lengthBytes; i++) { |
| 746 header[index++] = dataLength >> (((lengthBytes - 1) - i) * 8) & 0xFF; |
| 747 } |
| 748 if (mask) { |
| 749 header[1] |= 1 << 7; |
| 750 var maskBytes = _IOCrypto.getRandomBytes(4); |
| 751 header.setRange(index, index + 4, maskBytes); |
| 752 index += 4; |
| 753 if (data != null) { |
| 754 Uint8List list; |
| 755 // If this is a text message just do the masking inside the |
| 756 // encoded data. |
| 757 if (opcode == _WebSocketOpcode.TEXT && data is Uint8List) { |
| 758 list = data; |
| 759 } else { |
| 760 if (data is Uint8List) { |
| 761 list = new Uint8List.fromList(data); |
| 762 } else { |
| 763 list = new Uint8List(data.length); |
| 764 for (int i = 0; i < data.length; i++) { |
| 765 if (data[i] < 0 || 255 < data[i]) { |
| 766 throw new ArgumentError("List element is not a byte value " |
| 767 "(value ${data[i]} at index $i)"); |
| 768 } |
| 769 list[i] = data[i]; |
| 770 } |
| 771 } |
| 772 } |
| 773 const int BLOCK_SIZE = 16; |
| 774 int blockCount = list.length ~/ BLOCK_SIZE; |
| 775 if (blockCount > 0) { |
| 776 // Create mask block. |
| 777 int mask = 0; |
| 778 for (int i = 3; i >= 0; i--) { |
| 779 mask = (mask << 8) | maskBytes[i]; |
| 780 } |
| 781 Int32x4 blockMask = new Int32x4(mask, mask, mask, mask); |
| 782 Int32x4List blockBuffer = |
| 783 new Int32x4List.view(list.buffer, 0, blockCount); |
| 784 for (int i = 0; i < blockBuffer.length; i++) { |
| 785 blockBuffer[i] ^= blockMask; |
| 786 } |
| 787 } |
| 788 // Handle end. |
| 789 for (int i = blockCount * BLOCK_SIZE; i < list.length; i++) { |
| 790 list[i] ^= maskBytes[i & 3]; |
| 791 } |
| 792 data = list; |
| 793 } |
| 794 } |
| 795 assert(index == headerSize); |
| 796 if (data == null) { |
| 797 return [header]; |
| 798 } else { |
| 799 return [header, data]; |
| 800 } |
| 801 } |
| 802 } |
| 803 |
| 804 class _WebSocketConsumer implements StreamConsumer { |
| 805 final _WebSocketImpl webSocket; |
| 806 final Socket socket; |
| 807 StreamController _controller; |
| 808 StreamSubscription _subscription; |
| 809 bool _issuedPause = false; |
| 810 bool _closed = false; |
| 811 Completer _closeCompleter = new Completer(); |
| 812 Completer _completer; |
| 813 |
| 814 _WebSocketConsumer(this.webSocket, this.socket); |
| 815 |
| 816 void _onListen() { |
| 817 if (_subscription != null) { |
| 818 _subscription.cancel(); |
| 819 } |
| 820 } |
| 821 |
| 822 void _onPause() { |
| 823 if (_subscription != null) { |
| 824 _subscription.pause(); |
| 825 } else { |
| 826 _issuedPause = true; |
| 827 } |
| 828 } |
| 829 |
| 830 void _onResume() { |
| 831 if (_subscription != null) { |
| 832 _subscription.resume(); |
| 833 } else { |
| 834 _issuedPause = false; |
| 835 } |
| 836 } |
| 837 |
| 838 void _cancel() { |
| 839 if (_subscription != null) { |
| 840 var subscription = _subscription; |
| 841 _subscription = null; |
| 842 subscription.cancel(); |
| 843 } |
| 844 } |
| 845 |
| 846 _ensureController() { |
| 847 if (_controller != null) return; |
| 848 _controller = new StreamController( |
| 849 sync: true, |
| 850 onPause: _onPause, |
| 851 onResume: _onResume, |
| 852 onCancel: _onListen); |
| 853 var stream = _controller.stream |
| 854 .transform(new _WebSocketOutgoingTransformer(webSocket)); |
| 855 socket.addStream(stream).then((_) { |
| 856 _done(); |
| 857 _closeCompleter.complete(webSocket); |
| 858 }, onError: (error, StackTrace stackTrace) { |
| 859 _closed = true; |
| 860 _cancel(); |
| 861 if (error is ArgumentError) { |
| 862 if (!_done(error, stackTrace)) { |
| 863 _closeCompleter.completeError(error, stackTrace); |
| 864 } |
| 865 } else { |
| 866 _done(); |
| 867 _closeCompleter.complete(webSocket); |
| 868 } |
| 869 }); |
| 870 } |
| 871 |
| 872 bool _done([error, StackTrace stackTrace]) { |
| 873 if (_completer == null) return false; |
| 874 if (error != null) { |
| 875 _completer.completeError(error, stackTrace); |
| 876 } else { |
| 877 _completer.complete(webSocket); |
| 878 } |
| 879 _completer = null; |
| 880 return true; |
| 881 } |
| 882 |
| 883 Future addStream(var stream) { |
| 884 if (_closed) { |
| 885 stream.listen(null).cancel(); |
| 886 return new Future.value(webSocket); |
| 887 } |
| 888 _ensureController(); |
| 889 _completer = new Completer(); |
| 890 _subscription = stream.listen((data) { |
| 891 _controller.add(data); |
| 892 }, onDone: _done, onError: _done, cancelOnError: true); |
| 893 if (_issuedPause) { |
| 894 _subscription.pause(); |
| 895 _issuedPause = false; |
| 896 } |
| 897 return _completer.future; |
| 898 } |
| 899 |
| 900 Future close() { |
| 901 _ensureController(); |
| 902 Future closeSocket() { |
| 903 return socket.close().catchError((_) {}).then((_) => webSocket); |
| 904 } |
| 905 _controller.close(); |
| 906 return _closeCompleter.future.then((_) => closeSocket()); |
| 907 } |
| 908 |
| 909 void add(data) { |
| 910 if (_closed) return; |
| 911 _ensureController(); |
| 912 _controller.add(data); |
| 913 } |
| 914 |
| 915 void closeSocket() { |
| 916 _closed = true; |
| 917 _cancel(); |
| 918 close(); |
| 919 } |
| 920 } |
| 921 |
| 922 class _WebSocketImpl extends Stream with _ServiceObject implements WebSocket { |
| 923 // Use default Map so we keep order. |
| 924 static Map<int, _WebSocketImpl> _webSockets = new Map<int, _WebSocketImpl>(); |
| 925 static const int DEFAULT_WINDOW_BITS = 15; |
| 926 static const String PER_MESSAGE_DEFLATE = "permessage-deflate"; |
| 927 |
| 928 final String protocol; |
| 929 |
| 930 StreamController _controller; |
| 931 StreamSubscription _subscription; |
| 932 StreamSink _sink; |
| 933 |
| 934 final _socket; |
| 935 final bool _serverSide; |
| 936 int _readyState = WebSocket.CONNECTING; |
| 937 bool _writeClosed = false; |
| 938 int _closeCode; |
| 939 String _closeReason; |
| 940 Duration _pingInterval; |
| 941 Timer _pingTimer; |
| 942 _WebSocketConsumer _consumer; |
| 943 |
| 944 int _outCloseCode; |
| 945 String _outCloseReason; |
| 946 Timer _closeTimer; |
| 947 _WebSocketPerMessageDeflate _deflate; |
| 948 |
| 949 static final HttpClient _httpClient = new HttpClient(); |
| 950 |
| 951 static Future<WebSocket> connect( |
| 952 String url, Iterable<String> protocols, Map<String, dynamic> headers, |
| 953 {CompressionOptions compression: CompressionOptions.DEFAULT}) { |
| 954 Uri uri = Uri.parse(url); |
| 955 if (uri.scheme != "ws" && uri.scheme != "wss") { |
| 956 throw new WebSocketException("Unsupported URL scheme '${uri.scheme}'"); |
| 957 } |
| 958 |
| 959 Random random = new Random(); |
| 960 // Generate 16 random bytes. |
| 961 Uint8List nonceData = new Uint8List(16); |
| 962 for (int i = 0; i < 16; i++) { |
| 963 nonceData[i] = random.nextInt(256); |
| 964 } |
| 965 String nonce = _CryptoUtils.bytesToBase64(nonceData); |
| 966 |
| 967 uri = new Uri( |
| 968 scheme: uri.scheme == "wss" ? "https" : "http", |
| 969 userInfo: uri.userInfo, |
| 970 host: uri.host, |
| 971 port: uri.port, |
| 972 path: uri.path, |
| 973 query: uri.query, |
| 974 fragment: uri.fragment); |
| 975 return _httpClient.openUrl("GET", uri).then((request) { |
| 976 if (uri.userInfo != null && !uri.userInfo.isEmpty) { |
| 977 // If the URL contains user information use that for basic |
| 978 // authorization. |
| 979 String auth = _CryptoUtils.bytesToBase64(UTF8.encode(uri.userInfo)); |
| 980 request.headers.set(HttpHeaders.AUTHORIZATION, "Basic $auth"); |
| 981 } |
| 982 if (headers != null) { |
| 983 headers.forEach((field, value) => request.headers.add(field, value)); |
| 984 } |
| 985 // Setup the initial handshake. |
| 986 request.headers |
| 987 ..set(HttpHeaders.CONNECTION, "Upgrade") |
| 988 ..set(HttpHeaders.UPGRADE, "websocket") |
| 989 ..set("Sec-WebSocket-Key", nonce) |
| 990 ..set("Cache-Control", "no-cache") |
| 991 ..set("Sec-WebSocket-Version", "13"); |
| 992 if (protocols != null) { |
| 993 request.headers.add("Sec-WebSocket-Protocol", protocols.toList()); |
| 994 } |
| 995 |
| 996 if (compression.enabled) { |
| 997 request.headers |
| 998 .add("Sec-WebSocket-Extensions", compression._createHeader()); |
| 999 } |
| 1000 |
| 1001 return request.close(); |
| 1002 }).then((response) { |
| 1003 |
| 1004 void error(String message) { |
| 1005 // Flush data. |
| 1006 response.detachSocket().then((socket) { |
| 1007 socket.destroy(); |
| 1008 }); |
| 1009 throw new WebSocketException(message); |
| 1010 } |
| 1011 |
| 1012 if (response.statusCode != HttpStatus.SWITCHING_PROTOCOLS || |
| 1013 response.headers[HttpHeaders.CONNECTION] == null || |
| 1014 !response.headers[HttpHeaders.CONNECTION] |
| 1015 .any((value) => value.toLowerCase() == "upgrade") || |
| 1016 response.headers.value(HttpHeaders.UPGRADE).toLowerCase() != |
| 1017 "websocket") { |
| 1018 error("Connection to '$uri' was not upgraded to websocket"); |
| 1019 } |
| 1020 String accept = response.headers.value("Sec-WebSocket-Accept"); |
| 1021 if (accept == null) { |
| 1022 error("Response did not contain a 'Sec-WebSocket-Accept' header"); |
| 1023 } |
| 1024 _SHA1 sha1 = new _SHA1(); |
| 1025 sha1.add("$nonce$_webSocketGUID".codeUnits); |
| 1026 List<int> expectedAccept = sha1.close(); |
| 1027 List<int> receivedAccept = _CryptoUtils.base64StringToBytes(accept); |
| 1028 if (expectedAccept.length != receivedAccept.length) { |
| 1029 error("Reasponse header 'Sec-WebSocket-Accept' is the wrong length"); |
| 1030 } |
| 1031 for (int i = 0; i < expectedAccept.length; i++) { |
| 1032 if (expectedAccept[i] != receivedAccept[i]) { |
| 1033 error("Bad response 'Sec-WebSocket-Accept' header"); |
| 1034 } |
| 1035 } |
| 1036 var protocol = response.headers.value('Sec-WebSocket-Protocol'); |
| 1037 |
| 1038 _WebSocketPerMessageDeflate deflate = |
| 1039 negotiateClientCompression(response, compression); |
| 1040 |
| 1041 return response.detachSocket().then/*<WebSocket>*/((socket) => |
| 1042 new _WebSocketImpl._fromSocket( |
| 1043 socket, protocol, compression, false, deflate)); |
| 1044 }); |
| 1045 } |
| 1046 |
| 1047 static _WebSocketPerMessageDeflate negotiateClientCompression( |
| 1048 HttpClientResponse response, CompressionOptions compression) { |
| 1049 String extensionHeader = response.headers.value('Sec-WebSocket-Extensions'); |
| 1050 |
| 1051 if (extensionHeader == null) { |
| 1052 extensionHeader = ""; |
| 1053 } |
| 1054 |
| 1055 var hv = HeaderValue.parse(extensionHeader, valueSeparator: ','); |
| 1056 |
| 1057 if (compression.enabled && hv.value == PER_MESSAGE_DEFLATE) { |
| 1058 var serverNoContextTakeover = |
| 1059 hv.parameters.containsKey(_serverNoContextTakeover); |
| 1060 var clientNoContextTakeover = |
| 1061 hv.parameters.containsKey(_clientNoContextTakeover); |
| 1062 |
| 1063 int getWindowBits(String type) { |
| 1064 var o = hv.parameters[type]; |
| 1065 if (o == null) { |
| 1066 return DEFAULT_WINDOW_BITS; |
| 1067 } |
| 1068 |
| 1069 return int.parse(o, onError: (s) => DEFAULT_WINDOW_BITS); |
| 1070 } |
| 1071 |
| 1072 return new _WebSocketPerMessageDeflate( |
| 1073 clientMaxWindowBits: getWindowBits(_clientMaxWindowBits), |
| 1074 serverMaxWindowBits: getWindowBits(_serverMaxWindowBits), |
| 1075 clientNoContextTakeover: clientNoContextTakeover, |
| 1076 serverNoContextTakeover: serverNoContextTakeover); |
| 1077 } |
| 1078 |
| 1079 return null; |
| 1080 } |
| 1081 |
| 1082 _WebSocketImpl._fromSocket( |
| 1083 this._socket, this.protocol, CompressionOptions compression, |
| 1084 [this._serverSide = false, _WebSocketPerMessageDeflate deflate]) { |
| 1085 _consumer = new _WebSocketConsumer(this, _socket); |
| 1086 _sink = new _StreamSinkImpl(_consumer); |
| 1087 _readyState = WebSocket.OPEN; |
| 1088 _deflate = deflate; |
| 1089 |
| 1090 var transformer = new _WebSocketProtocolTransformer(_serverSide, _deflate); |
| 1091 _subscription = _socket.transform(transformer).listen((data) { |
| 1092 if (data is _WebSocketPing) { |
| 1093 if (!_writeClosed) _consumer.add(new _WebSocketPong(data.payload)); |
| 1094 } else if (data is _WebSocketPong) { |
| 1095 // Simply set pingInterval, as it'll cancel any timers. |
| 1096 pingInterval = _pingInterval; |
| 1097 } else { |
| 1098 _controller.add(data); |
| 1099 } |
| 1100 }, onError: (error, stackTrace) { |
| 1101 if (_closeTimer != null) _closeTimer.cancel(); |
| 1102 if (error is FormatException) { |
| 1103 _close(WebSocketStatus.INVALID_FRAME_PAYLOAD_DATA); |
| 1104 } else { |
| 1105 _close(WebSocketStatus.PROTOCOL_ERROR); |
| 1106 } |
| 1107 // An error happened, set the close code set above. |
| 1108 _closeCode = _outCloseCode; |
| 1109 _closeReason = _outCloseReason; |
| 1110 _controller.close(); |
| 1111 }, onDone: () { |
| 1112 if (_closeTimer != null) _closeTimer.cancel(); |
| 1113 if (_readyState == WebSocket.OPEN) { |
| 1114 _readyState = WebSocket.CLOSING; |
| 1115 if (!_isReservedStatusCode(transformer.closeCode)) { |
| 1116 _close(transformer.closeCode, transformer.closeReason); |
| 1117 } else { |
| 1118 _close(); |
| 1119 } |
| 1120 _readyState = WebSocket.CLOSED; |
| 1121 } |
| 1122 // Protocol close, use close code from transformer. |
| 1123 _closeCode = transformer.closeCode; |
| 1124 _closeReason = transformer.closeReason; |
| 1125 _controller.close(); |
| 1126 }, cancelOnError: true); |
| 1127 _subscription.pause(); |
| 1128 _controller = new StreamController( |
| 1129 sync: true, onListen: _subscription.resume, onCancel: () { |
| 1130 _subscription.cancel(); |
| 1131 _subscription = null; |
| 1132 }, onPause: _subscription.pause, onResume: _subscription.resume); |
| 1133 |
| 1134 _webSockets[_serviceId] = this; |
| 1135 try { |
| 1136 _socket._owner = this; |
| 1137 } catch (_) {} |
| 1138 } |
| 1139 |
| 1140 StreamSubscription listen(void onData(message), |
| 1141 {Function onError, void onDone(), bool cancelOnError}) { |
| 1142 return _controller.stream.listen(onData, |
| 1143 onError: onError, onDone: onDone, cancelOnError: cancelOnError); |
| 1144 } |
| 1145 |
| 1146 Duration get pingInterval => _pingInterval; |
| 1147 |
| 1148 void set pingInterval(Duration interval) { |
| 1149 if (_writeClosed) return; |
| 1150 if (_pingTimer != null) _pingTimer.cancel(); |
| 1151 _pingInterval = interval; |
| 1152 |
| 1153 if (_pingInterval == null) return; |
| 1154 |
| 1155 _pingTimer = new Timer(_pingInterval, () { |
| 1156 if (_writeClosed) return; |
| 1157 _consumer.add(new _WebSocketPing()); |
| 1158 _pingTimer = new Timer(_pingInterval, () { |
| 1159 // No pong received. |
| 1160 _close(WebSocketStatus.GOING_AWAY); |
| 1161 }); |
| 1162 }); |
| 1163 } |
| 1164 |
| 1165 int get readyState => _readyState; |
| 1166 |
| 1167 String get extensions => null; |
| 1168 int get closeCode => _closeCode; |
| 1169 String get closeReason => _closeReason; |
| 1170 |
| 1171 void add(data) { _sink.add(data); } |
| 1172 void addError(error, [StackTrace stackTrace]) { |
| 1173 _sink.addError(error, stackTrace); |
| 1174 } |
| 1175 Future addStream(Stream stream) => _sink.addStream(stream); |
| 1176 Future get done => _sink.done; |
| 1177 |
| 1178 Future close([int code, String reason]) { |
| 1179 if (_isReservedStatusCode(code)) { |
| 1180 throw new WebSocketException("Reserved status code $code"); |
| 1181 } |
| 1182 if (_outCloseCode == null) { |
| 1183 _outCloseCode = code; |
| 1184 _outCloseReason = reason; |
| 1185 } |
| 1186 if (!_controller.isClosed) { |
| 1187 // If a close has not yet been received from the other end then |
| 1188 // 1) make sure to listen on the stream so the close frame will be |
| 1189 // processed if received. |
| 1190 // 2) set a timer terminate the connection if a close frame is |
| 1191 // not received. |
| 1192 if (!_controller.hasListener && _subscription != null) { |
| 1193 _controller.stream.drain().catchError((_) => {}); |
| 1194 } |
| 1195 if (_closeTimer == null) { |
| 1196 // When closing the web-socket, we no longer accept data. |
| 1197 _closeTimer = new Timer(const Duration(seconds: 5), () { |
| 1198 // Reuse code and reason from the local close. |
| 1199 _closeCode = _outCloseCode; |
| 1200 _closeReason = _outCloseReason; |
| 1201 if (_subscription != null) _subscription.cancel(); |
| 1202 _controller.close(); |
| 1203 _webSockets.remove(_serviceId); |
| 1204 }); |
| 1205 } |
| 1206 } |
| 1207 return _sink.close(); |
| 1208 } |
| 1209 |
| 1210 void _close([int code, String reason]) { |
| 1211 if (_writeClosed) return; |
| 1212 if (_outCloseCode == null) { |
| 1213 _outCloseCode = code; |
| 1214 _outCloseReason = reason; |
| 1215 } |
| 1216 _writeClosed = true; |
| 1217 _consumer.closeSocket(); |
| 1218 _webSockets.remove(_serviceId); |
| 1219 } |
| 1220 |
| 1221 String get _serviceTypePath => 'io/websockets'; |
| 1222 String get _serviceTypeName => 'WebSocket'; |
| 1223 |
| 1224 Map<String, dynamic> _toJSON(bool ref) { |
| 1225 var name = '${_socket.address.host}:${_socket.port}'; |
| 1226 var r = <String, dynamic>{ |
| 1227 'id': _servicePath, |
| 1228 'type': _serviceType(ref), |
| 1229 'name': name, |
| 1230 'user_name': name, |
| 1231 }; |
| 1232 if (ref) { |
| 1233 return r; |
| 1234 } |
| 1235 try { |
| 1236 r['socket'] = _socket._toJSON(true); |
| 1237 } catch (_) { |
| 1238 r['socket'] = { |
| 1239 'id': _servicePath, |
| 1240 'type': '@Socket', |
| 1241 'name': 'UserSocket', |
| 1242 'user_name': 'UserSocket', |
| 1243 }; |
| 1244 } |
| 1245 return r; |
| 1246 } |
| 1247 |
| 1248 static bool _isReservedStatusCode(int code) { |
| 1249 return code != null && |
| 1250 (code < WebSocketStatus.NORMAL_CLOSURE || |
| 1251 code == WebSocketStatus.RESERVED_1004 || |
| 1252 code == WebSocketStatus.NO_STATUS_RECEIVED || |
| 1253 code == WebSocketStatus.ABNORMAL_CLOSURE || |
| 1254 (code > WebSocketStatus.INTERNAL_SERVER_ERROR && |
| 1255 code < WebSocketStatus.RESERVED_1015) || |
| 1256 (code >= WebSocketStatus.RESERVED_1015 && code < 3000)); |
| 1257 } |
| 1258 } |
OLD | NEW |