| OLD | NEW |
| (Empty) |
| 1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file | |
| 2 // for details. All rights reserved. Use of this source code is governed by a | |
| 3 // BSD-style license that can be found in the LICENSE file. | |
| 4 | |
| 5 // The following code is copied from sdk/lib/io/websocket_impl.dart. The | |
| 6 // "dart:io" implementation isn't used directly to support non-"dart:io" | |
| 7 // applications. | |
| 8 // | |
| 9 // Because it's copied directly, only modifications necessary to support the | |
| 10 // desired public API and to remove "dart:io" dependencies have been made. | |
| 11 // | |
| 12 // This is up-to-date as of sdk revision | |
| 13 // 86227840d75d974feb238f8b3c59c038b99c05cf. | |
| 14 import 'dart:async'; | |
| 15 import 'dart:convert'; | |
| 16 import 'dart:math'; | |
| 17 import 'dart:typed_data'; | |
| 18 | |
| 19 import '../web_socket.dart'; | |
| 20 import 'bytes_builder.dart'; | |
| 21 import 'io_sink.dart'; | |
| 22 import 'web_socket.dart'; | |
| 23 | |
| 24 const String webSocketGUID = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"; | |
| 25 | |
| 26 final _random = new Random(); | |
| 27 | |
| 28 // Matches _WebSocketOpcode. | |
| 29 class _WebSocketMessageType { | |
| 30 static const int NONE = 0; | |
| 31 static const int TEXT = 1; | |
| 32 static const int BINARY = 2; | |
| 33 } | |
| 34 | |
| 35 | |
| 36 class _WebSocketOpcode { | |
| 37 static const int CONTINUATION = 0; | |
| 38 static const int TEXT = 1; | |
| 39 static const int BINARY = 2; | |
| 40 static const int RESERVED_3 = 3; | |
| 41 static const int RESERVED_4 = 4; | |
| 42 static const int RESERVED_5 = 5; | |
| 43 static const int RESERVED_6 = 6; | |
| 44 static const int RESERVED_7 = 7; | |
| 45 static const int CLOSE = 8; | |
| 46 static const int PING = 9; | |
| 47 static const int PONG = 10; | |
| 48 static const int RESERVED_B = 11; | |
| 49 static const int RESERVED_C = 12; | |
| 50 static const int RESERVED_D = 13; | |
| 51 static const int RESERVED_E = 14; | |
| 52 static const int RESERVED_F = 15; | |
| 53 } | |
| 54 | |
| 55 /** | |
| 56 * The web socket protocol transformer handles the protocol byte stream | |
| 57 * which is supplied through the [:handleData:]. As the protocol is processed, | |
| 58 * it'll output frame data as either a List<int> or String. | |
| 59 * | |
| 60 * Important infomation about usage: Be sure you use cancelOnError, so the | |
| 61 * socket will be closed when the processer encounter an error. Not using it | |
| 62 * will lead to undefined behaviour. | |
| 63 */ | |
| 64 // TODO(ajohnsen): make this transformer reusable? | |
| 65 class _WebSocketProtocolTransformer implements StreamTransformer, EventSink { | |
| 66 static const int START = 0; | |
| 67 static const int LEN_FIRST = 1; | |
| 68 static const int LEN_REST = 2; | |
| 69 static const int MASK = 3; | |
| 70 static const int PAYLOAD = 4; | |
| 71 static const int CLOSED = 5; | |
| 72 static const int FAILURE = 6; | |
| 73 | |
| 74 int _state = START; | |
| 75 bool _fin = false; | |
| 76 int _opcode = -1; | |
| 77 int _len = -1; | |
| 78 bool _masked = false; | |
| 79 int _remainingLenBytes = -1; | |
| 80 int _remainingMaskingKeyBytes = 4; | |
| 81 int _remainingPayloadBytes = -1; | |
| 82 int _unmaskingIndex = 0; | |
| 83 int _currentMessageType = _WebSocketMessageType.NONE; | |
| 84 int closeCode = WebSocketStatus.NO_STATUS_RECEIVED; | |
| 85 String closeReason = ""; | |
| 86 | |
| 87 EventSink _eventSink; | |
| 88 | |
| 89 final bool _serverSide; | |
| 90 final List _maskingBytes = new List(4); | |
| 91 final BytesBuilder _payload = new BytesBuilder(copy: false); | |
| 92 | |
| 93 _WebSocketProtocolTransformer([this._serverSide = false]); | |
| 94 | |
| 95 Stream bind(Stream stream) { | |
| 96 return new Stream.eventTransformed( | |
| 97 stream, | |
| 98 (EventSink eventSink) { | |
| 99 if (_eventSink != null) { | |
| 100 throw new StateError("WebSocket transformer already used."); | |
| 101 } | |
| 102 _eventSink = eventSink; | |
| 103 return this; | |
| 104 }); | |
| 105 } | |
| 106 | |
| 107 void addError(Object error, [StackTrace stackTrace]) => | |
| 108 _eventSink.addError(error, stackTrace); | |
| 109 | |
| 110 void close() => _eventSink.close(); | |
| 111 | |
| 112 /** | |
| 113 * Process data received from the underlying communication channel. | |
| 114 */ | |
| 115 void add(Uint8List buffer) { | |
| 116 int count = buffer.length; | |
| 117 int index = 0; | |
| 118 int lastIndex = count; | |
| 119 if (_state == CLOSED) { | |
| 120 throw new CompatibleWebSocketException("Data on closed connection"); | |
| 121 } | |
| 122 if (_state == FAILURE) { | |
| 123 throw new CompatibleWebSocketException("Data on failed connection"); | |
| 124 } | |
| 125 while ((index < lastIndex) && _state != CLOSED && _state != FAILURE) { | |
| 126 int byte = buffer[index]; | |
| 127 if (_state <= LEN_REST) { | |
| 128 if (_state == START) { | |
| 129 _fin = (byte & 0x80) != 0; | |
| 130 if ((byte & 0x70) != 0) { | |
| 131 // The RSV1, RSV2 bits RSV3 must be all zero. | |
| 132 throw new CompatibleWebSocketException("Protocol error"); | |
| 133 } | |
| 134 _opcode = (byte & 0xF); | |
| 135 if (_opcode <= _WebSocketOpcode.BINARY) { | |
| 136 if (_opcode == _WebSocketOpcode.CONTINUATION) { | |
| 137 if (_currentMessageType == _WebSocketMessageType.NONE) { | |
| 138 throw new CompatibleWebSocketException("Protocol error"); | |
| 139 } | |
| 140 } else { | |
| 141 assert(_opcode == _WebSocketOpcode.TEXT || | |
| 142 _opcode == _WebSocketOpcode.BINARY); | |
| 143 if (_currentMessageType != _WebSocketMessageType.NONE) { | |
| 144 throw new CompatibleWebSocketException("Protocol error"); | |
| 145 } | |
| 146 _currentMessageType = _opcode; | |
| 147 } | |
| 148 } else if (_opcode >= _WebSocketOpcode.CLOSE && | |
| 149 _opcode <= _WebSocketOpcode.PONG) { | |
| 150 // Control frames cannot be fragmented. | |
| 151 if (!_fin) throw new CompatibleWebSocketException("Protocol error"); | |
| 152 } else { | |
| 153 throw new CompatibleWebSocketException("Protocol error"); | |
| 154 } | |
| 155 _state = LEN_FIRST; | |
| 156 } else if (_state == LEN_FIRST) { | |
| 157 _masked = (byte & 0x80) != 0; | |
| 158 _len = byte & 0x7F; | |
| 159 if (_isControlFrame() && _len > 125) { | |
| 160 throw new CompatibleWebSocketException("Protocol error"); | |
| 161 } | |
| 162 if (_len == 126) { | |
| 163 _len = 0; | |
| 164 _remainingLenBytes = 2; | |
| 165 _state = LEN_REST; | |
| 166 } else if (_len == 127) { | |
| 167 _len = 0; | |
| 168 _remainingLenBytes = 8; | |
| 169 _state = LEN_REST; | |
| 170 } else { | |
| 171 assert(_len < 126); | |
| 172 _lengthDone(); | |
| 173 } | |
| 174 } else { | |
| 175 assert(_state == LEN_REST); | |
| 176 _len = _len << 8 | byte; | |
| 177 _remainingLenBytes--; | |
| 178 if (_remainingLenBytes == 0) { | |
| 179 _lengthDone(); | |
| 180 } | |
| 181 } | |
| 182 } else { | |
| 183 if (_state == MASK) { | |
| 184 _maskingBytes[4 - _remainingMaskingKeyBytes--] = byte; | |
| 185 if (_remainingMaskingKeyBytes == 0) { | |
| 186 _maskDone(); | |
| 187 } | |
| 188 } else { | |
| 189 assert(_state == PAYLOAD); | |
| 190 // The payload is not handled one byte at a time but in blocks. | |
| 191 int payloadLength = min(lastIndex - index, _remainingPayloadBytes); | |
| 192 _remainingPayloadBytes -= payloadLength; | |
| 193 // Unmask payload if masked. | |
| 194 if (_masked) { | |
| 195 _unmask(index, payloadLength, buffer); | |
| 196 } | |
| 197 // Control frame and data frame share _payloads. | |
| 198 _payload.add( | |
| 199 new Uint8List.view(buffer.buffer, index, payloadLength)); | |
| 200 index += payloadLength; | |
| 201 if (_isControlFrame()) { | |
| 202 if (_remainingPayloadBytes == 0) _controlFrameEnd(); | |
| 203 } else { | |
| 204 if (_currentMessageType != _WebSocketMessageType.TEXT && | |
| 205 _currentMessageType != _WebSocketMessageType.BINARY) { | |
| 206 throw new CompatibleWebSocketException("Protocol error"); | |
| 207 } | |
| 208 if (_remainingPayloadBytes == 0) _messageFrameEnd(); | |
| 209 } | |
| 210 | |
| 211 // Hack - as we always do index++ below. | |
| 212 index--; | |
| 213 } | |
| 214 } | |
| 215 | |
| 216 // Move to the next byte. | |
| 217 index++; | |
| 218 } | |
| 219 } | |
| 220 | |
| 221 void _unmask(int index, int length, Uint8List buffer) { | |
| 222 const int BLOCK_SIZE = 16; | |
| 223 // Skip Int32x4-version if message is small. | |
| 224 if (length >= BLOCK_SIZE) { | |
| 225 // Start by aligning to 16 bytes. | |
| 226 final int startOffset = BLOCK_SIZE - (index & 15); | |
| 227 final int end = index + startOffset; | |
| 228 for (int i = index; i < end; i++) { | |
| 229 buffer[i] ^= _maskingBytes[_unmaskingIndex++ & 3]; | |
| 230 } | |
| 231 index += startOffset; | |
| 232 length -= startOffset; | |
| 233 final int blockCount = length ~/ BLOCK_SIZE; | |
| 234 if (blockCount > 0) { | |
| 235 // Create mask block. | |
| 236 int mask = 0; | |
| 237 for (int i = 3; i >= 0; i--) { | |
| 238 mask = (mask << 8) | _maskingBytes[(_unmaskingIndex + i) & 3]; | |
| 239 } | |
| 240 Int32x4 blockMask = new Int32x4(mask, mask, mask, mask); | |
| 241 Int32x4List blockBuffer = new Int32x4List.view( | |
| 242 buffer.buffer, index, blockCount); | |
| 243 for (int i = 0; i < blockBuffer.length; i++) { | |
| 244 blockBuffer[i] ^= blockMask; | |
| 245 } | |
| 246 final int bytes = blockCount * BLOCK_SIZE; | |
| 247 index += bytes; | |
| 248 length -= bytes; | |
| 249 } | |
| 250 } | |
| 251 // Handle end. | |
| 252 final int end = index + length; | |
| 253 for (int i = index; i < end; i++) { | |
| 254 buffer[i] ^= _maskingBytes[_unmaskingIndex++ & 3]; | |
| 255 } | |
| 256 } | |
| 257 | |
| 258 void _lengthDone() { | |
| 259 if (_masked) { | |
| 260 if (!_serverSide) { | |
| 261 throw new CompatibleWebSocketException( | |
| 262 "Received masked frame from server"); | |
| 263 } | |
| 264 _state = MASK; | |
| 265 } else { | |
| 266 if (_serverSide) { | |
| 267 throw new CompatibleWebSocketException( | |
| 268 "Received unmasked frame from client"); | |
| 269 } | |
| 270 _remainingPayloadBytes = _len; | |
| 271 _startPayload(); | |
| 272 } | |
| 273 } | |
| 274 | |
| 275 void _maskDone() { | |
| 276 _remainingPayloadBytes = _len; | |
| 277 _startPayload(); | |
| 278 } | |
| 279 | |
| 280 void _startPayload() { | |
| 281 // If there is no actual payload perform perform callbacks without | |
| 282 // going through the PAYLOAD state. | |
| 283 if (_remainingPayloadBytes == 0) { | |
| 284 if (_isControlFrame()) { | |
| 285 switch (_opcode) { | |
| 286 case _WebSocketOpcode.CLOSE: | |
| 287 _state = CLOSED; | |
| 288 _eventSink.close(); | |
| 289 break; | |
| 290 case _WebSocketOpcode.PING: | |
| 291 _eventSink.add(new _WebSocketPing()); | |
| 292 break; | |
| 293 case _WebSocketOpcode.PONG: | |
| 294 _eventSink.add(new _WebSocketPong()); | |
| 295 break; | |
| 296 } | |
| 297 _prepareForNextFrame(); | |
| 298 } else { | |
| 299 _messageFrameEnd(); | |
| 300 } | |
| 301 } else { | |
| 302 _state = PAYLOAD; | |
| 303 } | |
| 304 } | |
| 305 | |
| 306 void _messageFrameEnd() { | |
| 307 if (_fin) { | |
| 308 switch (_currentMessageType) { | |
| 309 case _WebSocketMessageType.TEXT: | |
| 310 _eventSink.add(UTF8.decode(_payload.takeBytes())); | |
| 311 break; | |
| 312 case _WebSocketMessageType.BINARY: | |
| 313 _eventSink.add(_payload.takeBytes()); | |
| 314 break; | |
| 315 } | |
| 316 _currentMessageType = _WebSocketMessageType.NONE; | |
| 317 } | |
| 318 _prepareForNextFrame(); | |
| 319 } | |
| 320 | |
| 321 void _controlFrameEnd() { | |
| 322 switch (_opcode) { | |
| 323 case _WebSocketOpcode.CLOSE: | |
| 324 closeCode = WebSocketStatus.NO_STATUS_RECEIVED; | |
| 325 var payload = _payload.takeBytes(); | |
| 326 if (payload.length > 0) { | |
| 327 if (payload.length == 1) { | |
| 328 throw new CompatibleWebSocketException("Protocol error"); | |
| 329 } | |
| 330 closeCode = payload[0] << 8 | payload[1]; | |
| 331 if (closeCode == WebSocketStatus.NO_STATUS_RECEIVED) { | |
| 332 throw new CompatibleWebSocketException("Protocol error"); | |
| 333 } | |
| 334 if (payload.length > 2) { | |
| 335 closeReason = UTF8.decode(payload.sublist(2)); | |
| 336 } | |
| 337 } | |
| 338 _state = CLOSED; | |
| 339 _eventSink.close(); | |
| 340 break; | |
| 341 | |
| 342 case _WebSocketOpcode.PING: | |
| 343 _eventSink.add(new _WebSocketPing(_payload.takeBytes())); | |
| 344 break; | |
| 345 | |
| 346 case _WebSocketOpcode.PONG: | |
| 347 _eventSink.add(new _WebSocketPong(_payload.takeBytes())); | |
| 348 break; | |
| 349 } | |
| 350 _prepareForNextFrame(); | |
| 351 } | |
| 352 | |
| 353 bool _isControlFrame() { | |
| 354 return _opcode == _WebSocketOpcode.CLOSE || | |
| 355 _opcode == _WebSocketOpcode.PING || | |
| 356 _opcode == _WebSocketOpcode.PONG; | |
| 357 } | |
| 358 | |
| 359 void _prepareForNextFrame() { | |
| 360 if (_state != CLOSED && _state != FAILURE) _state = START; | |
| 361 _fin = false; | |
| 362 _opcode = -1; | |
| 363 _len = -1; | |
| 364 _remainingLenBytes = -1; | |
| 365 _remainingMaskingKeyBytes = 4; | |
| 366 _remainingPayloadBytes = -1; | |
| 367 _unmaskingIndex = 0; | |
| 368 } | |
| 369 } | |
| 370 | |
| 371 | |
| 372 class _WebSocketPing { | |
| 373 final List<int> payload; | |
| 374 _WebSocketPing([this.payload = null]); | |
| 375 } | |
| 376 | |
| 377 | |
| 378 class _WebSocketPong { | |
| 379 final List<int> payload; | |
| 380 _WebSocketPong([this.payload = null]); | |
| 381 } | |
| 382 | |
| 383 // TODO(ajohnsen): Make this transformer reusable. | |
| 384 class _WebSocketOutgoingTransformer implements StreamTransformer, EventSink { | |
| 385 final WebSocketImpl webSocket; | |
| 386 EventSink _eventSink; | |
| 387 | |
| 388 _WebSocketOutgoingTransformer(this.webSocket); | |
| 389 | |
| 390 Stream bind(Stream stream) { | |
| 391 return new Stream.eventTransformed( | |
| 392 stream, | |
| 393 (EventSink eventSink) { | |
| 394 if (_eventSink != null) { | |
| 395 throw new StateError("WebSocket transformer already used"); | |
| 396 } | |
| 397 _eventSink = eventSink; | |
| 398 return this; | |
| 399 }); | |
| 400 } | |
| 401 | |
| 402 void add(message) { | |
| 403 if (message is _WebSocketPong) { | |
| 404 addFrame(_WebSocketOpcode.PONG, message.payload); | |
| 405 return; | |
| 406 } | |
| 407 if (message is _WebSocketPing) { | |
| 408 addFrame(_WebSocketOpcode.PING, message.payload); | |
| 409 return; | |
| 410 } | |
| 411 List<int> data; | |
| 412 int opcode; | |
| 413 if (message != null) { | |
| 414 if (message is String) { | |
| 415 opcode = _WebSocketOpcode.TEXT; | |
| 416 data = UTF8.encode(message); | |
| 417 } else { | |
| 418 if (message is !List<int>) { | |
| 419 throw new ArgumentError(message); | |
| 420 } | |
| 421 opcode = _WebSocketOpcode.BINARY; | |
| 422 data = message; | |
| 423 } | |
| 424 } else { | |
| 425 opcode = _WebSocketOpcode.TEXT; | |
| 426 } | |
| 427 addFrame(opcode, data); | |
| 428 } | |
| 429 | |
| 430 void addError(Object error, [StackTrace stackTrace]) => | |
| 431 _eventSink.addError(error, stackTrace); | |
| 432 | |
| 433 void close() { | |
| 434 int code = webSocket._outCloseCode; | |
| 435 String reason = webSocket._outCloseReason; | |
| 436 List<int> data; | |
| 437 if (code != null) { | |
| 438 data = new List<int>(); | |
| 439 data.add((code >> 8) & 0xFF); | |
| 440 data.add(code & 0xFF); | |
| 441 if (reason != null) { | |
| 442 data.addAll(UTF8.encode(reason)); | |
| 443 } | |
| 444 } | |
| 445 addFrame(_WebSocketOpcode.CLOSE, data); | |
| 446 _eventSink.close(); | |
| 447 } | |
| 448 | |
| 449 void addFrame(int opcode, List<int> data) => | |
| 450 createFrame(opcode, data, webSocket._serverSide).forEach(_eventSink.add); | |
| 451 | |
| 452 static Iterable createFrame(int opcode, List<int> data, bool serverSide) { | |
| 453 bool mask = !serverSide; // Masking not implemented for server. | |
| 454 int dataLength = data == null ? 0 : data.length; | |
| 455 // Determine the header size. | |
| 456 int headerSize = (mask) ? 6 : 2; | |
| 457 if (dataLength > 65535) { | |
| 458 headerSize += 8; | |
| 459 } else if (dataLength > 125) { | |
| 460 headerSize += 2; | |
| 461 } | |
| 462 Uint8List header = new Uint8List(headerSize); | |
| 463 int index = 0; | |
| 464 // Set FIN and opcode. | |
| 465 header[index++] = 0x80 | opcode; | |
| 466 // Determine size and position of length field. | |
| 467 int lengthBytes = 1; | |
| 468 if (dataLength > 65535) { | |
| 469 header[index++] = 127; | |
| 470 lengthBytes = 8; | |
| 471 } else if (dataLength > 125) { | |
| 472 header[index++] = 126; | |
| 473 lengthBytes = 2; | |
| 474 } | |
| 475 // Write the length in network byte order into the header. | |
| 476 for (int i = 0; i < lengthBytes; i++) { | |
| 477 header[index++] = dataLength >> (((lengthBytes - 1) - i) * 8) & 0xFF; | |
| 478 } | |
| 479 if (mask) { | |
| 480 header[1] |= 1 << 7; | |
| 481 var maskBytes = [_random.nextInt(256), _random.nextInt(256), | |
| 482 _random.nextInt(256), _random.nextInt(256)]; | |
| 483 header.setRange(index, index + 4, maskBytes); | |
| 484 index += 4; | |
| 485 if (data != null) { | |
| 486 Uint8List list; | |
| 487 // If this is a text message just do the masking inside the | |
| 488 // encoded data. | |
| 489 if (opcode == _WebSocketOpcode.TEXT && data is Uint8List) { | |
| 490 list = data; | |
| 491 } else { | |
| 492 if (data is Uint8List) { | |
| 493 list = new Uint8List.fromList(data); | |
| 494 } else { | |
| 495 list = new Uint8List(data.length); | |
| 496 for (int i = 0; i < data.length; i++) { | |
| 497 if (data[i] < 0 || 255 < data[i]) { | |
| 498 throw new ArgumentError( | |
| 499 "List element is not a byte value " | |
| 500 "(value ${data[i]} at index $i)"); | |
| 501 } | |
| 502 list[i] = data[i]; | |
| 503 } | |
| 504 } | |
| 505 } | |
| 506 const int BLOCK_SIZE = 16; | |
| 507 int blockCount = list.length ~/ BLOCK_SIZE; | |
| 508 if (blockCount > 0) { | |
| 509 // Create mask block. | |
| 510 int mask = 0; | |
| 511 for (int i = 3; i >= 0; i--) { | |
| 512 mask = (mask << 8) | maskBytes[i]; | |
| 513 } | |
| 514 Int32x4 blockMask = new Int32x4(mask, mask, mask, mask); | |
| 515 Int32x4List blockBuffer = new Int32x4List.view( | |
| 516 list.buffer, 0, blockCount); | |
| 517 for (int i = 0; i < blockBuffer.length; i++) { | |
| 518 blockBuffer[i] ^= blockMask; | |
| 519 } | |
| 520 } | |
| 521 // Handle end. | |
| 522 for (int i = blockCount * BLOCK_SIZE; i < list.length; i++) { | |
| 523 list[i] ^= maskBytes[i & 3]; | |
| 524 } | |
| 525 data = list; | |
| 526 } | |
| 527 } | |
| 528 assert(index == headerSize); | |
| 529 if (data == null) { | |
| 530 return [header]; | |
| 531 } else { | |
| 532 return [header, data]; | |
| 533 } | |
| 534 } | |
| 535 } | |
| 536 | |
| 537 | |
| 538 class _WebSocketConsumer implements StreamConsumer { | |
| 539 final WebSocketImpl webSocket; | |
| 540 final StreamSink<List<int>> sink; | |
| 541 StreamController _controller; | |
| 542 StreamSubscription _subscription; | |
| 543 bool _issuedPause = false; | |
| 544 bool _closed = false; | |
| 545 Completer _closeCompleter = new Completer(); | |
| 546 Completer _completer; | |
| 547 | |
| 548 _WebSocketConsumer(this.webSocket, this.sink); | |
| 549 | |
| 550 void _onListen() { | |
| 551 if (_subscription != null) { | |
| 552 _subscription.cancel(); | |
| 553 } | |
| 554 } | |
| 555 | |
| 556 void _onPause() { | |
| 557 if (_subscription != null) { | |
| 558 _subscription.pause(); | |
| 559 } else { | |
| 560 _issuedPause = true; | |
| 561 } | |
| 562 } | |
| 563 | |
| 564 void _onResume() { | |
| 565 if (_subscription != null) { | |
| 566 _subscription.resume(); | |
| 567 } else { | |
| 568 _issuedPause = false; | |
| 569 } | |
| 570 } | |
| 571 | |
| 572 void _cancel() { | |
| 573 if (_subscription != null) { | |
| 574 var subscription = _subscription; | |
| 575 _subscription = null; | |
| 576 subscription.cancel(); | |
| 577 } | |
| 578 } | |
| 579 | |
| 580 _ensureController() { | |
| 581 if (_controller != null) return; | |
| 582 _controller = new StreamController(sync: true, | |
| 583 onPause: _onPause, | |
| 584 onResume: _onResume, | |
| 585 onCancel: _onListen); | |
| 586 var stream = _controller.stream.transform( | |
| 587 new _WebSocketOutgoingTransformer(webSocket)); | |
| 588 sink.addStream(stream) | |
| 589 .then((_) { | |
| 590 _done(); | |
| 591 _closeCompleter.complete(webSocket); | |
| 592 }, onError: (error, StackTrace stackTrace) { | |
| 593 _closed = true; | |
| 594 _cancel(); | |
| 595 if (error is ArgumentError) { | |
| 596 if (!_done(error, stackTrace)) { | |
| 597 _closeCompleter.completeError(error, stackTrace); | |
| 598 } | |
| 599 } else { | |
| 600 _done(); | |
| 601 _closeCompleter.complete(webSocket); | |
| 602 } | |
| 603 }); | |
| 604 } | |
| 605 | |
| 606 bool _done([error, StackTrace stackTrace]) { | |
| 607 if (_completer == null) return false; | |
| 608 if (error != null) { | |
| 609 _completer.completeError(error, stackTrace); | |
| 610 } else { | |
| 611 _completer.complete(webSocket); | |
| 612 } | |
| 613 _completer = null; | |
| 614 return true; | |
| 615 } | |
| 616 | |
| 617 Future addStream(var stream) { | |
| 618 if (_closed) { | |
| 619 stream.listen(null).cancel(); | |
| 620 return new Future.value(webSocket); | |
| 621 } | |
| 622 _ensureController(); | |
| 623 _completer = new Completer(); | |
| 624 _subscription = stream.listen( | |
| 625 (data) { | |
| 626 _controller.add(data); | |
| 627 }, | |
| 628 onDone: _done, | |
| 629 onError: _done, | |
| 630 cancelOnError: true); | |
| 631 if (_issuedPause) { | |
| 632 _subscription.pause(); | |
| 633 _issuedPause = false; | |
| 634 } | |
| 635 return _completer.future; | |
| 636 } | |
| 637 | |
| 638 Future close() { | |
| 639 _ensureController(); | |
| 640 Future closeSocket() { | |
| 641 return sink.close().catchError((_) {}).then((_) => webSocket); | |
| 642 } | |
| 643 _controller.close(); | |
| 644 return _closeCompleter.future.then((_) => closeSocket()); | |
| 645 } | |
| 646 | |
| 647 void add(data) { | |
| 648 if (_closed) return; | |
| 649 _ensureController(); | |
| 650 _controller.add(data); | |
| 651 } | |
| 652 | |
| 653 void closeSocket() { | |
| 654 _closed = true; | |
| 655 _cancel(); | |
| 656 close(); | |
| 657 } | |
| 658 } | |
| 659 | |
| 660 | |
| 661 class WebSocketImpl extends Stream with _ServiceObject | |
| 662 implements CompatibleWebSocket { | |
| 663 // Use default Map so we keep order. | |
| 664 static Map<int, WebSocketImpl> _webSockets = new Map<int, WebSocketImpl>(); | |
| 665 | |
| 666 final String protocol; | |
| 667 | |
| 668 StreamController _controller; | |
| 669 StreamSubscription _subscription; | |
| 670 StreamSink _sink; | |
| 671 | |
| 672 final bool _serverSide; | |
| 673 int _readyState = WebSocket.CONNECTING; | |
| 674 bool _writeClosed = false; | |
| 675 int _closeCode; | |
| 676 String _closeReason; | |
| 677 Duration _pingInterval; | |
| 678 Timer _pingTimer; | |
| 679 _WebSocketConsumer _consumer; | |
| 680 | |
| 681 int _outCloseCode; | |
| 682 String _outCloseReason; | |
| 683 Timer _closeTimer; | |
| 684 | |
| 685 WebSocketImpl.fromSocket(Stream<List<int>> stream, | |
| 686 StreamSink<List<int>> sink, this.protocol, [this._serverSide = false]) { | |
| 687 _consumer = new _WebSocketConsumer(this, sink); | |
| 688 _sink = new StreamSinkImpl(_consumer); | |
| 689 _readyState = WebSocket.OPEN; | |
| 690 | |
| 691 var transformer = new _WebSocketProtocolTransformer(_serverSide); | |
| 692 _subscription = stream.transform(transformer).listen( | |
| 693 (data) { | |
| 694 if (data is _WebSocketPing) { | |
| 695 if (!_writeClosed) _consumer.add(new _WebSocketPong(data.payload)); | |
| 696 } else if (data is _WebSocketPong) { | |
| 697 // Simply set pingInterval, as it'll cancel any timers. | |
| 698 pingInterval = _pingInterval; | |
| 699 } else { | |
| 700 _controller.add(data); | |
| 701 } | |
| 702 }, | |
| 703 onError: (error) { | |
| 704 if (_closeTimer != null) _closeTimer.cancel(); | |
| 705 if (error is FormatException) { | |
| 706 _close(WebSocketStatus.INVALID_FRAME_PAYLOAD_DATA); | |
| 707 } else { | |
| 708 _close(WebSocketStatus.PROTOCOL_ERROR); | |
| 709 } | |
| 710 // An error happened, set the close code set above. | |
| 711 _closeCode = _outCloseCode; | |
| 712 _closeReason = _outCloseReason; | |
| 713 _controller.close(); | |
| 714 }, | |
| 715 onDone: () { | |
| 716 if (_closeTimer != null) _closeTimer.cancel(); | |
| 717 if (_readyState == WebSocket.OPEN) { | |
| 718 _readyState = WebSocket.CLOSING; | |
| 719 if (!_isReservedStatusCode(transformer.closeCode)) { | |
| 720 _close(transformer.closeCode); | |
| 721 } else { | |
| 722 _close(); | |
| 723 } | |
| 724 _readyState = WebSocket.CLOSED; | |
| 725 } | |
| 726 // Protocol close, use close code from transformer. | |
| 727 _closeCode = transformer.closeCode; | |
| 728 _closeReason = transformer.closeReason; | |
| 729 _controller.close(); | |
| 730 }, | |
| 731 cancelOnError: true); | |
| 732 _subscription.pause(); | |
| 733 _controller = new StreamController(sync: true, | |
| 734 onListen: () => _subscription.resume(), | |
| 735 onCancel: () { | |
| 736 _subscription.cancel(); | |
| 737 _subscription = null; | |
| 738 }, | |
| 739 onPause: _subscription.pause, | |
| 740 onResume: _subscription.resume); | |
| 741 | |
| 742 _webSockets[_serviceId] = this; | |
| 743 } | |
| 744 | |
| 745 StreamSubscription listen(void onData(message), | |
| 746 {Function onError, | |
| 747 void onDone(), | |
| 748 bool cancelOnError}) { | |
| 749 return _controller.stream.listen(onData, | |
| 750 onError: onError, | |
| 751 onDone: onDone, | |
| 752 cancelOnError: cancelOnError); | |
| 753 } | |
| 754 | |
| 755 Duration get pingInterval => _pingInterval; | |
| 756 | |
| 757 void set pingInterval(Duration interval) { | |
| 758 if (_writeClosed) return; | |
| 759 if (_pingTimer != null) _pingTimer.cancel(); | |
| 760 _pingInterval = interval; | |
| 761 | |
| 762 if (_pingInterval == null) return; | |
| 763 | |
| 764 _pingTimer = new Timer(_pingInterval, () { | |
| 765 if (_writeClosed) return; | |
| 766 _consumer.add(new _WebSocketPing()); | |
| 767 _pingTimer = new Timer(_pingInterval, () { | |
| 768 // No pong received. | |
| 769 _close(WebSocketStatus.GOING_AWAY); | |
| 770 }); | |
| 771 }); | |
| 772 } | |
| 773 | |
| 774 int get readyState => _readyState; | |
| 775 | |
| 776 String get extensions => null; | |
| 777 int get closeCode => _closeCode; | |
| 778 String get closeReason => _closeReason; | |
| 779 | |
| 780 void add(data) => _sink.add(data); | |
| 781 void addError(error, [StackTrace stackTrace]) => | |
| 782 _sink.addError(error, stackTrace); | |
| 783 Future addStream(Stream stream) => _sink.addStream(stream); | |
| 784 Future get done => _sink.done; | |
| 785 | |
| 786 Future close([int code, String reason]) { | |
| 787 if (_isReservedStatusCode(code)) { | |
| 788 throw new CompatibleWebSocketException("Reserved status code $code"); | |
| 789 } | |
| 790 if (_outCloseCode == null) { | |
| 791 _outCloseCode = code; | |
| 792 _outCloseReason = reason; | |
| 793 } | |
| 794 if (!_controller.isClosed) { | |
| 795 // If a close has not yet been received from the other end then | |
| 796 // 1) make sure to listen on the stream so the close frame will be | |
| 797 // processed if received. | |
| 798 // 2) set a timer terminate the connection if a close frame is | |
| 799 // not received. | |
| 800 if (!_controller.hasListener && _subscription != null) { | |
| 801 _controller.stream.drain().catchError((_) => {}); | |
| 802 } | |
| 803 if (_closeTimer == null) { | |
| 804 // When closing the web-socket, we no longer accept data. | |
| 805 _closeTimer = new Timer(const Duration(seconds: 5), () { | |
| 806 // Reuse code and reason from the local close. | |
| 807 _closeCode = _outCloseCode; | |
| 808 _closeReason = _outCloseReason; | |
| 809 if (_subscription != null) _subscription.cancel(); | |
| 810 _controller.close(); | |
| 811 _webSockets.remove(_serviceId); | |
| 812 }); | |
| 813 } | |
| 814 } | |
| 815 return _sink.close(); | |
| 816 } | |
| 817 | |
| 818 void _close([int code, String reason]) { | |
| 819 if (_writeClosed) return; | |
| 820 if (_outCloseCode == null) { | |
| 821 _outCloseCode = code; | |
| 822 _outCloseReason = reason; | |
| 823 } | |
| 824 _writeClosed = true; | |
| 825 _consumer.closeSocket(); | |
| 826 _webSockets.remove(_serviceId); | |
| 827 } | |
| 828 | |
| 829 // The _toJSON, _serviceTypePath, and _serviceTypeName methods | |
| 830 // have been deleted for http_parser. The methods were unused in WebSocket | |
| 831 // code and produced warnings. | |
| 832 | |
| 833 static bool _isReservedStatusCode(int code) { | |
| 834 return code != null && | |
| 835 (code < WebSocketStatus.NORMAL_CLOSURE || | |
| 836 code == WebSocketStatus.RESERVED_1004 || | |
| 837 code == WebSocketStatus.NO_STATUS_RECEIVED || | |
| 838 code == WebSocketStatus.ABNORMAL_CLOSURE || | |
| 839 (code > WebSocketStatus.INTERNAL_SERVER_ERROR && | |
| 840 code < WebSocketStatus.RESERVED_1015) || | |
| 841 (code >= WebSocketStatus.RESERVED_1015 && | |
| 842 code < 3000)); | |
| 843 } | |
| 844 } | |
| 845 | |
| 846 // The following code is from sdk/lib/io/service_object.dart. | |
| 847 | |
| 848 int _nextServiceId = 1; | |
| 849 | |
| 850 // TODO(ajohnsen): Use other way of getting a uniq id. | |
| 851 abstract class _ServiceObject { | |
| 852 int __serviceId = 0; | |
| 853 int get _serviceId { | |
| 854 if (__serviceId == 0) __serviceId = _nextServiceId++; | |
| 855 return __serviceId; | |
| 856 } | |
| 857 | |
| 858 // The _toJSON, _servicePath, _serviceTypePath, _serviceTypeName, and | |
| 859 // _serviceType methods have been deleted for http_parser. The methods were | |
| 860 // unused in WebSocket code and produced warnings. | |
| 861 } | |
| OLD | NEW |