OLD | NEW |
| (Empty) |
1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file | |
2 // for details. All rights reserved. Use of this source code is governed by a | |
3 // BSD-style license that can be found in the LICENSE file. | |
4 | |
5 // The following code is copied from sdk/lib/io/websocket_impl.dart. The | |
6 // "dart:io" implementation isn't used directly to support non-"dart:io" | |
7 // applications. | |
8 // | |
9 // Because it's copied directly, only modifications necessary to support the | |
10 // desired public API and to remove "dart:io" dependencies have been made. | |
11 // | |
12 // This is up-to-date as of sdk revision | |
13 // 86227840d75d974feb238f8b3c59c038b99c05cf. | |
14 import 'dart:async'; | |
15 import 'dart:convert'; | |
16 import 'dart:math'; | |
17 import 'dart:typed_data'; | |
18 | |
19 import '../web_socket.dart'; | |
20 import 'bytes_builder.dart'; | |
21 import 'io_sink.dart'; | |
22 import 'web_socket.dart'; | |
23 | |
24 const String webSocketGUID = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"; | |
25 | |
26 final _random = new Random(); | |
27 | |
28 // Matches _WebSocketOpcode. | |
29 class _WebSocketMessageType { | |
30 static const int NONE = 0; | |
31 static const int TEXT = 1; | |
32 static const int BINARY = 2; | |
33 } | |
34 | |
35 | |
36 class _WebSocketOpcode { | |
37 static const int CONTINUATION = 0; | |
38 static const int TEXT = 1; | |
39 static const int BINARY = 2; | |
40 static const int RESERVED_3 = 3; | |
41 static const int RESERVED_4 = 4; | |
42 static const int RESERVED_5 = 5; | |
43 static const int RESERVED_6 = 6; | |
44 static const int RESERVED_7 = 7; | |
45 static const int CLOSE = 8; | |
46 static const int PING = 9; | |
47 static const int PONG = 10; | |
48 static const int RESERVED_B = 11; | |
49 static const int RESERVED_C = 12; | |
50 static const int RESERVED_D = 13; | |
51 static const int RESERVED_E = 14; | |
52 static const int RESERVED_F = 15; | |
53 } | |
54 | |
55 /** | |
56 * The web socket protocol transformer handles the protocol byte stream | |
57 * which is supplied through the [:handleData:]. As the protocol is processed, | |
58 * it'll output frame data as either a List<int> or String. | |
59 * | |
60 * Important infomation about usage: Be sure you use cancelOnError, so the | |
61 * socket will be closed when the processer encounter an error. Not using it | |
62 * will lead to undefined behaviour. | |
63 */ | |
64 // TODO(ajohnsen): make this transformer reusable? | |
65 class _WebSocketProtocolTransformer implements StreamTransformer, EventSink { | |
66 static const int START = 0; | |
67 static const int LEN_FIRST = 1; | |
68 static const int LEN_REST = 2; | |
69 static const int MASK = 3; | |
70 static const int PAYLOAD = 4; | |
71 static const int CLOSED = 5; | |
72 static const int FAILURE = 6; | |
73 | |
74 int _state = START; | |
75 bool _fin = false; | |
76 int _opcode = -1; | |
77 int _len = -1; | |
78 bool _masked = false; | |
79 int _remainingLenBytes = -1; | |
80 int _remainingMaskingKeyBytes = 4; | |
81 int _remainingPayloadBytes = -1; | |
82 int _unmaskingIndex = 0; | |
83 int _currentMessageType = _WebSocketMessageType.NONE; | |
84 int closeCode = WebSocketStatus.NO_STATUS_RECEIVED; | |
85 String closeReason = ""; | |
86 | |
87 EventSink _eventSink; | |
88 | |
89 final bool _serverSide; | |
90 final List _maskingBytes = new List(4); | |
91 final BytesBuilder _payload = new BytesBuilder(copy: false); | |
92 | |
93 _WebSocketProtocolTransformer([this._serverSide = false]); | |
94 | |
95 Stream bind(Stream stream) { | |
96 return new Stream.eventTransformed( | |
97 stream, | |
98 (EventSink eventSink) { | |
99 if (_eventSink != null) { | |
100 throw new StateError("WebSocket transformer already used."); | |
101 } | |
102 _eventSink = eventSink; | |
103 return this; | |
104 }); | |
105 } | |
106 | |
107 void addError(Object error, [StackTrace stackTrace]) => | |
108 _eventSink.addError(error, stackTrace); | |
109 | |
110 void close() => _eventSink.close(); | |
111 | |
112 /** | |
113 * Process data received from the underlying communication channel. | |
114 */ | |
115 void add(Uint8List buffer) { | |
116 int count = buffer.length; | |
117 int index = 0; | |
118 int lastIndex = count; | |
119 if (_state == CLOSED) { | |
120 throw new CompatibleWebSocketException("Data on closed connection"); | |
121 } | |
122 if (_state == FAILURE) { | |
123 throw new CompatibleWebSocketException("Data on failed connection"); | |
124 } | |
125 while ((index < lastIndex) && _state != CLOSED && _state != FAILURE) { | |
126 int byte = buffer[index]; | |
127 if (_state <= LEN_REST) { | |
128 if (_state == START) { | |
129 _fin = (byte & 0x80) != 0; | |
130 if ((byte & 0x70) != 0) { | |
131 // The RSV1, RSV2 bits RSV3 must be all zero. | |
132 throw new CompatibleWebSocketException("Protocol error"); | |
133 } | |
134 _opcode = (byte & 0xF); | |
135 if (_opcode <= _WebSocketOpcode.BINARY) { | |
136 if (_opcode == _WebSocketOpcode.CONTINUATION) { | |
137 if (_currentMessageType == _WebSocketMessageType.NONE) { | |
138 throw new CompatibleWebSocketException("Protocol error"); | |
139 } | |
140 } else { | |
141 assert(_opcode == _WebSocketOpcode.TEXT || | |
142 _opcode == _WebSocketOpcode.BINARY); | |
143 if (_currentMessageType != _WebSocketMessageType.NONE) { | |
144 throw new CompatibleWebSocketException("Protocol error"); | |
145 } | |
146 _currentMessageType = _opcode; | |
147 } | |
148 } else if (_opcode >= _WebSocketOpcode.CLOSE && | |
149 _opcode <= _WebSocketOpcode.PONG) { | |
150 // Control frames cannot be fragmented. | |
151 if (!_fin) throw new CompatibleWebSocketException("Protocol error"); | |
152 } else { | |
153 throw new CompatibleWebSocketException("Protocol error"); | |
154 } | |
155 _state = LEN_FIRST; | |
156 } else if (_state == LEN_FIRST) { | |
157 _masked = (byte & 0x80) != 0; | |
158 _len = byte & 0x7F; | |
159 if (_isControlFrame() && _len > 125) { | |
160 throw new CompatibleWebSocketException("Protocol error"); | |
161 } | |
162 if (_len == 126) { | |
163 _len = 0; | |
164 _remainingLenBytes = 2; | |
165 _state = LEN_REST; | |
166 } else if (_len == 127) { | |
167 _len = 0; | |
168 _remainingLenBytes = 8; | |
169 _state = LEN_REST; | |
170 } else { | |
171 assert(_len < 126); | |
172 _lengthDone(); | |
173 } | |
174 } else { | |
175 assert(_state == LEN_REST); | |
176 _len = _len << 8 | byte; | |
177 _remainingLenBytes--; | |
178 if (_remainingLenBytes == 0) { | |
179 _lengthDone(); | |
180 } | |
181 } | |
182 } else { | |
183 if (_state == MASK) { | |
184 _maskingBytes[4 - _remainingMaskingKeyBytes--] = byte; | |
185 if (_remainingMaskingKeyBytes == 0) { | |
186 _maskDone(); | |
187 } | |
188 } else { | |
189 assert(_state == PAYLOAD); | |
190 // The payload is not handled one byte at a time but in blocks. | |
191 int payloadLength = min(lastIndex - index, _remainingPayloadBytes); | |
192 _remainingPayloadBytes -= payloadLength; | |
193 // Unmask payload if masked. | |
194 if (_masked) { | |
195 _unmask(index, payloadLength, buffer); | |
196 } | |
197 // Control frame and data frame share _payloads. | |
198 _payload.add( | |
199 new Uint8List.view(buffer.buffer, index, payloadLength)); | |
200 index += payloadLength; | |
201 if (_isControlFrame()) { | |
202 if (_remainingPayloadBytes == 0) _controlFrameEnd(); | |
203 } else { | |
204 if (_currentMessageType != _WebSocketMessageType.TEXT && | |
205 _currentMessageType != _WebSocketMessageType.BINARY) { | |
206 throw new CompatibleWebSocketException("Protocol error"); | |
207 } | |
208 if (_remainingPayloadBytes == 0) _messageFrameEnd(); | |
209 } | |
210 | |
211 // Hack - as we always do index++ below. | |
212 index--; | |
213 } | |
214 } | |
215 | |
216 // Move to the next byte. | |
217 index++; | |
218 } | |
219 } | |
220 | |
221 void _unmask(int index, int length, Uint8List buffer) { | |
222 const int BLOCK_SIZE = 16; | |
223 // Skip Int32x4-version if message is small. | |
224 if (length >= BLOCK_SIZE) { | |
225 // Start by aligning to 16 bytes. | |
226 final int startOffset = BLOCK_SIZE - (index & 15); | |
227 final int end = index + startOffset; | |
228 for (int i = index; i < end; i++) { | |
229 buffer[i] ^= _maskingBytes[_unmaskingIndex++ & 3]; | |
230 } | |
231 index += startOffset; | |
232 length -= startOffset; | |
233 final int blockCount = length ~/ BLOCK_SIZE; | |
234 if (blockCount > 0) { | |
235 // Create mask block. | |
236 int mask = 0; | |
237 for (int i = 3; i >= 0; i--) { | |
238 mask = (mask << 8) | _maskingBytes[(_unmaskingIndex + i) & 3]; | |
239 } | |
240 Int32x4 blockMask = new Int32x4(mask, mask, mask, mask); | |
241 Int32x4List blockBuffer = new Int32x4List.view( | |
242 buffer.buffer, index, blockCount); | |
243 for (int i = 0; i < blockBuffer.length; i++) { | |
244 blockBuffer[i] ^= blockMask; | |
245 } | |
246 final int bytes = blockCount * BLOCK_SIZE; | |
247 index += bytes; | |
248 length -= bytes; | |
249 } | |
250 } | |
251 // Handle end. | |
252 final int end = index + length; | |
253 for (int i = index; i < end; i++) { | |
254 buffer[i] ^= _maskingBytes[_unmaskingIndex++ & 3]; | |
255 } | |
256 } | |
257 | |
258 void _lengthDone() { | |
259 if (_masked) { | |
260 if (!_serverSide) { | |
261 throw new CompatibleWebSocketException( | |
262 "Received masked frame from server"); | |
263 } | |
264 _state = MASK; | |
265 } else { | |
266 if (_serverSide) { | |
267 throw new CompatibleWebSocketException( | |
268 "Received unmasked frame from client"); | |
269 } | |
270 _remainingPayloadBytes = _len; | |
271 _startPayload(); | |
272 } | |
273 } | |
274 | |
275 void _maskDone() { | |
276 _remainingPayloadBytes = _len; | |
277 _startPayload(); | |
278 } | |
279 | |
280 void _startPayload() { | |
281 // If there is no actual payload perform perform callbacks without | |
282 // going through the PAYLOAD state. | |
283 if (_remainingPayloadBytes == 0) { | |
284 if (_isControlFrame()) { | |
285 switch (_opcode) { | |
286 case _WebSocketOpcode.CLOSE: | |
287 _state = CLOSED; | |
288 _eventSink.close(); | |
289 break; | |
290 case _WebSocketOpcode.PING: | |
291 _eventSink.add(new _WebSocketPing()); | |
292 break; | |
293 case _WebSocketOpcode.PONG: | |
294 _eventSink.add(new _WebSocketPong()); | |
295 break; | |
296 } | |
297 _prepareForNextFrame(); | |
298 } else { | |
299 _messageFrameEnd(); | |
300 } | |
301 } else { | |
302 _state = PAYLOAD; | |
303 } | |
304 } | |
305 | |
306 void _messageFrameEnd() { | |
307 if (_fin) { | |
308 switch (_currentMessageType) { | |
309 case _WebSocketMessageType.TEXT: | |
310 _eventSink.add(UTF8.decode(_payload.takeBytes())); | |
311 break; | |
312 case _WebSocketMessageType.BINARY: | |
313 _eventSink.add(_payload.takeBytes()); | |
314 break; | |
315 } | |
316 _currentMessageType = _WebSocketMessageType.NONE; | |
317 } | |
318 _prepareForNextFrame(); | |
319 } | |
320 | |
321 void _controlFrameEnd() { | |
322 switch (_opcode) { | |
323 case _WebSocketOpcode.CLOSE: | |
324 closeCode = WebSocketStatus.NO_STATUS_RECEIVED; | |
325 var payload = _payload.takeBytes(); | |
326 if (payload.length > 0) { | |
327 if (payload.length == 1) { | |
328 throw new CompatibleWebSocketException("Protocol error"); | |
329 } | |
330 closeCode = payload[0] << 8 | payload[1]; | |
331 if (closeCode == WebSocketStatus.NO_STATUS_RECEIVED) { | |
332 throw new CompatibleWebSocketException("Protocol error"); | |
333 } | |
334 if (payload.length > 2) { | |
335 closeReason = UTF8.decode(payload.sublist(2)); | |
336 } | |
337 } | |
338 _state = CLOSED; | |
339 _eventSink.close(); | |
340 break; | |
341 | |
342 case _WebSocketOpcode.PING: | |
343 _eventSink.add(new _WebSocketPing(_payload.takeBytes())); | |
344 break; | |
345 | |
346 case _WebSocketOpcode.PONG: | |
347 _eventSink.add(new _WebSocketPong(_payload.takeBytes())); | |
348 break; | |
349 } | |
350 _prepareForNextFrame(); | |
351 } | |
352 | |
353 bool _isControlFrame() { | |
354 return _opcode == _WebSocketOpcode.CLOSE || | |
355 _opcode == _WebSocketOpcode.PING || | |
356 _opcode == _WebSocketOpcode.PONG; | |
357 } | |
358 | |
359 void _prepareForNextFrame() { | |
360 if (_state != CLOSED && _state != FAILURE) _state = START; | |
361 _fin = false; | |
362 _opcode = -1; | |
363 _len = -1; | |
364 _remainingLenBytes = -1; | |
365 _remainingMaskingKeyBytes = 4; | |
366 _remainingPayloadBytes = -1; | |
367 _unmaskingIndex = 0; | |
368 } | |
369 } | |
370 | |
371 | |
372 class _WebSocketPing { | |
373 final List<int> payload; | |
374 _WebSocketPing([this.payload = null]); | |
375 } | |
376 | |
377 | |
378 class _WebSocketPong { | |
379 final List<int> payload; | |
380 _WebSocketPong([this.payload = null]); | |
381 } | |
382 | |
383 // TODO(ajohnsen): Make this transformer reusable. | |
384 class _WebSocketOutgoingTransformer implements StreamTransformer, EventSink { | |
385 final WebSocketImpl webSocket; | |
386 EventSink _eventSink; | |
387 | |
388 _WebSocketOutgoingTransformer(this.webSocket); | |
389 | |
390 Stream bind(Stream stream) { | |
391 return new Stream.eventTransformed( | |
392 stream, | |
393 (EventSink eventSink) { | |
394 if (_eventSink != null) { | |
395 throw new StateError("WebSocket transformer already used"); | |
396 } | |
397 _eventSink = eventSink; | |
398 return this; | |
399 }); | |
400 } | |
401 | |
402 void add(message) { | |
403 if (message is _WebSocketPong) { | |
404 addFrame(_WebSocketOpcode.PONG, message.payload); | |
405 return; | |
406 } | |
407 if (message is _WebSocketPing) { | |
408 addFrame(_WebSocketOpcode.PING, message.payload); | |
409 return; | |
410 } | |
411 List<int> data; | |
412 int opcode; | |
413 if (message != null) { | |
414 if (message is String) { | |
415 opcode = _WebSocketOpcode.TEXT; | |
416 data = UTF8.encode(message); | |
417 } else { | |
418 if (message is !List<int>) { | |
419 throw new ArgumentError(message); | |
420 } | |
421 opcode = _WebSocketOpcode.BINARY; | |
422 data = message; | |
423 } | |
424 } else { | |
425 opcode = _WebSocketOpcode.TEXT; | |
426 } | |
427 addFrame(opcode, data); | |
428 } | |
429 | |
430 void addError(Object error, [StackTrace stackTrace]) => | |
431 _eventSink.addError(error, stackTrace); | |
432 | |
433 void close() { | |
434 int code = webSocket._outCloseCode; | |
435 String reason = webSocket._outCloseReason; | |
436 List<int> data; | |
437 if (code != null) { | |
438 data = new List<int>(); | |
439 data.add((code >> 8) & 0xFF); | |
440 data.add(code & 0xFF); | |
441 if (reason != null) { | |
442 data.addAll(UTF8.encode(reason)); | |
443 } | |
444 } | |
445 addFrame(_WebSocketOpcode.CLOSE, data); | |
446 _eventSink.close(); | |
447 } | |
448 | |
449 void addFrame(int opcode, List<int> data) => | |
450 createFrame(opcode, data, webSocket._serverSide).forEach(_eventSink.add); | |
451 | |
452 static Iterable createFrame(int opcode, List<int> data, bool serverSide) { | |
453 bool mask = !serverSide; // Masking not implemented for server. | |
454 int dataLength = data == null ? 0 : data.length; | |
455 // Determine the header size. | |
456 int headerSize = (mask) ? 6 : 2; | |
457 if (dataLength > 65535) { | |
458 headerSize += 8; | |
459 } else if (dataLength > 125) { | |
460 headerSize += 2; | |
461 } | |
462 Uint8List header = new Uint8List(headerSize); | |
463 int index = 0; | |
464 // Set FIN and opcode. | |
465 header[index++] = 0x80 | opcode; | |
466 // Determine size and position of length field. | |
467 int lengthBytes = 1; | |
468 if (dataLength > 65535) { | |
469 header[index++] = 127; | |
470 lengthBytes = 8; | |
471 } else if (dataLength > 125) { | |
472 header[index++] = 126; | |
473 lengthBytes = 2; | |
474 } | |
475 // Write the length in network byte order into the header. | |
476 for (int i = 0; i < lengthBytes; i++) { | |
477 header[index++] = dataLength >> (((lengthBytes - 1) - i) * 8) & 0xFF; | |
478 } | |
479 if (mask) { | |
480 header[1] |= 1 << 7; | |
481 var maskBytes = [_random.nextInt(256), _random.nextInt(256), | |
482 _random.nextInt(256), _random.nextInt(256)]; | |
483 header.setRange(index, index + 4, maskBytes); | |
484 index += 4; | |
485 if (data != null) { | |
486 Uint8List list; | |
487 // If this is a text message just do the masking inside the | |
488 // encoded data. | |
489 if (opcode == _WebSocketOpcode.TEXT && data is Uint8List) { | |
490 list = data; | |
491 } else { | |
492 if (data is Uint8List) { | |
493 list = new Uint8List.fromList(data); | |
494 } else { | |
495 list = new Uint8List(data.length); | |
496 for (int i = 0; i < data.length; i++) { | |
497 if (data[i] < 0 || 255 < data[i]) { | |
498 throw new ArgumentError( | |
499 "List element is not a byte value " | |
500 "(value ${data[i]} at index $i)"); | |
501 } | |
502 list[i] = data[i]; | |
503 } | |
504 } | |
505 } | |
506 const int BLOCK_SIZE = 16; | |
507 int blockCount = list.length ~/ BLOCK_SIZE; | |
508 if (blockCount > 0) { | |
509 // Create mask block. | |
510 int mask = 0; | |
511 for (int i = 3; i >= 0; i--) { | |
512 mask = (mask << 8) | maskBytes[i]; | |
513 } | |
514 Int32x4 blockMask = new Int32x4(mask, mask, mask, mask); | |
515 Int32x4List blockBuffer = new Int32x4List.view( | |
516 list.buffer, 0, blockCount); | |
517 for (int i = 0; i < blockBuffer.length; i++) { | |
518 blockBuffer[i] ^= blockMask; | |
519 } | |
520 } | |
521 // Handle end. | |
522 for (int i = blockCount * BLOCK_SIZE; i < list.length; i++) { | |
523 list[i] ^= maskBytes[i & 3]; | |
524 } | |
525 data = list; | |
526 } | |
527 } | |
528 assert(index == headerSize); | |
529 if (data == null) { | |
530 return [header]; | |
531 } else { | |
532 return [header, data]; | |
533 } | |
534 } | |
535 } | |
536 | |
537 | |
538 class _WebSocketConsumer implements StreamConsumer { | |
539 final WebSocketImpl webSocket; | |
540 final StreamSink<List<int>> sink; | |
541 StreamController _controller; | |
542 StreamSubscription _subscription; | |
543 bool _issuedPause = false; | |
544 bool _closed = false; | |
545 Completer _closeCompleter = new Completer(); | |
546 Completer _completer; | |
547 | |
548 _WebSocketConsumer(this.webSocket, this.sink); | |
549 | |
550 void _onListen() { | |
551 if (_subscription != null) { | |
552 _subscription.cancel(); | |
553 } | |
554 } | |
555 | |
556 void _onPause() { | |
557 if (_subscription != null) { | |
558 _subscription.pause(); | |
559 } else { | |
560 _issuedPause = true; | |
561 } | |
562 } | |
563 | |
564 void _onResume() { | |
565 if (_subscription != null) { | |
566 _subscription.resume(); | |
567 } else { | |
568 _issuedPause = false; | |
569 } | |
570 } | |
571 | |
572 void _cancel() { | |
573 if (_subscription != null) { | |
574 var subscription = _subscription; | |
575 _subscription = null; | |
576 subscription.cancel(); | |
577 } | |
578 } | |
579 | |
580 _ensureController() { | |
581 if (_controller != null) return; | |
582 _controller = new StreamController(sync: true, | |
583 onPause: _onPause, | |
584 onResume: _onResume, | |
585 onCancel: _onListen); | |
586 var stream = _controller.stream.transform( | |
587 new _WebSocketOutgoingTransformer(webSocket)); | |
588 sink.addStream(stream) | |
589 .then((_) { | |
590 _done(); | |
591 _closeCompleter.complete(webSocket); | |
592 }, onError: (error, StackTrace stackTrace) { | |
593 _closed = true; | |
594 _cancel(); | |
595 if (error is ArgumentError) { | |
596 if (!_done(error, stackTrace)) { | |
597 _closeCompleter.completeError(error, stackTrace); | |
598 } | |
599 } else { | |
600 _done(); | |
601 _closeCompleter.complete(webSocket); | |
602 } | |
603 }); | |
604 } | |
605 | |
606 bool _done([error, StackTrace stackTrace]) { | |
607 if (_completer == null) return false; | |
608 if (error != null) { | |
609 _completer.completeError(error, stackTrace); | |
610 } else { | |
611 _completer.complete(webSocket); | |
612 } | |
613 _completer = null; | |
614 return true; | |
615 } | |
616 | |
617 Future addStream(var stream) { | |
618 if (_closed) { | |
619 stream.listen(null).cancel(); | |
620 return new Future.value(webSocket); | |
621 } | |
622 _ensureController(); | |
623 _completer = new Completer(); | |
624 _subscription = stream.listen( | |
625 (data) { | |
626 _controller.add(data); | |
627 }, | |
628 onDone: _done, | |
629 onError: _done, | |
630 cancelOnError: true); | |
631 if (_issuedPause) { | |
632 _subscription.pause(); | |
633 _issuedPause = false; | |
634 } | |
635 return _completer.future; | |
636 } | |
637 | |
638 Future close() { | |
639 _ensureController(); | |
640 Future closeSocket() { | |
641 return sink.close().catchError((_) {}).then((_) => webSocket); | |
642 } | |
643 _controller.close(); | |
644 return _closeCompleter.future.then((_) => closeSocket()); | |
645 } | |
646 | |
647 void add(data) { | |
648 if (_closed) return; | |
649 _ensureController(); | |
650 _controller.add(data); | |
651 } | |
652 | |
653 void closeSocket() { | |
654 _closed = true; | |
655 _cancel(); | |
656 close(); | |
657 } | |
658 } | |
659 | |
660 | |
661 class WebSocketImpl extends Stream with _ServiceObject | |
662 implements CompatibleWebSocket { | |
663 // Use default Map so we keep order. | |
664 static Map<int, WebSocketImpl> _webSockets = new Map<int, WebSocketImpl>(); | |
665 | |
666 final String protocol; | |
667 | |
668 StreamController _controller; | |
669 StreamSubscription _subscription; | |
670 StreamSink _sink; | |
671 | |
672 final bool _serverSide; | |
673 int _readyState = WebSocket.CONNECTING; | |
674 bool _writeClosed = false; | |
675 int _closeCode; | |
676 String _closeReason; | |
677 Duration _pingInterval; | |
678 Timer _pingTimer; | |
679 _WebSocketConsumer _consumer; | |
680 | |
681 int _outCloseCode; | |
682 String _outCloseReason; | |
683 Timer _closeTimer; | |
684 | |
685 WebSocketImpl.fromSocket(Stream<List<int>> stream, | |
686 StreamSink<List<int>> sink, this.protocol, [this._serverSide = false]) { | |
687 _consumer = new _WebSocketConsumer(this, sink); | |
688 _sink = new StreamSinkImpl(_consumer); | |
689 _readyState = WebSocket.OPEN; | |
690 | |
691 var transformer = new _WebSocketProtocolTransformer(_serverSide); | |
692 _subscription = stream.transform(transformer).listen( | |
693 (data) { | |
694 if (data is _WebSocketPing) { | |
695 if (!_writeClosed) _consumer.add(new _WebSocketPong(data.payload)); | |
696 } else if (data is _WebSocketPong) { | |
697 // Simply set pingInterval, as it'll cancel any timers. | |
698 pingInterval = _pingInterval; | |
699 } else { | |
700 _controller.add(data); | |
701 } | |
702 }, | |
703 onError: (error) { | |
704 if (_closeTimer != null) _closeTimer.cancel(); | |
705 if (error is FormatException) { | |
706 _close(WebSocketStatus.INVALID_FRAME_PAYLOAD_DATA); | |
707 } else { | |
708 _close(WebSocketStatus.PROTOCOL_ERROR); | |
709 } | |
710 // An error happened, set the close code set above. | |
711 _closeCode = _outCloseCode; | |
712 _closeReason = _outCloseReason; | |
713 _controller.close(); | |
714 }, | |
715 onDone: () { | |
716 if (_closeTimer != null) _closeTimer.cancel(); | |
717 if (_readyState == WebSocket.OPEN) { | |
718 _readyState = WebSocket.CLOSING; | |
719 if (!_isReservedStatusCode(transformer.closeCode)) { | |
720 _close(transformer.closeCode); | |
721 } else { | |
722 _close(); | |
723 } | |
724 _readyState = WebSocket.CLOSED; | |
725 } | |
726 // Protocol close, use close code from transformer. | |
727 _closeCode = transformer.closeCode; | |
728 _closeReason = transformer.closeReason; | |
729 _controller.close(); | |
730 }, | |
731 cancelOnError: true); | |
732 _subscription.pause(); | |
733 _controller = new StreamController(sync: true, | |
734 onListen: () => _subscription.resume(), | |
735 onCancel: () { | |
736 _subscription.cancel(); | |
737 _subscription = null; | |
738 }, | |
739 onPause: _subscription.pause, | |
740 onResume: _subscription.resume); | |
741 | |
742 _webSockets[_serviceId] = this; | |
743 } | |
744 | |
745 StreamSubscription listen(void onData(message), | |
746 {Function onError, | |
747 void onDone(), | |
748 bool cancelOnError}) { | |
749 return _controller.stream.listen(onData, | |
750 onError: onError, | |
751 onDone: onDone, | |
752 cancelOnError: cancelOnError); | |
753 } | |
754 | |
755 Duration get pingInterval => _pingInterval; | |
756 | |
757 void set pingInterval(Duration interval) { | |
758 if (_writeClosed) return; | |
759 if (_pingTimer != null) _pingTimer.cancel(); | |
760 _pingInterval = interval; | |
761 | |
762 if (_pingInterval == null) return; | |
763 | |
764 _pingTimer = new Timer(_pingInterval, () { | |
765 if (_writeClosed) return; | |
766 _consumer.add(new _WebSocketPing()); | |
767 _pingTimer = new Timer(_pingInterval, () { | |
768 // No pong received. | |
769 _close(WebSocketStatus.GOING_AWAY); | |
770 }); | |
771 }); | |
772 } | |
773 | |
774 int get readyState => _readyState; | |
775 | |
776 String get extensions => null; | |
777 int get closeCode => _closeCode; | |
778 String get closeReason => _closeReason; | |
779 | |
780 void add(data) => _sink.add(data); | |
781 void addError(error, [StackTrace stackTrace]) => | |
782 _sink.addError(error, stackTrace); | |
783 Future addStream(Stream stream) => _sink.addStream(stream); | |
784 Future get done => _sink.done; | |
785 | |
786 Future close([int code, String reason]) { | |
787 if (_isReservedStatusCode(code)) { | |
788 throw new CompatibleWebSocketException("Reserved status code $code"); | |
789 } | |
790 if (_outCloseCode == null) { | |
791 _outCloseCode = code; | |
792 _outCloseReason = reason; | |
793 } | |
794 if (!_controller.isClosed) { | |
795 // If a close has not yet been received from the other end then | |
796 // 1) make sure to listen on the stream so the close frame will be | |
797 // processed if received. | |
798 // 2) set a timer terminate the connection if a close frame is | |
799 // not received. | |
800 if (!_controller.hasListener && _subscription != null) { | |
801 _controller.stream.drain().catchError((_) => {}); | |
802 } | |
803 if (_closeTimer == null) { | |
804 // When closing the web-socket, we no longer accept data. | |
805 _closeTimer = new Timer(const Duration(seconds: 5), () { | |
806 // Reuse code and reason from the local close. | |
807 _closeCode = _outCloseCode; | |
808 _closeReason = _outCloseReason; | |
809 if (_subscription != null) _subscription.cancel(); | |
810 _controller.close(); | |
811 _webSockets.remove(_serviceId); | |
812 }); | |
813 } | |
814 } | |
815 return _sink.close(); | |
816 } | |
817 | |
818 void _close([int code, String reason]) { | |
819 if (_writeClosed) return; | |
820 if (_outCloseCode == null) { | |
821 _outCloseCode = code; | |
822 _outCloseReason = reason; | |
823 } | |
824 _writeClosed = true; | |
825 _consumer.closeSocket(); | |
826 _webSockets.remove(_serviceId); | |
827 } | |
828 | |
829 // The _toJSON, _serviceTypePath, and _serviceTypeName methods | |
830 // have been deleted for http_parser. The methods were unused in WebSocket | |
831 // code and produced warnings. | |
832 | |
833 static bool _isReservedStatusCode(int code) { | |
834 return code != null && | |
835 (code < WebSocketStatus.NORMAL_CLOSURE || | |
836 code == WebSocketStatus.RESERVED_1004 || | |
837 code == WebSocketStatus.NO_STATUS_RECEIVED || | |
838 code == WebSocketStatus.ABNORMAL_CLOSURE || | |
839 (code > WebSocketStatus.INTERNAL_SERVER_ERROR && | |
840 code < WebSocketStatus.RESERVED_1015) || | |
841 (code >= WebSocketStatus.RESERVED_1015 && | |
842 code < 3000)); | |
843 } | |
844 } | |
845 | |
846 // The following code is from sdk/lib/io/service_object.dart. | |
847 | |
848 int _nextServiceId = 1; | |
849 | |
850 // TODO(ajohnsen): Use other way of getting a uniq id. | |
851 abstract class _ServiceObject { | |
852 int __serviceId = 0; | |
853 int get _serviceId { | |
854 if (__serviceId == 0) __serviceId = _nextServiceId++; | |
855 return __serviceId; | |
856 } | |
857 | |
858 // The _toJSON, _servicePath, _serviceTypePath, _serviceTypeName, and | |
859 // _serviceType methods have been deleted for http_parser. The methods were | |
860 // unused in WebSocket code and produced warnings. | |
861 } | |
OLD | NEW |