Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(37)

Side by Side Diff: lib/src/web_socket.dart

Issue 1046573002: pkg/http_parser: format code, prepare for +6 release (Closed) Base URL: https://github.com/dart-lang/http_parser.git@master
Patch Set: Created 5 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « lib/src/media_type.dart ('k') | pubspec.yaml » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2014, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2014, the Dart project authors. Please see the AUTHORS file
nweiz 2015/03/30 22:23:47 I would prefer we not format this file differently
kevmoo 2015/03/30 22:42:36 Done.
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 library http_parser.web_socket; 5 library http_parser.web_socket;
6 6
7 import 'dart:async'; 7 import 'dart:async';
8 import 'dart:convert'; 8 import 'dart:convert';
9 import 'dart:math'; 9 import 'dart:math';
10 import 'dart:typed_data'; 10 import 'dart:typed_data';
11 11
(...skipping 61 matching lines...) Expand 10 before | Expand all | Expand 10 after
73 /// 73 ///
74 /// If [stream] is also a [StreamSink] (for example, if it's a "dart:io" 74 /// If [stream] is also a [StreamSink] (for example, if it's a "dart:io"
75 /// `Socket`), it will be used for both sending and receiving data. Otherwise, 75 /// `Socket`), it will be used for both sending and receiving data. Otherwise,
76 /// it will be used for receiving data and [sink] will be used for sending it. 76 /// it will be used for receiving data and [sink] will be used for sending it.
77 /// 77 ///
78 /// If this is a WebSocket server, [serverSide] should be `true` (the 78 /// If this is a WebSocket server, [serverSide] should be `true` (the
79 /// default); if it's a client, [serverSide] should be `false`. 79 /// default); if it's a client, [serverSide] should be `false`.
80 /// 80 ///
81 /// [WebSocket handshake]: https://tools.ietf.org/html/rfc6455#section-4 81 /// [WebSocket handshake]: https://tools.ietf.org/html/rfc6455#section-4
82 factory CompatibleWebSocket(Stream<List<int>> stream, 82 factory CompatibleWebSocket(Stream<List<int>> stream,
83 {StreamSink<List<int>> sink, bool serverSide: true}) { 83 {StreamSink<List<int>> sink, bool serverSide: true}) {
84 if (sink == null) { 84 if (sink == null) {
85 if (stream is! StreamSink) { 85 if (stream is! StreamSink) {
86 throw new ArgumentError("If stream isn't also a StreamSink, sink must " 86 throw new ArgumentError("If stream isn't also a StreamSink, sink must "
87 "be passed explicitly."); 87 "be passed explicitly.");
88 } 88 }
89 sink = stream as StreamSink; 89 sink = stream as StreamSink;
90 } 90 }
91 91
92 return new _WebSocketImpl._fromSocket(stream, sink, serverSide); 92 return new _WebSocketImpl._fromSocket(stream, sink, serverSide);
93 } 93 }
94 94
95 /// Closes the web socket connection. 95 /// Closes the web socket connection.
96 /// 96 ///
97 /// [closeCode] and [closeReason] are the [close code][] and [reason][] sent 97 /// [closeCode] and [closeReason] are the [close code][] and [reason][] sent
98 /// to the remote peer, respectively. If they are omitted, the peer will see 98 /// to the remote peer, respectively. If they are omitted, the peer will see
99 /// a "no status received" code with no reason. 99 /// a "no status received" code with no reason.
100 /// 100 ///
101 /// [close code]: https://tools.ietf.org/html/rfc6455#section-7.1.5 101 /// [close code]: https://tools.ietf.org/html/rfc6455#section-7.1.5
102 /// [reason]: https://tools.ietf.org/html/rfc6455#section-7.1.6 102 /// [reason]: https://tools.ietf.org/html/rfc6455#section-7.1.6
103 Future close([int closeCode, String closeReason]); 103 Future close([int closeCode, String closeReason]);
104 } 104 }
105 105
106 /// An exception thrown by [CompatibleWebSocket]. 106 /// An exception thrown by [CompatibleWebSocket].
107 class CompatibleWebSocketException implements Exception { 107 class CompatibleWebSocketException implements Exception {
108 final String message; 108 final String message;
109 109
110 CompatibleWebSocketException([this.message]); 110 CompatibleWebSocketException([this.message]);
111 111
112 String toString() => message == null 112 String toString() => message == null
113 ? "CompatibleWebSocketException" : 113 ? "CompatibleWebSocketException"
114 "CompatibleWebSocketException: $message"; 114 : "CompatibleWebSocketException: $message";
115 } 115 }
116 116
117 // The following code is copied from sdk/lib/io/websocket_impl.dart. The 117 // The following code is copied from sdk/lib/io/websocket_impl.dart. The
118 // "dart:io" implementation isn't used directly both to support non-"dart:io" 118 // "dart:io" implementation isn't used directly both to support non-"dart:io"
119 // applications, and because it's incompatible with non-"dart:io" HTTP requests 119 // applications, and because it's incompatible with non-"dart:io" HTTP requests
120 // (issue 18172). 120 // (issue 18172).
121 // 121 //
122 // Because it's copied directly, only modifications necessary to support the 122 // Because it's copied directly, only modifications necessary to support the
123 // desired public API and to remove "dart:io" dependencies have been made. 123 // desired public API and to remove "dart:io" dependencies have been made.
124 124
125 /** 125 /**
126 * Web socket status codes used when closing a web socket connection. 126 * Web socket status codes used when closing a web socket connection.
127 */ 127 */
128 abstract class _WebSocketStatus { 128 abstract class _WebSocketStatus {
129 static const int NORMAL_CLOSURE = 1000; 129 static const int NORMAL_CLOSURE = 1000;
130 static const int GOING_AWAY = 1001; 130 static const int GOING_AWAY = 1001;
131 static const int PROTOCOL_ERROR = 1002; 131 static const int PROTOCOL_ERROR = 1002;
132 static const int UNSUPPORTED_DATA = 1003; 132 static const int UNSUPPORTED_DATA = 1003;
133 static const int RESERVED_1004 = 1004; 133 static const int RESERVED_1004 = 1004;
134 static const int NO_STATUS_RECEIVED = 1005; 134 static const int NO_STATUS_RECEIVED = 1005;
135 static const int ABNORMAL_CLOSURE = 1006; 135 static const int ABNORMAL_CLOSURE = 1006;
136 static const int INVALID_FRAME_PAYLOAD_DATA = 1007; 136 static const int INVALID_FRAME_PAYLOAD_DATA = 1007;
137 static const int POLICY_VIOLATION = 1008; 137 static const int POLICY_VIOLATION = 1008;
138 static const int MESSAGE_TOO_BIG = 1009; 138 static const int MESSAGE_TOO_BIG = 1009;
139 static const int MISSING_MANDATORY_EXTENSION = 1010; 139 static const int MISSING_MANDATORY_EXTENSION = 1010;
140 static const int INTERNAL_SERVER_ERROR = 1011; 140 static const int INTERNAL_SERVER_ERROR = 1011;
141 static const int RESERVED_1015 = 1015; 141 static const int RESERVED_1015 = 1015;
142 } 142 }
143 143
144 abstract class _WebSocketState { 144 abstract class _WebSocketState {
145 static const int CONNECTING = 0; 145 static const int CONNECTING = 0;
146 static const int OPEN = 1; 146 static const int OPEN = 1;
147 static const int CLOSING = 2; 147 static const int CLOSING = 2;
148 static const int CLOSED = 3; 148 static const int CLOSED = 3;
149 } 149 }
150 150
151 const String _webSocketGUID = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"; 151 const String _webSocketGUID = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
152 152
153 final _random = new Random(); 153 final _random = new Random();
154 154
155 // Matches _WebSocketOpcode. 155 // Matches _WebSocketOpcode.
156 class _WebSocketMessageType { 156 class _WebSocketMessageType {
157 static const int NONE = 0; 157 static const int NONE = 0;
158 static const int TEXT = 1; 158 static const int TEXT = 1;
159 static const int BINARY = 2; 159 static const int BINARY = 2;
160 } 160 }
161 161
162
163 class _WebSocketOpcode { 162 class _WebSocketOpcode {
164 static const int CONTINUATION = 0; 163 static const int CONTINUATION = 0;
165 static const int TEXT = 1; 164 static const int TEXT = 1;
166 static const int BINARY = 2; 165 static const int BINARY = 2;
167 static const int RESERVED_3 = 3; 166 static const int RESERVED_3 = 3;
168 static const int RESERVED_4 = 4; 167 static const int RESERVED_4 = 4;
169 static const int RESERVED_5 = 5; 168 static const int RESERVED_5 = 5;
170 static const int RESERVED_6 = 6; 169 static const int RESERVED_6 = 6;
171 static const int RESERVED_7 = 7; 170 static const int RESERVED_7 = 7;
172 static const int CLOSE = 8; 171 static const int CLOSE = 8;
(...skipping 40 matching lines...) Expand 10 before | Expand all | Expand 10 after
213 212
214 EventSink _eventSink; 213 EventSink _eventSink;
215 214
216 final bool _serverSide; 215 final bool _serverSide;
217 final List _maskingBytes = new List(4); 216 final List _maskingBytes = new List(4);
218 final BytesBuilder _payload = new BytesBuilder(copy: false); 217 final BytesBuilder _payload = new BytesBuilder(copy: false);
219 218
220 _WebSocketProtocolTransformer([this._serverSide = false]); 219 _WebSocketProtocolTransformer([this._serverSide = false]);
221 220
222 Stream bind(Stream stream) { 221 Stream bind(Stream stream) {
223 return new Stream.eventTransformed( 222 return new Stream.eventTransformed(stream, (EventSink eventSink) {
224 stream, 223 if (_eventSink != null) {
225 (EventSink eventSink) { 224 throw new StateError("WebSocket transformer already used.");
226 if (_eventSink != null) { 225 }
227 throw new StateError("WebSocket transformer already used."); 226 _eventSink = eventSink;
228 } 227 return this;
229 _eventSink = eventSink; 228 });
230 return this;
231 });
232 } 229 }
233 230
234 void addError(Object error, [StackTrace stackTrace]) => 231 void addError(Object error, [StackTrace stackTrace]) =>
235 _eventSink.addError(error, stackTrace); 232 _eventSink.addError(error, stackTrace);
236 233
237 void close() => _eventSink.close(); 234 void close() => _eventSink.close();
238 235
239 /** 236 /**
240 * Process data received from the underlying communication channel. 237 * Process data received from the underlying communication channel.
241 */ 238 */
(...skipping 17 matching lines...) Expand all
259 throw new CompatibleWebSocketException("Protocol error"); 256 throw new CompatibleWebSocketException("Protocol error");
260 } 257 }
261 _opcode = (byte & 0xF); 258 _opcode = (byte & 0xF);
262 if (_opcode <= _WebSocketOpcode.BINARY) { 259 if (_opcode <= _WebSocketOpcode.BINARY) {
263 if (_opcode == _WebSocketOpcode.CONTINUATION) { 260 if (_opcode == _WebSocketOpcode.CONTINUATION) {
264 if (_currentMessageType == _WebSocketMessageType.NONE) { 261 if (_currentMessageType == _WebSocketMessageType.NONE) {
265 throw new CompatibleWebSocketException("Protocol error"); 262 throw new CompatibleWebSocketException("Protocol error");
266 } 263 }
267 } else { 264 } else {
268 assert(_opcode == _WebSocketOpcode.TEXT || 265 assert(_opcode == _WebSocketOpcode.TEXT ||
269 _opcode == _WebSocketOpcode.BINARY); 266 _opcode == _WebSocketOpcode.BINARY);
270 if (_currentMessageType != _WebSocketMessageType.NONE) { 267 if (_currentMessageType != _WebSocketMessageType.NONE) {
271 throw new CompatibleWebSocketException("Protocol error"); 268 throw new CompatibleWebSocketException("Protocol error");
272 } 269 }
273 _currentMessageType = _opcode; 270 _currentMessageType = _opcode;
274 } 271 }
275 } else if (_opcode >= _WebSocketOpcode.CLOSE && 272 } else if (_opcode >= _WebSocketOpcode.CLOSE &&
276 _opcode <= _WebSocketOpcode.PONG) { 273 _opcode <= _WebSocketOpcode.PONG) {
277 // Control frames cannot be fragmented. 274 // Control frames cannot be fragmented.
278 if (!_fin) throw new CompatibleWebSocketException("Protocol error"); 275 if (!_fin) throw new CompatibleWebSocketException("Protocol error");
279 } else { 276 } else {
280 throw new CompatibleWebSocketException("Protocol error"); 277 throw new CompatibleWebSocketException("Protocol error");
281 } 278 }
282 _state = LEN_FIRST; 279 _state = LEN_FIRST;
283 } else if (_state == LEN_FIRST) { 280 } else if (_state == LEN_FIRST) {
284 _masked = (byte & 0x80) != 0; 281 _masked = (byte & 0x80) != 0;
285 _len = byte & 0x7F; 282 _len = byte & 0x7F;
286 if (_isControlFrame() && _len > 125) { 283 if (_isControlFrame() && _len > 125) {
(...skipping 28 matching lines...) Expand all
315 } else { 312 } else {
316 assert(_state == PAYLOAD); 313 assert(_state == PAYLOAD);
317 // The payload is not handled one byte at a time but in blocks. 314 // The payload is not handled one byte at a time but in blocks.
318 int payloadLength = min(lastIndex - index, _remainingPayloadBytes); 315 int payloadLength = min(lastIndex - index, _remainingPayloadBytes);
319 _remainingPayloadBytes -= payloadLength; 316 _remainingPayloadBytes -= payloadLength;
320 // Unmask payload if masked. 317 // Unmask payload if masked.
321 if (_masked) { 318 if (_masked) {
322 _unmask(index, payloadLength, buffer); 319 _unmask(index, payloadLength, buffer);
323 } 320 }
324 // Control frame and data frame share _payloads. 321 // Control frame and data frame share _payloads.
325 _payload.add( 322 _payload.add(new Uint8List.view(buffer.buffer, index, payloadLength));
326 new Uint8List.view(buffer.buffer, index, payloadLength));
327 index += payloadLength; 323 index += payloadLength;
328 if (_isControlFrame()) { 324 if (_isControlFrame()) {
329 if (_remainingPayloadBytes == 0) _controlFrameEnd(); 325 if (_remainingPayloadBytes == 0) _controlFrameEnd();
330 } else { 326 } else {
331 if (_currentMessageType != _WebSocketMessageType.TEXT && 327 if (_currentMessageType != _WebSocketMessageType.TEXT &&
332 _currentMessageType != _WebSocketMessageType.BINARY) { 328 _currentMessageType != _WebSocketMessageType.BINARY) {
333 throw new CompatibleWebSocketException("Protocol error"); 329 throw new CompatibleWebSocketException("Protocol error");
334 } 330 }
335 if (_remainingPayloadBytes == 0) _messageFrameEnd(); 331 if (_remainingPayloadBytes == 0) _messageFrameEnd();
336 } 332 }
337 333
338 // Hack - as we always do index++ below. 334 // Hack - as we always do index++ below.
339 index--; 335 index--;
340 } 336 }
341 } 337 }
342 338
343 // Move to the next byte. 339 // Move to the next byte.
(...skipping 14 matching lines...) Expand all
358 index += startOffset; 354 index += startOffset;
359 length -= startOffset; 355 length -= startOffset;
360 final int blockCount = length ~/ BLOCK_SIZE; 356 final int blockCount = length ~/ BLOCK_SIZE;
361 if (blockCount > 0) { 357 if (blockCount > 0) {
362 // Create mask block. 358 // Create mask block.
363 int mask = 0; 359 int mask = 0;
364 for (int i = 3; i >= 0; i--) { 360 for (int i = 3; i >= 0; i--) {
365 mask = (mask << 8) | _maskingBytes[(_unmaskingIndex + i) & 3]; 361 mask = (mask << 8) | _maskingBytes[(_unmaskingIndex + i) & 3];
366 } 362 }
367 Int32x4 blockMask = new Int32x4(mask, mask, mask, mask); 363 Int32x4 blockMask = new Int32x4(mask, mask, mask, mask);
368 Int32x4List blockBuffer = new Int32x4List.view( 364 Int32x4List blockBuffer =
369 buffer.buffer, index, blockCount); 365 new Int32x4List.view(buffer.buffer, index, blockCount);
370 for (int i = 0; i < blockBuffer.length; i++) { 366 for (int i = 0; i < blockBuffer.length; i++) {
371 blockBuffer[i] ^= blockMask; 367 blockBuffer[i] ^= blockMask;
372 } 368 }
373 final int bytes = blockCount * BLOCK_SIZE; 369 final int bytes = blockCount * BLOCK_SIZE;
374 index += bytes; 370 index += bytes;
375 length -= bytes; 371 length -= bytes;
376 } 372 }
377 } 373 }
378 // Handle end. 374 // Handle end.
379 final int end = index + length; 375 final int end = index + length;
(...skipping 92 matching lines...) Expand 10 before | Expand all | Expand 10 after
472 468
473 case _WebSocketOpcode.PONG: 469 case _WebSocketOpcode.PONG:
474 _eventSink.add(new _WebSocketPong(_payload.takeBytes())); 470 _eventSink.add(new _WebSocketPong(_payload.takeBytes()));
475 break; 471 break;
476 } 472 }
477 _prepareForNextFrame(); 473 _prepareForNextFrame();
478 } 474 }
479 475
480 bool _isControlFrame() { 476 bool _isControlFrame() {
481 return _opcode == _WebSocketOpcode.CLOSE || 477 return _opcode == _WebSocketOpcode.CLOSE ||
482 _opcode == _WebSocketOpcode.PING || 478 _opcode == _WebSocketOpcode.PING ||
483 _opcode == _WebSocketOpcode.PONG; 479 _opcode == _WebSocketOpcode.PONG;
484 } 480 }
485 481
486 void _prepareForNextFrame() { 482 void _prepareForNextFrame() {
487 if (_state != CLOSED && _state != FAILURE) _state = START; 483 if (_state != CLOSED && _state != FAILURE) _state = START;
488 _fin = false; 484 _fin = false;
489 _opcode = -1; 485 _opcode = -1;
490 _len = -1; 486 _len = -1;
491 _remainingLenBytes = -1; 487 _remainingLenBytes = -1;
492 _remainingMaskingKeyBytes = 4; 488 _remainingMaskingKeyBytes = 4;
493 _remainingPayloadBytes = -1; 489 _remainingPayloadBytes = -1;
494 _unmaskingIndex = 0; 490 _unmaskingIndex = 0;
495 } 491 }
496 } 492 }
497 493
498
499 class _WebSocketPing { 494 class _WebSocketPing {
500 final List<int> payload; 495 final List<int> payload;
501 _WebSocketPing([this.payload = null]); 496 _WebSocketPing([this.payload = null]);
502 } 497 }
503 498
504
505 class _WebSocketPong { 499 class _WebSocketPong {
506 final List<int> payload; 500 final List<int> payload;
507 _WebSocketPong([this.payload = null]); 501 _WebSocketPong([this.payload = null]);
508 } 502 }
509 503
510 // TODO(ajohnsen): Make this transformer reusable. 504 // TODO(ajohnsen): Make this transformer reusable.
511 class _WebSocketOutgoingTransformer implements StreamTransformer, EventSink { 505 class _WebSocketOutgoingTransformer implements StreamTransformer, EventSink {
512 final _WebSocketImpl webSocket; 506 final _WebSocketImpl webSocket;
513 EventSink _eventSink; 507 EventSink _eventSink;
514 508
515 _WebSocketOutgoingTransformer(this.webSocket); 509 _WebSocketOutgoingTransformer(this.webSocket);
516 510
517 Stream bind(Stream stream) { 511 Stream bind(Stream stream) {
518 return new Stream.eventTransformed( 512 return new Stream.eventTransformed(stream, (EventSink eventSink) {
519 stream, 513 if (_eventSink != null) {
520 (EventSink eventSink) { 514 throw new StateError("WebSocket transformer already used");
521 if (_eventSink != null) { 515 }
522 throw new StateError("WebSocket transformer already used"); 516 _eventSink = eventSink;
523 } 517 return this;
524 _eventSink = eventSink; 518 });
525 return this;
526 });
527 } 519 }
528 520
529 void add(message) { 521 void add(message) {
530 if (message is _WebSocketPong) { 522 if (message is _WebSocketPong) {
531 addFrame(_WebSocketOpcode.PONG, message.payload); 523 addFrame(_WebSocketOpcode.PONG, message.payload);
532 return; 524 return;
533 } 525 }
534 if (message is _WebSocketPing) { 526 if (message is _WebSocketPing) {
535 addFrame(_WebSocketOpcode.PING, message.payload); 527 addFrame(_WebSocketOpcode.PING, message.payload);
536 return; 528 return;
537 } 529 }
538 List<int> data; 530 List<int> data;
539 int opcode; 531 int opcode;
540 if (message != null) { 532 if (message != null) {
541 if (message is String) { 533 if (message is String) {
542 opcode = _WebSocketOpcode.TEXT; 534 opcode = _WebSocketOpcode.TEXT;
543 data = UTF8.encode(message); 535 data = UTF8.encode(message);
544 } else { 536 } else {
545 if (message is !List<int>) { 537 if (message is! List<int>) {
546 throw new ArgumentError(message); 538 throw new ArgumentError(message);
547 } 539 }
548 opcode = _WebSocketOpcode.BINARY; 540 opcode = _WebSocketOpcode.BINARY;
549 data = message; 541 data = message;
550 } 542 }
551 } else { 543 } else {
552 opcode = _WebSocketOpcode.TEXT; 544 opcode = _WebSocketOpcode.TEXT;
553 } 545 }
554 addFrame(opcode, data); 546 addFrame(opcode, data);
555 } 547 }
(...skipping 14 matching lines...) Expand all
570 } 562 }
571 } 563 }
572 addFrame(_WebSocketOpcode.CLOSE, data); 564 addFrame(_WebSocketOpcode.CLOSE, data);
573 _eventSink.close(); 565 _eventSink.close();
574 } 566 }
575 567
576 void addFrame(int opcode, List<int> data) => 568 void addFrame(int opcode, List<int> data) =>
577 createFrame(opcode, data, webSocket._serverSide).forEach(_eventSink.add); 569 createFrame(opcode, data, webSocket._serverSide).forEach(_eventSink.add);
578 570
579 static Iterable createFrame(int opcode, List<int> data, bool serverSide) { 571 static Iterable createFrame(int opcode, List<int> data, bool serverSide) {
580 bool mask = !serverSide; // Masking not implemented for server. 572 bool mask = !serverSide; // Masking not implemented for server.
581 int dataLength = data == null ? 0 : data.length; 573 int dataLength = data == null ? 0 : data.length;
582 // Determine the header size. 574 // Determine the header size.
583 int headerSize = (mask) ? 6 : 2; 575 int headerSize = (mask) ? 6 : 2;
584 if (dataLength > 65535) { 576 if (dataLength > 65535) {
585 headerSize += 8; 577 headerSize += 8;
586 } else if (dataLength > 125) { 578 } else if (dataLength > 125) {
587 headerSize += 2; 579 headerSize += 2;
588 } 580 }
589 Uint8List header = new Uint8List(headerSize); 581 Uint8List header = new Uint8List(headerSize);
590 int index = 0; 582 int index = 0;
591 // Set FIN and opcode. 583 // Set FIN and opcode.
592 header[index++] = 0x80 | opcode; 584 header[index++] = 0x80 | opcode;
593 // Determine size and position of length field. 585 // Determine size and position of length field.
594 int lengthBytes = 1; 586 int lengthBytes = 1;
595 if (dataLength > 65535) { 587 if (dataLength > 65535) {
596 header[index++] = 127; 588 header[index++] = 127;
597 lengthBytes = 8; 589 lengthBytes = 8;
598 } else if (dataLength > 125) { 590 } else if (dataLength > 125) {
599 header[index++] = 126; 591 header[index++] = 126;
600 lengthBytes = 2; 592 lengthBytes = 2;
601 } 593 }
602 // Write the length in network byte order into the header. 594 // Write the length in network byte order into the header.
603 for (int i = 0; i < lengthBytes; i++) { 595 for (int i = 0; i < lengthBytes; i++) {
604 header[index++] = dataLength >> (((lengthBytes - 1) - i) * 8) & 0xFF; 596 header[index++] = dataLength >> (((lengthBytes - 1) - i) * 8) & 0xFF;
605 } 597 }
606 if (mask) { 598 if (mask) {
607 header[1] |= 1 << 7; 599 header[1] |= 1 << 7;
608 var maskBytes = [_random.nextInt(256), _random.nextInt(256), 600 var maskBytes = [
609 _random.nextInt(256), _random.nextInt(256)]; 601 _random.nextInt(256),
602 _random.nextInt(256),
603 _random.nextInt(256),
604 _random.nextInt(256)
605 ];
610 header.setRange(index, index + 4, maskBytes); 606 header.setRange(index, index + 4, maskBytes);
611 index += 4; 607 index += 4;
612 if (data != null) { 608 if (data != null) {
613 Uint8List list; 609 Uint8List list;
614 // If this is a text message just do the masking inside the 610 // If this is a text message just do the masking inside the
615 // encoded data. 611 // encoded data.
616 if (opcode == _WebSocketOpcode.TEXT && data is Uint8List) { 612 if (opcode == _WebSocketOpcode.TEXT && data is Uint8List) {
617 list = data; 613 list = data;
618 } else { 614 } else {
619 if (data is Uint8List) { 615 if (data is Uint8List) {
620 list = new Uint8List.fromList(data); 616 list = new Uint8List.fromList(data);
621 } else { 617 } else {
622 list = new Uint8List(data.length); 618 list = new Uint8List(data.length);
623 for (int i = 0; i < data.length; i++) { 619 for (int i = 0; i < data.length; i++) {
624 if (data[i] < 0 || 255 < data[i]) { 620 if (data[i] < 0 || 255 < data[i]) {
625 throw new ArgumentError( 621 throw new ArgumentError("List element is not a byte value "
626 "List element is not a byte value "
627 "(value ${data[i]} at index $i)"); 622 "(value ${data[i]} at index $i)");
628 } 623 }
629 list[i] = data[i]; 624 list[i] = data[i];
630 } 625 }
631 } 626 }
632 } 627 }
633 const int BLOCK_SIZE = 16; 628 const int BLOCK_SIZE = 16;
634 int blockCount = list.length ~/ BLOCK_SIZE; 629 int blockCount = list.length ~/ BLOCK_SIZE;
635 if (blockCount > 0) { 630 if (blockCount > 0) {
636 // Create mask block. 631 // Create mask block.
637 int mask = 0; 632 int mask = 0;
638 for (int i = 3; i >= 0; i--) { 633 for (int i = 3; i >= 0; i--) {
639 mask = (mask << 8) | maskBytes[i]; 634 mask = (mask << 8) | maskBytes[i];
640 } 635 }
641 Int32x4 blockMask = new Int32x4(mask, mask, mask, mask); 636 Int32x4 blockMask = new Int32x4(mask, mask, mask, mask);
642 Int32x4List blockBuffer = new Int32x4List.view( 637 Int32x4List blockBuffer =
643 list.buffer, 0, blockCount); 638 new Int32x4List.view(list.buffer, 0, blockCount);
644 for (int i = 0; i < blockBuffer.length; i++) { 639 for (int i = 0; i < blockBuffer.length; i++) {
645 blockBuffer[i] ^= blockMask; 640 blockBuffer[i] ^= blockMask;
646 } 641 }
647 } 642 }
648 // Handle end. 643 // Handle end.
649 for (int i = blockCount * BLOCK_SIZE; i < list.length; i++) { 644 for (int i = blockCount * BLOCK_SIZE; i < list.length; i++) {
650 list[i] ^= maskBytes[i & 3]; 645 list[i] ^= maskBytes[i & 3];
651 } 646 }
652 data = list; 647 data = list;
653 } 648 }
654 } 649 }
655 assert(index == headerSize); 650 assert(index == headerSize);
656 if (data == null) { 651 if (data == null) {
657 return [header]; 652 return [header];
658 } else { 653 } else {
659 return [header, data]; 654 return [header, data];
660 } 655 }
661 } 656 }
662 } 657 }
663 658
664
665 class _WebSocketConsumer implements StreamConsumer { 659 class _WebSocketConsumer implements StreamConsumer {
666 final _WebSocketImpl webSocket; 660 final _WebSocketImpl webSocket;
667 final StreamSink<List<int>> sink; 661 final StreamSink<List<int>> sink;
668 StreamController _controller; 662 StreamController _controller;
669 StreamSubscription _subscription; 663 StreamSubscription _subscription;
670 bool _issuedPause = false; 664 bool _issuedPause = false;
671 bool _closed = false; 665 bool _closed = false;
672 Completer _closeCompleter = new Completer(); 666 Completer _closeCompleter = new Completer();
673 Completer _completer; 667 Completer _completer;
674 668
(...skipping 24 matching lines...) Expand all
699 void _cancel() { 693 void _cancel() {
700 if (_subscription != null) { 694 if (_subscription != null) {
701 var subscription = _subscription; 695 var subscription = _subscription;
702 _subscription = null; 696 _subscription = null;
703 subscription.cancel(); 697 subscription.cancel();
704 } 698 }
705 } 699 }
706 700
707 _ensureController() { 701 _ensureController() {
708 if (_controller != null) return; 702 if (_controller != null) return;
709 _controller = new StreamController(sync: true, 703 _controller = new StreamController(
710 onPause: _onPause, 704 sync: true,
711 onResume: _onResume, 705 onPause: _onPause,
712 onCancel: _onListen); 706 onResume: _onResume,
713 var stream = _controller.stream.transform( 707 onCancel: _onListen);
714 new _WebSocketOutgoingTransformer(webSocket)); 708 var stream = _controller.stream
715 sink.addStream(stream) 709 .transform(new _WebSocketOutgoingTransformer(webSocket));
716 .then((_) { 710 sink.addStream(stream).then((_) {
717 _done(); 711 _done();
718 _closeCompleter.complete(webSocket); 712 _closeCompleter.complete(webSocket);
719 }, onError: (error, StackTrace stackTrace) { 713 }, onError: (error, StackTrace stackTrace) {
720 _closed = true; 714 _closed = true;
721 _cancel(); 715 _cancel();
722 if (error is ArgumentError) { 716 if (error is ArgumentError) {
723 if (!_done(error, stackTrace)) { 717 if (!_done(error, stackTrace)) {
724 _closeCompleter.completeError(error, stackTrace); 718 _closeCompleter.completeError(error, stackTrace);
725 } 719 }
726 } else { 720 } else {
727 _done(); 721 _done();
728 _closeCompleter.complete(webSocket); 722 _closeCompleter.complete(webSocket);
729 } 723 }
730 }); 724 });
731 } 725 }
732 726
733 bool _done([error, StackTrace stackTrace]) { 727 bool _done([error, StackTrace stackTrace]) {
734 if (_completer == null) return false; 728 if (_completer == null) return false;
735 if (error != null) { 729 if (error != null) {
736 _completer.completeError(error, stackTrace); 730 _completer.completeError(error, stackTrace);
737 } else { 731 } else {
738 _completer.complete(webSocket); 732 _completer.complete(webSocket);
739 } 733 }
740 _completer = null; 734 _completer = null;
741 return true; 735 return true;
742 } 736 }
743 737
744 Future addStream(var stream) { 738 Future addStream(var stream) {
745 if (_closed) { 739 if (_closed) {
746 stream.listen(null).cancel(); 740 stream.listen(null).cancel();
747 return new Future.value(webSocket); 741 return new Future.value(webSocket);
748 } 742 }
749 _ensureController(); 743 _ensureController();
750 _completer = new Completer(); 744 _completer = new Completer();
751 _subscription = stream.listen( 745 _subscription = stream.listen((data) {
752 (data) { 746 _controller.add(data);
753 _controller.add(data); 747 }, onDone: _done, onError: _done, cancelOnError: true);
754 },
755 onDone: _done,
756 onError: _done,
757 cancelOnError: true);
758 if (_issuedPause) { 748 if (_issuedPause) {
759 _subscription.pause(); 749 _subscription.pause();
760 _issuedPause = false; 750 _issuedPause = false;
761 } 751 }
762 return _completer.future; 752 return _completer.future;
763 } 753 }
764 754
765 Future close() { 755 Future close() {
766 _ensureController(); 756 _ensureController();
767 Future closeSocket() { 757 Future closeSocket() {
768 return sink.close().catchError((_) {}).then((_) => webSocket); 758 return sink.close().catchError((_) {}).then((_) => webSocket);
769 } 759 }
770 _controller.close(); 760 _controller.close();
771 return _closeCompleter.future.then((_) => closeSocket()); 761 return _closeCompleter.future.then((_) => closeSocket());
772 } 762 }
773 763
774 void add(data) { 764 void add(data) {
775 if (_closed) return; 765 if (_closed) return;
776 _ensureController(); 766 _ensureController();
777 _controller.add(data); 767 _controller.add(data);
778 } 768 }
779 769
780 void closeSocket() { 770 void closeSocket() {
781 _closed = true; 771 _closed = true;
782 _cancel(); 772 _cancel();
783 close(); 773 close();
784 } 774 }
785 } 775 }
786 776
787
788 class _WebSocketImpl extends Stream implements CompatibleWebSocket { 777 class _WebSocketImpl extends Stream implements CompatibleWebSocket {
789 StreamController _controller; 778 StreamController _controller;
790 StreamSubscription _subscription; 779 StreamSubscription _subscription;
791 StreamController _sink; 780 StreamController _sink;
792 781
793 final bool _serverSide; 782 final bool _serverSide;
794 int _readyState = _WebSocketState.CONNECTING; 783 int _readyState = _WebSocketState.CONNECTING;
795 bool _writeClosed = false; 784 bool _writeClosed = false;
796 int _closeCode; 785 int _closeCode;
797 String _closeReason; 786 String _closeReason;
798 Duration _pingInterval; 787 Duration _pingInterval;
799 Timer _pingTimer; 788 Timer _pingTimer;
800 _WebSocketConsumer _consumer; 789 _WebSocketConsumer _consumer;
801 790
802 int _outCloseCode; 791 int _outCloseCode;
803 String _outCloseReason; 792 String _outCloseReason;
804 Timer _closeTimer; 793 Timer _closeTimer;
805 794
806 _WebSocketImpl._fromSocket(Stream<List<int>> stream, 795 _WebSocketImpl._fromSocket(
807 StreamSink<List<int>> sink, [this._serverSide = false]) { 796 Stream<List<int>> stream, StreamSink<List<int>> sink,
797 [this._serverSide = false]) {
808 _consumer = new _WebSocketConsumer(this, sink); 798 _consumer = new _WebSocketConsumer(this, sink);
809 _sink = new StreamController(); 799 _sink = new StreamController();
810 _sink.stream.pipe(_consumer); 800 _sink.stream.pipe(_consumer);
811 _readyState = _WebSocketState.OPEN; 801 _readyState = _WebSocketState.OPEN;
812 802
813 var transformer = new _WebSocketProtocolTransformer(_serverSide); 803 var transformer = new _WebSocketProtocolTransformer(_serverSide);
814 _subscription = stream.transform(transformer).listen( 804 _subscription = stream.transform(transformer).listen((data) {
815 (data) { 805 if (data is _WebSocketPing) {
816 if (data is _WebSocketPing) { 806 if (!_writeClosed) _consumer.add(new _WebSocketPong(data.payload));
817 if (!_writeClosed) _consumer.add(new _WebSocketPong(data.payload)); 807 } else if (data is _WebSocketPong) {
818 } else if (data is _WebSocketPong) { 808 // Simply set pingInterval, as it'll cancel any timers.
819 // Simply set pingInterval, as it'll cancel any timers. 809 pingInterval = _pingInterval;
820 pingInterval = _pingInterval; 810 } else {
821 } else { 811 _controller.add(data);
822 _controller.add(data); 812 }
823 } 813 }, onError: (error) {
824 }, 814 if (_closeTimer != null) _closeTimer.cancel();
825 onError: (error) { 815 if (error is FormatException) {
826 if (_closeTimer != null) _closeTimer.cancel(); 816 _close(_WebSocketStatus.INVALID_FRAME_PAYLOAD_DATA);
827 if (error is FormatException) { 817 } else {
828 _close(_WebSocketStatus.INVALID_FRAME_PAYLOAD_DATA); 818 _close(_WebSocketStatus.PROTOCOL_ERROR);
829 } else { 819 }
830 _close(_WebSocketStatus.PROTOCOL_ERROR); 820 _controller.close();
831 } 821 }, onDone: () {
832 _controller.close(); 822 if (_closeTimer != null) _closeTimer.cancel();
833 }, 823 if (_readyState == _WebSocketState.OPEN) {
834 onDone: () { 824 _readyState = _WebSocketState.CLOSING;
835 if (_closeTimer != null) _closeTimer.cancel(); 825 if (!_isReservedStatusCode(transformer.closeCode)) {
836 if (_readyState == _WebSocketState.OPEN) { 826 _close(transformer.closeCode);
837 _readyState = _WebSocketState.CLOSING; 827 } else {
838 if (!_isReservedStatusCode(transformer.closeCode)) { 828 _close();
839 _close(transformer.closeCode); 829 }
840 } else { 830 _readyState = _WebSocketState.CLOSED;
841 _close(); 831 }
842 } 832 _closeCode = transformer.closeCode;
843 _readyState = _WebSocketState.CLOSED; 833 _closeReason = transformer.closeReason;
844 } 834 _controller.close();
845 _closeCode = transformer.closeCode; 835 }, cancelOnError: true);
846 _closeReason = transformer.closeReason;
847 _controller.close();
848 },
849 cancelOnError: true);
850 _subscription.pause(); 836 _subscription.pause();
851 _controller = new StreamController(sync: true, 837 _controller = new StreamController(
852 onListen: _subscription.resume, 838 sync: true,
853 onPause: _subscription.pause, 839 onListen: _subscription.resume,
854 onResume: _subscription.resume); 840 onPause: _subscription.pause,
841 onResume: _subscription.resume);
855 } 842 }
856 843
857 StreamSubscription listen(void onData(message), 844 StreamSubscription listen(void onData(message),
858 {Function onError, 845 {Function onError, void onDone(), bool cancelOnError}) {
859 void onDone(),
860 bool cancelOnError}) {
861 return _controller.stream.listen(onData, 846 return _controller.stream.listen(onData,
862 onError: onError, 847 onError: onError, onDone: onDone, cancelOnError: cancelOnError);
863 onDone: onDone,
864 cancelOnError: cancelOnError);
865 } 848 }
866 849
867 Duration get pingInterval => _pingInterval; 850 Duration get pingInterval => _pingInterval;
868 851
869 void set pingInterval(Duration interval) { 852 void set pingInterval(Duration interval) {
870 if (_writeClosed) return; 853 if (_writeClosed) return;
871 if (_pingTimer != null) _pingTimer.cancel(); 854 if (_pingTimer != null) _pingTimer.cancel();
872 _pingInterval = interval; 855 _pingInterval = interval;
873 856
874 if (_pingInterval == null) return; 857 if (_pingInterval == null) return;
(...skipping 40 matching lines...) Expand 10 before | Expand all | Expand 10 after
915 if (_outCloseCode == null) { 898 if (_outCloseCode == null) {
916 _outCloseCode = code; 899 _outCloseCode = code;
917 _outCloseReason = reason; 900 _outCloseReason = reason;
918 } 901 }
919 _writeClosed = true; 902 _writeClosed = true;
920 _consumer.closeSocket(); 903 _consumer.closeSocket();
921 } 904 }
922 905
923 static bool _isReservedStatusCode(int code) { 906 static bool _isReservedStatusCode(int code) {
924 return code != null && 907 return code != null &&
925 (code < _WebSocketStatus.NORMAL_CLOSURE || 908 (code < _WebSocketStatus.NORMAL_CLOSURE ||
926 code == _WebSocketStatus.RESERVED_1004 || 909 code == _WebSocketStatus.RESERVED_1004 ||
927 code == _WebSocketStatus.NO_STATUS_RECEIVED || 910 code == _WebSocketStatus.NO_STATUS_RECEIVED ||
928 code == _WebSocketStatus.ABNORMAL_CLOSURE || 911 code == _WebSocketStatus.ABNORMAL_CLOSURE ||
929 (code > _WebSocketStatus.INTERNAL_SERVER_ERROR && 912 (code > _WebSocketStatus.INTERNAL_SERVER_ERROR &&
930 code < _WebSocketStatus.RESERVED_1015) || 913 code < _WebSocketStatus.RESERVED_1015) ||
931 (code >= _WebSocketStatus.RESERVED_1015 && 914 (code >= _WebSocketStatus.RESERVED_1015 && code < 3000));
932 code < 3000));
933 } 915 }
934 } 916 }
935
OLDNEW
« no previous file with comments | « lib/src/media_type.dart ('k') | pubspec.yaml » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698