Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(139)

Side by Side Diff: lib/src/web_socket.dart

Issue 1225403008: Bring in latest dart:io WebSocket code. (Closed) Base URL: git@github.com:dart-lang/http_parser@master
Patch Set: pubspec + changelog Created 5 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « lib/src/copy/web_socket_impl.dart ('k') | pubspec.yaml » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2014, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2014, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 library http_parser.web_socket; 5 library http_parser.web_socket;
6 6
7 import 'dart:async'; 7 import 'dart:async';
8 import 'dart:convert';
9 import 'dart:math';
10 import 'dart:typed_data';
11 8
12 import 'package:crypto/crypto.dart'; 9 import 'package:crypto/crypto.dart';
13 10
14 import 'bytes_builder.dart'; 11 import 'copy/web_socket_impl.dart';
15 12
16 /// An implementation of the WebSocket protocol that's not specific to "dart:io" 13 /// An implementation of the WebSocket protocol that's not specific to "dart:io"
17 /// or to any particular HTTP API. 14 /// or to any particular HTTP API.
18 /// 15 ///
19 /// Because this is HTTP-API-agnostic, it doesn't handle the initial [WebSocket 16 /// Because this is HTTP-API-agnostic, it doesn't handle the initial [WebSocket
20 /// handshake][]. This needs to be handled manually by the user of the code. 17 /// handshake][]. This needs to be handled manually by the user of the code.
21 /// Once that's been done, [new CompatibleWebSocket] can be called with the 18 /// Once that's been done, [new CompatibleWebSocket] can be called with the
22 /// underlying socket and it will handle the remainder of the protocol. 19 /// underlying socket and it will handle the remainder of the protocol.
23 /// 20 ///
24 /// [WebSocket handshake]: https://tools.ietf.org/html/rfc6455#section-4 21 /// [WebSocket handshake]: https://tools.ietf.org/html/rfc6455#section-4
(...skipping 30 matching lines...) Expand all
55 /// the [initial handshake]. 52 /// the [initial handshake].
56 /// 53 ///
57 /// The return value should be sent back to the client in a 54 /// The return value should be sent back to the client in a
58 /// `Sec-WebSocket-Accept` header. 55 /// `Sec-WebSocket-Accept` header.
59 /// 56 ///
60 /// [initial handshake]: https://tools.ietf.org/html/rfc6455#section-4.2.2 57 /// [initial handshake]: https://tools.ietf.org/html/rfc6455#section-4.2.2
61 static String signKey(String key) { 58 static String signKey(String key) {
62 var hash = new SHA1(); 59 var hash = new SHA1();
63 // We use [codeUnits] here rather than UTF-8-decoding the string because 60 // We use [codeUnits] here rather than UTF-8-decoding the string because
64 // [key] is expected to be base64 encoded, and so will be pure ASCII. 61 // [key] is expected to be base64 encoded, and so will be pure ASCII.
65 hash.add((key + _webSocketGUID).codeUnits); 62 hash.add((key + webSocketGUID).codeUnits);
66 return CryptoUtils.bytesToBase64(hash.close()); 63 return CryptoUtils.bytesToBase64(hash.close());
67 } 64 }
68 65
69 /// Creates a new WebSocket handling messaging across an existing socket. 66 /// Creates a new WebSocket handling messaging across an existing socket.
70 /// 67 ///
71 /// Because this is HTTP-API-agnostic, the initial [WebSocket handshake][] 68 /// Because this is HTTP-API-agnostic, the initial [WebSocket handshake][]
72 /// must have already been completed on the socket before this is called. 69 /// must have already been completed on the socket before this is called.
73 /// 70 ///
74 /// If [stream] is also a [StreamSink] (for example, if it's a "dart:io" 71 /// If [stream] is also a [StreamSink] (for example, if it's a "dart:io"
75 /// `Socket`), it will be used for both sending and receiving data. Otherwise, 72 /// `Socket`), it will be used for both sending and receiving data. Otherwise,
76 /// it will be used for receiving data and [sink] will be used for sending it. 73 /// it will be used for receiving data and [sink] will be used for sending it.
77 /// 74 ///
75 /// [protocol] should be the protocol negotiated by this handshake, if any.
76 ///
78 /// If this is a WebSocket server, [serverSide] should be `true` (the 77 /// If this is a WebSocket server, [serverSide] should be `true` (the
79 /// default); if it's a client, [serverSide] should be `false`. 78 /// default); if it's a client, [serverSide] should be `false`.
80 /// 79 ///
81 /// [WebSocket handshake]: https://tools.ietf.org/html/rfc6455#section-4 80 /// [WebSocket handshake]: https://tools.ietf.org/html/rfc6455#section-4
82 factory CompatibleWebSocket(Stream<List<int>> stream, 81 factory CompatibleWebSocket(Stream<List<int>> stream,
83 {StreamSink<List<int>> sink, bool serverSide: true}) { 82 {StreamSink<List<int>> sink, String protocol, bool serverSide: true}) {
84 if (sink == null) { 83 if (sink == null) {
85 if (stream is! StreamSink) { 84 if (stream is! StreamSink) {
86 throw new ArgumentError("If stream isn't also a StreamSink, sink must " 85 throw new ArgumentError("If stream isn't also a StreamSink, sink must "
87 "be passed explicitly."); 86 "be passed explicitly.");
88 } 87 }
89 sink = stream as StreamSink; 88 sink = stream as StreamSink;
90 } 89 }
91 90
92 return new _WebSocketImpl._fromSocket(stream, sink, serverSide); 91 return new WebSocketImpl.fromSocket(stream, sink, protocol, serverSide);
93 } 92 }
94 93
95 /// Closes the web socket connection. 94 /// Closes the web socket connection.
96 /// 95 ///
97 /// [closeCode] and [closeReason] are the [close code][] and [reason][] sent 96 /// [closeCode] and [closeReason] are the [close code][] and [reason][] sent
98 /// to the remote peer, respectively. If they are omitted, the peer will see 97 /// to the remote peer, respectively. If they are omitted, the peer will see
99 /// a "no status received" code with no reason. 98 /// a "no status received" code with no reason.
100 /// 99 ///
101 /// [close code]: https://tools.ietf.org/html/rfc6455#section-7.1.5 100 /// [close code]: https://tools.ietf.org/html/rfc6455#section-7.1.5
102 /// [reason]: https://tools.ietf.org/html/rfc6455#section-7.1.6 101 /// [reason]: https://tools.ietf.org/html/rfc6455#section-7.1.6
103 Future close([int closeCode, String closeReason]); 102 Future close([int closeCode, String closeReason]);
104 } 103 }
105 104
106 /// An exception thrown by [CompatibleWebSocket]. 105 /// An exception thrown by [CompatibleWebSocket].
107 class CompatibleWebSocketException implements Exception { 106 class CompatibleWebSocketException implements Exception {
108 final String message; 107 final String message;
109 108
110 CompatibleWebSocketException([this.message]); 109 CompatibleWebSocketException([this.message]);
111 110
112 String toString() => message == null 111 String toString() => message == null
113 ? "CompatibleWebSocketException" : 112 ? "CompatibleWebSocketException" :
114 "CompatibleWebSocketException: $message"; 113 "CompatibleWebSocketException: $message";
115 } 114 }
116
117 // The following code is copied from sdk/lib/io/websocket_impl.dart. The
118 // "dart:io" implementation isn't used directly both to support non-"dart:io"
119 // applications, and because it's incompatible with non-"dart:io" HTTP requests
120 // (issue 18172).
121 //
122 // Because it's copied directly, only modifications necessary to support the
123 // desired public API and to remove "dart:io" dependencies have been made.
124
125 /**
126 * Web socket status codes used when closing a web socket connection.
127 */
128 abstract class _WebSocketStatus {
129 static const int NORMAL_CLOSURE = 1000;
130 static const int GOING_AWAY = 1001;
131 static const int PROTOCOL_ERROR = 1002;
132 static const int UNSUPPORTED_DATA = 1003;
133 static const int RESERVED_1004 = 1004;
134 static const int NO_STATUS_RECEIVED = 1005;
135 static const int ABNORMAL_CLOSURE = 1006;
136 static const int INVALID_FRAME_PAYLOAD_DATA = 1007;
137 static const int POLICY_VIOLATION = 1008;
138 static const int MESSAGE_TOO_BIG = 1009;
139 static const int MISSING_MANDATORY_EXTENSION = 1010;
140 static const int INTERNAL_SERVER_ERROR = 1011;
141 static const int RESERVED_1015 = 1015;
142 }
143
144 abstract class _WebSocketState {
145 static const int CONNECTING = 0;
146 static const int OPEN = 1;
147 static const int CLOSING = 2;
148 static const int CLOSED = 3;
149 }
150
151 const String _webSocketGUID = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
152
153 final _random = new Random();
154
155 // Matches _WebSocketOpcode.
156 class _WebSocketMessageType {
157 static const int NONE = 0;
158 static const int TEXT = 1;
159 static const int BINARY = 2;
160 }
161
162
163 class _WebSocketOpcode {
164 static const int CONTINUATION = 0;
165 static const int TEXT = 1;
166 static const int BINARY = 2;
167 static const int RESERVED_3 = 3;
168 static const int RESERVED_4 = 4;
169 static const int RESERVED_5 = 5;
170 static const int RESERVED_6 = 6;
171 static const int RESERVED_7 = 7;
172 static const int CLOSE = 8;
173 static const int PING = 9;
174 static const int PONG = 10;
175 static const int RESERVED_B = 11;
176 static const int RESERVED_C = 12;
177 static const int RESERVED_D = 13;
178 static const int RESERVED_E = 14;
179 static const int RESERVED_F = 15;
180 }
181
182 /**
183 * The web socket protocol transformer handles the protocol byte stream
184 * which is supplied through the [:handleData:]. As the protocol is processed,
185 * it'll output frame data as either a List<int> or String.
186 *
187 * Important infomation about usage: Be sure you use cancelOnError, so the
188 * socket will be closed when the processer encounter an error. Not using it
189 * will lead to undefined behaviour.
190 */
191 // TODO(ajohnsen): make this transformer reusable?
192 class _WebSocketProtocolTransformer implements StreamTransformer, EventSink {
193 static const int START = 0;
194 static const int LEN_FIRST = 1;
195 static const int LEN_REST = 2;
196 static const int MASK = 3;
197 static const int PAYLOAD = 4;
198 static const int CLOSED = 5;
199 static const int FAILURE = 6;
200
201 int _state = START;
202 bool _fin = false;
203 int _opcode = -1;
204 int _len = -1;
205 bool _masked = false;
206 int _remainingLenBytes = -1;
207 int _remainingMaskingKeyBytes = 4;
208 int _remainingPayloadBytes = -1;
209 int _unmaskingIndex = 0;
210 int _currentMessageType = _WebSocketMessageType.NONE;
211 int closeCode = _WebSocketStatus.NO_STATUS_RECEIVED;
212 String closeReason = "";
213
214 EventSink _eventSink;
215
216 final bool _serverSide;
217 final List _maskingBytes = new List(4);
218 final BytesBuilder _payload = new BytesBuilder(copy: false);
219
220 _WebSocketProtocolTransformer([this._serverSide = false]);
221
222 Stream bind(Stream stream) {
223 return new Stream.eventTransformed(
224 stream,
225 (EventSink eventSink) {
226 if (_eventSink != null) {
227 throw new StateError("WebSocket transformer already used.");
228 }
229 _eventSink = eventSink;
230 return this;
231 });
232 }
233
234 void addError(Object error, [StackTrace stackTrace]) =>
235 _eventSink.addError(error, stackTrace);
236
237 void close() => _eventSink.close();
238
239 /**
240 * Process data received from the underlying communication channel.
241 */
242 void add(Uint8List buffer) {
243 int count = buffer.length;
244 int index = 0;
245 int lastIndex = count;
246 if (_state == CLOSED) {
247 throw new CompatibleWebSocketException("Data on closed connection");
248 }
249 if (_state == FAILURE) {
250 throw new CompatibleWebSocketException("Data on failed connection");
251 }
252 while ((index < lastIndex) && _state != CLOSED && _state != FAILURE) {
253 int byte = buffer[index];
254 if (_state <= LEN_REST) {
255 if (_state == START) {
256 _fin = (byte & 0x80) != 0;
257 if ((byte & 0x70) != 0) {
258 // The RSV1, RSV2 bits RSV3 must be all zero.
259 throw new CompatibleWebSocketException("Protocol error");
260 }
261 _opcode = (byte & 0xF);
262 if (_opcode <= _WebSocketOpcode.BINARY) {
263 if (_opcode == _WebSocketOpcode.CONTINUATION) {
264 if (_currentMessageType == _WebSocketMessageType.NONE) {
265 throw new CompatibleWebSocketException("Protocol error");
266 }
267 } else {
268 assert(_opcode == _WebSocketOpcode.TEXT ||
269 _opcode == _WebSocketOpcode.BINARY);
270 if (_currentMessageType != _WebSocketMessageType.NONE) {
271 throw new CompatibleWebSocketException("Protocol error");
272 }
273 _currentMessageType = _opcode;
274 }
275 } else if (_opcode >= _WebSocketOpcode.CLOSE &&
276 _opcode <= _WebSocketOpcode.PONG) {
277 // Control frames cannot be fragmented.
278 if (!_fin) throw new CompatibleWebSocketException("Protocol error");
279 } else {
280 throw new CompatibleWebSocketException("Protocol error");
281 }
282 _state = LEN_FIRST;
283 } else if (_state == LEN_FIRST) {
284 _masked = (byte & 0x80) != 0;
285 _len = byte & 0x7F;
286 if (_isControlFrame() && _len > 125) {
287 throw new CompatibleWebSocketException("Protocol error");
288 }
289 if (_len == 126) {
290 _len = 0;
291 _remainingLenBytes = 2;
292 _state = LEN_REST;
293 } else if (_len == 127) {
294 _len = 0;
295 _remainingLenBytes = 8;
296 _state = LEN_REST;
297 } else {
298 assert(_len < 126);
299 _lengthDone();
300 }
301 } else {
302 assert(_state == LEN_REST);
303 _len = _len << 8 | byte;
304 _remainingLenBytes--;
305 if (_remainingLenBytes == 0) {
306 _lengthDone();
307 }
308 }
309 } else {
310 if (_state == MASK) {
311 _maskingBytes[4 - _remainingMaskingKeyBytes--] = byte;
312 if (_remainingMaskingKeyBytes == 0) {
313 _maskDone();
314 }
315 } else {
316 assert(_state == PAYLOAD);
317 // The payload is not handled one byte at a time but in blocks.
318 int payloadLength = min(lastIndex - index, _remainingPayloadBytes);
319 _remainingPayloadBytes -= payloadLength;
320 // Unmask payload if masked.
321 if (_masked) {
322 _unmask(index, payloadLength, buffer);
323 }
324 // Control frame and data frame share _payloads.
325 _payload.add(
326 new Uint8List.view(buffer.buffer, index, payloadLength));
327 index += payloadLength;
328 if (_isControlFrame()) {
329 if (_remainingPayloadBytes == 0) _controlFrameEnd();
330 } else {
331 if (_currentMessageType != _WebSocketMessageType.TEXT &&
332 _currentMessageType != _WebSocketMessageType.BINARY) {
333 throw new CompatibleWebSocketException("Protocol error");
334 }
335 if (_remainingPayloadBytes == 0) _messageFrameEnd();
336 }
337
338 // Hack - as we always do index++ below.
339 index--;
340 }
341 }
342
343 // Move to the next byte.
344 index++;
345 }
346 }
347
348 void _unmask(int index, int length, Uint8List buffer) {
349 const int BLOCK_SIZE = 16;
350 // Skip Int32x4-version if message is small.
351 if (length >= BLOCK_SIZE) {
352 // Start by aligning to 16 bytes.
353 final int startOffset = BLOCK_SIZE - (index & 15);
354 final int end = index + startOffset;
355 for (int i = index; i < end; i++) {
356 buffer[i] ^= _maskingBytes[_unmaskingIndex++ & 3];
357 }
358 index += startOffset;
359 length -= startOffset;
360 final int blockCount = length ~/ BLOCK_SIZE;
361 if (blockCount > 0) {
362 // Create mask block.
363 int mask = 0;
364 for (int i = 3; i >= 0; i--) {
365 mask = (mask << 8) | _maskingBytes[(_unmaskingIndex + i) & 3];
366 }
367 Int32x4 blockMask = new Int32x4(mask, mask, mask, mask);
368 Int32x4List blockBuffer = new Int32x4List.view(
369 buffer.buffer, index, blockCount);
370 for (int i = 0; i < blockBuffer.length; i++) {
371 blockBuffer[i] ^= blockMask;
372 }
373 final int bytes = blockCount * BLOCK_SIZE;
374 index += bytes;
375 length -= bytes;
376 }
377 }
378 // Handle end.
379 final int end = index + length;
380 for (int i = index; i < end; i++) {
381 buffer[i] ^= _maskingBytes[_unmaskingIndex++ & 3];
382 }
383 }
384
385 void _lengthDone() {
386 if (_masked) {
387 if (!_serverSide) {
388 throw new CompatibleWebSocketException(
389 "Received masked frame from server");
390 }
391 _state = MASK;
392 } else {
393 if (_serverSide) {
394 throw new CompatibleWebSocketException(
395 "Received unmasked frame from client");
396 }
397 _remainingPayloadBytes = _len;
398 _startPayload();
399 }
400 }
401
402 void _maskDone() {
403 _remainingPayloadBytes = _len;
404 _startPayload();
405 }
406
407 void _startPayload() {
408 // If there is no actual payload perform perform callbacks without
409 // going through the PAYLOAD state.
410 if (_remainingPayloadBytes == 0) {
411 if (_isControlFrame()) {
412 switch (_opcode) {
413 case _WebSocketOpcode.CLOSE:
414 _state = CLOSED;
415 _eventSink.close();
416 break;
417 case _WebSocketOpcode.PING:
418 _eventSink.add(new _WebSocketPing());
419 break;
420 case _WebSocketOpcode.PONG:
421 _eventSink.add(new _WebSocketPong());
422 break;
423 }
424 _prepareForNextFrame();
425 } else {
426 _messageFrameEnd();
427 }
428 } else {
429 _state = PAYLOAD;
430 }
431 }
432
433 void _messageFrameEnd() {
434 if (_fin) {
435 switch (_currentMessageType) {
436 case _WebSocketMessageType.TEXT:
437 _eventSink.add(UTF8.decode(_payload.takeBytes()));
438 break;
439 case _WebSocketMessageType.BINARY:
440 _eventSink.add(_payload.takeBytes());
441 break;
442 }
443 _currentMessageType = _WebSocketMessageType.NONE;
444 }
445 _prepareForNextFrame();
446 }
447
448 void _controlFrameEnd() {
449 switch (_opcode) {
450 case _WebSocketOpcode.CLOSE:
451 closeCode = _WebSocketStatus.NO_STATUS_RECEIVED;
452 var payload = _payload.takeBytes();
453 if (payload.length > 0) {
454 if (payload.length == 1) {
455 throw new CompatibleWebSocketException("Protocol error");
456 }
457 closeCode = payload[0] << 8 | payload[1];
458 if (closeCode == _WebSocketStatus.NO_STATUS_RECEIVED) {
459 throw new CompatibleWebSocketException("Protocol error");
460 }
461 if (payload.length > 2) {
462 closeReason = UTF8.decode(payload.sublist(2));
463 }
464 }
465 _state = CLOSED;
466 _eventSink.close();
467 break;
468
469 case _WebSocketOpcode.PING:
470 _eventSink.add(new _WebSocketPing(_payload.takeBytes()));
471 break;
472
473 case _WebSocketOpcode.PONG:
474 _eventSink.add(new _WebSocketPong(_payload.takeBytes()));
475 break;
476 }
477 _prepareForNextFrame();
478 }
479
480 bool _isControlFrame() {
481 return _opcode == _WebSocketOpcode.CLOSE ||
482 _opcode == _WebSocketOpcode.PING ||
483 _opcode == _WebSocketOpcode.PONG;
484 }
485
486 void _prepareForNextFrame() {
487 if (_state != CLOSED && _state != FAILURE) _state = START;
488 _fin = false;
489 _opcode = -1;
490 _len = -1;
491 _remainingLenBytes = -1;
492 _remainingMaskingKeyBytes = 4;
493 _remainingPayloadBytes = -1;
494 _unmaskingIndex = 0;
495 }
496 }
497
498
499 class _WebSocketPing {
500 final List<int> payload;
501 _WebSocketPing([this.payload = null]);
502 }
503
504
505 class _WebSocketPong {
506 final List<int> payload;
507 _WebSocketPong([this.payload = null]);
508 }
509
510 // TODO(ajohnsen): Make this transformer reusable.
511 class _WebSocketOutgoingTransformer implements StreamTransformer, EventSink {
512 final _WebSocketImpl webSocket;
513 EventSink _eventSink;
514
515 _WebSocketOutgoingTransformer(this.webSocket);
516
517 Stream bind(Stream stream) {
518 return new Stream.eventTransformed(
519 stream,
520 (EventSink eventSink) {
521 if (_eventSink != null) {
522 throw new StateError("WebSocket transformer already used");
523 }
524 _eventSink = eventSink;
525 return this;
526 });
527 }
528
529 void add(message) {
530 if (message is _WebSocketPong) {
531 addFrame(_WebSocketOpcode.PONG, message.payload);
532 return;
533 }
534 if (message is _WebSocketPing) {
535 addFrame(_WebSocketOpcode.PING, message.payload);
536 return;
537 }
538 List<int> data;
539 int opcode;
540 if (message != null) {
541 if (message is String) {
542 opcode = _WebSocketOpcode.TEXT;
543 data = UTF8.encode(message);
544 } else {
545 if (message is !List<int>) {
546 throw new ArgumentError(message);
547 }
548 opcode = _WebSocketOpcode.BINARY;
549 data = message;
550 }
551 } else {
552 opcode = _WebSocketOpcode.TEXT;
553 }
554 addFrame(opcode, data);
555 }
556
557 void addError(Object error, [StackTrace stackTrace]) =>
558 _eventSink.addError(error, stackTrace);
559
560 void close() {
561 int code = webSocket._outCloseCode;
562 String reason = webSocket._outCloseReason;
563 List<int> data;
564 if (code != null) {
565 data = new List<int>();
566 data.add((code >> 8) & 0xFF);
567 data.add(code & 0xFF);
568 if (reason != null) {
569 data.addAll(UTF8.encode(reason));
570 }
571 }
572 addFrame(_WebSocketOpcode.CLOSE, data);
573 _eventSink.close();
574 }
575
576 void addFrame(int opcode, List<int> data) =>
577 createFrame(opcode, data, webSocket._serverSide).forEach(_eventSink.add);
578
579 static Iterable createFrame(int opcode, List<int> data, bool serverSide) {
580 bool mask = !serverSide; // Masking not implemented for server.
581 int dataLength = data == null ? 0 : data.length;
582 // Determine the header size.
583 int headerSize = (mask) ? 6 : 2;
584 if (dataLength > 65535) {
585 headerSize += 8;
586 } else if (dataLength > 125) {
587 headerSize += 2;
588 }
589 Uint8List header = new Uint8List(headerSize);
590 int index = 0;
591 // Set FIN and opcode.
592 header[index++] = 0x80 | opcode;
593 // Determine size and position of length field.
594 int lengthBytes = 1;
595 if (dataLength > 65535) {
596 header[index++] = 127;
597 lengthBytes = 8;
598 } else if (dataLength > 125) {
599 header[index++] = 126;
600 lengthBytes = 2;
601 }
602 // Write the length in network byte order into the header.
603 for (int i = 0; i < lengthBytes; i++) {
604 header[index++] = dataLength >> (((lengthBytes - 1) - i) * 8) & 0xFF;
605 }
606 if (mask) {
607 header[1] |= 1 << 7;
608 var maskBytes = [_random.nextInt(256), _random.nextInt(256),
609 _random.nextInt(256), _random.nextInt(256)];
610 header.setRange(index, index + 4, maskBytes);
611 index += 4;
612 if (data != null) {
613 Uint8List list;
614 // If this is a text message just do the masking inside the
615 // encoded data.
616 if (opcode == _WebSocketOpcode.TEXT && data is Uint8List) {
617 list = data;
618 } else {
619 if (data is Uint8List) {
620 list = new Uint8List.fromList(data);
621 } else {
622 list = new Uint8List(data.length);
623 for (int i = 0; i < data.length; i++) {
624 if (data[i] < 0 || 255 < data[i]) {
625 throw new ArgumentError(
626 "List element is not a byte value "
627 "(value ${data[i]} at index $i)");
628 }
629 list[i] = data[i];
630 }
631 }
632 }
633 const int BLOCK_SIZE = 16;
634 int blockCount = list.length ~/ BLOCK_SIZE;
635 if (blockCount > 0) {
636 // Create mask block.
637 int mask = 0;
638 for (int i = 3; i >= 0; i--) {
639 mask = (mask << 8) | maskBytes[i];
640 }
641 Int32x4 blockMask = new Int32x4(mask, mask, mask, mask);
642 Int32x4List blockBuffer = new Int32x4List.view(
643 list.buffer, 0, blockCount);
644 for (int i = 0; i < blockBuffer.length; i++) {
645 blockBuffer[i] ^= blockMask;
646 }
647 }
648 // Handle end.
649 for (int i = blockCount * BLOCK_SIZE; i < list.length; i++) {
650 list[i] ^= maskBytes[i & 3];
651 }
652 data = list;
653 }
654 }
655 assert(index == headerSize);
656 if (data == null) {
657 return [header];
658 } else {
659 return [header, data];
660 }
661 }
662 }
663
664
665 class _WebSocketConsumer implements StreamConsumer {
666 final _WebSocketImpl webSocket;
667 final StreamSink<List<int>> sink;
668 StreamController _controller;
669 StreamSubscription _subscription;
670 bool _issuedPause = false;
671 bool _closed = false;
672 Completer _closeCompleter = new Completer();
673 Completer _completer;
674
675 _WebSocketConsumer(this.webSocket, this.sink);
676
677 void _onListen() {
678 if (_subscription != null) {
679 _subscription.cancel();
680 }
681 }
682
683 void _onPause() {
684 if (_subscription != null) {
685 _subscription.pause();
686 } else {
687 _issuedPause = true;
688 }
689 }
690
691 void _onResume() {
692 if (_subscription != null) {
693 _subscription.resume();
694 } else {
695 _issuedPause = false;
696 }
697 }
698
699 void _cancel() {
700 if (_subscription != null) {
701 var subscription = _subscription;
702 _subscription = null;
703 subscription.cancel();
704 }
705 }
706
707 _ensureController() {
708 if (_controller != null) return;
709 _controller = new StreamController(sync: true,
710 onPause: _onPause,
711 onResume: _onResume,
712 onCancel: _onListen);
713 var stream = _controller.stream.transform(
714 new _WebSocketOutgoingTransformer(webSocket));
715 sink.addStream(stream)
716 .then((_) {
717 _done();
718 _closeCompleter.complete(webSocket);
719 }, onError: (error, StackTrace stackTrace) {
720 _closed = true;
721 _cancel();
722 if (error is ArgumentError) {
723 if (!_done(error, stackTrace)) {
724 _closeCompleter.completeError(error, stackTrace);
725 }
726 } else {
727 _done();
728 _closeCompleter.complete(webSocket);
729 }
730 });
731 }
732
733 bool _done([error, StackTrace stackTrace]) {
734 if (_completer == null) return false;
735 if (error != null) {
736 _completer.completeError(error, stackTrace);
737 } else {
738 _completer.complete(webSocket);
739 }
740 _completer = null;
741 return true;
742 }
743
744 Future addStream(var stream) {
745 if (_closed) {
746 stream.listen(null).cancel();
747 return new Future.value(webSocket);
748 }
749 _ensureController();
750 _completer = new Completer();
751 _subscription = stream.listen(
752 (data) {
753 _controller.add(data);
754 },
755 onDone: _done,
756 onError: _done,
757 cancelOnError: true);
758 if (_issuedPause) {
759 _subscription.pause();
760 _issuedPause = false;
761 }
762 return _completer.future;
763 }
764
765 Future close() {
766 _ensureController();
767 Future closeSocket() {
768 return sink.close().catchError((_) {}).then((_) => webSocket);
769 }
770 _controller.close();
771 return _closeCompleter.future.then((_) => closeSocket());
772 }
773
774 void add(data) {
775 if (_closed) return;
776 _ensureController();
777 _controller.add(data);
778 }
779
780 void closeSocket() {
781 _closed = true;
782 _cancel();
783 close();
784 }
785 }
786
787
788 class _WebSocketImpl extends Stream implements CompatibleWebSocket {
789 StreamController _controller;
790 StreamSubscription _subscription;
791 StreamController _sink;
792
793 final bool _serverSide;
794 int _readyState = _WebSocketState.CONNECTING;
795 bool _writeClosed = false;
796 int _closeCode;
797 String _closeReason;
798 Duration _pingInterval;
799 Timer _pingTimer;
800 _WebSocketConsumer _consumer;
801
802 int _outCloseCode;
803 String _outCloseReason;
804 Timer _closeTimer;
805
806 _WebSocketImpl._fromSocket(Stream<List<int>> stream,
807 StreamSink<List<int>> sink, [this._serverSide = false]) {
808 _consumer = new _WebSocketConsumer(this, sink);
809 _sink = new StreamController();
810 _sink.stream.pipe(_consumer);
811 _readyState = _WebSocketState.OPEN;
812
813 var transformer = new _WebSocketProtocolTransformer(_serverSide);
814 _subscription = stream.transform(transformer).listen(
815 (data) {
816 if (data is _WebSocketPing) {
817 if (!_writeClosed) _consumer.add(new _WebSocketPong(data.payload));
818 } else if (data is _WebSocketPong) {
819 // Simply set pingInterval, as it'll cancel any timers.
820 pingInterval = _pingInterval;
821 } else {
822 _controller.add(data);
823 }
824 },
825 onError: (error) {
826 if (_closeTimer != null) _closeTimer.cancel();
827 if (error is FormatException) {
828 _close(_WebSocketStatus.INVALID_FRAME_PAYLOAD_DATA);
829 } else {
830 _close(_WebSocketStatus.PROTOCOL_ERROR);
831 }
832 _controller.close();
833 },
834 onDone: () {
835 if (_closeTimer != null) _closeTimer.cancel();
836 if (_readyState == _WebSocketState.OPEN) {
837 _readyState = _WebSocketState.CLOSING;
838 if (!_isReservedStatusCode(transformer.closeCode)) {
839 _close(transformer.closeCode);
840 } else {
841 _close();
842 }
843 _readyState = _WebSocketState.CLOSED;
844 }
845 _closeCode = transformer.closeCode;
846 _closeReason = transformer.closeReason;
847 _controller.close();
848 },
849 cancelOnError: true);
850 _subscription.pause();
851 _controller = new StreamController(sync: true,
852 onListen: _subscription.resume,
853 onPause: _subscription.pause,
854 onResume: _subscription.resume);
855 }
856
857 StreamSubscription listen(void onData(message),
858 {Function onError,
859 void onDone(),
860 bool cancelOnError}) {
861 return _controller.stream.listen(onData,
862 onError: onError,
863 onDone: onDone,
864 cancelOnError: cancelOnError);
865 }
866
867 Duration get pingInterval => _pingInterval;
868
869 void set pingInterval(Duration interval) {
870 if (_writeClosed) return;
871 if (_pingTimer != null) _pingTimer.cancel();
872 _pingInterval = interval;
873
874 if (_pingInterval == null) return;
875
876 _pingTimer = new Timer(_pingInterval, () {
877 if (_writeClosed) return;
878 _consumer.add(new _WebSocketPing());
879 _pingTimer = new Timer(_pingInterval, () {
880 // No pong received.
881 _close(_WebSocketStatus.GOING_AWAY);
882 });
883 });
884 }
885
886 int get closeCode => _closeCode;
887 String get closeReason => _closeReason;
888
889 void add(data) => _sink.add(data);
890 void addError(error, [StackTrace stackTrace]) =>
891 _sink.addError(error, stackTrace);
892 Future addStream(Stream stream) => _sink.addStream(stream);
893 Future get done => _sink.done;
894
895 Future close([int code, String reason]) {
896 if (_isReservedStatusCode(code)) {
897 throw new CompatibleWebSocketException("Reserved status code $code");
898 }
899 if (_outCloseCode == null) {
900 _outCloseCode = code;
901 _outCloseReason = reason;
902 }
903 if (_closeTimer == null && !_controller.isClosed) {
904 // When closing the web-socket, we no longer accept data.
905 _closeTimer = new Timer(const Duration(seconds: 5), () {
906 _subscription.cancel();
907 _controller.close();
908 });
909 }
910 return _sink.close();
911 }
912
913 void _close([int code, String reason]) {
914 if (_writeClosed) return;
915 if (_outCloseCode == null) {
916 _outCloseCode = code;
917 _outCloseReason = reason;
918 }
919 _writeClosed = true;
920 _consumer.closeSocket();
921 }
922
923 static bool _isReservedStatusCode(int code) {
924 return code != null &&
925 (code < _WebSocketStatus.NORMAL_CLOSURE ||
926 code == _WebSocketStatus.RESERVED_1004 ||
927 code == _WebSocketStatus.NO_STATUS_RECEIVED ||
928 code == _WebSocketStatus.ABNORMAL_CLOSURE ||
929 (code > _WebSocketStatus.INTERNAL_SERVER_ERROR &&
930 code < _WebSocketStatus.RESERVED_1015) ||
931 (code >= _WebSocketStatus.RESERVED_1015 &&
932 code < 3000));
933 }
934 }
935
OLDNEW
« no previous file with comments | « lib/src/copy/web_socket_impl.dart ('k') | pubspec.yaml » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698