OLD | NEW |
---|---|
1 // Copyright (c) 2014, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2014, the Dart project authors. Please see the AUTHORS file |
nweiz
2015/03/30 22:23:47
I would prefer we not format this file differently
kevmoo
2015/03/30 22:42:36
Done.
| |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 library http_parser.web_socket; | 5 library http_parser.web_socket; |
6 | 6 |
7 import 'dart:async'; | 7 import 'dart:async'; |
8 import 'dart:convert'; | 8 import 'dart:convert'; |
9 import 'dart:math'; | 9 import 'dart:math'; |
10 import 'dart:typed_data'; | 10 import 'dart:typed_data'; |
11 | 11 |
(...skipping 61 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
73 /// | 73 /// |
74 /// If [stream] is also a [StreamSink] (for example, if it's a "dart:io" | 74 /// If [stream] is also a [StreamSink] (for example, if it's a "dart:io" |
75 /// `Socket`), it will be used for both sending and receiving data. Otherwise, | 75 /// `Socket`), it will be used for both sending and receiving data. Otherwise, |
76 /// it will be used for receiving data and [sink] will be used for sending it. | 76 /// it will be used for receiving data and [sink] will be used for sending it. |
77 /// | 77 /// |
78 /// If this is a WebSocket server, [serverSide] should be `true` (the | 78 /// If this is a WebSocket server, [serverSide] should be `true` (the |
79 /// default); if it's a client, [serverSide] should be `false`. | 79 /// default); if it's a client, [serverSide] should be `false`. |
80 /// | 80 /// |
81 /// [WebSocket handshake]: https://tools.ietf.org/html/rfc6455#section-4 | 81 /// [WebSocket handshake]: https://tools.ietf.org/html/rfc6455#section-4 |
82 factory CompatibleWebSocket(Stream<List<int>> stream, | 82 factory CompatibleWebSocket(Stream<List<int>> stream, |
83 {StreamSink<List<int>> sink, bool serverSide: true}) { | 83 {StreamSink<List<int>> sink, bool serverSide: true}) { |
84 if (sink == null) { | 84 if (sink == null) { |
85 if (stream is! StreamSink) { | 85 if (stream is! StreamSink) { |
86 throw new ArgumentError("If stream isn't also a StreamSink, sink must " | 86 throw new ArgumentError("If stream isn't also a StreamSink, sink must " |
87 "be passed explicitly."); | 87 "be passed explicitly."); |
88 } | 88 } |
89 sink = stream as StreamSink; | 89 sink = stream as StreamSink; |
90 } | 90 } |
91 | 91 |
92 return new _WebSocketImpl._fromSocket(stream, sink, serverSide); | 92 return new _WebSocketImpl._fromSocket(stream, sink, serverSide); |
93 } | 93 } |
94 | 94 |
95 /// Closes the web socket connection. | 95 /// Closes the web socket connection. |
96 /// | 96 /// |
97 /// [closeCode] and [closeReason] are the [close code][] and [reason][] sent | 97 /// [closeCode] and [closeReason] are the [close code][] and [reason][] sent |
98 /// to the remote peer, respectively. If they are omitted, the peer will see | 98 /// to the remote peer, respectively. If they are omitted, the peer will see |
99 /// a "no status received" code with no reason. | 99 /// a "no status received" code with no reason. |
100 /// | 100 /// |
101 /// [close code]: https://tools.ietf.org/html/rfc6455#section-7.1.5 | 101 /// [close code]: https://tools.ietf.org/html/rfc6455#section-7.1.5 |
102 /// [reason]: https://tools.ietf.org/html/rfc6455#section-7.1.6 | 102 /// [reason]: https://tools.ietf.org/html/rfc6455#section-7.1.6 |
103 Future close([int closeCode, String closeReason]); | 103 Future close([int closeCode, String closeReason]); |
104 } | 104 } |
105 | 105 |
106 /// An exception thrown by [CompatibleWebSocket]. | 106 /// An exception thrown by [CompatibleWebSocket]. |
107 class CompatibleWebSocketException implements Exception { | 107 class CompatibleWebSocketException implements Exception { |
108 final String message; | 108 final String message; |
109 | 109 |
110 CompatibleWebSocketException([this.message]); | 110 CompatibleWebSocketException([this.message]); |
111 | 111 |
112 String toString() => message == null | 112 String toString() => message == null |
113 ? "CompatibleWebSocketException" : | 113 ? "CompatibleWebSocketException" |
114 "CompatibleWebSocketException: $message"; | 114 : "CompatibleWebSocketException: $message"; |
115 } | 115 } |
116 | 116 |
117 // The following code is copied from sdk/lib/io/websocket_impl.dart. The | 117 // The following code is copied from sdk/lib/io/websocket_impl.dart. The |
118 // "dart:io" implementation isn't used directly both to support non-"dart:io" | 118 // "dart:io" implementation isn't used directly both to support non-"dart:io" |
119 // applications, and because it's incompatible with non-"dart:io" HTTP requests | 119 // applications, and because it's incompatible with non-"dart:io" HTTP requests |
120 // (issue 18172). | 120 // (issue 18172). |
121 // | 121 // |
122 // Because it's copied directly, only modifications necessary to support the | 122 // Because it's copied directly, only modifications necessary to support the |
123 // desired public API and to remove "dart:io" dependencies have been made. | 123 // desired public API and to remove "dart:io" dependencies have been made. |
124 | 124 |
125 /** | 125 /** |
126 * Web socket status codes used when closing a web socket connection. | 126 * Web socket status codes used when closing a web socket connection. |
127 */ | 127 */ |
128 abstract class _WebSocketStatus { | 128 abstract class _WebSocketStatus { |
129 static const int NORMAL_CLOSURE = 1000; | 129 static const int NORMAL_CLOSURE = 1000; |
130 static const int GOING_AWAY = 1001; | 130 static const int GOING_AWAY = 1001; |
131 static const int PROTOCOL_ERROR = 1002; | 131 static const int PROTOCOL_ERROR = 1002; |
132 static const int UNSUPPORTED_DATA = 1003; | 132 static const int UNSUPPORTED_DATA = 1003; |
133 static const int RESERVED_1004 = 1004; | 133 static const int RESERVED_1004 = 1004; |
134 static const int NO_STATUS_RECEIVED = 1005; | 134 static const int NO_STATUS_RECEIVED = 1005; |
135 static const int ABNORMAL_CLOSURE = 1006; | 135 static const int ABNORMAL_CLOSURE = 1006; |
136 static const int INVALID_FRAME_PAYLOAD_DATA = 1007; | 136 static const int INVALID_FRAME_PAYLOAD_DATA = 1007; |
137 static const int POLICY_VIOLATION = 1008; | 137 static const int POLICY_VIOLATION = 1008; |
138 static const int MESSAGE_TOO_BIG = 1009; | 138 static const int MESSAGE_TOO_BIG = 1009; |
139 static const int MISSING_MANDATORY_EXTENSION = 1010; | 139 static const int MISSING_MANDATORY_EXTENSION = 1010; |
140 static const int INTERNAL_SERVER_ERROR = 1011; | 140 static const int INTERNAL_SERVER_ERROR = 1011; |
141 static const int RESERVED_1015 = 1015; | 141 static const int RESERVED_1015 = 1015; |
142 } | 142 } |
143 | 143 |
144 abstract class _WebSocketState { | 144 abstract class _WebSocketState { |
145 static const int CONNECTING = 0; | 145 static const int CONNECTING = 0; |
146 static const int OPEN = 1; | 146 static const int OPEN = 1; |
147 static const int CLOSING = 2; | 147 static const int CLOSING = 2; |
148 static const int CLOSED = 3; | 148 static const int CLOSED = 3; |
149 } | 149 } |
150 | 150 |
151 const String _webSocketGUID = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"; | 151 const String _webSocketGUID = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"; |
152 | 152 |
153 final _random = new Random(); | 153 final _random = new Random(); |
154 | 154 |
155 // Matches _WebSocketOpcode. | 155 // Matches _WebSocketOpcode. |
156 class _WebSocketMessageType { | 156 class _WebSocketMessageType { |
157 static const int NONE = 0; | 157 static const int NONE = 0; |
158 static const int TEXT = 1; | 158 static const int TEXT = 1; |
159 static const int BINARY = 2; | 159 static const int BINARY = 2; |
160 } | 160 } |
161 | 161 |
162 | |
163 class _WebSocketOpcode { | 162 class _WebSocketOpcode { |
164 static const int CONTINUATION = 0; | 163 static const int CONTINUATION = 0; |
165 static const int TEXT = 1; | 164 static const int TEXT = 1; |
166 static const int BINARY = 2; | 165 static const int BINARY = 2; |
167 static const int RESERVED_3 = 3; | 166 static const int RESERVED_3 = 3; |
168 static const int RESERVED_4 = 4; | 167 static const int RESERVED_4 = 4; |
169 static const int RESERVED_5 = 5; | 168 static const int RESERVED_5 = 5; |
170 static const int RESERVED_6 = 6; | 169 static const int RESERVED_6 = 6; |
171 static const int RESERVED_7 = 7; | 170 static const int RESERVED_7 = 7; |
172 static const int CLOSE = 8; | 171 static const int CLOSE = 8; |
(...skipping 40 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
213 | 212 |
214 EventSink _eventSink; | 213 EventSink _eventSink; |
215 | 214 |
216 final bool _serverSide; | 215 final bool _serverSide; |
217 final List _maskingBytes = new List(4); | 216 final List _maskingBytes = new List(4); |
218 final BytesBuilder _payload = new BytesBuilder(copy: false); | 217 final BytesBuilder _payload = new BytesBuilder(copy: false); |
219 | 218 |
220 _WebSocketProtocolTransformer([this._serverSide = false]); | 219 _WebSocketProtocolTransformer([this._serverSide = false]); |
221 | 220 |
222 Stream bind(Stream stream) { | 221 Stream bind(Stream stream) { |
223 return new Stream.eventTransformed( | 222 return new Stream.eventTransformed(stream, (EventSink eventSink) { |
224 stream, | 223 if (_eventSink != null) { |
225 (EventSink eventSink) { | 224 throw new StateError("WebSocket transformer already used."); |
226 if (_eventSink != null) { | 225 } |
227 throw new StateError("WebSocket transformer already used."); | 226 _eventSink = eventSink; |
228 } | 227 return this; |
229 _eventSink = eventSink; | 228 }); |
230 return this; | |
231 }); | |
232 } | 229 } |
233 | 230 |
234 void addError(Object error, [StackTrace stackTrace]) => | 231 void addError(Object error, [StackTrace stackTrace]) => |
235 _eventSink.addError(error, stackTrace); | 232 _eventSink.addError(error, stackTrace); |
236 | 233 |
237 void close() => _eventSink.close(); | 234 void close() => _eventSink.close(); |
238 | 235 |
239 /** | 236 /** |
240 * Process data received from the underlying communication channel. | 237 * Process data received from the underlying communication channel. |
241 */ | 238 */ |
(...skipping 17 matching lines...) Expand all Loading... | |
259 throw new CompatibleWebSocketException("Protocol error"); | 256 throw new CompatibleWebSocketException("Protocol error"); |
260 } | 257 } |
261 _opcode = (byte & 0xF); | 258 _opcode = (byte & 0xF); |
262 if (_opcode <= _WebSocketOpcode.BINARY) { | 259 if (_opcode <= _WebSocketOpcode.BINARY) { |
263 if (_opcode == _WebSocketOpcode.CONTINUATION) { | 260 if (_opcode == _WebSocketOpcode.CONTINUATION) { |
264 if (_currentMessageType == _WebSocketMessageType.NONE) { | 261 if (_currentMessageType == _WebSocketMessageType.NONE) { |
265 throw new CompatibleWebSocketException("Protocol error"); | 262 throw new CompatibleWebSocketException("Protocol error"); |
266 } | 263 } |
267 } else { | 264 } else { |
268 assert(_opcode == _WebSocketOpcode.TEXT || | 265 assert(_opcode == _WebSocketOpcode.TEXT || |
269 _opcode == _WebSocketOpcode.BINARY); | 266 _opcode == _WebSocketOpcode.BINARY); |
270 if (_currentMessageType != _WebSocketMessageType.NONE) { | 267 if (_currentMessageType != _WebSocketMessageType.NONE) { |
271 throw new CompatibleWebSocketException("Protocol error"); | 268 throw new CompatibleWebSocketException("Protocol error"); |
272 } | 269 } |
273 _currentMessageType = _opcode; | 270 _currentMessageType = _opcode; |
274 } | 271 } |
275 } else if (_opcode >= _WebSocketOpcode.CLOSE && | 272 } else if (_opcode >= _WebSocketOpcode.CLOSE && |
276 _opcode <= _WebSocketOpcode.PONG) { | 273 _opcode <= _WebSocketOpcode.PONG) { |
277 // Control frames cannot be fragmented. | 274 // Control frames cannot be fragmented. |
278 if (!_fin) throw new CompatibleWebSocketException("Protocol error"); | 275 if (!_fin) throw new CompatibleWebSocketException("Protocol error"); |
279 } else { | 276 } else { |
280 throw new CompatibleWebSocketException("Protocol error"); | 277 throw new CompatibleWebSocketException("Protocol error"); |
281 } | 278 } |
282 _state = LEN_FIRST; | 279 _state = LEN_FIRST; |
283 } else if (_state == LEN_FIRST) { | 280 } else if (_state == LEN_FIRST) { |
284 _masked = (byte & 0x80) != 0; | 281 _masked = (byte & 0x80) != 0; |
285 _len = byte & 0x7F; | 282 _len = byte & 0x7F; |
286 if (_isControlFrame() && _len > 125) { | 283 if (_isControlFrame() && _len > 125) { |
(...skipping 28 matching lines...) Expand all Loading... | |
315 } else { | 312 } else { |
316 assert(_state == PAYLOAD); | 313 assert(_state == PAYLOAD); |
317 // The payload is not handled one byte at a time but in blocks. | 314 // The payload is not handled one byte at a time but in blocks. |
318 int payloadLength = min(lastIndex - index, _remainingPayloadBytes); | 315 int payloadLength = min(lastIndex - index, _remainingPayloadBytes); |
319 _remainingPayloadBytes -= payloadLength; | 316 _remainingPayloadBytes -= payloadLength; |
320 // Unmask payload if masked. | 317 // Unmask payload if masked. |
321 if (_masked) { | 318 if (_masked) { |
322 _unmask(index, payloadLength, buffer); | 319 _unmask(index, payloadLength, buffer); |
323 } | 320 } |
324 // Control frame and data frame share _payloads. | 321 // Control frame and data frame share _payloads. |
325 _payload.add( | 322 _payload.add(new Uint8List.view(buffer.buffer, index, payloadLength)); |
326 new Uint8List.view(buffer.buffer, index, payloadLength)); | |
327 index += payloadLength; | 323 index += payloadLength; |
328 if (_isControlFrame()) { | 324 if (_isControlFrame()) { |
329 if (_remainingPayloadBytes == 0) _controlFrameEnd(); | 325 if (_remainingPayloadBytes == 0) _controlFrameEnd(); |
330 } else { | 326 } else { |
331 if (_currentMessageType != _WebSocketMessageType.TEXT && | 327 if (_currentMessageType != _WebSocketMessageType.TEXT && |
332 _currentMessageType != _WebSocketMessageType.BINARY) { | 328 _currentMessageType != _WebSocketMessageType.BINARY) { |
333 throw new CompatibleWebSocketException("Protocol error"); | 329 throw new CompatibleWebSocketException("Protocol error"); |
334 } | 330 } |
335 if (_remainingPayloadBytes == 0) _messageFrameEnd(); | 331 if (_remainingPayloadBytes == 0) _messageFrameEnd(); |
336 } | 332 } |
337 | 333 |
338 // Hack - as we always do index++ below. | 334 // Hack - as we always do index++ below. |
339 index--; | 335 index--; |
340 } | 336 } |
341 } | 337 } |
342 | 338 |
343 // Move to the next byte. | 339 // Move to the next byte. |
(...skipping 14 matching lines...) Expand all Loading... | |
358 index += startOffset; | 354 index += startOffset; |
359 length -= startOffset; | 355 length -= startOffset; |
360 final int blockCount = length ~/ BLOCK_SIZE; | 356 final int blockCount = length ~/ BLOCK_SIZE; |
361 if (blockCount > 0) { | 357 if (blockCount > 0) { |
362 // Create mask block. | 358 // Create mask block. |
363 int mask = 0; | 359 int mask = 0; |
364 for (int i = 3; i >= 0; i--) { | 360 for (int i = 3; i >= 0; i--) { |
365 mask = (mask << 8) | _maskingBytes[(_unmaskingIndex + i) & 3]; | 361 mask = (mask << 8) | _maskingBytes[(_unmaskingIndex + i) & 3]; |
366 } | 362 } |
367 Int32x4 blockMask = new Int32x4(mask, mask, mask, mask); | 363 Int32x4 blockMask = new Int32x4(mask, mask, mask, mask); |
368 Int32x4List blockBuffer = new Int32x4List.view( | 364 Int32x4List blockBuffer = |
369 buffer.buffer, index, blockCount); | 365 new Int32x4List.view(buffer.buffer, index, blockCount); |
370 for (int i = 0; i < blockBuffer.length; i++) { | 366 for (int i = 0; i < blockBuffer.length; i++) { |
371 blockBuffer[i] ^= blockMask; | 367 blockBuffer[i] ^= blockMask; |
372 } | 368 } |
373 final int bytes = blockCount * BLOCK_SIZE; | 369 final int bytes = blockCount * BLOCK_SIZE; |
374 index += bytes; | 370 index += bytes; |
375 length -= bytes; | 371 length -= bytes; |
376 } | 372 } |
377 } | 373 } |
378 // Handle end. | 374 // Handle end. |
379 final int end = index + length; | 375 final int end = index + length; |
(...skipping 92 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
472 | 468 |
473 case _WebSocketOpcode.PONG: | 469 case _WebSocketOpcode.PONG: |
474 _eventSink.add(new _WebSocketPong(_payload.takeBytes())); | 470 _eventSink.add(new _WebSocketPong(_payload.takeBytes())); |
475 break; | 471 break; |
476 } | 472 } |
477 _prepareForNextFrame(); | 473 _prepareForNextFrame(); |
478 } | 474 } |
479 | 475 |
480 bool _isControlFrame() { | 476 bool _isControlFrame() { |
481 return _opcode == _WebSocketOpcode.CLOSE || | 477 return _opcode == _WebSocketOpcode.CLOSE || |
482 _opcode == _WebSocketOpcode.PING || | 478 _opcode == _WebSocketOpcode.PING || |
483 _opcode == _WebSocketOpcode.PONG; | 479 _opcode == _WebSocketOpcode.PONG; |
484 } | 480 } |
485 | 481 |
486 void _prepareForNextFrame() { | 482 void _prepareForNextFrame() { |
487 if (_state != CLOSED && _state != FAILURE) _state = START; | 483 if (_state != CLOSED && _state != FAILURE) _state = START; |
488 _fin = false; | 484 _fin = false; |
489 _opcode = -1; | 485 _opcode = -1; |
490 _len = -1; | 486 _len = -1; |
491 _remainingLenBytes = -1; | 487 _remainingLenBytes = -1; |
492 _remainingMaskingKeyBytes = 4; | 488 _remainingMaskingKeyBytes = 4; |
493 _remainingPayloadBytes = -1; | 489 _remainingPayloadBytes = -1; |
494 _unmaskingIndex = 0; | 490 _unmaskingIndex = 0; |
495 } | 491 } |
496 } | 492 } |
497 | 493 |
498 | |
499 class _WebSocketPing { | 494 class _WebSocketPing { |
500 final List<int> payload; | 495 final List<int> payload; |
501 _WebSocketPing([this.payload = null]); | 496 _WebSocketPing([this.payload = null]); |
502 } | 497 } |
503 | 498 |
504 | |
505 class _WebSocketPong { | 499 class _WebSocketPong { |
506 final List<int> payload; | 500 final List<int> payload; |
507 _WebSocketPong([this.payload = null]); | 501 _WebSocketPong([this.payload = null]); |
508 } | 502 } |
509 | 503 |
510 // TODO(ajohnsen): Make this transformer reusable. | 504 // TODO(ajohnsen): Make this transformer reusable. |
511 class _WebSocketOutgoingTransformer implements StreamTransformer, EventSink { | 505 class _WebSocketOutgoingTransformer implements StreamTransformer, EventSink { |
512 final _WebSocketImpl webSocket; | 506 final _WebSocketImpl webSocket; |
513 EventSink _eventSink; | 507 EventSink _eventSink; |
514 | 508 |
515 _WebSocketOutgoingTransformer(this.webSocket); | 509 _WebSocketOutgoingTransformer(this.webSocket); |
516 | 510 |
517 Stream bind(Stream stream) { | 511 Stream bind(Stream stream) { |
518 return new Stream.eventTransformed( | 512 return new Stream.eventTransformed(stream, (EventSink eventSink) { |
519 stream, | 513 if (_eventSink != null) { |
520 (EventSink eventSink) { | 514 throw new StateError("WebSocket transformer already used"); |
521 if (_eventSink != null) { | 515 } |
522 throw new StateError("WebSocket transformer already used"); | 516 _eventSink = eventSink; |
523 } | 517 return this; |
524 _eventSink = eventSink; | 518 }); |
525 return this; | |
526 }); | |
527 } | 519 } |
528 | 520 |
529 void add(message) { | 521 void add(message) { |
530 if (message is _WebSocketPong) { | 522 if (message is _WebSocketPong) { |
531 addFrame(_WebSocketOpcode.PONG, message.payload); | 523 addFrame(_WebSocketOpcode.PONG, message.payload); |
532 return; | 524 return; |
533 } | 525 } |
534 if (message is _WebSocketPing) { | 526 if (message is _WebSocketPing) { |
535 addFrame(_WebSocketOpcode.PING, message.payload); | 527 addFrame(_WebSocketOpcode.PING, message.payload); |
536 return; | 528 return; |
537 } | 529 } |
538 List<int> data; | 530 List<int> data; |
539 int opcode; | 531 int opcode; |
540 if (message != null) { | 532 if (message != null) { |
541 if (message is String) { | 533 if (message is String) { |
542 opcode = _WebSocketOpcode.TEXT; | 534 opcode = _WebSocketOpcode.TEXT; |
543 data = UTF8.encode(message); | 535 data = UTF8.encode(message); |
544 } else { | 536 } else { |
545 if (message is !List<int>) { | 537 if (message is! List<int>) { |
546 throw new ArgumentError(message); | 538 throw new ArgumentError(message); |
547 } | 539 } |
548 opcode = _WebSocketOpcode.BINARY; | 540 opcode = _WebSocketOpcode.BINARY; |
549 data = message; | 541 data = message; |
550 } | 542 } |
551 } else { | 543 } else { |
552 opcode = _WebSocketOpcode.TEXT; | 544 opcode = _WebSocketOpcode.TEXT; |
553 } | 545 } |
554 addFrame(opcode, data); | 546 addFrame(opcode, data); |
555 } | 547 } |
(...skipping 14 matching lines...) Expand all Loading... | |
570 } | 562 } |
571 } | 563 } |
572 addFrame(_WebSocketOpcode.CLOSE, data); | 564 addFrame(_WebSocketOpcode.CLOSE, data); |
573 _eventSink.close(); | 565 _eventSink.close(); |
574 } | 566 } |
575 | 567 |
576 void addFrame(int opcode, List<int> data) => | 568 void addFrame(int opcode, List<int> data) => |
577 createFrame(opcode, data, webSocket._serverSide).forEach(_eventSink.add); | 569 createFrame(opcode, data, webSocket._serverSide).forEach(_eventSink.add); |
578 | 570 |
579 static Iterable createFrame(int opcode, List<int> data, bool serverSide) { | 571 static Iterable createFrame(int opcode, List<int> data, bool serverSide) { |
580 bool mask = !serverSide; // Masking not implemented for server. | 572 bool mask = !serverSide; // Masking not implemented for server. |
581 int dataLength = data == null ? 0 : data.length; | 573 int dataLength = data == null ? 0 : data.length; |
582 // Determine the header size. | 574 // Determine the header size. |
583 int headerSize = (mask) ? 6 : 2; | 575 int headerSize = (mask) ? 6 : 2; |
584 if (dataLength > 65535) { | 576 if (dataLength > 65535) { |
585 headerSize += 8; | 577 headerSize += 8; |
586 } else if (dataLength > 125) { | 578 } else if (dataLength > 125) { |
587 headerSize += 2; | 579 headerSize += 2; |
588 } | 580 } |
589 Uint8List header = new Uint8List(headerSize); | 581 Uint8List header = new Uint8List(headerSize); |
590 int index = 0; | 582 int index = 0; |
591 // Set FIN and opcode. | 583 // Set FIN and opcode. |
592 header[index++] = 0x80 | opcode; | 584 header[index++] = 0x80 | opcode; |
593 // Determine size and position of length field. | 585 // Determine size and position of length field. |
594 int lengthBytes = 1; | 586 int lengthBytes = 1; |
595 if (dataLength > 65535) { | 587 if (dataLength > 65535) { |
596 header[index++] = 127; | 588 header[index++] = 127; |
597 lengthBytes = 8; | 589 lengthBytes = 8; |
598 } else if (dataLength > 125) { | 590 } else if (dataLength > 125) { |
599 header[index++] = 126; | 591 header[index++] = 126; |
600 lengthBytes = 2; | 592 lengthBytes = 2; |
601 } | 593 } |
602 // Write the length in network byte order into the header. | 594 // Write the length in network byte order into the header. |
603 for (int i = 0; i < lengthBytes; i++) { | 595 for (int i = 0; i < lengthBytes; i++) { |
604 header[index++] = dataLength >> (((lengthBytes - 1) - i) * 8) & 0xFF; | 596 header[index++] = dataLength >> (((lengthBytes - 1) - i) * 8) & 0xFF; |
605 } | 597 } |
606 if (mask) { | 598 if (mask) { |
607 header[1] |= 1 << 7; | 599 header[1] |= 1 << 7; |
608 var maskBytes = [_random.nextInt(256), _random.nextInt(256), | 600 var maskBytes = [ |
609 _random.nextInt(256), _random.nextInt(256)]; | 601 _random.nextInt(256), |
602 _random.nextInt(256), | |
603 _random.nextInt(256), | |
604 _random.nextInt(256) | |
605 ]; | |
610 header.setRange(index, index + 4, maskBytes); | 606 header.setRange(index, index + 4, maskBytes); |
611 index += 4; | 607 index += 4; |
612 if (data != null) { | 608 if (data != null) { |
613 Uint8List list; | 609 Uint8List list; |
614 // If this is a text message just do the masking inside the | 610 // If this is a text message just do the masking inside the |
615 // encoded data. | 611 // encoded data. |
616 if (opcode == _WebSocketOpcode.TEXT && data is Uint8List) { | 612 if (opcode == _WebSocketOpcode.TEXT && data is Uint8List) { |
617 list = data; | 613 list = data; |
618 } else { | 614 } else { |
619 if (data is Uint8List) { | 615 if (data is Uint8List) { |
620 list = new Uint8List.fromList(data); | 616 list = new Uint8List.fromList(data); |
621 } else { | 617 } else { |
622 list = new Uint8List(data.length); | 618 list = new Uint8List(data.length); |
623 for (int i = 0; i < data.length; i++) { | 619 for (int i = 0; i < data.length; i++) { |
624 if (data[i] < 0 || 255 < data[i]) { | 620 if (data[i] < 0 || 255 < data[i]) { |
625 throw new ArgumentError( | 621 throw new ArgumentError("List element is not a byte value " |
626 "List element is not a byte value " | |
627 "(value ${data[i]} at index $i)"); | 622 "(value ${data[i]} at index $i)"); |
628 } | 623 } |
629 list[i] = data[i]; | 624 list[i] = data[i]; |
630 } | 625 } |
631 } | 626 } |
632 } | 627 } |
633 const int BLOCK_SIZE = 16; | 628 const int BLOCK_SIZE = 16; |
634 int blockCount = list.length ~/ BLOCK_SIZE; | 629 int blockCount = list.length ~/ BLOCK_SIZE; |
635 if (blockCount > 0) { | 630 if (blockCount > 0) { |
636 // Create mask block. | 631 // Create mask block. |
637 int mask = 0; | 632 int mask = 0; |
638 for (int i = 3; i >= 0; i--) { | 633 for (int i = 3; i >= 0; i--) { |
639 mask = (mask << 8) | maskBytes[i]; | 634 mask = (mask << 8) | maskBytes[i]; |
640 } | 635 } |
641 Int32x4 blockMask = new Int32x4(mask, mask, mask, mask); | 636 Int32x4 blockMask = new Int32x4(mask, mask, mask, mask); |
642 Int32x4List blockBuffer = new Int32x4List.view( | 637 Int32x4List blockBuffer = |
643 list.buffer, 0, blockCount); | 638 new Int32x4List.view(list.buffer, 0, blockCount); |
644 for (int i = 0; i < blockBuffer.length; i++) { | 639 for (int i = 0; i < blockBuffer.length; i++) { |
645 blockBuffer[i] ^= blockMask; | 640 blockBuffer[i] ^= blockMask; |
646 } | 641 } |
647 } | 642 } |
648 // Handle end. | 643 // Handle end. |
649 for (int i = blockCount * BLOCK_SIZE; i < list.length; i++) { | 644 for (int i = blockCount * BLOCK_SIZE; i < list.length; i++) { |
650 list[i] ^= maskBytes[i & 3]; | 645 list[i] ^= maskBytes[i & 3]; |
651 } | 646 } |
652 data = list; | 647 data = list; |
653 } | 648 } |
654 } | 649 } |
655 assert(index == headerSize); | 650 assert(index == headerSize); |
656 if (data == null) { | 651 if (data == null) { |
657 return [header]; | 652 return [header]; |
658 } else { | 653 } else { |
659 return [header, data]; | 654 return [header, data]; |
660 } | 655 } |
661 } | 656 } |
662 } | 657 } |
663 | 658 |
664 | |
665 class _WebSocketConsumer implements StreamConsumer { | 659 class _WebSocketConsumer implements StreamConsumer { |
666 final _WebSocketImpl webSocket; | 660 final _WebSocketImpl webSocket; |
667 final StreamSink<List<int>> sink; | 661 final StreamSink<List<int>> sink; |
668 StreamController _controller; | 662 StreamController _controller; |
669 StreamSubscription _subscription; | 663 StreamSubscription _subscription; |
670 bool _issuedPause = false; | 664 bool _issuedPause = false; |
671 bool _closed = false; | 665 bool _closed = false; |
672 Completer _closeCompleter = new Completer(); | 666 Completer _closeCompleter = new Completer(); |
673 Completer _completer; | 667 Completer _completer; |
674 | 668 |
(...skipping 24 matching lines...) Expand all Loading... | |
699 void _cancel() { | 693 void _cancel() { |
700 if (_subscription != null) { | 694 if (_subscription != null) { |
701 var subscription = _subscription; | 695 var subscription = _subscription; |
702 _subscription = null; | 696 _subscription = null; |
703 subscription.cancel(); | 697 subscription.cancel(); |
704 } | 698 } |
705 } | 699 } |
706 | 700 |
707 _ensureController() { | 701 _ensureController() { |
708 if (_controller != null) return; | 702 if (_controller != null) return; |
709 _controller = new StreamController(sync: true, | 703 _controller = new StreamController( |
710 onPause: _onPause, | 704 sync: true, |
711 onResume: _onResume, | 705 onPause: _onPause, |
712 onCancel: _onListen); | 706 onResume: _onResume, |
713 var stream = _controller.stream.transform( | 707 onCancel: _onListen); |
714 new _WebSocketOutgoingTransformer(webSocket)); | 708 var stream = _controller.stream |
715 sink.addStream(stream) | 709 .transform(new _WebSocketOutgoingTransformer(webSocket)); |
716 .then((_) { | 710 sink.addStream(stream).then((_) { |
717 _done(); | 711 _done(); |
718 _closeCompleter.complete(webSocket); | 712 _closeCompleter.complete(webSocket); |
719 }, onError: (error, StackTrace stackTrace) { | 713 }, onError: (error, StackTrace stackTrace) { |
720 _closed = true; | 714 _closed = true; |
721 _cancel(); | 715 _cancel(); |
722 if (error is ArgumentError) { | 716 if (error is ArgumentError) { |
723 if (!_done(error, stackTrace)) { | 717 if (!_done(error, stackTrace)) { |
724 _closeCompleter.completeError(error, stackTrace); | 718 _closeCompleter.completeError(error, stackTrace); |
725 } | 719 } |
726 } else { | 720 } else { |
727 _done(); | 721 _done(); |
728 _closeCompleter.complete(webSocket); | 722 _closeCompleter.complete(webSocket); |
729 } | 723 } |
730 }); | 724 }); |
731 } | 725 } |
732 | 726 |
733 bool _done([error, StackTrace stackTrace]) { | 727 bool _done([error, StackTrace stackTrace]) { |
734 if (_completer == null) return false; | 728 if (_completer == null) return false; |
735 if (error != null) { | 729 if (error != null) { |
736 _completer.completeError(error, stackTrace); | 730 _completer.completeError(error, stackTrace); |
737 } else { | 731 } else { |
738 _completer.complete(webSocket); | 732 _completer.complete(webSocket); |
739 } | 733 } |
740 _completer = null; | 734 _completer = null; |
741 return true; | 735 return true; |
742 } | 736 } |
743 | 737 |
744 Future addStream(var stream) { | 738 Future addStream(var stream) { |
745 if (_closed) { | 739 if (_closed) { |
746 stream.listen(null).cancel(); | 740 stream.listen(null).cancel(); |
747 return new Future.value(webSocket); | 741 return new Future.value(webSocket); |
748 } | 742 } |
749 _ensureController(); | 743 _ensureController(); |
750 _completer = new Completer(); | 744 _completer = new Completer(); |
751 _subscription = stream.listen( | 745 _subscription = stream.listen((data) { |
752 (data) { | 746 _controller.add(data); |
753 _controller.add(data); | 747 }, onDone: _done, onError: _done, cancelOnError: true); |
754 }, | |
755 onDone: _done, | |
756 onError: _done, | |
757 cancelOnError: true); | |
758 if (_issuedPause) { | 748 if (_issuedPause) { |
759 _subscription.pause(); | 749 _subscription.pause(); |
760 _issuedPause = false; | 750 _issuedPause = false; |
761 } | 751 } |
762 return _completer.future; | 752 return _completer.future; |
763 } | 753 } |
764 | 754 |
765 Future close() { | 755 Future close() { |
766 _ensureController(); | 756 _ensureController(); |
767 Future closeSocket() { | 757 Future closeSocket() { |
768 return sink.close().catchError((_) {}).then((_) => webSocket); | 758 return sink.close().catchError((_) {}).then((_) => webSocket); |
769 } | 759 } |
770 _controller.close(); | 760 _controller.close(); |
771 return _closeCompleter.future.then((_) => closeSocket()); | 761 return _closeCompleter.future.then((_) => closeSocket()); |
772 } | 762 } |
773 | 763 |
774 void add(data) { | 764 void add(data) { |
775 if (_closed) return; | 765 if (_closed) return; |
776 _ensureController(); | 766 _ensureController(); |
777 _controller.add(data); | 767 _controller.add(data); |
778 } | 768 } |
779 | 769 |
780 void closeSocket() { | 770 void closeSocket() { |
781 _closed = true; | 771 _closed = true; |
782 _cancel(); | 772 _cancel(); |
783 close(); | 773 close(); |
784 } | 774 } |
785 } | 775 } |
786 | 776 |
787 | |
788 class _WebSocketImpl extends Stream implements CompatibleWebSocket { | 777 class _WebSocketImpl extends Stream implements CompatibleWebSocket { |
789 StreamController _controller; | 778 StreamController _controller; |
790 StreamSubscription _subscription; | 779 StreamSubscription _subscription; |
791 StreamController _sink; | 780 StreamController _sink; |
792 | 781 |
793 final bool _serverSide; | 782 final bool _serverSide; |
794 int _readyState = _WebSocketState.CONNECTING; | 783 int _readyState = _WebSocketState.CONNECTING; |
795 bool _writeClosed = false; | 784 bool _writeClosed = false; |
796 int _closeCode; | 785 int _closeCode; |
797 String _closeReason; | 786 String _closeReason; |
798 Duration _pingInterval; | 787 Duration _pingInterval; |
799 Timer _pingTimer; | 788 Timer _pingTimer; |
800 _WebSocketConsumer _consumer; | 789 _WebSocketConsumer _consumer; |
801 | 790 |
802 int _outCloseCode; | 791 int _outCloseCode; |
803 String _outCloseReason; | 792 String _outCloseReason; |
804 Timer _closeTimer; | 793 Timer _closeTimer; |
805 | 794 |
806 _WebSocketImpl._fromSocket(Stream<List<int>> stream, | 795 _WebSocketImpl._fromSocket( |
807 StreamSink<List<int>> sink, [this._serverSide = false]) { | 796 Stream<List<int>> stream, StreamSink<List<int>> sink, |
797 [this._serverSide = false]) { | |
808 _consumer = new _WebSocketConsumer(this, sink); | 798 _consumer = new _WebSocketConsumer(this, sink); |
809 _sink = new StreamController(); | 799 _sink = new StreamController(); |
810 _sink.stream.pipe(_consumer); | 800 _sink.stream.pipe(_consumer); |
811 _readyState = _WebSocketState.OPEN; | 801 _readyState = _WebSocketState.OPEN; |
812 | 802 |
813 var transformer = new _WebSocketProtocolTransformer(_serverSide); | 803 var transformer = new _WebSocketProtocolTransformer(_serverSide); |
814 _subscription = stream.transform(transformer).listen( | 804 _subscription = stream.transform(transformer).listen((data) { |
815 (data) { | 805 if (data is _WebSocketPing) { |
816 if (data is _WebSocketPing) { | 806 if (!_writeClosed) _consumer.add(new _WebSocketPong(data.payload)); |
817 if (!_writeClosed) _consumer.add(new _WebSocketPong(data.payload)); | 807 } else if (data is _WebSocketPong) { |
818 } else if (data is _WebSocketPong) { | 808 // Simply set pingInterval, as it'll cancel any timers. |
819 // Simply set pingInterval, as it'll cancel any timers. | 809 pingInterval = _pingInterval; |
820 pingInterval = _pingInterval; | 810 } else { |
821 } else { | 811 _controller.add(data); |
822 _controller.add(data); | 812 } |
823 } | 813 }, onError: (error) { |
824 }, | 814 if (_closeTimer != null) _closeTimer.cancel(); |
825 onError: (error) { | 815 if (error is FormatException) { |
826 if (_closeTimer != null) _closeTimer.cancel(); | 816 _close(_WebSocketStatus.INVALID_FRAME_PAYLOAD_DATA); |
827 if (error is FormatException) { | 817 } else { |
828 _close(_WebSocketStatus.INVALID_FRAME_PAYLOAD_DATA); | 818 _close(_WebSocketStatus.PROTOCOL_ERROR); |
829 } else { | 819 } |
830 _close(_WebSocketStatus.PROTOCOL_ERROR); | 820 _controller.close(); |
831 } | 821 }, onDone: () { |
832 _controller.close(); | 822 if (_closeTimer != null) _closeTimer.cancel(); |
833 }, | 823 if (_readyState == _WebSocketState.OPEN) { |
834 onDone: () { | 824 _readyState = _WebSocketState.CLOSING; |
835 if (_closeTimer != null) _closeTimer.cancel(); | 825 if (!_isReservedStatusCode(transformer.closeCode)) { |
836 if (_readyState == _WebSocketState.OPEN) { | 826 _close(transformer.closeCode); |
837 _readyState = _WebSocketState.CLOSING; | 827 } else { |
838 if (!_isReservedStatusCode(transformer.closeCode)) { | 828 _close(); |
839 _close(transformer.closeCode); | 829 } |
840 } else { | 830 _readyState = _WebSocketState.CLOSED; |
841 _close(); | 831 } |
842 } | 832 _closeCode = transformer.closeCode; |
843 _readyState = _WebSocketState.CLOSED; | 833 _closeReason = transformer.closeReason; |
844 } | 834 _controller.close(); |
845 _closeCode = transformer.closeCode; | 835 }, cancelOnError: true); |
846 _closeReason = transformer.closeReason; | |
847 _controller.close(); | |
848 }, | |
849 cancelOnError: true); | |
850 _subscription.pause(); | 836 _subscription.pause(); |
851 _controller = new StreamController(sync: true, | 837 _controller = new StreamController( |
852 onListen: _subscription.resume, | 838 sync: true, |
853 onPause: _subscription.pause, | 839 onListen: _subscription.resume, |
854 onResume: _subscription.resume); | 840 onPause: _subscription.pause, |
841 onResume: _subscription.resume); | |
855 } | 842 } |
856 | 843 |
857 StreamSubscription listen(void onData(message), | 844 StreamSubscription listen(void onData(message), |
858 {Function onError, | 845 {Function onError, void onDone(), bool cancelOnError}) { |
859 void onDone(), | |
860 bool cancelOnError}) { | |
861 return _controller.stream.listen(onData, | 846 return _controller.stream.listen(onData, |
862 onError: onError, | 847 onError: onError, onDone: onDone, cancelOnError: cancelOnError); |
863 onDone: onDone, | |
864 cancelOnError: cancelOnError); | |
865 } | 848 } |
866 | 849 |
867 Duration get pingInterval => _pingInterval; | 850 Duration get pingInterval => _pingInterval; |
868 | 851 |
869 void set pingInterval(Duration interval) { | 852 void set pingInterval(Duration interval) { |
870 if (_writeClosed) return; | 853 if (_writeClosed) return; |
871 if (_pingTimer != null) _pingTimer.cancel(); | 854 if (_pingTimer != null) _pingTimer.cancel(); |
872 _pingInterval = interval; | 855 _pingInterval = interval; |
873 | 856 |
874 if (_pingInterval == null) return; | 857 if (_pingInterval == null) return; |
(...skipping 40 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
915 if (_outCloseCode == null) { | 898 if (_outCloseCode == null) { |
916 _outCloseCode = code; | 899 _outCloseCode = code; |
917 _outCloseReason = reason; | 900 _outCloseReason = reason; |
918 } | 901 } |
919 _writeClosed = true; | 902 _writeClosed = true; |
920 _consumer.closeSocket(); | 903 _consumer.closeSocket(); |
921 } | 904 } |
922 | 905 |
923 static bool _isReservedStatusCode(int code) { | 906 static bool _isReservedStatusCode(int code) { |
924 return code != null && | 907 return code != null && |
925 (code < _WebSocketStatus.NORMAL_CLOSURE || | 908 (code < _WebSocketStatus.NORMAL_CLOSURE || |
926 code == _WebSocketStatus.RESERVED_1004 || | 909 code == _WebSocketStatus.RESERVED_1004 || |
927 code == _WebSocketStatus.NO_STATUS_RECEIVED || | 910 code == _WebSocketStatus.NO_STATUS_RECEIVED || |
928 code == _WebSocketStatus.ABNORMAL_CLOSURE || | 911 code == _WebSocketStatus.ABNORMAL_CLOSURE || |
929 (code > _WebSocketStatus.INTERNAL_SERVER_ERROR && | 912 (code > _WebSocketStatus.INTERNAL_SERVER_ERROR && |
930 code < _WebSocketStatus.RESERVED_1015) || | 913 code < _WebSocketStatus.RESERVED_1015) || |
931 (code >= _WebSocketStatus.RESERVED_1015 && | 914 (code >= _WebSocketStatus.RESERVED_1015 && code < 3000)); |
932 code < 3000)); | |
933 } | 915 } |
934 } | 916 } |
935 | |
OLD | NEW |