Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(304)

Side by Side Diff: lib/src/copy/web_socket_impl.dart

Issue 1993933003: Remove unused libraries. (Closed) Base URL: git@github.com:dart-lang/http_parser@master
Patch Set: Created 4 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « lib/src/copy/web_socket.dart ('k') | pubspec.yaml » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file.
4
5 // The following code is copied from sdk/lib/io/websocket_impl.dart. The
6 // "dart:io" implementation isn't used directly to support non-"dart:io"
7 // applications.
8 //
9 // Because it's copied directly, only modifications necessary to support the
10 // desired public API and to remove "dart:io" dependencies have been made.
11 //
12 // This is up-to-date as of sdk revision
13 // 86227840d75d974feb238f8b3c59c038b99c05cf.
14 import 'dart:async';
15 import 'dart:convert';
16 import 'dart:math';
17 import 'dart:typed_data';
18
19 import '../web_socket.dart';
20 import 'bytes_builder.dart';
21 import 'io_sink.dart';
22 import 'web_socket.dart';
23
24 const String webSocketGUID = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
25
26 final _random = new Random();
27
28 // Matches _WebSocketOpcode.
29 class _WebSocketMessageType {
30 static const int NONE = 0;
31 static const int TEXT = 1;
32 static const int BINARY = 2;
33 }
34
35
36 class _WebSocketOpcode {
37 static const int CONTINUATION = 0;
38 static const int TEXT = 1;
39 static const int BINARY = 2;
40 static const int RESERVED_3 = 3;
41 static const int RESERVED_4 = 4;
42 static const int RESERVED_5 = 5;
43 static const int RESERVED_6 = 6;
44 static const int RESERVED_7 = 7;
45 static const int CLOSE = 8;
46 static const int PING = 9;
47 static const int PONG = 10;
48 static const int RESERVED_B = 11;
49 static const int RESERVED_C = 12;
50 static const int RESERVED_D = 13;
51 static const int RESERVED_E = 14;
52 static const int RESERVED_F = 15;
53 }
54
55 /**
56 * The web socket protocol transformer handles the protocol byte stream
57 * which is supplied through the [:handleData:]. As the protocol is processed,
58 * it'll output frame data as either a List<int> or String.
59 *
60 * Important infomation about usage: Be sure you use cancelOnError, so the
61 * socket will be closed when the processer encounter an error. Not using it
62 * will lead to undefined behaviour.
63 */
64 // TODO(ajohnsen): make this transformer reusable?
65 class _WebSocketProtocolTransformer implements StreamTransformer, EventSink {
66 static const int START = 0;
67 static const int LEN_FIRST = 1;
68 static const int LEN_REST = 2;
69 static const int MASK = 3;
70 static const int PAYLOAD = 4;
71 static const int CLOSED = 5;
72 static const int FAILURE = 6;
73
74 int _state = START;
75 bool _fin = false;
76 int _opcode = -1;
77 int _len = -1;
78 bool _masked = false;
79 int _remainingLenBytes = -1;
80 int _remainingMaskingKeyBytes = 4;
81 int _remainingPayloadBytes = -1;
82 int _unmaskingIndex = 0;
83 int _currentMessageType = _WebSocketMessageType.NONE;
84 int closeCode = WebSocketStatus.NO_STATUS_RECEIVED;
85 String closeReason = "";
86
87 EventSink _eventSink;
88
89 final bool _serverSide;
90 final List _maskingBytes = new List(4);
91 final BytesBuilder _payload = new BytesBuilder(copy: false);
92
93 _WebSocketProtocolTransformer([this._serverSide = false]);
94
95 Stream bind(Stream stream) {
96 return new Stream.eventTransformed(
97 stream,
98 (EventSink eventSink) {
99 if (_eventSink != null) {
100 throw new StateError("WebSocket transformer already used.");
101 }
102 _eventSink = eventSink;
103 return this;
104 });
105 }
106
107 void addError(Object error, [StackTrace stackTrace]) =>
108 _eventSink.addError(error, stackTrace);
109
110 void close() => _eventSink.close();
111
112 /**
113 * Process data received from the underlying communication channel.
114 */
115 void add(Uint8List buffer) {
116 int count = buffer.length;
117 int index = 0;
118 int lastIndex = count;
119 if (_state == CLOSED) {
120 throw new CompatibleWebSocketException("Data on closed connection");
121 }
122 if (_state == FAILURE) {
123 throw new CompatibleWebSocketException("Data on failed connection");
124 }
125 while ((index < lastIndex) && _state != CLOSED && _state != FAILURE) {
126 int byte = buffer[index];
127 if (_state <= LEN_REST) {
128 if (_state == START) {
129 _fin = (byte & 0x80) != 0;
130 if ((byte & 0x70) != 0) {
131 // The RSV1, RSV2 bits RSV3 must be all zero.
132 throw new CompatibleWebSocketException("Protocol error");
133 }
134 _opcode = (byte & 0xF);
135 if (_opcode <= _WebSocketOpcode.BINARY) {
136 if (_opcode == _WebSocketOpcode.CONTINUATION) {
137 if (_currentMessageType == _WebSocketMessageType.NONE) {
138 throw new CompatibleWebSocketException("Protocol error");
139 }
140 } else {
141 assert(_opcode == _WebSocketOpcode.TEXT ||
142 _opcode == _WebSocketOpcode.BINARY);
143 if (_currentMessageType != _WebSocketMessageType.NONE) {
144 throw new CompatibleWebSocketException("Protocol error");
145 }
146 _currentMessageType = _opcode;
147 }
148 } else if (_opcode >= _WebSocketOpcode.CLOSE &&
149 _opcode <= _WebSocketOpcode.PONG) {
150 // Control frames cannot be fragmented.
151 if (!_fin) throw new CompatibleWebSocketException("Protocol error");
152 } else {
153 throw new CompatibleWebSocketException("Protocol error");
154 }
155 _state = LEN_FIRST;
156 } else if (_state == LEN_FIRST) {
157 _masked = (byte & 0x80) != 0;
158 _len = byte & 0x7F;
159 if (_isControlFrame() && _len > 125) {
160 throw new CompatibleWebSocketException("Protocol error");
161 }
162 if (_len == 126) {
163 _len = 0;
164 _remainingLenBytes = 2;
165 _state = LEN_REST;
166 } else if (_len == 127) {
167 _len = 0;
168 _remainingLenBytes = 8;
169 _state = LEN_REST;
170 } else {
171 assert(_len < 126);
172 _lengthDone();
173 }
174 } else {
175 assert(_state == LEN_REST);
176 _len = _len << 8 | byte;
177 _remainingLenBytes--;
178 if (_remainingLenBytes == 0) {
179 _lengthDone();
180 }
181 }
182 } else {
183 if (_state == MASK) {
184 _maskingBytes[4 - _remainingMaskingKeyBytes--] = byte;
185 if (_remainingMaskingKeyBytes == 0) {
186 _maskDone();
187 }
188 } else {
189 assert(_state == PAYLOAD);
190 // The payload is not handled one byte at a time but in blocks.
191 int payloadLength = min(lastIndex - index, _remainingPayloadBytes);
192 _remainingPayloadBytes -= payloadLength;
193 // Unmask payload if masked.
194 if (_masked) {
195 _unmask(index, payloadLength, buffer);
196 }
197 // Control frame and data frame share _payloads.
198 _payload.add(
199 new Uint8List.view(buffer.buffer, index, payloadLength));
200 index += payloadLength;
201 if (_isControlFrame()) {
202 if (_remainingPayloadBytes == 0) _controlFrameEnd();
203 } else {
204 if (_currentMessageType != _WebSocketMessageType.TEXT &&
205 _currentMessageType != _WebSocketMessageType.BINARY) {
206 throw new CompatibleWebSocketException("Protocol error");
207 }
208 if (_remainingPayloadBytes == 0) _messageFrameEnd();
209 }
210
211 // Hack - as we always do index++ below.
212 index--;
213 }
214 }
215
216 // Move to the next byte.
217 index++;
218 }
219 }
220
221 void _unmask(int index, int length, Uint8List buffer) {
222 const int BLOCK_SIZE = 16;
223 // Skip Int32x4-version if message is small.
224 if (length >= BLOCK_SIZE) {
225 // Start by aligning to 16 bytes.
226 final int startOffset = BLOCK_SIZE - (index & 15);
227 final int end = index + startOffset;
228 for (int i = index; i < end; i++) {
229 buffer[i] ^= _maskingBytes[_unmaskingIndex++ & 3];
230 }
231 index += startOffset;
232 length -= startOffset;
233 final int blockCount = length ~/ BLOCK_SIZE;
234 if (blockCount > 0) {
235 // Create mask block.
236 int mask = 0;
237 for (int i = 3; i >= 0; i--) {
238 mask = (mask << 8) | _maskingBytes[(_unmaskingIndex + i) & 3];
239 }
240 Int32x4 blockMask = new Int32x4(mask, mask, mask, mask);
241 Int32x4List blockBuffer = new Int32x4List.view(
242 buffer.buffer, index, blockCount);
243 for (int i = 0; i < blockBuffer.length; i++) {
244 blockBuffer[i] ^= blockMask;
245 }
246 final int bytes = blockCount * BLOCK_SIZE;
247 index += bytes;
248 length -= bytes;
249 }
250 }
251 // Handle end.
252 final int end = index + length;
253 for (int i = index; i < end; i++) {
254 buffer[i] ^= _maskingBytes[_unmaskingIndex++ & 3];
255 }
256 }
257
258 void _lengthDone() {
259 if (_masked) {
260 if (!_serverSide) {
261 throw new CompatibleWebSocketException(
262 "Received masked frame from server");
263 }
264 _state = MASK;
265 } else {
266 if (_serverSide) {
267 throw new CompatibleWebSocketException(
268 "Received unmasked frame from client");
269 }
270 _remainingPayloadBytes = _len;
271 _startPayload();
272 }
273 }
274
275 void _maskDone() {
276 _remainingPayloadBytes = _len;
277 _startPayload();
278 }
279
280 void _startPayload() {
281 // If there is no actual payload perform perform callbacks without
282 // going through the PAYLOAD state.
283 if (_remainingPayloadBytes == 0) {
284 if (_isControlFrame()) {
285 switch (_opcode) {
286 case _WebSocketOpcode.CLOSE:
287 _state = CLOSED;
288 _eventSink.close();
289 break;
290 case _WebSocketOpcode.PING:
291 _eventSink.add(new _WebSocketPing());
292 break;
293 case _WebSocketOpcode.PONG:
294 _eventSink.add(new _WebSocketPong());
295 break;
296 }
297 _prepareForNextFrame();
298 } else {
299 _messageFrameEnd();
300 }
301 } else {
302 _state = PAYLOAD;
303 }
304 }
305
306 void _messageFrameEnd() {
307 if (_fin) {
308 switch (_currentMessageType) {
309 case _WebSocketMessageType.TEXT:
310 _eventSink.add(UTF8.decode(_payload.takeBytes()));
311 break;
312 case _WebSocketMessageType.BINARY:
313 _eventSink.add(_payload.takeBytes());
314 break;
315 }
316 _currentMessageType = _WebSocketMessageType.NONE;
317 }
318 _prepareForNextFrame();
319 }
320
321 void _controlFrameEnd() {
322 switch (_opcode) {
323 case _WebSocketOpcode.CLOSE:
324 closeCode = WebSocketStatus.NO_STATUS_RECEIVED;
325 var payload = _payload.takeBytes();
326 if (payload.length > 0) {
327 if (payload.length == 1) {
328 throw new CompatibleWebSocketException("Protocol error");
329 }
330 closeCode = payload[0] << 8 | payload[1];
331 if (closeCode == WebSocketStatus.NO_STATUS_RECEIVED) {
332 throw new CompatibleWebSocketException("Protocol error");
333 }
334 if (payload.length > 2) {
335 closeReason = UTF8.decode(payload.sublist(2));
336 }
337 }
338 _state = CLOSED;
339 _eventSink.close();
340 break;
341
342 case _WebSocketOpcode.PING:
343 _eventSink.add(new _WebSocketPing(_payload.takeBytes()));
344 break;
345
346 case _WebSocketOpcode.PONG:
347 _eventSink.add(new _WebSocketPong(_payload.takeBytes()));
348 break;
349 }
350 _prepareForNextFrame();
351 }
352
353 bool _isControlFrame() {
354 return _opcode == _WebSocketOpcode.CLOSE ||
355 _opcode == _WebSocketOpcode.PING ||
356 _opcode == _WebSocketOpcode.PONG;
357 }
358
359 void _prepareForNextFrame() {
360 if (_state != CLOSED && _state != FAILURE) _state = START;
361 _fin = false;
362 _opcode = -1;
363 _len = -1;
364 _remainingLenBytes = -1;
365 _remainingMaskingKeyBytes = 4;
366 _remainingPayloadBytes = -1;
367 _unmaskingIndex = 0;
368 }
369 }
370
371
372 class _WebSocketPing {
373 final List<int> payload;
374 _WebSocketPing([this.payload = null]);
375 }
376
377
378 class _WebSocketPong {
379 final List<int> payload;
380 _WebSocketPong([this.payload = null]);
381 }
382
383 // TODO(ajohnsen): Make this transformer reusable.
384 class _WebSocketOutgoingTransformer implements StreamTransformer, EventSink {
385 final WebSocketImpl webSocket;
386 EventSink _eventSink;
387
388 _WebSocketOutgoingTransformer(this.webSocket);
389
390 Stream bind(Stream stream) {
391 return new Stream.eventTransformed(
392 stream,
393 (EventSink eventSink) {
394 if (_eventSink != null) {
395 throw new StateError("WebSocket transformer already used");
396 }
397 _eventSink = eventSink;
398 return this;
399 });
400 }
401
402 void add(message) {
403 if (message is _WebSocketPong) {
404 addFrame(_WebSocketOpcode.PONG, message.payload);
405 return;
406 }
407 if (message is _WebSocketPing) {
408 addFrame(_WebSocketOpcode.PING, message.payload);
409 return;
410 }
411 List<int> data;
412 int opcode;
413 if (message != null) {
414 if (message is String) {
415 opcode = _WebSocketOpcode.TEXT;
416 data = UTF8.encode(message);
417 } else {
418 if (message is !List<int>) {
419 throw new ArgumentError(message);
420 }
421 opcode = _WebSocketOpcode.BINARY;
422 data = message;
423 }
424 } else {
425 opcode = _WebSocketOpcode.TEXT;
426 }
427 addFrame(opcode, data);
428 }
429
430 void addError(Object error, [StackTrace stackTrace]) =>
431 _eventSink.addError(error, stackTrace);
432
433 void close() {
434 int code = webSocket._outCloseCode;
435 String reason = webSocket._outCloseReason;
436 List<int> data;
437 if (code != null) {
438 data = new List<int>();
439 data.add((code >> 8) & 0xFF);
440 data.add(code & 0xFF);
441 if (reason != null) {
442 data.addAll(UTF8.encode(reason));
443 }
444 }
445 addFrame(_WebSocketOpcode.CLOSE, data);
446 _eventSink.close();
447 }
448
449 void addFrame(int opcode, List<int> data) =>
450 createFrame(opcode, data, webSocket._serverSide).forEach(_eventSink.add);
451
452 static Iterable createFrame(int opcode, List<int> data, bool serverSide) {
453 bool mask = !serverSide; // Masking not implemented for server.
454 int dataLength = data == null ? 0 : data.length;
455 // Determine the header size.
456 int headerSize = (mask) ? 6 : 2;
457 if (dataLength > 65535) {
458 headerSize += 8;
459 } else if (dataLength > 125) {
460 headerSize += 2;
461 }
462 Uint8List header = new Uint8List(headerSize);
463 int index = 0;
464 // Set FIN and opcode.
465 header[index++] = 0x80 | opcode;
466 // Determine size and position of length field.
467 int lengthBytes = 1;
468 if (dataLength > 65535) {
469 header[index++] = 127;
470 lengthBytes = 8;
471 } else if (dataLength > 125) {
472 header[index++] = 126;
473 lengthBytes = 2;
474 }
475 // Write the length in network byte order into the header.
476 for (int i = 0; i < lengthBytes; i++) {
477 header[index++] = dataLength >> (((lengthBytes - 1) - i) * 8) & 0xFF;
478 }
479 if (mask) {
480 header[1] |= 1 << 7;
481 var maskBytes = [_random.nextInt(256), _random.nextInt(256),
482 _random.nextInt(256), _random.nextInt(256)];
483 header.setRange(index, index + 4, maskBytes);
484 index += 4;
485 if (data != null) {
486 Uint8List list;
487 // If this is a text message just do the masking inside the
488 // encoded data.
489 if (opcode == _WebSocketOpcode.TEXT && data is Uint8List) {
490 list = data;
491 } else {
492 if (data is Uint8List) {
493 list = new Uint8List.fromList(data);
494 } else {
495 list = new Uint8List(data.length);
496 for (int i = 0; i < data.length; i++) {
497 if (data[i] < 0 || 255 < data[i]) {
498 throw new ArgumentError(
499 "List element is not a byte value "
500 "(value ${data[i]} at index $i)");
501 }
502 list[i] = data[i];
503 }
504 }
505 }
506 const int BLOCK_SIZE = 16;
507 int blockCount = list.length ~/ BLOCK_SIZE;
508 if (blockCount > 0) {
509 // Create mask block.
510 int mask = 0;
511 for (int i = 3; i >= 0; i--) {
512 mask = (mask << 8) | maskBytes[i];
513 }
514 Int32x4 blockMask = new Int32x4(mask, mask, mask, mask);
515 Int32x4List blockBuffer = new Int32x4List.view(
516 list.buffer, 0, blockCount);
517 for (int i = 0; i < blockBuffer.length; i++) {
518 blockBuffer[i] ^= blockMask;
519 }
520 }
521 // Handle end.
522 for (int i = blockCount * BLOCK_SIZE; i < list.length; i++) {
523 list[i] ^= maskBytes[i & 3];
524 }
525 data = list;
526 }
527 }
528 assert(index == headerSize);
529 if (data == null) {
530 return [header];
531 } else {
532 return [header, data];
533 }
534 }
535 }
536
537
538 class _WebSocketConsumer implements StreamConsumer {
539 final WebSocketImpl webSocket;
540 final StreamSink<List<int>> sink;
541 StreamController _controller;
542 StreamSubscription _subscription;
543 bool _issuedPause = false;
544 bool _closed = false;
545 Completer _closeCompleter = new Completer();
546 Completer _completer;
547
548 _WebSocketConsumer(this.webSocket, this.sink);
549
550 void _onListen() {
551 if (_subscription != null) {
552 _subscription.cancel();
553 }
554 }
555
556 void _onPause() {
557 if (_subscription != null) {
558 _subscription.pause();
559 } else {
560 _issuedPause = true;
561 }
562 }
563
564 void _onResume() {
565 if (_subscription != null) {
566 _subscription.resume();
567 } else {
568 _issuedPause = false;
569 }
570 }
571
572 void _cancel() {
573 if (_subscription != null) {
574 var subscription = _subscription;
575 _subscription = null;
576 subscription.cancel();
577 }
578 }
579
580 _ensureController() {
581 if (_controller != null) return;
582 _controller = new StreamController(sync: true,
583 onPause: _onPause,
584 onResume: _onResume,
585 onCancel: _onListen);
586 var stream = _controller.stream.transform(
587 new _WebSocketOutgoingTransformer(webSocket));
588 sink.addStream(stream)
589 .then((_) {
590 _done();
591 _closeCompleter.complete(webSocket);
592 }, onError: (error, StackTrace stackTrace) {
593 _closed = true;
594 _cancel();
595 if (error is ArgumentError) {
596 if (!_done(error, stackTrace)) {
597 _closeCompleter.completeError(error, stackTrace);
598 }
599 } else {
600 _done();
601 _closeCompleter.complete(webSocket);
602 }
603 });
604 }
605
606 bool _done([error, StackTrace stackTrace]) {
607 if (_completer == null) return false;
608 if (error != null) {
609 _completer.completeError(error, stackTrace);
610 } else {
611 _completer.complete(webSocket);
612 }
613 _completer = null;
614 return true;
615 }
616
617 Future addStream(var stream) {
618 if (_closed) {
619 stream.listen(null).cancel();
620 return new Future.value(webSocket);
621 }
622 _ensureController();
623 _completer = new Completer();
624 _subscription = stream.listen(
625 (data) {
626 _controller.add(data);
627 },
628 onDone: _done,
629 onError: _done,
630 cancelOnError: true);
631 if (_issuedPause) {
632 _subscription.pause();
633 _issuedPause = false;
634 }
635 return _completer.future;
636 }
637
638 Future close() {
639 _ensureController();
640 Future closeSocket() {
641 return sink.close().catchError((_) {}).then((_) => webSocket);
642 }
643 _controller.close();
644 return _closeCompleter.future.then((_) => closeSocket());
645 }
646
647 void add(data) {
648 if (_closed) return;
649 _ensureController();
650 _controller.add(data);
651 }
652
653 void closeSocket() {
654 _closed = true;
655 _cancel();
656 close();
657 }
658 }
659
660
661 class WebSocketImpl extends Stream with _ServiceObject
662 implements CompatibleWebSocket {
663 // Use default Map so we keep order.
664 static Map<int, WebSocketImpl> _webSockets = new Map<int, WebSocketImpl>();
665
666 final String protocol;
667
668 StreamController _controller;
669 StreamSubscription _subscription;
670 StreamSink _sink;
671
672 final bool _serverSide;
673 int _readyState = WebSocket.CONNECTING;
674 bool _writeClosed = false;
675 int _closeCode;
676 String _closeReason;
677 Duration _pingInterval;
678 Timer _pingTimer;
679 _WebSocketConsumer _consumer;
680
681 int _outCloseCode;
682 String _outCloseReason;
683 Timer _closeTimer;
684
685 WebSocketImpl.fromSocket(Stream<List<int>> stream,
686 StreamSink<List<int>> sink, this.protocol, [this._serverSide = false]) {
687 _consumer = new _WebSocketConsumer(this, sink);
688 _sink = new StreamSinkImpl(_consumer);
689 _readyState = WebSocket.OPEN;
690
691 var transformer = new _WebSocketProtocolTransformer(_serverSide);
692 _subscription = stream.transform(transformer).listen(
693 (data) {
694 if (data is _WebSocketPing) {
695 if (!_writeClosed) _consumer.add(new _WebSocketPong(data.payload));
696 } else if (data is _WebSocketPong) {
697 // Simply set pingInterval, as it'll cancel any timers.
698 pingInterval = _pingInterval;
699 } else {
700 _controller.add(data);
701 }
702 },
703 onError: (error) {
704 if (_closeTimer != null) _closeTimer.cancel();
705 if (error is FormatException) {
706 _close(WebSocketStatus.INVALID_FRAME_PAYLOAD_DATA);
707 } else {
708 _close(WebSocketStatus.PROTOCOL_ERROR);
709 }
710 // An error happened, set the close code set above.
711 _closeCode = _outCloseCode;
712 _closeReason = _outCloseReason;
713 _controller.close();
714 },
715 onDone: () {
716 if (_closeTimer != null) _closeTimer.cancel();
717 if (_readyState == WebSocket.OPEN) {
718 _readyState = WebSocket.CLOSING;
719 if (!_isReservedStatusCode(transformer.closeCode)) {
720 _close(transformer.closeCode);
721 } else {
722 _close();
723 }
724 _readyState = WebSocket.CLOSED;
725 }
726 // Protocol close, use close code from transformer.
727 _closeCode = transformer.closeCode;
728 _closeReason = transformer.closeReason;
729 _controller.close();
730 },
731 cancelOnError: true);
732 _subscription.pause();
733 _controller = new StreamController(sync: true,
734 onListen: () => _subscription.resume(),
735 onCancel: () {
736 _subscription.cancel();
737 _subscription = null;
738 },
739 onPause: _subscription.pause,
740 onResume: _subscription.resume);
741
742 _webSockets[_serviceId] = this;
743 }
744
745 StreamSubscription listen(void onData(message),
746 {Function onError,
747 void onDone(),
748 bool cancelOnError}) {
749 return _controller.stream.listen(onData,
750 onError: onError,
751 onDone: onDone,
752 cancelOnError: cancelOnError);
753 }
754
755 Duration get pingInterval => _pingInterval;
756
757 void set pingInterval(Duration interval) {
758 if (_writeClosed) return;
759 if (_pingTimer != null) _pingTimer.cancel();
760 _pingInterval = interval;
761
762 if (_pingInterval == null) return;
763
764 _pingTimer = new Timer(_pingInterval, () {
765 if (_writeClosed) return;
766 _consumer.add(new _WebSocketPing());
767 _pingTimer = new Timer(_pingInterval, () {
768 // No pong received.
769 _close(WebSocketStatus.GOING_AWAY);
770 });
771 });
772 }
773
774 int get readyState => _readyState;
775
776 String get extensions => null;
777 int get closeCode => _closeCode;
778 String get closeReason => _closeReason;
779
780 void add(data) => _sink.add(data);
781 void addError(error, [StackTrace stackTrace]) =>
782 _sink.addError(error, stackTrace);
783 Future addStream(Stream stream) => _sink.addStream(stream);
784 Future get done => _sink.done;
785
786 Future close([int code, String reason]) {
787 if (_isReservedStatusCode(code)) {
788 throw new CompatibleWebSocketException("Reserved status code $code");
789 }
790 if (_outCloseCode == null) {
791 _outCloseCode = code;
792 _outCloseReason = reason;
793 }
794 if (!_controller.isClosed) {
795 // If a close has not yet been received from the other end then
796 // 1) make sure to listen on the stream so the close frame will be
797 // processed if received.
798 // 2) set a timer terminate the connection if a close frame is
799 // not received.
800 if (!_controller.hasListener && _subscription != null) {
801 _controller.stream.drain().catchError((_) => {});
802 }
803 if (_closeTimer == null) {
804 // When closing the web-socket, we no longer accept data.
805 _closeTimer = new Timer(const Duration(seconds: 5), () {
806 // Reuse code and reason from the local close.
807 _closeCode = _outCloseCode;
808 _closeReason = _outCloseReason;
809 if (_subscription != null) _subscription.cancel();
810 _controller.close();
811 _webSockets.remove(_serviceId);
812 });
813 }
814 }
815 return _sink.close();
816 }
817
818 void _close([int code, String reason]) {
819 if (_writeClosed) return;
820 if (_outCloseCode == null) {
821 _outCloseCode = code;
822 _outCloseReason = reason;
823 }
824 _writeClosed = true;
825 _consumer.closeSocket();
826 _webSockets.remove(_serviceId);
827 }
828
829 // The _toJSON, _serviceTypePath, and _serviceTypeName methods
830 // have been deleted for http_parser. The methods were unused in WebSocket
831 // code and produced warnings.
832
833 static bool _isReservedStatusCode(int code) {
834 return code != null &&
835 (code < WebSocketStatus.NORMAL_CLOSURE ||
836 code == WebSocketStatus.RESERVED_1004 ||
837 code == WebSocketStatus.NO_STATUS_RECEIVED ||
838 code == WebSocketStatus.ABNORMAL_CLOSURE ||
839 (code > WebSocketStatus.INTERNAL_SERVER_ERROR &&
840 code < WebSocketStatus.RESERVED_1015) ||
841 (code >= WebSocketStatus.RESERVED_1015 &&
842 code < 3000));
843 }
844 }
845
846 // The following code is from sdk/lib/io/service_object.dart.
847
848 int _nextServiceId = 1;
849
850 // TODO(ajohnsen): Use other way of getting a uniq id.
851 abstract class _ServiceObject {
852 int __serviceId = 0;
853 int get _serviceId {
854 if (__serviceId == 0) __serviceId = _nextServiceId++;
855 return __serviceId;
856 }
857
858 // The _toJSON, _servicePath, _serviceTypePath, _serviceTypeName, and
859 // _serviceType methods have been deleted for http_parser. The methods were
860 // unused in WebSocket code and produced warnings.
861 }
OLDNEW
« no previous file with comments | « lib/src/copy/web_socket.dart ('k') | pubspec.yaml » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698