Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1222)

Side by Side Diff: sdk/lib/io/websocket_impl.dart

Issue 1426043002: Revert "Web Socket compression - take two" (Closed) Base URL: https://github.com/dart-lang/sdk.git@master
Patch Set: Created 5 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « sdk/lib/io/websocket.dart ('k') | tests/standalone/io/web_socket_compression_test.dart » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 part of dart.io; 5 part of dart.io;
6 6
7 const String _webSocketGUID = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"; 7 const String _webSocketGUID = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
8 const String _clientNoContextTakeover = "client_no_context_takeover";
9 const String _serverNoContextTakeover = "server_no_context_takeover";
10 const String _clientMaxWindowBits = "client_max_window_bits";
11 const String _serverMaxWindowBits = "server_max_window_bits";
12 8
13 // Matches _WebSocketOpcode. 9 // Matches _WebSocketOpcode.
14 class _WebSocketMessageType { 10 class _WebSocketMessageType {
15 static const int NONE = 0; 11 static const int NONE = 0;
16 static const int TEXT = 1; 12 static const int TEXT = 1;
17 static const int BINARY = 2; 13 static const int BINARY = 2;
18 } 14 }
19 15
16
20 class _WebSocketOpcode { 17 class _WebSocketOpcode {
21 static const int CONTINUATION = 0; 18 static const int CONTINUATION = 0;
22 static const int TEXT = 1; 19 static const int TEXT = 1;
23 static const int BINARY = 2; 20 static const int BINARY = 2;
24 static const int RESERVED_3 = 3; 21 static const int RESERVED_3 = 3;
25 static const int RESERVED_4 = 4; 22 static const int RESERVED_4 = 4;
26 static const int RESERVED_5 = 5; 23 static const int RESERVED_5 = 5;
27 static const int RESERVED_6 = 6; 24 static const int RESERVED_6 = 6;
28 static const int RESERVED_7 = 7; 25 static const int RESERVED_7 = 7;
29 static const int CLOSE = 8; 26 static const int CLOSE = 8;
30 static const int PING = 9; 27 static const int PING = 9;
31 static const int PONG = 10; 28 static const int PONG = 10;
32 static const int RESERVED_B = 11; 29 static const int RESERVED_B = 11;
33 static const int RESERVED_C = 12; 30 static const int RESERVED_C = 12;
34 static const int RESERVED_D = 13; 31 static const int RESERVED_D = 13;
35 static const int RESERVED_E = 14; 32 static const int RESERVED_E = 14;
36 static const int RESERVED_F = 15; 33 static const int RESERVED_F = 15;
37 } 34 }
38 35
39 /** 36 /**
40 * The web socket protocol transformer handles the protocol byte stream 37 * The web socket protocol transformer handles the protocol byte stream
41 * which is supplied through the [:handleData:]. As the protocol is processed, 38 * which is supplied through the [:handleData:]. As the protocol is processed,
42 * it'll output frame data as either a List<int> or String. 39 * it'll output frame data as either a List<int> or String.
43 * 40 *
44 * Important information about usage: Be sure you use cancelOnError, so the 41 * Important infomation about usage: Be sure you use cancelOnError, so the
45 * socket will be closed when the processor encounter an error. Not using it 42 * socket will be closed when the processer encounter an error. Not using it
46 * will lead to undefined behaviour. 43 * will lead to undefined behaviour.
47 */ 44 */
48 // TODO(ajohnsen): make this transformer reusable? 45 // TODO(ajohnsen): make this transformer reusable?
49 class _WebSocketProtocolTransformer implements StreamTransformer, EventSink { 46 class _WebSocketProtocolTransformer implements StreamTransformer, EventSink {
50 static const int START = 0; 47 static const int START = 0;
51 static const int LEN_FIRST = 1; 48 static const int LEN_FIRST = 1;
52 static const int LEN_REST = 2; 49 static const int LEN_REST = 2;
53 static const int MASK = 3; 50 static const int MASK = 3;
54 static const int PAYLOAD = 4; 51 static const int PAYLOAD = 4;
55 static const int CLOSED = 5; 52 static const int CLOSED = 5;
56 static const int FAILURE = 6; 53 static const int FAILURE = 6;
57 static const int FIN = 0x80;
58 static const int RSV1 = 0x40;
59 static const int RSV2 = 0x20;
60 static const int RSV3 = 0x10;
61 static const int OPCODE = 0xF;
62 54
63 int _state = START; 55 int _state = START;
64 bool _fin = false; 56 bool _fin = false;
65 bool _compressed = false;
66 int _opcode = -1; 57 int _opcode = -1;
67 int _len = -1; 58 int _len = -1;
68 bool _masked = false; 59 bool _masked = false;
69 int _remainingLenBytes = -1; 60 int _remainingLenBytes = -1;
70 int _remainingMaskingKeyBytes = 4; 61 int _remainingMaskingKeyBytes = 4;
71 int _remainingPayloadBytes = -1; 62 int _remainingPayloadBytes = -1;
72 int _unmaskingIndex = 0; 63 int _unmaskingIndex = 0;
73 int _currentMessageType = _WebSocketMessageType.NONE; 64 int _currentMessageType = _WebSocketMessageType.NONE;
74 int closeCode = WebSocketStatus.NO_STATUS_RECEIVED; 65 int closeCode = WebSocketStatus.NO_STATUS_RECEIVED;
75 String closeReason = ""; 66 String closeReason = "";
76 67
77 EventSink _eventSink; 68 EventSink _eventSink;
78 69
79 final bool _serverSide; 70 final bool _serverSide;
80 final List _maskingBytes = new List(4); 71 final List _maskingBytes = new List(4);
81 final BytesBuilder _payload = new BytesBuilder(copy: false); 72 final BytesBuilder _payload = new BytesBuilder(copy: false);
82 73
83 _WebSocketPerMessageDeflate _deflate; 74 _WebSocketProtocolTransformer([this._serverSide = false]);
84 _WebSocketProtocolTransformer([this._serverSide = false, this._deflate]);
85 75
86 Stream bind(Stream stream) { 76 Stream bind(Stream stream) {
87 return new Stream.eventTransformed(stream, (EventSink eventSink) { 77 return new Stream.eventTransformed(
88 if (_eventSink != null) { 78 stream,
89 throw new StateError("WebSocket transformer already used."); 79 (EventSink eventSink) {
90 } 80 if (_eventSink != null) {
91 _eventSink = eventSink; 81 throw new StateError("WebSocket transformer already used.");
92 return this; 82 }
93 }); 83 _eventSink = eventSink;
84 return this;
85 });
94 } 86 }
95 87
96 void addError(Object error, [StackTrace stackTrace]) => 88 void addError(Object error, [StackTrace stackTrace]) =>
97 _eventSink.addError(error, stackTrace); 89 _eventSink.addError(error, stackTrace);
98 90
99 void close() => _eventSink.close(); 91 void close() => _eventSink.close();
100 92
101 /** 93 /**
102 * Process data received from the underlying communication channel. 94 * Process data received from the underlying communication channel.
103 */ 95 */
104 void add(Uint8List buffer) { 96 void add(Uint8List buffer) {
97 int count = buffer.length;
105 int index = 0; 98 int index = 0;
106 int lastIndex = buffer.length; 99 int lastIndex = count;
107 if (_state == CLOSED) { 100 if (_state == CLOSED) {
108 throw new WebSocketException("Data on closed connection"); 101 throw new WebSocketException("Data on closed connection");
109 } 102 }
110 if (_state == FAILURE) { 103 if (_state == FAILURE) {
111 throw new WebSocketException("Data on failed connection"); 104 throw new WebSocketException("Data on failed connection");
112 } 105 }
113 while ((index < lastIndex) && _state != CLOSED && _state != FAILURE) { 106 while ((index < lastIndex) && _state != CLOSED && _state != FAILURE) {
114 int byte = buffer[index]; 107 int byte = buffer[index];
115 if (_state <= LEN_REST) { 108 if (_state <= LEN_REST) {
116 if (_state == START) { 109 if (_state == START) {
117 _fin = (byte & FIN) != 0; 110 _fin = (byte & 0x80) != 0;
118 111 if ((byte & 0x70) != 0) {
119 if((byte & (RSV2 | RSV3)) != 0) { 112 // The RSV1, RSV2 bits RSV3 must be all zero.
120 // The RSV2, RSV3 bits must both be zero.
121 throw new WebSocketException("Protocol error"); 113 throw new WebSocketException("Protocol error");
122 } 114 }
123 115 _opcode = (byte & 0xF);
124 if ((byte & RSV1) != 0) {
125 _compressed = true;
126 } else {
127 _compressed = false;
128 }
129 _opcode = (byte & OPCODE);
130
131 if (_opcode <= _WebSocketOpcode.BINARY) { 116 if (_opcode <= _WebSocketOpcode.BINARY) {
132 if (_opcode == _WebSocketOpcode.CONTINUATION) { 117 if (_opcode == _WebSocketOpcode.CONTINUATION) {
133 if (_currentMessageType == _WebSocketMessageType.NONE) { 118 if (_currentMessageType == _WebSocketMessageType.NONE) {
134 throw new WebSocketException("Protocol error"); 119 throw new WebSocketException("Protocol error");
135 } 120 }
136 } else { 121 } else {
137 assert(_opcode == _WebSocketOpcode.TEXT || 122 assert(_opcode == _WebSocketOpcode.TEXT ||
138 _opcode == _WebSocketOpcode.BINARY); 123 _opcode == _WebSocketOpcode.BINARY);
139 if (_currentMessageType != _WebSocketMessageType.NONE) { 124 if (_currentMessageType != _WebSocketMessageType.NONE) {
140 throw new WebSocketException("Protocol error"); 125 throw new WebSocketException("Protocol error");
141 } 126 }
142 _currentMessageType = _opcode; 127 _currentMessageType = _opcode;
143 } 128 }
144 } else if (_opcode >= _WebSocketOpcode.CLOSE && 129 } else if (_opcode >= _WebSocketOpcode.CLOSE &&
145 _opcode <= _WebSocketOpcode.PONG) { 130 _opcode <= _WebSocketOpcode.PONG) {
146 // Control frames cannot be fragmented. 131 // Control frames cannot be fragmented.
147 if (!_fin) throw new WebSocketException("Protocol error"); 132 if (!_fin) throw new WebSocketException("Protocol error");
148 } else { 133 } else {
149 throw new WebSocketException("Protocol error"); 134 throw new WebSocketException("Protocol error");
150 } 135 }
151 _state = LEN_FIRST; 136 _state = LEN_FIRST;
152 } else if (_state == LEN_FIRST) { 137 } else if (_state == LEN_FIRST) {
153 _masked = (byte & 0x80) != 0; 138 _masked = (byte & 0x80) != 0;
154 _len = byte & 0x7F; 139 _len = byte & 0x7F;
155 if (_isControlFrame() && _len > 125) { 140 if (_isControlFrame() && _len > 125) {
(...skipping 28 matching lines...) Expand all
184 } else { 169 } else {
185 assert(_state == PAYLOAD); 170 assert(_state == PAYLOAD);
186 // The payload is not handled one byte at a time but in blocks. 171 // The payload is not handled one byte at a time but in blocks.
187 int payloadLength = min(lastIndex - index, _remainingPayloadBytes); 172 int payloadLength = min(lastIndex - index, _remainingPayloadBytes);
188 _remainingPayloadBytes -= payloadLength; 173 _remainingPayloadBytes -= payloadLength;
189 // Unmask payload if masked. 174 // Unmask payload if masked.
190 if (_masked) { 175 if (_masked) {
191 _unmask(index, payloadLength, buffer); 176 _unmask(index, payloadLength, buffer);
192 } 177 }
193 // Control frame and data frame share _payloads. 178 // Control frame and data frame share _payloads.
194 _payload.add(new Uint8List.view(buffer.buffer, index, payloadLength)); 179 _payload.add(
180 new Uint8List.view(buffer.buffer, index, payloadLength));
195 index += payloadLength; 181 index += payloadLength;
196 if (_isControlFrame()) { 182 if (_isControlFrame()) {
197 if (_remainingPayloadBytes == 0) _controlFrameEnd(); 183 if (_remainingPayloadBytes == 0) _controlFrameEnd();
198 } else { 184 } else {
199 if (_currentMessageType != _WebSocketMessageType.TEXT && 185 if (_currentMessageType != _WebSocketMessageType.TEXT &&
200 _currentMessageType != _WebSocketMessageType.BINARY) { 186 _currentMessageType != _WebSocketMessageType.BINARY) {
201 throw new WebSocketException("Protocol error"); 187 throw new WebSocketException("Protocol error");
202 } 188 }
203 if (_remainingPayloadBytes == 0) _messageFrameEnd(); 189 if (_remainingPayloadBytes == 0) _messageFrameEnd();
204 } 190 }
205 191
206 // Hack - as we always do index++ below. 192 // Hack - as we always do index++ below.
207 index--; 193 index--;
208 } 194 }
209 } 195 }
210 196
211 // Move to the next byte. 197 // Move to the next byte.
(...skipping 14 matching lines...) Expand all
226 index += startOffset; 212 index += startOffset;
227 length -= startOffset; 213 length -= startOffset;
228 final int blockCount = length ~/ BLOCK_SIZE; 214 final int blockCount = length ~/ BLOCK_SIZE;
229 if (blockCount > 0) { 215 if (blockCount > 0) {
230 // Create mask block. 216 // Create mask block.
231 int mask = 0; 217 int mask = 0;
232 for (int i = 3; i >= 0; i--) { 218 for (int i = 3; i >= 0; i--) {
233 mask = (mask << 8) | _maskingBytes[(_unmaskingIndex + i) & 3]; 219 mask = (mask << 8) | _maskingBytes[(_unmaskingIndex + i) & 3];
234 } 220 }
235 Int32x4 blockMask = new Int32x4(mask, mask, mask, mask); 221 Int32x4 blockMask = new Int32x4(mask, mask, mask, mask);
236 Int32x4List blockBuffer = 222 Int32x4List blockBuffer = new Int32x4List.view(
237 new Int32x4List.view(buffer.buffer, index, blockCount); 223 buffer.buffer, index, blockCount);
238 for (int i = 0; i < blockBuffer.length; i++) { 224 for (int i = 0; i < blockBuffer.length; i++) {
239 blockBuffer[i] ^= blockMask; 225 blockBuffer[i] ^= blockMask;
240 } 226 }
241 final int bytes = blockCount * BLOCK_SIZE; 227 final int bytes = blockCount * BLOCK_SIZE;
242 index += bytes; 228 index += bytes;
243 length -= bytes; 229 length -= bytes;
244 } 230 }
245 } 231 }
246 // Handle end. 232 // Handle end.
247 final int end = index + length; 233 final int end = index + length;
(...skipping 43 matching lines...) Expand 10 before | Expand all | Expand 10 after
291 } else { 277 } else {
292 _messageFrameEnd(); 278 _messageFrameEnd();
293 } 279 }
294 } else { 280 } else {
295 _state = PAYLOAD; 281 _state = PAYLOAD;
296 } 282 }
297 } 283 }
298 284
299 void _messageFrameEnd() { 285 void _messageFrameEnd() {
300 if (_fin) { 286 if (_fin) {
301 var bytes = _payload.takeBytes();
302 if (_deflate != null && _compressed) {
303 bytes = _deflate.processIncomingMessage(bytes);
304 }
305
306 switch (_currentMessageType) { 287 switch (_currentMessageType) {
307 case _WebSocketMessageType.TEXT: 288 case _WebSocketMessageType.TEXT:
308 _eventSink.add(UTF8.decode(bytes)); 289 _eventSink.add(UTF8.decode(_payload.takeBytes()));
309 break; 290 break;
310 case _WebSocketMessageType.BINARY: 291 case _WebSocketMessageType.BINARY:
311 _eventSink.add(bytes); 292 _eventSink.add(_payload.takeBytes());
312 break; 293 break;
313 } 294 }
314 _currentMessageType = _WebSocketMessageType.NONE; 295 _currentMessageType = _WebSocketMessageType.NONE;
315 } 296 }
316 _prepareForNextFrame(); 297 _prepareForNextFrame();
317 } 298 }
318 299
319 void _controlFrameEnd() { 300 void _controlFrameEnd() {
320 switch (_opcode) { 301 switch (_opcode) {
321 case _WebSocketOpcode.CLOSE: 302 case _WebSocketOpcode.CLOSE:
(...skipping 21 matching lines...) Expand all
343 324
344 case _WebSocketOpcode.PONG: 325 case _WebSocketOpcode.PONG:
345 _eventSink.add(new _WebSocketPong(_payload.takeBytes())); 326 _eventSink.add(new _WebSocketPong(_payload.takeBytes()));
346 break; 327 break;
347 } 328 }
348 _prepareForNextFrame(); 329 _prepareForNextFrame();
349 } 330 }
350 331
351 bool _isControlFrame() { 332 bool _isControlFrame() {
352 return _opcode == _WebSocketOpcode.CLOSE || 333 return _opcode == _WebSocketOpcode.CLOSE ||
353 _opcode == _WebSocketOpcode.PING || 334 _opcode == _WebSocketOpcode.PING ||
354 _opcode == _WebSocketOpcode.PONG; 335 _opcode == _WebSocketOpcode.PONG;
355 } 336 }
356 337
357 void _prepareForNextFrame() { 338 void _prepareForNextFrame() {
358 if (_state != CLOSED && _state != FAILURE) _state = START; 339 if (_state != CLOSED && _state != FAILURE) _state = START;
359 _fin = false; 340 _fin = false;
360 _opcode = -1; 341 _opcode = -1;
361 _len = -1; 342 _len = -1;
362 _remainingLenBytes = -1; 343 _remainingLenBytes = -1;
363 _remainingMaskingKeyBytes = 4; 344 _remainingMaskingKeyBytes = 4;
364 _remainingPayloadBytes = -1; 345 _remainingPayloadBytes = -1;
365 _unmaskingIndex = 0; 346 _unmaskingIndex = 0;
366 } 347 }
367 } 348 }
368 349
350
369 class _WebSocketPing { 351 class _WebSocketPing {
370 final List<int> payload; 352 final List<int> payload;
371 _WebSocketPing([this.payload = null]); 353 _WebSocketPing([this.payload = null]);
372 } 354 }
373 355
356
374 class _WebSocketPong { 357 class _WebSocketPong {
375 final List<int> payload; 358 final List<int> payload;
376 _WebSocketPong([this.payload = null]); 359 _WebSocketPong([this.payload = null]);
377 } 360 }
378 361
362
379 class _WebSocketTransformerImpl implements WebSocketTransformer { 363 class _WebSocketTransformerImpl implements WebSocketTransformer {
380 final StreamController<WebSocket> _controller = 364 final StreamController<WebSocket> _controller =
381 new StreamController<WebSocket>(sync: true); 365 new StreamController<WebSocket>(sync: true);
382 final Function _protocolSelector; 366 final Function _protocolSelector;
383 final CompressionOptions _compression;
384 367
385 _WebSocketTransformerImpl(this._protocolSelector, this._compression); 368 _WebSocketTransformerImpl(this._protocolSelector);
386 369
387 Stream<WebSocket> bind(Stream<HttpRequest> stream) { 370 Stream<WebSocket> bind(Stream<HttpRequest> stream) {
388 stream.listen((request) { 371 stream.listen((request) {
389 _upgrade(request, _protocolSelector, _compression) 372 _upgrade(request, _protocolSelector)
390 .then((WebSocket webSocket) => _controller.add(webSocket)) 373 .then((WebSocket webSocket) => _controller.add(webSocket))
391 .catchError(_controller.addError); 374 .catchError(_controller.addError);
392 }, onDone: () { 375 }, onDone: () {
393 _controller.close(); 376 _controller.close();
394 }); 377 });
395 378
396 return _controller.stream; 379 return _controller.stream;
397 } 380 }
398 381
399 static Future<WebSocket> _upgrade( 382 static Future<WebSocket> _upgrade(HttpRequest request, _protocolSelector) {
400 HttpRequest request, _protocolSelector, CompressionOptions compression) {
401 var response = request.response; 383 var response = request.response;
402 if (!_isUpgradeRequest(request)) { 384 if (!_isUpgradeRequest(request)) {
403 // Send error response. 385 // Send error response.
404 response 386 response
405 ..statusCode = HttpStatus.BAD_REQUEST 387 ..statusCode = HttpStatus.BAD_REQUEST
406 ..close(); 388 ..close();
407 return new Future.error( 389 return new Future.error(
408 new WebSocketException("Invalid WebSocket upgrade request")); 390 new WebSocketException("Invalid WebSocket upgrade request"));
409 } 391 }
410 392
411 Future upgrade(String protocol) { 393 Future upgrade(String protocol) {
412 // Send the upgrade response. 394 // Send the upgrade response.
413 response 395 response
414 ..statusCode = HttpStatus.SWITCHING_PROTOCOLS 396 ..statusCode = HttpStatus.SWITCHING_PROTOCOLS
415 ..headers.add(HttpHeaders.CONNECTION, "Upgrade") 397 ..headers.add(HttpHeaders.CONNECTION, "Upgrade")
416 ..headers.add(HttpHeaders.UPGRADE, "websocket"); 398 ..headers.add(HttpHeaders.UPGRADE, "websocket");
417 String key = request.headers.value("Sec-WebSocket-Key"); 399 String key = request.headers.value("Sec-WebSocket-Key");
418 _SHA1 sha1 = new _SHA1(); 400 _SHA1 sha1 = new _SHA1();
419 sha1.add("$key$_webSocketGUID".codeUnits); 401 sha1.add("$key$_webSocketGUID".codeUnits);
420 String accept = _CryptoUtils.bytesToBase64(sha1.close()); 402 String accept = _CryptoUtils.bytesToBase64(sha1.close());
421 response.headers.add("Sec-WebSocket-Accept", accept); 403 response.headers.add("Sec-WebSocket-Accept", accept);
422 if (protocol != null) { 404 if (protocol != null) {
423 response.headers.add("Sec-WebSocket-Protocol", protocol); 405 response.headers.add("Sec-WebSocket-Protocol", protocol);
424 } 406 }
425
426 var deflate = _negotiateCompression(request, response, compression);
427
428 response.headers.contentLength = 0; 407 response.headers.contentLength = 0;
429 return response.detachSocket().then((socket) => 408 return response.detachSocket()
430 new _WebSocketImpl._fromSocket( 409 .then((socket) => new _WebSocketImpl._fromSocket(
431 socket, protocol, compression, true, deflate)); 410 socket, protocol, true));
432 } 411 }
433 412
434 var protocols = request.headers['Sec-WebSocket-Protocol']; 413 var protocols = request.headers['Sec-WebSocket-Protocol'];
435 if (protocols != null && _protocolSelector != null) { 414 if (protocols != null && _protocolSelector != null) {
436 // The suggested protocols can be spread over multiple lines, each 415 // The suggested protocols can be spread over multiple lines, each
437 // consisting of multiple protocols. To unify all of them, first join 416 // consisting of multiple protocols. To unify all of them, first join
438 // the lists with ', ' and then tokenize. 417 // the lists with ', ' and then tokenize.
439 protocols = _HttpParser._tokenizeFieldValue(protocols.join(', ')); 418 protocols = _HttpParser._tokenizeFieldValue(protocols.join(', '));
440 return new Future(() => _protocolSelector(protocols)).then((protocol) { 419 return new Future(() => _protocolSelector(protocols))
441 if (protocols.indexOf(protocol) < 0) { 420 .then((protocol) {
442 throw new WebSocketException( 421 if (protocols.indexOf(protocol) < 0) {
443 "Selected protocol is not in the list of available protocols"); 422 throw new WebSocketException(
444 } 423 "Selected protocol is not in the list of available protocols");
445 return protocol; 424 }
446 }).catchError((error) { 425 return protocol;
447 response 426 })
448 ..statusCode = HttpStatus.INTERNAL_SERVER_ERROR 427 .catchError((error) {
449 ..close(); 428 response
450 throw error; 429 ..statusCode = HttpStatus.INTERNAL_SERVER_ERROR
451 }).then(upgrade); 430 ..close();
431 throw error;
432 })
433 .then(upgrade);
452 } else { 434 } else {
453 return upgrade(null); 435 return upgrade(null);
454 } 436 }
455 } 437 }
456 438
457 static _WebSocketPerMessageDeflate _negotiateCompression(HttpRequest request,
458 HttpResponse response, CompressionOptions compression) {
459 var extensionHeader = request.headers.value("Sec-WebSocket-Extensions");
460
461 if (extensionHeader == null) {
462 extensionHeader = "";
463 }
464
465 var hv = HeaderValue.parse(extensionHeader, valueSeparator: ',');
466 if (compression.enabled && hv.value == _WebSocketImpl.PER_MESSAGE_DEFLATE) {
467 var info = compression._createHeader(hv);
468
469 response.headers.add("Sec-WebSocket-Extensions", info[0]);
470 var serverNoContextTakeover =
471 hv.parameters.containsKey(_serverNoContextTakeover);
472 var clientNoContextTakeover =
473 hv.parameters.containsKey(_clientNoContextTakeover);
474 var deflate = new _WebSocketPerMessageDeflate(
475 serverNoContextTakeover: serverNoContextTakeover,
476 clientNoContextTakeover: clientNoContextTakeover,
477 serverMaxWindowBits: info[1],
478 clientMaxWindowBits: info[1],
479 serverSide: true);
480
481 return deflate;
482 }
483
484 return null;
485 }
486
487 static bool _isUpgradeRequest(HttpRequest request) { 439 static bool _isUpgradeRequest(HttpRequest request) {
488 if (request.method != "GET") { 440 if (request.method != "GET") {
489 return false; 441 return false;
490 } 442 }
491 if (request.headers[HttpHeaders.CONNECTION] == null) { 443 if (request.headers[HttpHeaders.CONNECTION] == null) {
492 return false; 444 return false;
493 } 445 }
494 bool isUpgrade = false; 446 bool isUpgrade = false;
495 request.headers[HttpHeaders.CONNECTION].forEach((String value) { 447 request.headers[HttpHeaders.CONNECTION].forEach((String value) {
496 if (value.toLowerCase() == "upgrade") isUpgrade = true; 448 if (value.toLowerCase() == "upgrade") isUpgrade = true;
497 }); 449 });
498 if (!isUpgrade) return false; 450 if (!isUpgrade) return false;
499 String upgrade = request.headers.value(HttpHeaders.UPGRADE); 451 String upgrade = request.headers.value(HttpHeaders.UPGRADE);
500 if (upgrade == null || upgrade.toLowerCase() != "websocket") { 452 if (upgrade == null || upgrade.toLowerCase() != "websocket") {
501 return false; 453 return false;
502 } 454 }
503 String version = request.headers.value("Sec-WebSocket-Version"); 455 String version = request.headers.value("Sec-WebSocket-Version");
504 if (version == null || version != "13") { 456 if (version == null || version != "13") {
505 return false; 457 return false;
506 } 458 }
507 String key = request.headers.value("Sec-WebSocket-Key"); 459 String key = request.headers.value("Sec-WebSocket-Key");
508 if (key == null) { 460 if (key == null) {
509 return false; 461 return false;
510 } 462 }
511 return true; 463 return true;
512 } 464 }
513 } 465 }
514 466
515 class _WebSocketPerMessageDeflate {
516 bool serverNoContextTakeover;
517 bool clientNoContextTakeover;
518 int clientMaxWindowBits;
519 int serverMaxWindowBits;
520 bool serverSide;
521
522 _Filter decoder;
523 _Filter encoder;
524
525 _WebSocketPerMessageDeflate(
526 {this.clientMaxWindowBits: _WebSocketImpl.DEFAULT_WINDOW_BITS,
527 this.serverMaxWindowBits: _WebSocketImpl.DEFAULT_WINDOW_BITS,
528 this.serverNoContextTakeover: false,
529 this.clientNoContextTakeover: false,
530 this.serverSide: false});
531
532 void _ensureDecoder() {
533 if (decoder == null) {
534 decoder = _Filter._newZLibInflateFilter(
535 serverSide ? clientMaxWindowBits : serverMaxWindowBits, null, true);
536 }
537 }
538
539 void _ensureEncoder() {
540 if (encoder == null) {
541 encoder = _Filter._newZLibDeflateFilter(
542 false,
543 ZLibOption.DEFAULT_LEVEL,
544 serverSide ? serverMaxWindowBits : clientMaxWindowBits,
545 ZLibOption.DEFAULT_MEM_LEVEL,
546 ZLibOption.STRATEGY_DEFAULT,
547 null,
548 true);
549 }
550 }
551
552 List<int> processIncomingMessage(List<int> msg) {
553 _ensureDecoder();
554
555 var data = [];
556 data.addAll(msg);
557 data.addAll(const [0x00, 0x00, 0xff, 0xff]);
558
559 decoder.process(data, 0, data.length);
560 var reuse =
561 !(serverSide ? clientNoContextTakeover : serverNoContextTakeover);
562 var result = [];
563 var out;
564
565 while ((out = decoder.processed(flush: reuse)) != null) {
566 result.addAll(out);
567 }
568
569 decoder.processed(flush: reuse);
570
571 if (!reuse) {
572 decoder.end();
573 decoder = null;
574 }
575 return result;
576 }
577
578 List<int> processOutgoingMessage(List<int> msg) {
579 _ensureEncoder();
580 var reuse =
581 !(serverSide ? serverNoContextTakeover : clientNoContextTakeover);
582 var result = [];
583 Uint8List buffer;
584 var out;
585
586 if (msg is! Uint8List) {
587 for (var i = 0; i < msg.length; i++) {
588 if (msg[i] < 0 || 255 < msg[i]) {
589 throw new ArgumentError("List element is not a byte value "
590 "(value ${msg[i]} at index $i)");
591 }
592 }
593 buffer = new Uint8List.fromList(msg);
594 } else {
595 buffer = msg;
596 }
597
598 encoder.process(buffer, 0, buffer.length);
599
600 while ((out = encoder.processed(flush: reuse)) != null) {
601 result.addAll(out);
602 }
603
604 if (serverSide ? serverNoContextTakeover : clientNoContextTakeover) {
605 encoder.end();
606 encoder = null;
607 }
608
609 if (result.length > 4) {
610 result = result.sublist(0, result.length - 4);
611 }
612
613 return result;
614 }
615 }
616 467
617 // TODO(ajohnsen): Make this transformer reusable. 468 // TODO(ajohnsen): Make this transformer reusable.
618 class _WebSocketOutgoingTransformer implements StreamTransformer, EventSink { 469 class _WebSocketOutgoingTransformer implements StreamTransformer, EventSink {
619 final _WebSocketImpl webSocket; 470 final _WebSocketImpl webSocket;
620 EventSink _eventSink; 471 EventSink _eventSink;
621 472
622 _WebSocketPerMessageDeflate _deflateHelper; 473 _WebSocketOutgoingTransformer(this.webSocket);
623
624 _WebSocketOutgoingTransformer(this.webSocket) {
625 _deflateHelper = webSocket._deflate;
626 }
627 474
628 Stream bind(Stream stream) { 475 Stream bind(Stream stream) {
629 return new Stream.eventTransformed(stream, (EventSink eventSink) { 476 return new Stream.eventTransformed(
630 if (_eventSink != null) { 477 stream,
631 throw new StateError("WebSocket transformer already used"); 478 (EventSink eventSink) {
632 } 479 if (_eventSink != null) {
633 _eventSink = eventSink; 480 throw new StateError("WebSocket transformer already used");
634 return this; 481 }
635 }); 482 _eventSink = eventSink;
483 return this;
484 });
636 } 485 }
637 486
638 void add(message) { 487 void add(message) {
639 if (message is _WebSocketPong) { 488 if (message is _WebSocketPong) {
640 addFrame(_WebSocketOpcode.PONG, message.payload); 489 addFrame(_WebSocketOpcode.PONG, message.payload);
641 return; 490 return;
642 } 491 }
643 if (message is _WebSocketPing) { 492 if (message is _WebSocketPing) {
644 addFrame(_WebSocketOpcode.PING, message.payload); 493 addFrame(_WebSocketOpcode.PING, message.payload);
645 return; 494 return;
646 } 495 }
647 List<int> data; 496 List<int> data;
648 int opcode; 497 int opcode;
649 if (message != null) { 498 if (message != null) {
650 if (message is String) { 499 if (message is String) {
651 opcode = _WebSocketOpcode.TEXT; 500 opcode = _WebSocketOpcode.TEXT;
652 data = UTF8.encode(message); 501 data = UTF8.encode(message);
653 } else { 502 } else {
654 if (message is! List<int>) { 503 if (message is !List<int>) {
655 throw new ArgumentError(message); 504 throw new ArgumentError(message);
656 } 505 }
657 opcode = _WebSocketOpcode.BINARY; 506 opcode = _WebSocketOpcode.BINARY;
658 data = message; 507 data = message;
659 } 508 }
660
661 if (_deflateHelper != null) {
662 data = _deflateHelper.processOutgoingMessage(data);
663 }
664 } else { 509 } else {
665 opcode = _WebSocketOpcode.TEXT; 510 opcode = _WebSocketOpcode.TEXT;
666 } 511 }
667 addFrame(opcode, data); 512 addFrame(opcode, data);
668 } 513 }
669 514
670 void addError(Object error, [StackTrace stackTrace]) => 515 void addError(Object error, [StackTrace stackTrace]) =>
671 _eventSink.addError(error, stackTrace); 516 _eventSink.addError(error, stackTrace);
672 517
673 void close() { 518 void close() {
674 int code = webSocket._outCloseCode; 519 int code = webSocket._outCloseCode;
675 String reason = webSocket._outCloseReason; 520 String reason = webSocket._outCloseReason;
676 List<int> data; 521 List<int> data;
677 if (code != null) { 522 if (code != null) {
678 data = new List<int>(); 523 data = new List<int>();
679 data.add((code >> 8) & 0xFF); 524 data.add((code >> 8) & 0xFF);
680 data.add(code & 0xFF); 525 data.add(code & 0xFF);
681 if (reason != null) { 526 if (reason != null) {
682 data.addAll(UTF8.encode(reason)); 527 data.addAll(UTF8.encode(reason));
683 } 528 }
684 } 529 }
685 addFrame(_WebSocketOpcode.CLOSE, data); 530 addFrame(_WebSocketOpcode.CLOSE, data);
686 _eventSink.close(); 531 _eventSink.close();
687 } 532 }
688 533
689 void addFrame(int opcode, List<int> data) => createFrame( 534 void addFrame(int opcode, List<int> data) =>
690 opcode, 535 createFrame(opcode, data, webSocket._serverSide).forEach(_eventSink.add);
691 data,
692 webSocket._serverSide,
693 _deflateHelper != null &&
694 (opcode == _WebSocketOpcode.TEXT ||
695 opcode == _WebSocketOpcode.BINARY)).forEach((e) {
696 _eventSink.add(e);
697 });
698 536
699 static Iterable createFrame( 537 static Iterable createFrame(int opcode, List<int> data, bool serverSide) {
700 int opcode, List<int> data, bool serverSide, bool compressed) { 538 bool mask = !serverSide; // Masking not implemented for server.
701 bool mask = !serverSide; // Masking not implemented for server.
702 int dataLength = data == null ? 0 : data.length; 539 int dataLength = data == null ? 0 : data.length;
703 // Determine the header size. 540 // Determine the header size.
704 int headerSize = (mask) ? 6 : 2; 541 int headerSize = (mask) ? 6 : 2;
705 if (dataLength > 65535) { 542 if (dataLength > 65535) {
706 headerSize += 8; 543 headerSize += 8;
707 } else if (dataLength > 125) { 544 } else if (dataLength > 125) {
708 headerSize += 2; 545 headerSize += 2;
709 } 546 }
710 Uint8List header = new Uint8List(headerSize); 547 Uint8List header = new Uint8List(headerSize);
711 int index = 0; 548 int index = 0;
712
713 // Set FIN and opcode. 549 // Set FIN and opcode.
714 var hoc = _WebSocketProtocolTransformer.FIN 550 header[index++] = 0x80 | opcode;
715 | (compressed ? _WebSocketProtocolTransformer.RSV1 : 0)
716 | (opcode & _WebSocketProtocolTransformer.OPCODE);
717
718 header[index++] = hoc;
719 // Determine size and position of length field. 551 // Determine size and position of length field.
720 int lengthBytes = 1; 552 int lengthBytes = 1;
553 int firstLengthByte = 1;
721 if (dataLength > 65535) { 554 if (dataLength > 65535) {
722 header[index++] = 127; 555 header[index++] = 127;
723 lengthBytes = 8; 556 lengthBytes = 8;
724 } else if (dataLength > 125) { 557 } else if (dataLength > 125) {
725 header[index++] = 126; 558 header[index++] = 126;
726 lengthBytes = 2; 559 lengthBytes = 2;
727 } 560 }
728 // Write the length in network byte order into the header. 561 // Write the length in network byte order into the header.
729 for (int i = 0; i < lengthBytes; i++) { 562 for (int i = 0; i < lengthBytes; i++) {
730 header[index++] = dataLength >> (((lengthBytes - 1) - i) * 8) & 0xFF; 563 header[index++] = dataLength >> (((lengthBytes - 1) - i) * 8) & 0xFF;
731 } 564 }
732 if (mask) { 565 if (mask) {
733 header[1] |= 1 << 7; 566 header[1] |= 1 << 7;
734 var maskBytes = _IOCrypto.getRandomBytes(4); 567 var maskBytes = _IOCrypto.getRandomBytes(4);
735 header.setRange(index, index + 4, maskBytes); 568 header.setRange(index, index + 4, maskBytes);
736 index += 4; 569 index += 4;
737 if (data != null) { 570 if (data != null) {
738 Uint8List list; 571 Uint8List list;
739 // If this is a text message just do the masking inside the 572 // If this is a text message just do the masking inside the
740 // encoded data. 573 // encoded data.
741 if (opcode == _WebSocketOpcode.TEXT && data is Uint8List) { 574 if (opcode == _WebSocketOpcode.TEXT && data is Uint8List) {
742 list = data; 575 list = data;
743 } else { 576 } else {
744 if (data is Uint8List) { 577 if (data is Uint8List) {
745 list = new Uint8List.fromList(data); 578 list = new Uint8List.fromList(data);
746 } else { 579 } else {
747 list = new Uint8List(data.length); 580 list = new Uint8List(data.length);
748 for (int i = 0; i < data.length; i++) { 581 for (int i = 0; i < data.length; i++) {
749 if (data[i] < 0 || 255 < data[i]) { 582 if (data[i] < 0 || 255 < data[i]) {
750 throw new ArgumentError("List element is not a byte value " 583 throw new ArgumentError(
584 "List element is not a byte value "
751 "(value ${data[i]} at index $i)"); 585 "(value ${data[i]} at index $i)");
752 } 586 }
753 list[i] = data[i]; 587 list[i] = data[i];
754 } 588 }
755 } 589 }
756 } 590 }
757 const int BLOCK_SIZE = 16; 591 const int BLOCK_SIZE = 16;
758 int blockCount = list.length ~/ BLOCK_SIZE; 592 int blockCount = list.length ~/ BLOCK_SIZE;
759 if (blockCount > 0) { 593 if (blockCount > 0) {
760 // Create mask block. 594 // Create mask block.
761 int mask = 0; 595 int mask = 0;
762 for (int i = 3; i >= 0; i--) { 596 for (int i = 3; i >= 0; i--) {
763 mask = (mask << 8) | maskBytes[i]; 597 mask = (mask << 8) | maskBytes[i];
764 } 598 }
765 Int32x4 blockMask = new Int32x4(mask, mask, mask, mask); 599 Int32x4 blockMask = new Int32x4(mask, mask, mask, mask);
766 Int32x4List blockBuffer = 600 Int32x4List blockBuffer = new Int32x4List.view(
767 new Int32x4List.view(list.buffer, 0, blockCount); 601 list.buffer, 0, blockCount);
768 for (int i = 0; i < blockBuffer.length; i++) { 602 for (int i = 0; i < blockBuffer.length; i++) {
769 blockBuffer[i] ^= blockMask; 603 blockBuffer[i] ^= blockMask;
770 } 604 }
771 } 605 }
772 // Handle end. 606 // Handle end.
773 for (int i = blockCount * BLOCK_SIZE; i < list.length; i++) { 607 for (int i = blockCount * BLOCK_SIZE; i < list.length; i++) {
774 list[i] ^= maskBytes[i & 3]; 608 list[i] ^= maskBytes[i & 3];
775 } 609 }
776 data = list; 610 data = list;
777 } 611 }
778 } 612 }
779 assert(index == headerSize); 613 assert(index == headerSize);
780 if (data == null) { 614 if (data == null) {
781 return [header]; 615 return [header];
782 } else { 616 } else {
783 return [header, data]; 617 return [header, data];
784 } 618 }
785 } 619 }
786 } 620 }
787 621
622
788 class _WebSocketConsumer implements StreamConsumer { 623 class _WebSocketConsumer implements StreamConsumer {
789 final _WebSocketImpl webSocket; 624 final _WebSocketImpl webSocket;
790 final Socket socket; 625 final Socket socket;
791 StreamController _controller; 626 StreamController _controller;
792 StreamSubscription _subscription; 627 StreamSubscription _subscription;
793 bool _issuedPause = false; 628 bool _issuedPause = false;
794 bool _closed = false; 629 bool _closed = false;
795 Completer _closeCompleter = new Completer(); 630 Completer _closeCompleter = new Completer();
796 Completer _completer; 631 Completer _completer;
797 632
(...skipping 24 matching lines...) Expand all
822 void _cancel() { 657 void _cancel() {
823 if (_subscription != null) { 658 if (_subscription != null) {
824 var subscription = _subscription; 659 var subscription = _subscription;
825 _subscription = null; 660 _subscription = null;
826 subscription.cancel(); 661 subscription.cancel();
827 } 662 }
828 } 663 }
829 664
830 _ensureController() { 665 _ensureController() {
831 if (_controller != null) return; 666 if (_controller != null) return;
832 _controller = new StreamController( 667 _controller = new StreamController(sync: true,
833 sync: true, 668 onPause: _onPause,
834 onPause: _onPause, 669 onResume: _onResume,
835 onResume: _onResume, 670 onCancel: _onListen);
836 onCancel: _onListen); 671 var stream = _controller.stream.transform(
837 var stream = _controller.stream 672 new _WebSocketOutgoingTransformer(webSocket));
838 .transform(new _WebSocketOutgoingTransformer(webSocket)); 673 socket.addStream(stream)
839 socket.addStream(stream).then((_) { 674 .then((_) {
840 _done(); 675 _done();
841 _closeCompleter.complete(webSocket); 676 _closeCompleter.complete(webSocket);
842 }, onError: (error, StackTrace stackTrace) { 677 }, onError: (error, StackTrace stackTrace) {
843 _closed = true; 678 _closed = true;
844 _cancel(); 679 _cancel();
845 if (error is ArgumentError) { 680 if (error is ArgumentError) {
846 if (!_done(error, stackTrace)) { 681 if (!_done(error, stackTrace)) {
847 _closeCompleter.completeError(error, stackTrace); 682 _closeCompleter.completeError(error, stackTrace);
848 } 683 }
849 } else { 684 } else {
850 _done(); 685 _done();
851 _closeCompleter.complete(webSocket); 686 _closeCompleter.complete(webSocket);
852 } 687 }
853 }); 688 });
854 } 689 }
855 690
856 bool _done([error, StackTrace stackTrace]) { 691 bool _done([error, StackTrace stackTrace]) {
857 if (_completer == null) return false; 692 if (_completer == null) return false;
858 if (error != null) { 693 if (error != null) {
859 _completer.completeError(error, stackTrace); 694 _completer.completeError(error, stackTrace);
860 } else { 695 } else {
861 _completer.complete(webSocket); 696 _completer.complete(webSocket);
862 } 697 }
863 _completer = null; 698 _completer = null;
864 return true; 699 return true;
865 } 700 }
866 701
867 Future addStream(var stream) { 702 Future addStream(var stream) {
868 if (_closed) { 703 if (_closed) {
869 stream.listen(null).cancel(); 704 stream.listen(null).cancel();
870 return new Future.value(webSocket); 705 return new Future.value(webSocket);
871 } 706 }
872 _ensureController(); 707 _ensureController();
873 _completer = new Completer(); 708 _completer = new Completer();
874 _subscription = stream.listen((data) { 709 _subscription = stream.listen(
875 _controller.add(data); 710 (data) {
876 }, onDone: _done, onError: _done, cancelOnError: true); 711 _controller.add(data);
712 },
713 onDone: _done,
714 onError: _done,
715 cancelOnError: true);
877 if (_issuedPause) { 716 if (_issuedPause) {
878 _subscription.pause(); 717 _subscription.pause();
879 _issuedPause = false; 718 _issuedPause = false;
880 } 719 }
881 return _completer.future; 720 return _completer.future;
882 } 721 }
883 722
884 Future close() { 723 Future close() {
885 _ensureController(); 724 _ensureController();
886 Future closeSocket() { 725 Future closeSocket() {
887 return socket.close().catchError((_) {}).then((_) => webSocket); 726 return socket.close().catchError((_) {}).then((_) => webSocket);
888 } 727 }
889 _controller.close(); 728 _controller.close();
890 return _closeCompleter.future.then((_) => closeSocket()); 729 return _closeCompleter.future.then((_) => closeSocket());
891 } 730 }
892 731
893 void add(data) { 732 void add(data) {
894 if (_closed) return; 733 if (_closed) return;
895 _ensureController(); 734 _ensureController();
896 _controller.add(data); 735 _controller.add(data);
897 } 736 }
898 737
899 void closeSocket() { 738 void closeSocket() {
900 _closed = true; 739 _closed = true;
901 _cancel(); 740 _cancel();
902 close(); 741 close();
903 } 742 }
904 } 743 }
905 744
745
906 class _WebSocketImpl extends Stream with _ServiceObject implements WebSocket { 746 class _WebSocketImpl extends Stream with _ServiceObject implements WebSocket {
907 // Use default Map so we keep order. 747 // Use default Map so we keep order.
908 static Map<int, _WebSocketImpl> _webSockets = new Map<int, _WebSocketImpl>(); 748 static Map<int, _WebSocketImpl> _webSockets = new Map<int, _WebSocketImpl>();
909 static const int DEFAULT_WINDOW_BITS = 15;
910 static const String PER_MESSAGE_DEFLATE = "permessage-deflate";
911 749
912 final String protocol; 750 final String protocol;
913 751
914 StreamController _controller; 752 StreamController _controller;
915 StreamSubscription _subscription; 753 StreamSubscription _subscription;
916 StreamSink _sink; 754 StreamSink _sink;
917 755
918 final _socket; 756 final _socket;
919 final bool _serverSide; 757 final bool _serverSide;
920 int _readyState = WebSocket.CONNECTING; 758 int _readyState = WebSocket.CONNECTING;
921 bool _writeClosed = false; 759 bool _writeClosed = false;
922 int _closeCode; 760 int _closeCode;
923 String _closeReason; 761 String _closeReason;
924 Duration _pingInterval; 762 Duration _pingInterval;
925 Timer _pingTimer; 763 Timer _pingTimer;
926 _WebSocketConsumer _consumer; 764 _WebSocketConsumer _consumer;
927 765
928 int _outCloseCode; 766 int _outCloseCode;
929 String _outCloseReason; 767 String _outCloseReason;
930 Timer _closeTimer; 768 Timer _closeTimer;
931 _WebSocketPerMessageDeflate _deflate;
932 769
933 static final HttpClient _httpClient = new HttpClient(); 770 static final HttpClient _httpClient = new HttpClient();
934 771
935 static Future<WebSocket> connect( 772 static Future<WebSocket> connect(
936 String url, Iterable<String> protocols, Map<String, dynamic> headers, 773 String url, Iterable<String> protocols, Map<String, dynamic> headers) {
937 {CompressionOptions compression: CompressionOptions.DEFAULT}) {
938 Uri uri = Uri.parse(url); 774 Uri uri = Uri.parse(url);
939 if (uri.scheme != "ws" && uri.scheme != "wss") { 775 if (uri.scheme != "ws" && uri.scheme != "wss") {
940 throw new WebSocketException("Unsupported URL scheme '${uri.scheme}'"); 776 throw new WebSocketException("Unsupported URL scheme '${uri.scheme}'");
941 } 777 }
942 778
943 Random random = new Random(); 779 Random random = new Random();
944 // Generate 16 random bytes. 780 // Generate 16 random bytes.
945 Uint8List nonceData = new Uint8List(16); 781 Uint8List nonceData = new Uint8List(16);
946 for (int i = 0; i < 16; i++) { 782 for (int i = 0; i < 16; i++) {
947 nonceData[i] = random.nextInt(256); 783 nonceData[i] = random.nextInt(256);
948 } 784 }
949 String nonce = _CryptoUtils.bytesToBase64(nonceData); 785 String nonce = _CryptoUtils.bytesToBase64(nonceData);
950 786
951 uri = new Uri( 787 uri = new Uri(scheme: uri.scheme == "wss" ? "https" : "http",
952 scheme: uri.scheme == "wss" ? "https" : "http", 788 userInfo: uri.userInfo,
953 userInfo: uri.userInfo, 789 host: uri.host,
954 host: uri.host, 790 port: uri.port,
955 port: uri.port, 791 path: uri.path,
956 path: uri.path, 792 query: uri.query,
957 query: uri.query, 793 fragment: uri.fragment);
958 fragment: uri.fragment); 794 return _httpClient.openUrl("GET", uri)
959 return _httpClient.openUrl("GET", uri).then((request) { 795 .then((request) {
960 if (uri.userInfo != null && !uri.userInfo.isEmpty) { 796 if (uri.userInfo != null && !uri.userInfo.isEmpty) {
961 // If the URL contains user information use that for basic 797 // If the URL contains user information use that for basic
962 // authorization. 798 // authorization.
963 String auth = _CryptoUtils.bytesToBase64(UTF8.encode(uri.userInfo)); 799 String auth =
964 request.headers.set(HttpHeaders.AUTHORIZATION, "Basic $auth"); 800 _CryptoUtils.bytesToBase64(UTF8.encode(uri.userInfo));
965 } 801 request.headers.set(HttpHeaders.AUTHORIZATION, "Basic $auth");
966 if (headers != null) { 802 }
967 headers.forEach((field, value) => request.headers.add(field, value)); 803 if (headers != null) {
968 } 804 headers.forEach((field, value) => request.headers.add(field, value));
969 // Setup the initial handshake. 805 }
970 request.headers 806 // Setup the initial handshake.
971 ..set(HttpHeaders.CONNECTION, "Upgrade")
972 ..set(HttpHeaders.UPGRADE, "websocket")
973 ..set("Sec-WebSocket-Key", nonce)
974 ..set("Cache-Control", "no-cache")
975 ..set("Sec-WebSocket-Version", "13");
976 if (protocols != null) {
977 request.headers.add("Sec-WebSocket-Protocol", protocols.toList());
978 }
979
980 if (compression.enabled) {
981 request.headers 807 request.headers
982 .add("Sec-WebSocket-Extensions", compression._createHeader()); 808 ..set(HttpHeaders.CONNECTION, "Upgrade")
983 } 809 ..set(HttpHeaders.UPGRADE, "websocket")
984 810 ..set("Sec-WebSocket-Key", nonce)
985 return request.close(); 811 ..set("Cache-Control", "no-cache")
986 }).then((response) { 812 ..set("Sec-WebSocket-Version", "13");
987 void error(String message) { 813 if (protocols != null) {
988 // Flush data. 814 request.headers.add("Sec-WebSocket-Protocol", protocols.toList());
989 response.detachSocket().then((socket) {
990 socket.destroy();
991 });
992 throw new WebSocketException(message);
993 }
994 if (response.statusCode != HttpStatus.SWITCHING_PROTOCOLS ||
995 response.headers[HttpHeaders.CONNECTION] == null ||
996 !response.headers[HttpHeaders.CONNECTION]
997 .any((value) => value.toLowerCase() == "upgrade") ||
998 response.headers.value(HttpHeaders.UPGRADE).toLowerCase() !=
999 "websocket") {
1000 error("Connection to '$uri' was not upgraded to websocket");
1001 }
1002 String accept = response.headers.value("Sec-WebSocket-Accept");
1003 if (accept == null) {
1004 error("Response did not contain a 'Sec-WebSocket-Accept' header");
1005 }
1006 _SHA1 sha1 = new _SHA1();
1007 sha1.add("$nonce$_webSocketGUID".codeUnits);
1008 List<int> expectedAccept = sha1.close();
1009 List<int> receivedAccept = _CryptoUtils.base64StringToBytes(accept);
1010 if (expectedAccept.length != receivedAccept.length) {
1011 error("Reasponse header 'Sec-WebSocket-Accept' is the wrong length");
1012 }
1013 for (int i = 0; i < expectedAccept.length; i++) {
1014 if (expectedAccept[i] != receivedAccept[i]) {
1015 error("Bad response 'Sec-WebSocket-Accept' header");
1016 } 815 }
1017 } 816 return request.close();
1018 var protocol = response.headers.value('Sec-WebSocket-Protocol'); 817 })
1019 818 .then((response) {
1020 _WebSocketPerMessageDeflate deflate = 819 void error(String message) {
1021 negotiateClientCompression(response, compression); 820 // Flush data.
1022 821 response.detachSocket().then((socket) {
1023 return response.detachSocket().then((socket) => 822 socket.destroy();
1024 new _WebSocketImpl._fromSocket( 823 });
1025 socket, protocol, compression, false, deflate)); 824 throw new WebSocketException(message);
1026 }); 825 }
826 if (response.statusCode != HttpStatus.SWITCHING_PROTOCOLS ||
827 response.headers[HttpHeaders.CONNECTION] == null ||
828 !response.headers[HttpHeaders.CONNECTION].any(
829 (value) => value.toLowerCase() == "upgrade") ||
830 response.headers.value(HttpHeaders.UPGRADE).toLowerCase() !=
831 "websocket") {
832 error("Connection to '$uri' was not upgraded to websocket");
833 }
834 String accept = response.headers.value("Sec-WebSocket-Accept");
835 if (accept == null) {
836 error("Response did not contain a 'Sec-WebSocket-Accept' header");
837 }
838 _SHA1 sha1 = new _SHA1();
839 sha1.add("$nonce$_webSocketGUID".codeUnits);
840 List<int> expectedAccept = sha1.close();
841 List<int> receivedAccept = _CryptoUtils.base64StringToBytes(accept);
842 if (expectedAccept.length != receivedAccept.length) {
843 error("Reasponse header 'Sec-WebSocket-Accept' is the wrong length");
844 }
845 for (int i = 0; i < expectedAccept.length; i++) {
846 if (expectedAccept[i] != receivedAccept[i]) {
847 error("Bad response 'Sec-WebSocket-Accept' header");
848 }
849 }
850 var protocol = response.headers.value('Sec-WebSocket-Protocol');
851 return response.detachSocket()
852 .then((socket) => new _WebSocketImpl._fromSocket(socket, protocol));
853 });
1027 } 854 }
1028 855
1029 static _WebSocketPerMessageDeflate negotiateClientCompression( 856 _WebSocketImpl._fromSocket(this._socket, this.protocol,
1030 HttpClientResponse response, CompressionOptions compression) { 857 [this._serverSide = false]) {
1031 String extensionHeader = response.headers.value('Sec-WebSocket-Extensions');
1032
1033 if (extensionHeader == null) {
1034 extensionHeader = "";
1035 }
1036
1037 var hv = HeaderValue.parse(extensionHeader, valueSeparator: ',');
1038
1039 if (compression.enabled && hv.value == PER_MESSAGE_DEFLATE) {
1040 var serverNoContextTakeover =
1041 hv.parameters.containsKey(_serverNoContextTakeover);
1042 var clientNoContextTakeover =
1043 hv.parameters.containsKey(_clientNoContextTakeover);
1044
1045 int getWindowBits(String type) {
1046 var o = hv.parameters[type];
1047 if (o == null) {
1048 return DEFAULT_WINDOW_BITS;
1049 }
1050
1051 o = int.parse(o, onError: (s) => DEFAULT_WINDOW_BITS);
1052 return o;
1053 }
1054
1055 return new _WebSocketPerMessageDeflate(
1056 clientMaxWindowBits: getWindowBits(_clientMaxWindowBits),
1057 serverMaxWindowBits: getWindowBits(_serverMaxWindowBits),
1058 clientNoContextTakeover: clientNoContextTakeover,
1059 serverNoContextTakeover: serverNoContextTakeover);
1060 }
1061
1062 return null;
1063 }
1064
1065 _WebSocketImpl._fromSocket(
1066 this._socket, this.protocol, CompressionOptions compression,
1067 [this._serverSide = false, _WebSocketPerMessageDeflate deflate]) {
1068 _consumer = new _WebSocketConsumer(this, _socket); 858 _consumer = new _WebSocketConsumer(this, _socket);
1069 _sink = new _StreamSinkImpl(_consumer); 859 _sink = new _StreamSinkImpl(_consumer);
1070 _readyState = WebSocket.OPEN; 860 _readyState = WebSocket.OPEN;
1071 _deflate = deflate;
1072 861
1073 var transformer = new _WebSocketProtocolTransformer(_serverSide, _deflate); 862 var transformer = new _WebSocketProtocolTransformer(_serverSide);
1074 _subscription = _socket.transform(transformer).listen((data) { 863 _subscription = _socket.transform(transformer).listen(
1075 if (data is _WebSocketPing) { 864 (data) {
1076 if (!_writeClosed) _consumer.add(new _WebSocketPong(data.payload)); 865 if (data is _WebSocketPing) {
1077 } else if (data is _WebSocketPong) { 866 if (!_writeClosed) _consumer.add(new _WebSocketPong(data.payload));
1078 // Simply set pingInterval, as it'll cancel any timers. 867 } else if (data is _WebSocketPong) {
1079 pingInterval = _pingInterval; 868 // Simply set pingInterval, as it'll cancel any timers.
1080 } else { 869 pingInterval = _pingInterval;
1081 _controller.add(data); 870 } else {
1082 } 871 _controller.add(data);
1083 }, onError: (error, stackTrace) { 872 }
1084 if (_closeTimer != null) _closeTimer.cancel(); 873 },
1085 if (error is FormatException) { 874 onError: (error) {
1086 _close(WebSocketStatus.INVALID_FRAME_PAYLOAD_DATA); 875 if (_closeTimer != null) _closeTimer.cancel();
1087 } else { 876 if (error is FormatException) {
1088 _close(WebSocketStatus.PROTOCOL_ERROR); 877 _close(WebSocketStatus.INVALID_FRAME_PAYLOAD_DATA);
1089 } 878 } else {
1090 // An error happened, set the close code set above. 879 _close(WebSocketStatus.PROTOCOL_ERROR);
1091 _closeCode = _outCloseCode; 880 }
1092 _closeReason = _outCloseReason; 881 // An error happened, set the close code set above.
1093 _controller.close(); 882 _closeCode = _outCloseCode;
1094 }, onDone: () { 883 _closeReason = _outCloseReason;
1095 if (_closeTimer != null) _closeTimer.cancel(); 884 _controller.close();
1096 if (_readyState == WebSocket.OPEN) { 885 },
1097 _readyState = WebSocket.CLOSING; 886 onDone: () {
1098 if (!_isReservedStatusCode(transformer.closeCode)) { 887 if (_closeTimer != null) _closeTimer.cancel();
1099 _close(transformer.closeCode, transformer.closeReason); 888 if (_readyState == WebSocket.OPEN) {
1100 } else { 889 _readyState = WebSocket.CLOSING;
1101 _close(); 890 if (!_isReservedStatusCode(transformer.closeCode)) {
1102 } 891 _close(transformer.closeCode, transformer.closeReason);
1103 _readyState = WebSocket.CLOSED; 892 } else {
1104 } 893 _close();
1105 // Protocol close, use close code from transformer. 894 }
1106 _closeCode = transformer.closeCode; 895 _readyState = WebSocket.CLOSED;
1107 _closeReason = transformer.closeReason; 896 }
1108 _controller.close(); 897 // Protocol close, use close code from transformer.
1109 }, cancelOnError: true); 898 _closeCode = transformer.closeCode;
899 _closeReason = transformer.closeReason;
900 _controller.close();
901 },
902 cancelOnError: true);
1110 _subscription.pause(); 903 _subscription.pause();
1111 _controller = new StreamController( 904 _controller = new StreamController(sync: true,
1112 sync: true, onListen: _subscription.resume, onCancel: () { 905 onListen: _subscription.resume,
1113 _subscription.cancel(); 906 onCancel: () {
1114 _subscription = null; 907 _subscription.cancel();
1115 }, onPause: _subscription.pause, onResume: _subscription.resume); 908 _subscription = null;
909 },
910 onPause: _subscription.pause,
911 onResume: _subscription.resume);
1116 912
1117 _webSockets[_serviceId] = this; 913 _webSockets[_serviceId] = this;
1118 try { 914 try { _socket._owner = this; } catch (_) {}
1119 _socket._owner = this;
1120 } catch (_) {}
1121 } 915 }
1122 916
1123 StreamSubscription listen(void onData(message), 917 StreamSubscription listen(void onData(message),
1124 {Function onError, void onDone(), bool cancelOnError}) { 918 {Function onError,
919 void onDone(),
920 bool cancelOnError}) {
1125 return _controller.stream.listen(onData, 921 return _controller.stream.listen(onData,
1126 onError: onError, onDone: onDone, cancelOnError: cancelOnError); 922 onError: onError,
923 onDone: onDone,
924 cancelOnError: cancelOnError);
1127 } 925 }
1128 926
1129 Duration get pingInterval => _pingInterval; 927 Duration get pingInterval => _pingInterval;
1130 928
1131 void set pingInterval(Duration interval) { 929 void set pingInterval(Duration interval) {
1132 if (_writeClosed) return; 930 if (_writeClosed) return;
1133 if (_pingTimer != null) _pingTimer.cancel(); 931 if (_pingTimer != null) _pingTimer.cancel();
1134 _pingInterval = interval; 932 _pingInterval = interval;
1135 933
1136 if (_pingInterval == null) return; 934 if (_pingInterval == null) return;
(...skipping 85 matching lines...) Expand 10 before | Expand all | Expand 10 after
1222 'type': '@Socket', 1020 'type': '@Socket',
1223 'name': 'UserSocket', 1021 'name': 'UserSocket',
1224 'user_name': 'UserSocket', 1022 'user_name': 'UserSocket',
1225 }; 1023 };
1226 } 1024 }
1227 return r; 1025 return r;
1228 } 1026 }
1229 1027
1230 static bool _isReservedStatusCode(int code) { 1028 static bool _isReservedStatusCode(int code) {
1231 return code != null && 1029 return code != null &&
1232 (code < WebSocketStatus.NORMAL_CLOSURE || 1030 (code < WebSocketStatus.NORMAL_CLOSURE ||
1233 code == WebSocketStatus.RESERVED_1004 || 1031 code == WebSocketStatus.RESERVED_1004 ||
1234 code == WebSocketStatus.NO_STATUS_RECEIVED || 1032 code == WebSocketStatus.NO_STATUS_RECEIVED ||
1235 code == WebSocketStatus.ABNORMAL_CLOSURE || 1033 code == WebSocketStatus.ABNORMAL_CLOSURE ||
1236 (code > WebSocketStatus.INTERNAL_SERVER_ERROR && 1034 (code > WebSocketStatus.INTERNAL_SERVER_ERROR &&
1237 code < WebSocketStatus.RESERVED_1015) || 1035 code < WebSocketStatus.RESERVED_1015) ||
1238 (code >= WebSocketStatus.RESERVED_1015 && code < 3000)); 1036 (code >= WebSocketStatus.RESERVED_1015 &&
1037 code < 3000));
1239 } 1038 }
1240 } 1039 }
OLDNEW
« no previous file with comments | « sdk/lib/io/websocket.dart ('k') | tests/standalone/io/web_socket_compression_test.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698