Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(525)

Side by Side Diff: pkg/dev_compiler/tool/input_sdk/lib/io/websocket_impl.dart

Issue 2698353003: unfork DDC's copy of most SDK libraries (Closed)
Patch Set: revert core_patch Created 3 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file.
4
5 part of dart.io;
6
7 const String _webSocketGUID = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
8 const String _clientNoContextTakeover = "client_no_context_takeover";
9 const String _serverNoContextTakeover = "server_no_context_takeover";
10 const String _clientMaxWindowBits = "client_max_window_bits";
11 const String _serverMaxWindowBits = "server_max_window_bits";
12
13 // Matches _WebSocketOpcode.
14 class _WebSocketMessageType {
15 static const int NONE = 0;
16 static const int TEXT = 1;
17 static const int BINARY = 2;
18 }
19
20 class _WebSocketOpcode {
21 static const int CONTINUATION = 0;
22 static const int TEXT = 1;
23 static const int BINARY = 2;
24 static const int RESERVED_3 = 3;
25 static const int RESERVED_4 = 4;
26 static const int RESERVED_5 = 5;
27 static const int RESERVED_6 = 6;
28 static const int RESERVED_7 = 7;
29 static const int CLOSE = 8;
30 static const int PING = 9;
31 static const int PONG = 10;
32 static const int RESERVED_B = 11;
33 static const int RESERVED_C = 12;
34 static const int RESERVED_D = 13;
35 static const int RESERVED_E = 14;
36 static const int RESERVED_F = 15;
37 }
38
39 /**
40 * Stores the header and integer value derived from negotiation of
41 * client_max_window_bits and server_max_window_bits. headerValue will be
42 * set in the Websocket response headers.
43 */
44 class _CompressionMaxWindowBits {
45 String headerValue;
46 int maxWindowBits;
47 _CompressionMaxWindowBits([this.headerValue, this.maxWindowBits]);
48 String toString() => headerValue;
49 }
50
51 /**
52 * The web socket protocol transformer handles the protocol byte stream
53 * which is supplied through the [:handleData:]. As the protocol is processed,
54 * it'll output frame data as either a List<int> or String.
55 *
56 * Important information about usage: Be sure you use cancelOnError, so the
57 * socket will be closed when the processor encounter an error. Not using it
58 * will lead to undefined behaviour.
59 */
60 // TODO(ajohnsen): make this transformer reusable?
61 class _WebSocketProtocolTransformer
62 implements StreamTransformer<List<int>, dynamic>, EventSink<Uint8List> {
63 static const int START = 0;
64 static const int LEN_FIRST = 1;
65 static const int LEN_REST = 2;
66 static const int MASK = 3;
67 static const int PAYLOAD = 4;
68 static const int CLOSED = 5;
69 static const int FAILURE = 6;
70 static const int FIN = 0x80;
71 static const int RSV1 = 0x40;
72 static const int RSV2 = 0x20;
73 static const int RSV3 = 0x10;
74 static const int OPCODE = 0xF;
75
76 int _state = START;
77 bool _fin = false;
78 bool _compressed = false;
79 int _opcode = -1;
80 int _len = -1;
81 bool _masked = false;
82 int _remainingLenBytes = -1;
83 int _remainingMaskingKeyBytes = 4;
84 int _remainingPayloadBytes = -1;
85 int _unmaskingIndex = 0;
86 int _currentMessageType = _WebSocketMessageType.NONE;
87 int closeCode = WebSocketStatus.NO_STATUS_RECEIVED;
88 String closeReason = "";
89
90 EventSink _eventSink;
91
92 final bool _serverSide;
93 final List _maskingBytes = new List(4);
94 final BytesBuilder _payload = new BytesBuilder(copy: false);
95
96 _WebSocketPerMessageDeflate _deflate;
97 _WebSocketProtocolTransformer([this._serverSide = false, this._deflate]);
98
99 Stream bind(Stream stream) {
100 return new Stream.eventTransformed(stream, (EventSink eventSink) {
101 if (_eventSink != null) {
102 throw new StateError("WebSocket transformer already used.");
103 }
104 _eventSink = eventSink;
105 return this;
106 });
107 }
108
109 void addError(Object error, [StackTrace stackTrace]) {
110 _eventSink.addError(error, stackTrace);
111 }
112
113 void close() { _eventSink.close(); }
114
115 /**
116 * Process data received from the underlying communication channel.
117 */
118 void add(List<int> bytes) {
119 var buffer = bytes is Uint8List ? bytes : new Uint8List.fromList(bytes);
120 int index = 0;
121 int lastIndex = buffer.length;
122 if (_state == CLOSED) {
123 throw new WebSocketException("Data on closed connection");
124 }
125 if (_state == FAILURE) {
126 throw new WebSocketException("Data on failed connection");
127 }
128 while ((index < lastIndex) && _state != CLOSED && _state != FAILURE) {
129 int byte = buffer[index];
130 if (_state <= LEN_REST) {
131 if (_state == START) {
132 _fin = (byte & FIN) != 0;
133
134 if((byte & (RSV2 | RSV3)) != 0) {
135 // The RSV2, RSV3 bits must both be zero.
136 throw new WebSocketException("Protocol error");
137 }
138
139 _opcode = (byte & OPCODE);
140
141 if (_opcode != _WebSocketOpcode.CONTINUATION) {
142 if ((byte & RSV1) != 0) {
143 _compressed = true;
144 } else {
145 _compressed = false;
146 }
147 }
148
149 if (_opcode <= _WebSocketOpcode.BINARY) {
150 if (_opcode == _WebSocketOpcode.CONTINUATION) {
151 if (_currentMessageType == _WebSocketMessageType.NONE) {
152 throw new WebSocketException("Protocol error");
153 }
154 } else {
155 assert(_opcode == _WebSocketOpcode.TEXT ||
156 _opcode == _WebSocketOpcode.BINARY);
157 if (_currentMessageType != _WebSocketMessageType.NONE) {
158 throw new WebSocketException("Protocol error");
159 }
160 _currentMessageType = _opcode;
161 }
162 } else if (_opcode >= _WebSocketOpcode.CLOSE &&
163 _opcode <= _WebSocketOpcode.PONG) {
164 // Control frames cannot be fragmented.
165 if (!_fin) throw new WebSocketException("Protocol error");
166 } else {
167 throw new WebSocketException("Protocol error");
168 }
169 _state = LEN_FIRST;
170 } else if (_state == LEN_FIRST) {
171 _masked = (byte & 0x80) != 0;
172 _len = byte & 0x7F;
173 if (_isControlFrame() && _len > 125) {
174 throw new WebSocketException("Protocol error");
175 }
176 if (_len == 126) {
177 _len = 0;
178 _remainingLenBytes = 2;
179 _state = LEN_REST;
180 } else if (_len == 127) {
181 _len = 0;
182 _remainingLenBytes = 8;
183 _state = LEN_REST;
184 } else {
185 assert(_len < 126);
186 _lengthDone();
187 }
188 } else {
189 assert(_state == LEN_REST);
190 _len = _len << 8 | byte;
191 _remainingLenBytes--;
192 if (_remainingLenBytes == 0) {
193 _lengthDone();
194 }
195 }
196 } else {
197 if (_state == MASK) {
198 _maskingBytes[4 - _remainingMaskingKeyBytes--] = byte;
199 if (_remainingMaskingKeyBytes == 0) {
200 _maskDone();
201 }
202 } else {
203 assert(_state == PAYLOAD);
204 // The payload is not handled one byte at a time but in blocks.
205 int payloadLength = min(lastIndex - index, _remainingPayloadBytes);
206 _remainingPayloadBytes -= payloadLength;
207 // Unmask payload if masked.
208 if (_masked) {
209 _unmask(index, payloadLength, buffer);
210 }
211 // Control frame and data frame share _payloads.
212 _payload.add(new Uint8List.view(buffer.buffer, index, payloadLength));
213 index += payloadLength;
214 if (_isControlFrame()) {
215 if (_remainingPayloadBytes == 0) _controlFrameEnd();
216 } else {
217 if (_currentMessageType != _WebSocketMessageType.TEXT &&
218 _currentMessageType != _WebSocketMessageType.BINARY) {
219 throw new WebSocketException("Protocol error");
220 }
221 if (_remainingPayloadBytes == 0) _messageFrameEnd();
222 }
223
224 // Hack - as we always do index++ below.
225 index--;
226 }
227 }
228
229 // Move to the next byte.
230 index++;
231 }
232 }
233
234 void _unmask(int index, int length, Uint8List buffer) {
235 const int BLOCK_SIZE = 16;
236 // Skip Int32x4-version if message is small.
237 if (length >= BLOCK_SIZE) {
238 // Start by aligning to 16 bytes.
239 final int startOffset = BLOCK_SIZE - (index & 15);
240 final int end = index + startOffset;
241 for (int i = index; i < end; i++) {
242 buffer[i] ^= _maskingBytes[_unmaskingIndex++ & 3];
243 }
244 index += startOffset;
245 length -= startOffset;
246 final int blockCount = length ~/ BLOCK_SIZE;
247 if (blockCount > 0) {
248 // Create mask block.
249 int mask = 0;
250 for (int i = 3; i >= 0; i--) {
251 mask = (mask << 8) | _maskingBytes[(_unmaskingIndex + i) & 3];
252 }
253 Int32x4 blockMask = new Int32x4(mask, mask, mask, mask);
254 Int32x4List blockBuffer =
255 new Int32x4List.view(buffer.buffer, index, blockCount);
256 for (int i = 0; i < blockBuffer.length; i++) {
257 blockBuffer[i] ^= blockMask;
258 }
259 final int bytes = blockCount * BLOCK_SIZE;
260 index += bytes;
261 length -= bytes;
262 }
263 }
264 // Handle end.
265 final int end = index + length;
266 for (int i = index; i < end; i++) {
267 buffer[i] ^= _maskingBytes[_unmaskingIndex++ & 3];
268 }
269 }
270
271 void _lengthDone() {
272 if (_masked) {
273 if (!_serverSide) {
274 throw new WebSocketException("Received masked frame from server");
275 }
276 _state = MASK;
277 } else {
278 if (_serverSide) {
279 throw new WebSocketException("Received unmasked frame from client");
280 }
281 _remainingPayloadBytes = _len;
282 _startPayload();
283 }
284 }
285
286 void _maskDone() {
287 _remainingPayloadBytes = _len;
288 _startPayload();
289 }
290
291 void _startPayload() {
292 // If there is no actual payload perform perform callbacks without
293 // going through the PAYLOAD state.
294 if (_remainingPayloadBytes == 0) {
295 if (_isControlFrame()) {
296 switch (_opcode) {
297 case _WebSocketOpcode.CLOSE:
298 _state = CLOSED;
299 _eventSink.close();
300 break;
301 case _WebSocketOpcode.PING:
302 _eventSink.add(new _WebSocketPing());
303 break;
304 case _WebSocketOpcode.PONG:
305 _eventSink.add(new _WebSocketPong());
306 break;
307 }
308 _prepareForNextFrame();
309 } else {
310 _messageFrameEnd();
311 }
312 } else {
313 _state = PAYLOAD;
314 }
315 }
316
317 void _messageFrameEnd() {
318 if (_fin) {
319 var bytes = _payload.takeBytes();
320 if (_deflate != null && _compressed) {
321 bytes = _deflate.processIncomingMessage(bytes);
322 }
323
324 switch (_currentMessageType) {
325 case _WebSocketMessageType.TEXT:
326 _eventSink.add(UTF8.decode(bytes));
327 break;
328 case _WebSocketMessageType.BINARY:
329 _eventSink.add(bytes);
330 break;
331 }
332 _currentMessageType = _WebSocketMessageType.NONE;
333 }
334 _prepareForNextFrame();
335 }
336
337 void _controlFrameEnd() {
338 switch (_opcode) {
339 case _WebSocketOpcode.CLOSE:
340 closeCode = WebSocketStatus.NO_STATUS_RECEIVED;
341 var payload = _payload.takeBytes();
342 if (payload.length > 0) {
343 if (payload.length == 1) {
344 throw new WebSocketException("Protocol error");
345 }
346 closeCode = payload[0] << 8 | payload[1];
347 if (closeCode == WebSocketStatus.NO_STATUS_RECEIVED) {
348 throw new WebSocketException("Protocol error");
349 }
350 if (payload.length > 2) {
351 closeReason = UTF8.decode(payload.sublist(2));
352 }
353 }
354 _state = CLOSED;
355 _eventSink.close();
356 break;
357
358 case _WebSocketOpcode.PING:
359 _eventSink.add(new _WebSocketPing(_payload.takeBytes()));
360 break;
361
362 case _WebSocketOpcode.PONG:
363 _eventSink.add(new _WebSocketPong(_payload.takeBytes()));
364 break;
365 }
366 _prepareForNextFrame();
367 }
368
369 bool _isControlFrame() {
370 return _opcode == _WebSocketOpcode.CLOSE ||
371 _opcode == _WebSocketOpcode.PING ||
372 _opcode == _WebSocketOpcode.PONG;
373 }
374
375 void _prepareForNextFrame() {
376 if (_state != CLOSED && _state != FAILURE) _state = START;
377 _fin = false;
378 _opcode = -1;
379 _len = -1;
380 _remainingLenBytes = -1;
381 _remainingMaskingKeyBytes = 4;
382 _remainingPayloadBytes = -1;
383 _unmaskingIndex = 0;
384 }
385 }
386
387 class _WebSocketPing {
388 final List<int> payload;
389 _WebSocketPing([this.payload = null]);
390 }
391
392 class _WebSocketPong {
393 final List<int> payload;
394 _WebSocketPong([this.payload = null]);
395 }
396
397 class _WebSocketTransformerImpl implements WebSocketTransformer {
398 final StreamController<WebSocket> _controller =
399 new StreamController<WebSocket>(sync: true);
400 final Function _protocolSelector;
401 final CompressionOptions _compression;
402
403 _WebSocketTransformerImpl(this._protocolSelector, this._compression);
404
405 Stream<WebSocket> bind(Stream<HttpRequest> stream) {
406 stream.listen((request) {
407 _upgrade(request, _protocolSelector, _compression)
408 .then((WebSocket webSocket) => _controller.add(webSocket))
409 .catchError(_controller.addError);
410 }, onDone: () {
411 _controller.close();
412 });
413
414 return _controller.stream;
415 }
416
417 static Future<WebSocket> _upgrade(
418 HttpRequest request, _protocolSelector, CompressionOptions compression) {
419 var response = request.response;
420 if (!_isUpgradeRequest(request)) {
421 // Send error response.
422 response
423 ..statusCode = HttpStatus.BAD_REQUEST
424 ..close();
425 return new Future.error(
426 new WebSocketException("Invalid WebSocket upgrade request"));
427 }
428
429 Future upgrade(String protocol) {
430 // Send the upgrade response.
431 response
432 ..statusCode = HttpStatus.SWITCHING_PROTOCOLS
433 ..headers.add(HttpHeaders.CONNECTION, "Upgrade")
434 ..headers.add(HttpHeaders.UPGRADE, "websocket");
435 String key = request.headers.value("Sec-WebSocket-Key");
436 _SHA1 sha1 = new _SHA1();
437 sha1.add("$key$_webSocketGUID".codeUnits);
438 String accept = _CryptoUtils.bytesToBase64(sha1.close());
439 response.headers.add("Sec-WebSocket-Accept", accept);
440 if (protocol != null) {
441 response.headers.add("Sec-WebSocket-Protocol", protocol);
442 }
443
444 var deflate = _negotiateCompression(request, response, compression);
445
446 response.headers.contentLength = 0;
447 return response.detachSocket().then((socket) =>
448 new _WebSocketImpl._fromSocket(
449 socket, protocol, compression, true, deflate));
450 }
451
452 var protocols = request.headers['Sec-WebSocket-Protocol'];
453 if (protocols != null && _protocolSelector != null) {
454 // The suggested protocols can be spread over multiple lines, each
455 // consisting of multiple protocols. To unify all of them, first join
456 // the lists with ', ' and then tokenize.
457 protocols = _HttpParser._tokenizeFieldValue(protocols.join(', '));
458 return new Future(() => _protocolSelector(protocols)).then((protocol) {
459 if (protocols.indexOf(protocol) < 0) {
460 throw new WebSocketException(
461 "Selected protocol is not in the list of available protocols");
462 }
463 return protocol;
464 }).catchError((error) {
465 response
466 ..statusCode = HttpStatus.INTERNAL_SERVER_ERROR
467 ..close();
468 throw error;
469 }).then(upgrade);
470 } else {
471 return upgrade(null);
472 }
473 }
474
475 static _WebSocketPerMessageDeflate _negotiateCompression(HttpRequest request,
476 HttpResponse response, CompressionOptions compression) {
477 var extensionHeader = request.headers.value("Sec-WebSocket-Extensions");
478
479 extensionHeader ??= "";
480
481 var hv = HeaderValue.parse(extensionHeader, valueSeparator: ',');
482 if (compression.enabled && hv.value == _WebSocketImpl.PER_MESSAGE_DEFLATE) {
483 var info = compression._createHeader(hv);
484
485 response.headers.add("Sec-WebSocket-Extensions", info.headerValue);
486 var serverNoContextTakeover =
487 (hv.parameters.containsKey(_serverNoContextTakeover) &&
488 compression.serverNoContextTakeover);
489 var clientNoContextTakeover =
490 (hv.parameters.containsKey(_clientNoContextTakeover) &&
491 compression.clientNoContextTakeover);
492 var deflate = new _WebSocketPerMessageDeflate(
493 serverNoContextTakeover: serverNoContextTakeover,
494 clientNoContextTakeover: clientNoContextTakeover,
495 serverMaxWindowBits: info.maxWindowBits,
496 clientMaxWindowBits: info.maxWindowBits,
497 serverSide: true);
498
499 return deflate;
500 }
501
502 return null;
503 }
504
505 static bool _isUpgradeRequest(HttpRequest request) {
506 if (request.method != "GET") {
507 return false;
508 }
509 if (request.headers[HttpHeaders.CONNECTION] == null) {
510 return false;
511 }
512 bool isUpgrade = false;
513 request.headers[HttpHeaders.CONNECTION].forEach((String value) {
514 if (value.toLowerCase() == "upgrade") isUpgrade = true;
515 });
516 if (!isUpgrade) return false;
517 String upgrade = request.headers.value(HttpHeaders.UPGRADE);
518 if (upgrade == null || upgrade.toLowerCase() != "websocket") {
519 return false;
520 }
521 String version = request.headers.value("Sec-WebSocket-Version");
522 if (version == null || version != "13") {
523 return false;
524 }
525 String key = request.headers.value("Sec-WebSocket-Key");
526 if (key == null) {
527 return false;
528 }
529 return true;
530 }
531 }
532
533 class _WebSocketPerMessageDeflate {
534 bool serverNoContextTakeover;
535 bool clientNoContextTakeover;
536 int clientMaxWindowBits;
537 int serverMaxWindowBits;
538 bool serverSide;
539
540 _Filter decoder;
541 _Filter encoder;
542
543 _WebSocketPerMessageDeflate(
544 {this.clientMaxWindowBits: _WebSocketImpl.DEFAULT_WINDOW_BITS,
545 this.serverMaxWindowBits: _WebSocketImpl.DEFAULT_WINDOW_BITS,
546 this.serverNoContextTakeover: false,
547 this.clientNoContextTakeover: false,
548 this.serverSide: false});
549
550 void _ensureDecoder() {
551 if (decoder == null) {
552 decoder = _Filter._newZLibInflateFilter(
553 serverSide ? clientMaxWindowBits : serverMaxWindowBits, null, true);
554 }
555 }
556
557 void _ensureEncoder() {
558 if (encoder == null) {
559 encoder = _Filter._newZLibDeflateFilter(
560 false,
561 ZLibOption.DEFAULT_LEVEL,
562 serverSide ? serverMaxWindowBits : clientMaxWindowBits,
563 ZLibOption.DEFAULT_MEM_LEVEL,
564 ZLibOption.STRATEGY_DEFAULT,
565 null,
566 true);
567 }
568 }
569
570 Uint8List processIncomingMessage(List<int> msg) {
571 _ensureDecoder();
572
573 var data = [];
574 data.addAll(msg);
575 data.addAll(const [0x00, 0x00, 0xff, 0xff]);
576
577 decoder.process(data, 0, data.length);
578 var result = [];
579 var out;
580
581 while ((out = decoder.processed()) != null) {
582 result.addAll(out);
583 }
584
585 if ((serverSide && clientNoContextTakeover) ||
586 (!serverSide && serverNoContextTakeover)) {
587 decoder = null;
588 }
589
590 return new Uint8List.fromList(result);
591 }
592
593 List<int> processOutgoingMessage(List<int> msg) {
594 _ensureEncoder();
595 var result = [];
596 Uint8List buffer;
597 var out;
598
599 if (msg is! Uint8List) {
600 for (var i = 0; i < msg.length; i++) {
601 if (msg[i] < 0 || 255 < msg[i]) {
602 throw new ArgumentError("List element is not a byte value "
603 "(value ${msg[i]} at index $i)");
604 }
605 }
606 buffer = new Uint8List.fromList(msg);
607 } else {
608 buffer = msg;
609 }
610
611 encoder.process(buffer, 0, buffer.length);
612
613 while ((out = encoder.processed()) != null) {
614 result.addAll(out);
615 }
616
617 if ((!serverSide && clientNoContextTakeover) ||
618 (serverSide && serverNoContextTakeover)) {
619 encoder = null;
620 }
621
622 if (result.length > 4) {
623 result = result.sublist(0, result.length - 4);
624 }
625
626 return result;
627 }
628 }
629
630 // TODO(ajohnsen): Make this transformer reusable.
631 class _WebSocketOutgoingTransformer
632 implements StreamTransformer<dynamic, List<int>>, EventSink {
633 final _WebSocketImpl webSocket;
634 EventSink<List<int>> _eventSink;
635
636 _WebSocketPerMessageDeflate _deflateHelper;
637
638 _WebSocketOutgoingTransformer(this.webSocket) {
639 _deflateHelper = webSocket._deflate;
640 }
641
642 Stream<List<int>> bind(Stream stream) {
643 return new Stream.eventTransformed(stream, (eventSink) {
644 if (_eventSink != null) {
645 throw new StateError("WebSocket transformer already used");
646 }
647 _eventSink = eventSink;
648 return this;
649 });
650 }
651
652 void add(message) {
653 if (message is _WebSocketPong) {
654 addFrame(_WebSocketOpcode.PONG, message.payload);
655 return;
656 }
657 if (message is _WebSocketPing) {
658 addFrame(_WebSocketOpcode.PING, message.payload);
659 return;
660 }
661 List<int> data;
662 int opcode;
663 if (message != null) {
664 if (message is String) {
665 opcode = _WebSocketOpcode.TEXT;
666 data = UTF8.encode(message);
667 } else {
668 if (message is List<int>) {
669 data = message;
670 opcode = _WebSocketOpcode.BINARY;
671 } else {
672 throw new ArgumentError(message);
673 }
674 }
675
676 if (_deflateHelper != null) {
677 data = _deflateHelper.processOutgoingMessage(data);
678 }
679 } else {
680 opcode = _WebSocketOpcode.TEXT;
681 }
682 addFrame(opcode, data);
683 }
684
685 void addError(Object error, [StackTrace stackTrace]) {
686 _eventSink.addError(error, stackTrace);
687 }
688
689 void close() {
690 int code = webSocket._outCloseCode;
691 String reason = webSocket._outCloseReason;
692 List<int> data;
693 if (code != null) {
694 data = new List<int>();
695 data.add((code >> 8) & 0xFF);
696 data.add(code & 0xFF);
697 if (reason != null) {
698 data.addAll(UTF8.encode(reason));
699 }
700 }
701 addFrame(_WebSocketOpcode.CLOSE, data);
702 _eventSink.close();
703 }
704
705 void addFrame(int opcode, List<int> data) => createFrame(
706 opcode,
707 data,
708 webSocket._serverSide,
709 _deflateHelper != null &&
710 (opcode == _WebSocketOpcode.TEXT ||
711 opcode == _WebSocketOpcode.BINARY)).forEach((e) {
712 _eventSink.add(e);
713 });
714
715 static Iterable<List<int>> createFrame(
716 int opcode, List<int> data, bool serverSide, bool compressed) {
717 bool mask = !serverSide; // Masking not implemented for server.
718 int dataLength = data == null ? 0 : data.length;
719 // Determine the header size.
720 int headerSize = (mask) ? 6 : 2;
721 if (dataLength > 65535) {
722 headerSize += 8;
723 } else if (dataLength > 125) {
724 headerSize += 2;
725 }
726 Uint8List header = new Uint8List(headerSize);
727 int index = 0;
728
729 // Set FIN and opcode.
730 var hoc = _WebSocketProtocolTransformer.FIN
731 | (compressed ? _WebSocketProtocolTransformer.RSV1 : 0)
732 | (opcode & _WebSocketProtocolTransformer.OPCODE);
733
734 header[index++] = hoc;
735 // Determine size and position of length field.
736 int lengthBytes = 1;
737 if (dataLength > 65535) {
738 header[index++] = 127;
739 lengthBytes = 8;
740 } else if (dataLength > 125) {
741 header[index++] = 126;
742 lengthBytes = 2;
743 }
744 // Write the length in network byte order into the header.
745 for (int i = 0; i < lengthBytes; i++) {
746 header[index++] = dataLength >> (((lengthBytes - 1) - i) * 8) & 0xFF;
747 }
748 if (mask) {
749 header[1] |= 1 << 7;
750 var maskBytes = _IOCrypto.getRandomBytes(4);
751 header.setRange(index, index + 4, maskBytes);
752 index += 4;
753 if (data != null) {
754 Uint8List list;
755 // If this is a text message just do the masking inside the
756 // encoded data.
757 if (opcode == _WebSocketOpcode.TEXT && data is Uint8List) {
758 list = data;
759 } else {
760 if (data is Uint8List) {
761 list = new Uint8List.fromList(data);
762 } else {
763 list = new Uint8List(data.length);
764 for (int i = 0; i < data.length; i++) {
765 if (data[i] < 0 || 255 < data[i]) {
766 throw new ArgumentError("List element is not a byte value "
767 "(value ${data[i]} at index $i)");
768 }
769 list[i] = data[i];
770 }
771 }
772 }
773 const int BLOCK_SIZE = 16;
774 int blockCount = list.length ~/ BLOCK_SIZE;
775 if (blockCount > 0) {
776 // Create mask block.
777 int mask = 0;
778 for (int i = 3; i >= 0; i--) {
779 mask = (mask << 8) | maskBytes[i];
780 }
781 Int32x4 blockMask = new Int32x4(mask, mask, mask, mask);
782 Int32x4List blockBuffer =
783 new Int32x4List.view(list.buffer, 0, blockCount);
784 for (int i = 0; i < blockBuffer.length; i++) {
785 blockBuffer[i] ^= blockMask;
786 }
787 }
788 // Handle end.
789 for (int i = blockCount * BLOCK_SIZE; i < list.length; i++) {
790 list[i] ^= maskBytes[i & 3];
791 }
792 data = list;
793 }
794 }
795 assert(index == headerSize);
796 if (data == null) {
797 return [header];
798 } else {
799 return [header, data];
800 }
801 }
802 }
803
804 class _WebSocketConsumer implements StreamConsumer {
805 final _WebSocketImpl webSocket;
806 final Socket socket;
807 StreamController _controller;
808 StreamSubscription _subscription;
809 bool _issuedPause = false;
810 bool _closed = false;
811 Completer _closeCompleter = new Completer();
812 Completer _completer;
813
814 _WebSocketConsumer(this.webSocket, this.socket);
815
816 void _onListen() {
817 if (_subscription != null) {
818 _subscription.cancel();
819 }
820 }
821
822 void _onPause() {
823 if (_subscription != null) {
824 _subscription.pause();
825 } else {
826 _issuedPause = true;
827 }
828 }
829
830 void _onResume() {
831 if (_subscription != null) {
832 _subscription.resume();
833 } else {
834 _issuedPause = false;
835 }
836 }
837
838 void _cancel() {
839 if (_subscription != null) {
840 var subscription = _subscription;
841 _subscription = null;
842 subscription.cancel();
843 }
844 }
845
846 _ensureController() {
847 if (_controller != null) return;
848 _controller = new StreamController(
849 sync: true,
850 onPause: _onPause,
851 onResume: _onResume,
852 onCancel: _onListen);
853 var stream = _controller.stream
854 .transform(new _WebSocketOutgoingTransformer(webSocket));
855 socket.addStream(stream).then((_) {
856 _done();
857 _closeCompleter.complete(webSocket);
858 }, onError: (error, StackTrace stackTrace) {
859 _closed = true;
860 _cancel();
861 if (error is ArgumentError) {
862 if (!_done(error, stackTrace)) {
863 _closeCompleter.completeError(error, stackTrace);
864 }
865 } else {
866 _done();
867 _closeCompleter.complete(webSocket);
868 }
869 });
870 }
871
872 bool _done([error, StackTrace stackTrace]) {
873 if (_completer == null) return false;
874 if (error != null) {
875 _completer.completeError(error, stackTrace);
876 } else {
877 _completer.complete(webSocket);
878 }
879 _completer = null;
880 return true;
881 }
882
883 Future addStream(var stream) {
884 if (_closed) {
885 stream.listen(null).cancel();
886 return new Future.value(webSocket);
887 }
888 _ensureController();
889 _completer = new Completer();
890 _subscription = stream.listen((data) {
891 _controller.add(data);
892 }, onDone: _done, onError: _done, cancelOnError: true);
893 if (_issuedPause) {
894 _subscription.pause();
895 _issuedPause = false;
896 }
897 return _completer.future;
898 }
899
900 Future close() {
901 _ensureController();
902 Future closeSocket() {
903 return socket.close().catchError((_) {}).then((_) => webSocket);
904 }
905 _controller.close();
906 return _closeCompleter.future.then((_) => closeSocket());
907 }
908
909 void add(data) {
910 if (_closed) return;
911 _ensureController();
912 _controller.add(data);
913 }
914
915 void closeSocket() {
916 _closed = true;
917 _cancel();
918 close();
919 }
920 }
921
922 class _WebSocketImpl extends Stream with _ServiceObject implements WebSocket {
923 // Use default Map so we keep order.
924 static Map<int, _WebSocketImpl> _webSockets = new Map<int, _WebSocketImpl>();
925 static const int DEFAULT_WINDOW_BITS = 15;
926 static const String PER_MESSAGE_DEFLATE = "permessage-deflate";
927
928 final String protocol;
929
930 StreamController _controller;
931 StreamSubscription _subscription;
932 StreamSink _sink;
933
934 final _socket;
935 final bool _serverSide;
936 int _readyState = WebSocket.CONNECTING;
937 bool _writeClosed = false;
938 int _closeCode;
939 String _closeReason;
940 Duration _pingInterval;
941 Timer _pingTimer;
942 _WebSocketConsumer _consumer;
943
944 int _outCloseCode;
945 String _outCloseReason;
946 Timer _closeTimer;
947 _WebSocketPerMessageDeflate _deflate;
948
949 static final HttpClient _httpClient = new HttpClient();
950
951 static Future<WebSocket> connect(
952 String url, Iterable<String> protocols, Map<String, dynamic> headers,
953 {CompressionOptions compression: CompressionOptions.DEFAULT}) {
954 Uri uri = Uri.parse(url);
955 if (uri.scheme != "ws" && uri.scheme != "wss") {
956 throw new WebSocketException("Unsupported URL scheme '${uri.scheme}'");
957 }
958
959 Random random = new Random();
960 // Generate 16 random bytes.
961 Uint8List nonceData = new Uint8List(16);
962 for (int i = 0; i < 16; i++) {
963 nonceData[i] = random.nextInt(256);
964 }
965 String nonce = _CryptoUtils.bytesToBase64(nonceData);
966
967 uri = new Uri(
968 scheme: uri.scheme == "wss" ? "https" : "http",
969 userInfo: uri.userInfo,
970 host: uri.host,
971 port: uri.port,
972 path: uri.path,
973 query: uri.query,
974 fragment: uri.fragment);
975 return _httpClient.openUrl("GET", uri).then((request) {
976 if (uri.userInfo != null && !uri.userInfo.isEmpty) {
977 // If the URL contains user information use that for basic
978 // authorization.
979 String auth = _CryptoUtils.bytesToBase64(UTF8.encode(uri.userInfo));
980 request.headers.set(HttpHeaders.AUTHORIZATION, "Basic $auth");
981 }
982 if (headers != null) {
983 headers.forEach((field, value) => request.headers.add(field, value));
984 }
985 // Setup the initial handshake.
986 request.headers
987 ..set(HttpHeaders.CONNECTION, "Upgrade")
988 ..set(HttpHeaders.UPGRADE, "websocket")
989 ..set("Sec-WebSocket-Key", nonce)
990 ..set("Cache-Control", "no-cache")
991 ..set("Sec-WebSocket-Version", "13");
992 if (protocols != null) {
993 request.headers.add("Sec-WebSocket-Protocol", protocols.toList());
994 }
995
996 if (compression.enabled) {
997 request.headers
998 .add("Sec-WebSocket-Extensions", compression._createHeader());
999 }
1000
1001 return request.close();
1002 }).then((response) {
1003
1004 void error(String message) {
1005 // Flush data.
1006 response.detachSocket().then((socket) {
1007 socket.destroy();
1008 });
1009 throw new WebSocketException(message);
1010 }
1011
1012 if (response.statusCode != HttpStatus.SWITCHING_PROTOCOLS ||
1013 response.headers[HttpHeaders.CONNECTION] == null ||
1014 !response.headers[HttpHeaders.CONNECTION]
1015 .any((value) => value.toLowerCase() == "upgrade") ||
1016 response.headers.value(HttpHeaders.UPGRADE).toLowerCase() !=
1017 "websocket") {
1018 error("Connection to '$uri' was not upgraded to websocket");
1019 }
1020 String accept = response.headers.value("Sec-WebSocket-Accept");
1021 if (accept == null) {
1022 error("Response did not contain a 'Sec-WebSocket-Accept' header");
1023 }
1024 _SHA1 sha1 = new _SHA1();
1025 sha1.add("$nonce$_webSocketGUID".codeUnits);
1026 List<int> expectedAccept = sha1.close();
1027 List<int> receivedAccept = _CryptoUtils.base64StringToBytes(accept);
1028 if (expectedAccept.length != receivedAccept.length) {
1029 error("Reasponse header 'Sec-WebSocket-Accept' is the wrong length");
1030 }
1031 for (int i = 0; i < expectedAccept.length; i++) {
1032 if (expectedAccept[i] != receivedAccept[i]) {
1033 error("Bad response 'Sec-WebSocket-Accept' header");
1034 }
1035 }
1036 var protocol = response.headers.value('Sec-WebSocket-Protocol');
1037
1038 _WebSocketPerMessageDeflate deflate =
1039 negotiateClientCompression(response, compression);
1040
1041 return response.detachSocket().then/*<WebSocket>*/((socket) =>
1042 new _WebSocketImpl._fromSocket(
1043 socket, protocol, compression, false, deflate));
1044 });
1045 }
1046
1047 static _WebSocketPerMessageDeflate negotiateClientCompression(
1048 HttpClientResponse response, CompressionOptions compression) {
1049 String extensionHeader = response.headers.value('Sec-WebSocket-Extensions');
1050
1051 if (extensionHeader == null) {
1052 extensionHeader = "";
1053 }
1054
1055 var hv = HeaderValue.parse(extensionHeader, valueSeparator: ',');
1056
1057 if (compression.enabled && hv.value == PER_MESSAGE_DEFLATE) {
1058 var serverNoContextTakeover =
1059 hv.parameters.containsKey(_serverNoContextTakeover);
1060 var clientNoContextTakeover =
1061 hv.parameters.containsKey(_clientNoContextTakeover);
1062
1063 int getWindowBits(String type) {
1064 var o = hv.parameters[type];
1065 if (o == null) {
1066 return DEFAULT_WINDOW_BITS;
1067 }
1068
1069 return int.parse(o, onError: (s) => DEFAULT_WINDOW_BITS);
1070 }
1071
1072 return new _WebSocketPerMessageDeflate(
1073 clientMaxWindowBits: getWindowBits(_clientMaxWindowBits),
1074 serverMaxWindowBits: getWindowBits(_serverMaxWindowBits),
1075 clientNoContextTakeover: clientNoContextTakeover,
1076 serverNoContextTakeover: serverNoContextTakeover);
1077 }
1078
1079 return null;
1080 }
1081
1082 _WebSocketImpl._fromSocket(
1083 this._socket, this.protocol, CompressionOptions compression,
1084 [this._serverSide = false, _WebSocketPerMessageDeflate deflate]) {
1085 _consumer = new _WebSocketConsumer(this, _socket);
1086 _sink = new _StreamSinkImpl(_consumer);
1087 _readyState = WebSocket.OPEN;
1088 _deflate = deflate;
1089
1090 var transformer = new _WebSocketProtocolTransformer(_serverSide, _deflate);
1091 _subscription = _socket.transform(transformer).listen((data) {
1092 if (data is _WebSocketPing) {
1093 if (!_writeClosed) _consumer.add(new _WebSocketPong(data.payload));
1094 } else if (data is _WebSocketPong) {
1095 // Simply set pingInterval, as it'll cancel any timers.
1096 pingInterval = _pingInterval;
1097 } else {
1098 _controller.add(data);
1099 }
1100 }, onError: (error, stackTrace) {
1101 if (_closeTimer != null) _closeTimer.cancel();
1102 if (error is FormatException) {
1103 _close(WebSocketStatus.INVALID_FRAME_PAYLOAD_DATA);
1104 } else {
1105 _close(WebSocketStatus.PROTOCOL_ERROR);
1106 }
1107 // An error happened, set the close code set above.
1108 _closeCode = _outCloseCode;
1109 _closeReason = _outCloseReason;
1110 _controller.close();
1111 }, onDone: () {
1112 if (_closeTimer != null) _closeTimer.cancel();
1113 if (_readyState == WebSocket.OPEN) {
1114 _readyState = WebSocket.CLOSING;
1115 if (!_isReservedStatusCode(transformer.closeCode)) {
1116 _close(transformer.closeCode, transformer.closeReason);
1117 } else {
1118 _close();
1119 }
1120 _readyState = WebSocket.CLOSED;
1121 }
1122 // Protocol close, use close code from transformer.
1123 _closeCode = transformer.closeCode;
1124 _closeReason = transformer.closeReason;
1125 _controller.close();
1126 }, cancelOnError: true);
1127 _subscription.pause();
1128 _controller = new StreamController(
1129 sync: true, onListen: _subscription.resume, onCancel: () {
1130 _subscription.cancel();
1131 _subscription = null;
1132 }, onPause: _subscription.pause, onResume: _subscription.resume);
1133
1134 _webSockets[_serviceId] = this;
1135 try {
1136 _socket._owner = this;
1137 } catch (_) {}
1138 }
1139
1140 StreamSubscription listen(void onData(message),
1141 {Function onError, void onDone(), bool cancelOnError}) {
1142 return _controller.stream.listen(onData,
1143 onError: onError, onDone: onDone, cancelOnError: cancelOnError);
1144 }
1145
1146 Duration get pingInterval => _pingInterval;
1147
1148 void set pingInterval(Duration interval) {
1149 if (_writeClosed) return;
1150 if (_pingTimer != null) _pingTimer.cancel();
1151 _pingInterval = interval;
1152
1153 if (_pingInterval == null) return;
1154
1155 _pingTimer = new Timer(_pingInterval, () {
1156 if (_writeClosed) return;
1157 _consumer.add(new _WebSocketPing());
1158 _pingTimer = new Timer(_pingInterval, () {
1159 // No pong received.
1160 _close(WebSocketStatus.GOING_AWAY);
1161 });
1162 });
1163 }
1164
1165 int get readyState => _readyState;
1166
1167 String get extensions => null;
1168 int get closeCode => _closeCode;
1169 String get closeReason => _closeReason;
1170
1171 void add(data) { _sink.add(data); }
1172 void addError(error, [StackTrace stackTrace]) {
1173 _sink.addError(error, stackTrace);
1174 }
1175 Future addStream(Stream stream) => _sink.addStream(stream);
1176 Future get done => _sink.done;
1177
1178 Future close([int code, String reason]) {
1179 if (_isReservedStatusCode(code)) {
1180 throw new WebSocketException("Reserved status code $code");
1181 }
1182 if (_outCloseCode == null) {
1183 _outCloseCode = code;
1184 _outCloseReason = reason;
1185 }
1186 if (!_controller.isClosed) {
1187 // If a close has not yet been received from the other end then
1188 // 1) make sure to listen on the stream so the close frame will be
1189 // processed if received.
1190 // 2) set a timer terminate the connection if a close frame is
1191 // not received.
1192 if (!_controller.hasListener && _subscription != null) {
1193 _controller.stream.drain().catchError((_) => {});
1194 }
1195 if (_closeTimer == null) {
1196 // When closing the web-socket, we no longer accept data.
1197 _closeTimer = new Timer(const Duration(seconds: 5), () {
1198 // Reuse code and reason from the local close.
1199 _closeCode = _outCloseCode;
1200 _closeReason = _outCloseReason;
1201 if (_subscription != null) _subscription.cancel();
1202 _controller.close();
1203 _webSockets.remove(_serviceId);
1204 });
1205 }
1206 }
1207 return _sink.close();
1208 }
1209
1210 void _close([int code, String reason]) {
1211 if (_writeClosed) return;
1212 if (_outCloseCode == null) {
1213 _outCloseCode = code;
1214 _outCloseReason = reason;
1215 }
1216 _writeClosed = true;
1217 _consumer.closeSocket();
1218 _webSockets.remove(_serviceId);
1219 }
1220
1221 String get _serviceTypePath => 'io/websockets';
1222 String get _serviceTypeName => 'WebSocket';
1223
1224 Map<String, dynamic> _toJSON(bool ref) {
1225 var name = '${_socket.address.host}:${_socket.port}';
1226 var r = <String, dynamic>{
1227 'id': _servicePath,
1228 'type': _serviceType(ref),
1229 'name': name,
1230 'user_name': name,
1231 };
1232 if (ref) {
1233 return r;
1234 }
1235 try {
1236 r['socket'] = _socket._toJSON(true);
1237 } catch (_) {
1238 r['socket'] = {
1239 'id': _servicePath,
1240 'type': '@Socket',
1241 'name': 'UserSocket',
1242 'user_name': 'UserSocket',
1243 };
1244 }
1245 return r;
1246 }
1247
1248 static bool _isReservedStatusCode(int code) {
1249 return code != null &&
1250 (code < WebSocketStatus.NORMAL_CLOSURE ||
1251 code == WebSocketStatus.RESERVED_1004 ||
1252 code == WebSocketStatus.NO_STATUS_RECEIVED ||
1253 code == WebSocketStatus.ABNORMAL_CLOSURE ||
1254 (code > WebSocketStatus.INTERNAL_SERVER_ERROR &&
1255 code < WebSocketStatus.RESERVED_1015) ||
1256 (code >= WebSocketStatus.RESERVED_1015 && code < 3000));
1257 }
1258 }
OLDNEW
« no previous file with comments | « pkg/dev_compiler/tool/input_sdk/lib/io/websocket.dart ('k') | pkg/dev_compiler/tool/input_sdk/lib/isolate/capability.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698