Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(356)

Side by Side Diff: pkg/http_parser/lib/src/web_socket.dart

Issue 250073002: Re-apply "Add a non-dart:io WebSocket implementation to http_parser." (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Created 6 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « pkg/http_parser/lib/src/bytes_builder.dart ('k') | pkg/http_parser/pubspec.yaml » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2014, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 part of dart.io; 5 library http_parser.web_socket;
6
7 import 'dart:async';
8 import 'dart:convert';
9 import 'dart:math';
10 import 'dart:typed_data';
11
12 import 'package:crypto/crypto.dart';
13
14 import 'bytes_builder.dart';
15
16 /// An implementation of the WebSocket protocol that's not specific to "dart:io"
17 /// or to any particular HTTP API.
18 ///
19 /// Because this is HTTP-API-agnostic, it doesn't handle the initial [WebSocket
20 /// handshake][]. This needs to be handled manually by the user of the code.
21 /// Once that's been done, [new CompatibleWebSocket] can be called with the
22 /// underlying socket and it will handle the remainder of the protocol.
23 ///
24 /// [WebSocket handshake]: https://tools.ietf.org/html/rfc6455#section-4
25 abstract class CompatibleWebSocket implements Stream, StreamSink {
26 /// The interval for sending ping signals.
27 ///
28 /// If a ping message is not answered by a pong message from the peer, the
29 /// `WebSocket` is assumed disconnected and the connection is closed with a
30 /// [WebSocketStatus.GOING_AWAY] close code. When a ping signal is sent, the
31 /// pong message must be received within [pingInterval].
32 ///
33 /// There are never two outstanding pings at any given time, and the next ping
34 /// timer starts when the pong is received.
35 ///
36 /// By default, the [pingInterval] is `null`, indicating that ping messages
37 /// are disabled.
38 Duration pingInterval;
39
40 /// The [close code][] set when the WebSocket connection is closed.
41 ///
42 /// [close code]: https://tools.ietf.org/html/rfc6455#section-7.1.5
43 ///
44 /// Before the connection has been closed, this will be `null`.
45 int get closeCode;
46
47 /// The [close reason][] set when the WebSocket connection is closed.
48 ///
49 /// [close reason]: https://tools.ietf.org/html/rfc6455#section-7.1.6
50 ///
51 /// Before the connection has been closed, this will be `null`.
52 String get closeReason;
53
54 /// Signs a `Sec-WebSocket-Key` header sent by a WebSocket client as part of
55 /// the [initial handshake].
56 ///
57 /// The return value should be sent back to the client in a
58 /// `Sec-WebSocket-Accept` header.
59 ///
60 /// [initial handshake]: https://tools.ietf.org/html/rfc6455#section-4.2.2
61 static String signKey(String key) {
62 var hash = new SHA1();
63 // We use [codeUnits] here rather than UTF-8-decoding the string because
64 // [key] is expected to be base64 encoded, and so will be pure ASCII.
65 hash.add((key + _webSocketGUID).codeUnits);
66 return CryptoUtils.bytesToBase64(hash.close());
67 }
68
69 /// Creates a new WebSocket handling messaging across an existing socket.
70 ///
71 /// Because this is HTTP-API-agnostic, the initial [WebSocket handshake][]
72 /// must have already been completed on the socket before this is called.
73 ///
74 /// If [stream] is also a [StreamSink] (for example, if it's a "dart:io"
75 /// `Socket`), it will be used for both sending and receiving data. Otherwise,
76 /// it will be used for receiving data and [sink] will be used for sending it.
77 ///
78 /// If this is a WebSocket server, [serverSide] should be `true` (the
79 /// default); if it's a client, [serverSide] should be `false`.
80 ///
81 /// [WebSocket handshake]: https://tools.ietf.org/html/rfc6455#section-4
82 factory CompatibleWebSocket(Stream<List<int>> stream,
83 {StreamSink<List<int>> sink, bool serverSide: true}) {
84 if (sink == null) {
85 if (stream is! StreamSink) {
86 throw new ArgumentError("If stream isn't also a StreamSink, sink must "
87 "be passed explicitly.");
88 }
89 sink = stream as StreamSink;
90 }
91
92 return new _WebSocketImpl._fromSocket(stream, sink, serverSide);
93 }
94
95 /// Closes the web socket connection.
96 ///
97 /// [closeCode] and [closeReason] are the [close code][] and [reason][] sent
98 /// to the remote peer, respectively. If they are omitted, the peer will see
99 /// a "no status received" code with no reason.
100 ///
101 /// [close code]: https://tools.ietf.org/html/rfc6455#section-7.1.5
102 /// [reason]: https://tools.ietf.org/html/rfc6455#section-7.1.6
103 Future close([int closeCode, String closeReason]);
104 }
105
106 /// An exception thrown by [CompatibleWebSocket].
107 class CompatibleWebSocketException implements Exception {
108 final String message;
109
110 CompatibleWebSocketException([this.message]);
111
112 String toString() => message == null
113 ? "CompatibleWebSocketException" :
114 "CompatibleWebSocketException: $message";
115 }
116
117 // The following code is copied from sdk/lib/io/websocket_impl.dart. The
118 // "dart:io" implementation isn't used directly both to support non-"dart:io"
119 // applications, and because it's incompatible with non-"dart:io" HTTP requests
120 // (issue 18172).
121 //
122 // Because it's copied directly, only modifications necessary to support the
123 // desired public API and to remove "dart:io" dependencies have been made.
124
125 /**
126 * Web socket status codes used when closing a web socket connection.
127 */
128 abstract class _WebSocketStatus {
129 static const int NORMAL_CLOSURE = 1000;
130 static const int GOING_AWAY = 1001;
131 static const int PROTOCOL_ERROR = 1002;
132 static const int UNSUPPORTED_DATA = 1003;
133 static const int RESERVED_1004 = 1004;
134 static const int NO_STATUS_RECEIVED = 1005;
135 static const int ABNORMAL_CLOSURE = 1006;
136 static const int INVALID_FRAME_PAYLOAD_DATA = 1007;
137 static const int POLICY_VIOLATION = 1008;
138 static const int MESSAGE_TOO_BIG = 1009;
139 static const int MISSING_MANDATORY_EXTENSION = 1010;
140 static const int INTERNAL_SERVER_ERROR = 1011;
141 static const int RESERVED_1015 = 1015;
142 }
143
144 abstract class _WebSocketState {
145 static const int CONNECTING = 0;
146 static const int OPEN = 1;
147 static const int CLOSING = 2;
148 static const int CLOSED = 3;
149 }
6 150
7 const String _webSocketGUID = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"; 151 const String _webSocketGUID = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
8 152
153 final _random = new Random();
154
9 // Matches _WebSocketOpcode. 155 // Matches _WebSocketOpcode.
10 class _WebSocketMessageType { 156 class _WebSocketMessageType {
11 static const int NONE = 0; 157 static const int NONE = 0;
12 static const int TEXT = 1; 158 static const int TEXT = 1;
13 static const int BINARY = 2; 159 static const int BINARY = 2;
14 } 160 }
15 161
16 162
17 class _WebSocketOpcode { 163 class _WebSocketOpcode {
18 static const int CONTINUATION = 0; 164 static const int CONTINUATION = 0;
(...skipping 36 matching lines...) Expand 10 before | Expand all | Expand 10 after
55 int _state = START; 201 int _state = START;
56 bool _fin = false; 202 bool _fin = false;
57 int _opcode = -1; 203 int _opcode = -1;
58 int _len = -1; 204 int _len = -1;
59 bool _masked = false; 205 bool _masked = false;
60 int _remainingLenBytes = -1; 206 int _remainingLenBytes = -1;
61 int _remainingMaskingKeyBytes = 4; 207 int _remainingMaskingKeyBytes = 4;
62 int _remainingPayloadBytes = -1; 208 int _remainingPayloadBytes = -1;
63 int _unmaskingIndex = 0; 209 int _unmaskingIndex = 0;
64 int _currentMessageType = _WebSocketMessageType.NONE; 210 int _currentMessageType = _WebSocketMessageType.NONE;
65 int closeCode = WebSocketStatus.NO_STATUS_RECEIVED; 211 int closeCode = _WebSocketStatus.NO_STATUS_RECEIVED;
66 String closeReason = ""; 212 String closeReason = "";
67 213
68 EventSink _eventSink; 214 EventSink _eventSink;
69 215
70 final bool _serverSide; 216 final bool _serverSide;
71 final List _maskingBytes = new List(4); 217 final List _maskingBytes = new List(4);
72 final BytesBuilder _payload = new BytesBuilder(copy: false); 218 final BytesBuilder _payload = new BytesBuilder(copy: false);
73 219
74 _WebSocketProtocolTransformer([this._serverSide = false]); 220 _WebSocketProtocolTransformer([this._serverSide = false]);
75 221
(...skipping 15 matching lines...) Expand all
91 void close() => _eventSink.close(); 237 void close() => _eventSink.close();
92 238
93 /** 239 /**
94 * Process data received from the underlying communication channel. 240 * Process data received from the underlying communication channel.
95 */ 241 */
96 void add(Uint8List buffer) { 242 void add(Uint8List buffer) {
97 int count = buffer.length; 243 int count = buffer.length;
98 int index = 0; 244 int index = 0;
99 int lastIndex = count; 245 int lastIndex = count;
100 if (_state == CLOSED) { 246 if (_state == CLOSED) {
101 throw new WebSocketException("Data on closed connection"); 247 throw new CompatibleWebSocketException("Data on closed connection");
102 } 248 }
103 if (_state == FAILURE) { 249 if (_state == FAILURE) {
104 throw new WebSocketException("Data on failed connection"); 250 throw new CompatibleWebSocketException("Data on failed connection");
105 } 251 }
106 while ((index < lastIndex) && _state != CLOSED && _state != FAILURE) { 252 while ((index < lastIndex) && _state != CLOSED && _state != FAILURE) {
107 int byte = buffer[index]; 253 int byte = buffer[index];
108 if (_state <= LEN_REST) { 254 if (_state <= LEN_REST) {
109 if (_state == START) { 255 if (_state == START) {
110 _fin = (byte & 0x80) != 0; 256 _fin = (byte & 0x80) != 0;
111 if ((byte & 0x70) != 0) { 257 if ((byte & 0x70) != 0) {
112 // The RSV1, RSV2 bits RSV3 must be all zero. 258 // The RSV1, RSV2 bits RSV3 must be all zero.
113 throw new WebSocketException("Protocol error"); 259 throw new CompatibleWebSocketException("Protocol error");
114 } 260 }
115 _opcode = (byte & 0xF); 261 _opcode = (byte & 0xF);
116 if (_opcode <= _WebSocketOpcode.BINARY) { 262 if (_opcode <= _WebSocketOpcode.BINARY) {
117 if (_opcode == _WebSocketOpcode.CONTINUATION) { 263 if (_opcode == _WebSocketOpcode.CONTINUATION) {
118 if (_currentMessageType == _WebSocketMessageType.NONE) { 264 if (_currentMessageType == _WebSocketMessageType.NONE) {
119 throw new WebSocketException("Protocol error"); 265 throw new CompatibleWebSocketException("Protocol error");
120 } 266 }
121 } else { 267 } else {
122 assert(_opcode == _WebSocketOpcode.TEXT || 268 assert(_opcode == _WebSocketOpcode.TEXT ||
123 _opcode == _WebSocketOpcode.BINARY); 269 _opcode == _WebSocketOpcode.BINARY);
124 if (_currentMessageType != _WebSocketMessageType.NONE) { 270 if (_currentMessageType != _WebSocketMessageType.NONE) {
125 throw new WebSocketException("Protocol error"); 271 throw new CompatibleWebSocketException("Protocol error");
126 } 272 }
127 _currentMessageType = _opcode; 273 _currentMessageType = _opcode;
128 } 274 }
129 } else if (_opcode >= _WebSocketOpcode.CLOSE && 275 } else if (_opcode >= _WebSocketOpcode.CLOSE &&
130 _opcode <= _WebSocketOpcode.PONG) { 276 _opcode <= _WebSocketOpcode.PONG) {
131 // Control frames cannot be fragmented. 277 // Control frames cannot be fragmented.
132 if (!_fin) throw new WebSocketException("Protocol error"); 278 if (!_fin) throw new CompatibleWebSocketException("Protocol error");
133 } else { 279 } else {
134 throw new WebSocketException("Protocol error"); 280 throw new CompatibleWebSocketException("Protocol error");
135 } 281 }
136 _state = LEN_FIRST; 282 _state = LEN_FIRST;
137 } else if (_state == LEN_FIRST) { 283 } else if (_state == LEN_FIRST) {
138 _masked = (byte & 0x80) != 0; 284 _masked = (byte & 0x80) != 0;
139 _len = byte & 0x7F; 285 _len = byte & 0x7F;
140 if (_isControlFrame() && _len > 125) { 286 if (_isControlFrame() && _len > 125) {
141 throw new WebSocketException("Protocol error"); 287 throw new CompatibleWebSocketException("Protocol error");
142 } 288 }
143 if (_len == 126) { 289 if (_len == 126) {
144 _len = 0; 290 _len = 0;
145 _remainingLenBytes = 2; 291 _remainingLenBytes = 2;
146 _state = LEN_REST; 292 _state = LEN_REST;
147 } else if (_len == 127) { 293 } else if (_len == 127) {
148 _len = 0; 294 _len = 0;
149 _remainingLenBytes = 8; 295 _remainingLenBytes = 8;
150 _state = LEN_REST; 296 _state = LEN_REST;
151 } else { 297 } else {
(...skipping 25 matching lines...) Expand all
177 } 323 }
178 // Control frame and data frame share _payloads. 324 // Control frame and data frame share _payloads.
179 _payload.add( 325 _payload.add(
180 new Uint8List.view(buffer.buffer, index, payloadLength)); 326 new Uint8List.view(buffer.buffer, index, payloadLength));
181 index += payloadLength; 327 index += payloadLength;
182 if (_isControlFrame()) { 328 if (_isControlFrame()) {
183 if (_remainingPayloadBytes == 0) _controlFrameEnd(); 329 if (_remainingPayloadBytes == 0) _controlFrameEnd();
184 } else { 330 } else {
185 if (_currentMessageType != _WebSocketMessageType.TEXT && 331 if (_currentMessageType != _WebSocketMessageType.TEXT &&
186 _currentMessageType != _WebSocketMessageType.BINARY) { 332 _currentMessageType != _WebSocketMessageType.BINARY) {
187 throw new WebSocketException("Protocol error"); 333 throw new CompatibleWebSocketException("Protocol error");
188 } 334 }
189 if (_remainingPayloadBytes == 0) _messageFrameEnd(); 335 if (_remainingPayloadBytes == 0) _messageFrameEnd();
190 } 336 }
191 337
192 // Hack - as we always do index++ below. 338 // Hack - as we always do index++ below.
193 index--; 339 index--;
194 } 340 }
195 } 341 }
196 342
197 // Move to the next byte. 343 // Move to the next byte.
(...skipping 34 matching lines...) Expand 10 before | Expand all | Expand 10 after
232 // Handle end. 378 // Handle end.
233 final int end = index + length; 379 final int end = index + length;
234 for (int i = index; i < end; i++) { 380 for (int i = index; i < end; i++) {
235 buffer[i] ^= _maskingBytes[_unmaskingIndex++ & 3]; 381 buffer[i] ^= _maskingBytes[_unmaskingIndex++ & 3];
236 } 382 }
237 } 383 }
238 384
239 void _lengthDone() { 385 void _lengthDone() {
240 if (_masked) { 386 if (_masked) {
241 if (!_serverSide) { 387 if (!_serverSide) {
242 throw new WebSocketException("Received masked frame from server"); 388 throw new CompatibleWebSocketException(
389 "Received masked frame from server");
243 } 390 }
244 _state = MASK; 391 _state = MASK;
245 } else { 392 } else {
246 if (_serverSide) { 393 if (_serverSide) {
247 throw new WebSocketException("Received unmasked frame from client"); 394 throw new CompatibleWebSocketException(
395 "Received unmasked frame from client");
248 } 396 }
249 _remainingPayloadBytes = _len; 397 _remainingPayloadBytes = _len;
250 _startPayload(); 398 _startPayload();
251 } 399 }
252 } 400 }
253 401
254 void _maskDone() { 402 void _maskDone() {
255 _remainingPayloadBytes = _len; 403 _remainingPayloadBytes = _len;
256 _startPayload(); 404 _startPayload();
257 } 405 }
(...skipping 35 matching lines...) Expand 10 before | Expand all | Expand 10 after
293 break; 441 break;
294 } 442 }
295 _currentMessageType = _WebSocketMessageType.NONE; 443 _currentMessageType = _WebSocketMessageType.NONE;
296 } 444 }
297 _prepareForNextFrame(); 445 _prepareForNextFrame();
298 } 446 }
299 447
300 void _controlFrameEnd() { 448 void _controlFrameEnd() {
301 switch (_opcode) { 449 switch (_opcode) {
302 case _WebSocketOpcode.CLOSE: 450 case _WebSocketOpcode.CLOSE:
303 closeCode = WebSocketStatus.NO_STATUS_RECEIVED; 451 closeCode = _WebSocketStatus.NO_STATUS_RECEIVED;
304 var payload = _payload.takeBytes(); 452 var payload = _payload.takeBytes();
305 if (payload.length > 0) { 453 if (payload.length > 0) {
306 if (payload.length == 1) { 454 if (payload.length == 1) {
307 throw new WebSocketException("Protocol error"); 455 throw new CompatibleWebSocketException("Protocol error");
308 } 456 }
309 closeCode = payload[0] << 8 | payload[1]; 457 closeCode = payload[0] << 8 | payload[1];
310 if (closeCode == WebSocketStatus.NO_STATUS_RECEIVED) { 458 if (closeCode == _WebSocketStatus.NO_STATUS_RECEIVED) {
311 throw new WebSocketException("Protocol error"); 459 throw new CompatibleWebSocketException("Protocol error");
312 } 460 }
313 if (payload.length > 2) { 461 if (payload.length > 2) {
314 closeReason = UTF8.decode(payload.sublist(2)); 462 closeReason = UTF8.decode(payload.sublist(2));
315 } 463 }
316 } 464 }
317 _state = CLOSED; 465 _state = CLOSED;
318 _eventSink.close(); 466 _eventSink.close();
319 break; 467 break;
320 468
321 case _WebSocketOpcode.PING: 469 case _WebSocketOpcode.PING:
(...skipping 30 matching lines...) Expand all
352 final List<int> payload; 500 final List<int> payload;
353 _WebSocketPing([this.payload = null]); 501 _WebSocketPing([this.payload = null]);
354 } 502 }
355 503
356 504
357 class _WebSocketPong { 505 class _WebSocketPong {
358 final List<int> payload; 506 final List<int> payload;
359 _WebSocketPong([this.payload = null]); 507 _WebSocketPong([this.payload = null]);
360 } 508 }
361 509
362
363 class _WebSocketTransformerImpl implements WebSocketTransformer {
364 final StreamController<WebSocket> _controller =
365 new StreamController<WebSocket>(sync: true);
366 final Function _protocolSelector;
367
368 _WebSocketTransformerImpl(this._protocolSelector);
369
370 Stream<WebSocket> bind(Stream<HttpRequest> stream) {
371 stream.listen((request) {
372 _upgrade(request, _protocolSelector)
373 .then((WebSocket webSocket) => _controller.add(webSocket))
374 .catchError(_controller.addError);
375 });
376
377 return _controller.stream;
378 }
379
380 static Future<WebSocket> _upgrade(HttpRequest request, _protocolSelector) {
381 var response = request.response;
382 if (!_isUpgradeRequest(request)) {
383 // Send error response.
384 response
385 ..statusCode = HttpStatus.BAD_REQUEST
386 ..close();
387 return new Future.error(
388 new WebSocketException("Invalid WebSocket upgrade request"));
389 }
390
391 Future upgrade(String protocol) {
392 // Send the upgrade response.
393 response
394 ..statusCode = HttpStatus.SWITCHING_PROTOCOLS
395 ..headers.add(HttpHeaders.CONNECTION, "Upgrade")
396 ..headers.add(HttpHeaders.UPGRADE, "websocket");
397 String key = request.headers.value("Sec-WebSocket-Key");
398 _SHA1 sha1 = new _SHA1();
399 sha1.add("$key$_webSocketGUID".codeUnits);
400 String accept = _CryptoUtils.bytesToBase64(sha1.close());
401 response.headers.add("Sec-WebSocket-Accept", accept);
402 if (protocol != null && protocol.isNotEmpty) {
403 response.headers.add("Sec-WebSocket-Protocol", protocol);
404 }
405 response.headers.contentLength = 0;
406 return response.detachSocket()
407 .then((socket) => new _WebSocketImpl._fromSocket(
408 socket, protocol, true));
409 }
410
411 var protocols = request.headers['Sec-WebSocket-Protocol'];
412 if (protocols != null && _protocolSelector != null) {
413 // The suggested protocols can be spread over multiple lines, each
414 // consisting of multiple protocols. To unify all of them, first join
415 // the lists with ', ' and then tokenize.
416 protocols = _HttpParser._tokenizeFieldValue(protocols.join(', '));
417 return new Future(() => _protocolSelector(protocols))
418 .then((protocol) {
419 if (protocols.indexOf(protocol) < 0) {
420 throw new WebSocketException(
421 "Selected protocol is not in the list of available protocols");
422 }
423 return protocol;
424 })
425 .catchError((error) {
426 response
427 ..statusCode = HttpStatus.INTERNAL_SERVER_ERROR
428 ..close();
429 throw error;
430 })
431 .then(upgrade);
432 } else {
433 return upgrade(null);
434 }
435 }
436
437 static bool _isUpgradeRequest(HttpRequest request) {
438 if (request.method != "GET") {
439 return false;
440 }
441 if (request.headers[HttpHeaders.CONNECTION] == null) {
442 return false;
443 }
444 bool isUpgrade = false;
445 request.headers[HttpHeaders.CONNECTION].forEach((String value) {
446 if (value.toLowerCase() == "upgrade") isUpgrade = true;
447 });
448 if (!isUpgrade) return false;
449 String upgrade = request.headers.value(HttpHeaders.UPGRADE);
450 if (upgrade == null || upgrade.toLowerCase() != "websocket") {
451 return false;
452 }
453 String version = request.headers.value("Sec-WebSocket-Version");
454 if (version == null || version != "13") {
455 return false;
456 }
457 String key = request.headers.value("Sec-WebSocket-Key");
458 if (key == null) {
459 return false;
460 }
461 return true;
462 }
463 }
464
465
466 // TODO(ajohnsen): Make this transformer reusable. 510 // TODO(ajohnsen): Make this transformer reusable.
467 class _WebSocketOutgoingTransformer implements StreamTransformer, EventSink { 511 class _WebSocketOutgoingTransformer implements StreamTransformer, EventSink {
468 final _WebSocketImpl webSocket; 512 final _WebSocketImpl webSocket;
469 EventSink _eventSink; 513 EventSink _eventSink;
470 514
471 _WebSocketOutgoingTransformer(this.webSocket); 515 _WebSocketOutgoingTransformer(this.webSocket);
472 516
473 Stream bind(Stream stream) { 517 Stream bind(Stream stream) {
474 return new Stream.eventTransformed( 518 return new Stream.eventTransformed(
475 stream, 519 stream,
(...skipping 79 matching lines...) Expand 10 before | Expand all | Expand 10 after
555 } else if (dataLength > 125) { 599 } else if (dataLength > 125) {
556 header[index++] = 126; 600 header[index++] = 126;
557 lengthBytes = 2; 601 lengthBytes = 2;
558 } 602 }
559 // Write the length in network byte order into the header. 603 // Write the length in network byte order into the header.
560 for (int i = 0; i < lengthBytes; i++) { 604 for (int i = 0; i < lengthBytes; i++) {
561 header[index++] = dataLength >> (((lengthBytes - 1) - i) * 8) & 0xFF; 605 header[index++] = dataLength >> (((lengthBytes - 1) - i) * 8) & 0xFF;
562 } 606 }
563 if (mask) { 607 if (mask) {
564 header[1] |= 1 << 7; 608 header[1] |= 1 << 7;
565 var maskBytes = _IOCrypto.getRandomBytes(4); 609 var maskBytes = [_random.nextInt(256), _random.nextInt(256),
610 _random.nextInt(256), _random.nextInt(256)];
566 header.setRange(index, index + 4, maskBytes); 611 header.setRange(index, index + 4, maskBytes);
567 index += 4; 612 index += 4;
568 if (data != null) { 613 if (data != null) {
569 Uint8List list; 614 Uint8List list;
570 // If this is a text message just do the masking inside the 615 // If this is a text message just do the masking inside the
571 // encoded data. 616 // encoded data.
572 if (opcode == _WebSocketOpcode.TEXT && data is Uint8List) { 617 if (opcode == _WebSocketOpcode.TEXT && data is Uint8List) {
573 list = data; 618 list = data;
574 } else { 619 } else {
575 if (data is Uint8List) { 620 if (data is Uint8List) {
(...skipping 37 matching lines...) Expand 10 before | Expand all | Expand 10 after
613 return [header]; 658 return [header];
614 } else { 659 } else {
615 return [header, data]; 660 return [header, data];
616 } 661 }
617 } 662 }
618 } 663 }
619 664
620 665
621 class _WebSocketConsumer implements StreamConsumer { 666 class _WebSocketConsumer implements StreamConsumer {
622 final _WebSocketImpl webSocket; 667 final _WebSocketImpl webSocket;
623 final Socket socket; 668 final StreamSink<List<int>> sink;
624 StreamController _controller; 669 StreamController _controller;
625 StreamSubscription _subscription; 670 StreamSubscription _subscription;
626 bool _issuedPause = false; 671 bool _issuedPause = false;
627 bool _closed = false; 672 bool _closed = false;
628 Completer _closeCompleter = new Completer(); 673 Completer _closeCompleter = new Completer();
629 Completer _completer; 674 Completer _completer;
630 675
631 _WebSocketConsumer(this.webSocket, this.socket); 676 _WebSocketConsumer(this.webSocket, this.sink);
632 677
633 void _onListen() { 678 void _onListen() {
634 if (_subscription != null) { 679 if (_subscription != null) {
635 _subscription.cancel(); 680 _subscription.cancel();
636 } 681 }
637 } 682 }
638 683
639 void _onPause() { 684 void _onPause() {
640 if (_subscription != null) { 685 if (_subscription != null) {
641 _subscription.pause(); 686 _subscription.pause();
(...skipping 19 matching lines...) Expand all
661 } 706 }
662 707
663 _ensureController() { 708 _ensureController() {
664 if (_controller != null) return; 709 if (_controller != null) return;
665 _controller = new StreamController(sync: true, 710 _controller = new StreamController(sync: true,
666 onPause: _onPause, 711 onPause: _onPause,
667 onResume: _onResume, 712 onResume: _onResume,
668 onCancel: _onListen); 713 onCancel: _onListen);
669 var stream = _controller.stream.transform( 714 var stream = _controller.stream.transform(
670 new _WebSocketOutgoingTransformer(webSocket)); 715 new _WebSocketOutgoingTransformer(webSocket));
671 socket.addStream(stream) 716 sink.addStream(stream)
672 .then((_) { 717 .then((_) {
673 _done(); 718 _done();
674 _closeCompleter.complete(webSocket); 719 _closeCompleter.complete(webSocket);
675 }, onError: (error, StackTrace stackTrace) { 720 }, onError: (error, StackTrace stackTrace) {
676 _closed = true; 721 _closed = true;
677 _cancel(); 722 _cancel();
678 if (error is ArgumentError) { 723 if (error is ArgumentError) {
679 if (!_done(error, stackTrace)) { 724 if (!_done(error, stackTrace)) {
680 _closeCompleter.completeError(error, stackTrace); 725 _closeCompleter.completeError(error, stackTrace);
681 } 726 }
(...skipping 32 matching lines...) Expand 10 before | Expand all | Expand 10 after
714 if (_issuedPause) { 759 if (_issuedPause) {
715 _subscription.pause(); 760 _subscription.pause();
716 _issuedPause = false; 761 _issuedPause = false;
717 } 762 }
718 return _completer.future; 763 return _completer.future;
719 } 764 }
720 765
721 Future close() { 766 Future close() {
722 _ensureController(); 767 _ensureController();
723 Future closeSocket() { 768 Future closeSocket() {
724 return socket.close().catchError((_) {}).then((_) => webSocket); 769 return sink.close().catchError((_) {}).then((_) => webSocket);
725 } 770 }
726 _controller.close(); 771 _controller.close();
727 return _closeCompleter.future.then((_) => closeSocket()); 772 return _closeCompleter.future.then((_) => closeSocket());
728 } 773 }
729 774
730 void add(data) { 775 void add(data) {
731 if (_closed) return; 776 if (_closed) return;
732 _ensureController(); 777 _ensureController();
733 _controller.add(data); 778 _controller.add(data);
734 } 779 }
735 780
736 void closeSocket() { 781 void closeSocket() {
737 _closed = true; 782 _closed = true;
738 _cancel(); 783 _cancel();
739 close(); 784 close();
740 } 785 }
741 } 786 }
742 787
743 788
744 class _WebSocketImpl extends Stream implements WebSocket { 789 class _WebSocketImpl extends Stream implements CompatibleWebSocket {
745 final String protocol;
746
747 StreamController _controller; 790 StreamController _controller;
748 StreamSubscription _subscription; 791 StreamSubscription _subscription;
749 StreamSink _sink; 792 StreamController _sink;
750 793
751 final Socket _socket;
752 final bool _serverSide; 794 final bool _serverSide;
753 int _readyState = WebSocket.CONNECTING; 795 int _readyState = _WebSocketState.CONNECTING;
754 bool _writeClosed = false; 796 bool _writeClosed = false;
755 int _closeCode; 797 int _closeCode;
756 String _closeReason; 798 String _closeReason;
757 Duration _pingInterval; 799 Duration _pingInterval;
758 Timer _pingTimer; 800 Timer _pingTimer;
759 _WebSocketConsumer _consumer; 801 _WebSocketConsumer _consumer;
760 802
761 int _outCloseCode; 803 int _outCloseCode;
762 String _outCloseReason; 804 String _outCloseReason;
763 Timer _closeTimer; 805 Timer _closeTimer;
764 806
765 static final HttpClient _httpClient = new HttpClient(); 807 _WebSocketImpl._fromSocket(Stream<List<int>> stream,
766 808 StreamSink<List<int>> sink, [this._serverSide = false]) {
767 static Future<WebSocket> connect(String url, List<String> protocols) { 809 _consumer = new _WebSocketConsumer(this, sink);
768 Uri uri = Uri.parse(url); 810 _sink = new StreamController();
769 if (uri.scheme != "ws" && uri.scheme != "wss") { 811 _sink.stream.pipe(_consumer);
770 throw new WebSocketException("Unsupported URL scheme '${uri.scheme}'"); 812 _readyState = _WebSocketState.OPEN;
771 }
772 if (uri.userInfo != "") {
773 throw new WebSocketException("Unsupported user info '${uri.userInfo}'");
774 }
775
776 Random random = new Random();
777 // Generate 16 random bytes.
778 Uint8List nonceData = new Uint8List(16);
779 for (int i = 0; i < 16; i++) {
780 nonceData[i] = random.nextInt(256);
781 }
782 String nonce = _CryptoUtils.bytesToBase64(nonceData);
783
784 uri = new Uri(scheme: uri.scheme == "wss" ? "https" : "http",
785 userInfo: uri.userInfo,
786 host: uri.host,
787 port: uri.port,
788 path: uri.path,
789 query: uri.query,
790 fragment: uri.fragment);
791 return _httpClient.openUrl("GET", uri)
792 .then((request) {
793 // Setup the initial handshake.
794 request.headers
795 ..add(HttpHeaders.CONNECTION, "Upgrade")
796 ..set(HttpHeaders.UPGRADE, "websocket")
797 ..set("Sec-WebSocket-Key", nonce)
798 ..set("Cache-Control", "no-cache")
799 ..set("Sec-WebSocket-Version", "13");
800 if (protocols.isNotEmpty) {
801 request.headers.add("Sec-WebSocket-Protocol", protocols);
802 }
803 return request.close();
804 })
805 .then((response) {
806 void error(String message) {
807 // Flush data.
808 response.detachSocket().then((socket) {
809 socket.destroy();
810 });
811 throw new WebSocketException(message);
812 }
813 if (response.statusCode != HttpStatus.SWITCHING_PROTOCOLS ||
814 response.headers[HttpHeaders.CONNECTION] == null ||
815 !response.headers[HttpHeaders.CONNECTION].any(
816 (value) => value.toLowerCase() == "upgrade") ||
817 response.headers.value(HttpHeaders.UPGRADE).toLowerCase() !=
818 "websocket") {
819 error("Connection to '$uri' was not upgraded to websocket");
820 }
821 String accept = response.headers.value("Sec-WebSocket-Accept");
822 if (accept == null) {
823 error("Response did not contain a 'Sec-WebSocket-Accept' header");
824 }
825 _SHA1 sha1 = new _SHA1();
826 sha1.add("$nonce$_webSocketGUID".codeUnits);
827 List<int> expectedAccept = sha1.close();
828 List<int> receivedAccept = _CryptoUtils.base64StringToBytes(accept);
829 if (expectedAccept.length != receivedAccept.length) {
830 error("Reasponse header 'Sec-WebSocket-Accept' is the wrong length");
831 }
832 for (int i = 0; i < expectedAccept.length; i++) {
833 if (expectedAccept[i] != receivedAccept[i]) {
834 error("Bad response 'Sec-WebSocket-Accept' header");
835 }
836 }
837 var protocol = response.headers.value('Sec-WebSocket-Protocol');
838 return response.detachSocket()
839 .then((socket) => new _WebSocketImpl._fromSocket(socket, protocol));
840 });
841 }
842
843 _WebSocketImpl._fromSocket(this._socket, this.protocol,
844 [this._serverSide = false]) {
845 _consumer = new _WebSocketConsumer(this, _socket);
846 _sink = new _StreamSinkImpl(_consumer);
847 _readyState = WebSocket.OPEN;
848 813
849 var transformer = new _WebSocketProtocolTransformer(_serverSide); 814 var transformer = new _WebSocketProtocolTransformer(_serverSide);
850 _subscription = _socket.transform(transformer).listen( 815 _subscription = stream.transform(transformer).listen(
851 (data) { 816 (data) {
852 if (data is _WebSocketPing) { 817 if (data is _WebSocketPing) {
853 if (!_writeClosed) _consumer.add(new _WebSocketPong(data.payload)); 818 if (!_writeClosed) _consumer.add(new _WebSocketPong(data.payload));
854 } else if (data is _WebSocketPong) { 819 } else if (data is _WebSocketPong) {
855 // Simply set pingInterval, as it'll cancel any timers. 820 // Simply set pingInterval, as it'll cancel any timers.
856 pingInterval = _pingInterval; 821 pingInterval = _pingInterval;
857 } else { 822 } else {
858 _controller.add(data); 823 _controller.add(data);
859 } 824 }
860 }, 825 },
861 onError: (error) { 826 onError: (error) {
862 if (_closeTimer != null) _closeTimer.cancel(); 827 if (_closeTimer != null) _closeTimer.cancel();
863 if (error is FormatException) { 828 if (error is FormatException) {
864 _close(WebSocketStatus.INVALID_FRAME_PAYLOAD_DATA); 829 _close(_WebSocketStatus.INVALID_FRAME_PAYLOAD_DATA);
865 } else { 830 } else {
866 _close(WebSocketStatus.PROTOCOL_ERROR); 831 _close(_WebSocketStatus.PROTOCOL_ERROR);
867 } 832 }
868 _controller.close(); 833 _controller.close();
869 }, 834 },
870 onDone: () { 835 onDone: () {
871 if (_closeTimer != null) _closeTimer.cancel(); 836 if (_closeTimer != null) _closeTimer.cancel();
872 if (_readyState == WebSocket.OPEN) { 837 if (_readyState == _WebSocketState.OPEN) {
873 _readyState = WebSocket.CLOSING; 838 _readyState = _WebSocketState.CLOSING;
874 if (!_isReservedStatusCode(transformer.closeCode)) { 839 if (!_isReservedStatusCode(transformer.closeCode)) {
875 _close(transformer.closeCode); 840 _close(transformer.closeCode);
876 } else { 841 } else {
877 _close(); 842 _close();
878 } 843 }
879 _readyState = WebSocket.CLOSED; 844 _readyState = _WebSocketState.CLOSED;
880 } 845 }
881 _closeCode = transformer.closeCode; 846 _closeCode = transformer.closeCode;
882 _closeReason = transformer.closeReason; 847 _closeReason = transformer.closeReason;
883 _controller.close(); 848 _controller.close();
884 }, 849 },
885 cancelOnError: true); 850 cancelOnError: true);
886 _subscription.pause(); 851 _subscription.pause();
887 _controller = new StreamController(sync: true, 852 _controller = new StreamController(sync: true,
888 onListen: _subscription.resume, 853 onListen: _subscription.resume,
889 onPause: _subscription.pause, 854 onPause: _subscription.pause,
(...skipping 17 matching lines...) Expand all
907 if (_pingTimer != null) _pingTimer.cancel(); 872 if (_pingTimer != null) _pingTimer.cancel();
908 _pingInterval = interval; 873 _pingInterval = interval;
909 874
910 if (_pingInterval == null) return; 875 if (_pingInterval == null) return;
911 876
912 _pingTimer = new Timer(_pingInterval, () { 877 _pingTimer = new Timer(_pingInterval, () {
913 if (_writeClosed) return; 878 if (_writeClosed) return;
914 _consumer.add(new _WebSocketPing()); 879 _consumer.add(new _WebSocketPing());
915 _pingTimer = new Timer(_pingInterval, () { 880 _pingTimer = new Timer(_pingInterval, () {
916 // No pong received. 881 // No pong received.
917 _close(WebSocketStatus.GOING_AWAY); 882 _close(_WebSocketStatus.GOING_AWAY);
918 }); 883 });
919 }); 884 });
920 } 885 }
921 886
922 int get readyState => _readyState;
923
924 String get extensions => null;
925 int get closeCode => _closeCode; 887 int get closeCode => _closeCode;
926 String get closeReason => _closeReason; 888 String get closeReason => _closeReason;
927 889
928 void add(data) => _sink.add(data); 890 void add(data) => _sink.add(data);
929 void addError(error, [StackTrace stackTrace]) => 891 void addError(error, [StackTrace stackTrace]) =>
930 _sink.addError(error, stackTrace); 892 _sink.addError(error, stackTrace);
931 Future addStream(Stream stream) => _sink.addStream(stream); 893 Future addStream(Stream stream) => _sink.addStream(stream);
932 Future get done => _sink.done; 894 Future get done => _sink.done;
933 895
934 Future close([int code, String reason]) { 896 Future close([int code, String reason]) {
935 if (_isReservedStatusCode(code)) { 897 if (_isReservedStatusCode(code)) {
936 throw new WebSocketException("Reserved status code $code"); 898 throw new CompatibleWebSocketException("Reserved status code $code");
937 } 899 }
938 if (_outCloseCode == null) { 900 if (_outCloseCode == null) {
939 _outCloseCode = code; 901 _outCloseCode = code;
940 _outCloseReason = reason; 902 _outCloseReason = reason;
941 } 903 }
942 if (_closeTimer == null && !_controller.isClosed) { 904 if (_closeTimer == null && !_controller.isClosed) {
943 // When closing the web-socket, we no longer accept data. 905 // When closing the web-socket, we no longer accept data.
944 _closeTimer = new Timer(const Duration(seconds: 5), () { 906 _closeTimer = new Timer(const Duration(seconds: 5), () {
945 _subscription.cancel(); 907 _subscription.cancel();
946 _controller.close(); 908 _controller.close();
947 }); 909 });
948 } 910 }
949 return _sink.close(); 911 return _sink.close();
950 } 912 }
951 913
952 void _close([int code, String reason]) { 914 void _close([int code, String reason]) {
953 if (_writeClosed) return; 915 if (_writeClosed) return;
954 if (_outCloseCode == null) { 916 if (_outCloseCode == null) {
955 _outCloseCode = code; 917 _outCloseCode = code;
956 _outCloseReason = reason; 918 _outCloseReason = reason;
957 } 919 }
958 _writeClosed = true; 920 _writeClosed = true;
959 _consumer.closeSocket(); 921 _consumer.closeSocket();
960 } 922 }
961 923
962 static bool _isReservedStatusCode(int code) { 924 static bool _isReservedStatusCode(int code) {
963 return code != null && 925 return code != null &&
964 (code < WebSocketStatus.NORMAL_CLOSURE || 926 (code < _WebSocketStatus.NORMAL_CLOSURE ||
965 code == WebSocketStatus.RESERVED_1004 || 927 code == _WebSocketStatus.RESERVED_1004 ||
966 code == WebSocketStatus.NO_STATUS_RECEIVED || 928 code == _WebSocketStatus.NO_STATUS_RECEIVED ||
967 code == WebSocketStatus.ABNORMAL_CLOSURE || 929 code == _WebSocketStatus.ABNORMAL_CLOSURE ||
968 (code > WebSocketStatus.INTERNAL_SERVER_ERROR && 930 (code > _WebSocketStatus.INTERNAL_SERVER_ERROR &&
969 code < WebSocketStatus.RESERVED_1015) || 931 code < _WebSocketStatus.RESERVED_1015) ||
970 (code >= WebSocketStatus.RESERVED_1015 && 932 (code >= _WebSocketStatus.RESERVED_1015 &&
971 code < 3000)); 933 code < 3000));
972 } 934 }
973 } 935 }
936
OLDNEW
« no previous file with comments | « pkg/http_parser/lib/src/bytes_builder.dart ('k') | pkg/http_parser/pubspec.yaml » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698