Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(2)

Side by Side Diff: sdk/lib/io/websocket_impl.dart

Issue 1390353005: Web Socket compression - take two (Closed) Base URL: https://github.com/dart-lang/sdk.git@master
Patch Set: Fix observatory issues Created 5 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « sdk/lib/io/websocket.dart ('k') | tests/standalone/io/web_socket_compression_test.dart » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 part of dart.io; 5 part of dart.io;
6 6
7 const String _webSocketGUID = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"; 7 const String _webSocketGUID = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
8 const String _clientNoContextTakeover = "client_no_context_takeover";
9 const String _serverNoContextTakeover = "server_no_context_takeover";
10 const String _clientMaxWindowBits = "client_max_window_bits";
11 const String _serverMaxWindowBits = "server_max_window_bits";
8 12
9 // Matches _WebSocketOpcode. 13 // Matches _WebSocketOpcode.
10 class _WebSocketMessageType { 14 class _WebSocketMessageType {
11 static const int NONE = 0; 15 static const int NONE = 0;
12 static const int TEXT = 1; 16 static const int TEXT = 1;
13 static const int BINARY = 2; 17 static const int BINARY = 2;
14 } 18 }
15 19
16
17 class _WebSocketOpcode { 20 class _WebSocketOpcode {
18 static const int CONTINUATION = 0; 21 static const int CONTINUATION = 0;
19 static const int TEXT = 1; 22 static const int TEXT = 1;
20 static const int BINARY = 2; 23 static const int BINARY = 2;
21 static const int RESERVED_3 = 3; 24 static const int RESERVED_3 = 3;
22 static const int RESERVED_4 = 4; 25 static const int RESERVED_4 = 4;
23 static const int RESERVED_5 = 5; 26 static const int RESERVED_5 = 5;
24 static const int RESERVED_6 = 6; 27 static const int RESERVED_6 = 6;
25 static const int RESERVED_7 = 7; 28 static const int RESERVED_7 = 7;
26 static const int CLOSE = 8; 29 static const int CLOSE = 8;
27 static const int PING = 9; 30 static const int PING = 9;
28 static const int PONG = 10; 31 static const int PONG = 10;
29 static const int RESERVED_B = 11; 32 static const int RESERVED_B = 11;
30 static const int RESERVED_C = 12; 33 static const int RESERVED_C = 12;
31 static const int RESERVED_D = 13; 34 static const int RESERVED_D = 13;
32 static const int RESERVED_E = 14; 35 static const int RESERVED_E = 14;
33 static const int RESERVED_F = 15; 36 static const int RESERVED_F = 15;
34 } 37 }
35 38
36 /** 39 /**
37 * The web socket protocol transformer handles the protocol byte stream 40 * The web socket protocol transformer handles the protocol byte stream
38 * which is supplied through the [:handleData:]. As the protocol is processed, 41 * which is supplied through the [:handleData:]. As the protocol is processed,
39 * it'll output frame data as either a List<int> or String. 42 * it'll output frame data as either a List<int> or String.
40 * 43 *
41 * Important infomation about usage: Be sure you use cancelOnError, so the 44 * Important information about usage: Be sure you use cancelOnError, so the
42 * socket will be closed when the processer encounter an error. Not using it 45 * socket will be closed when the processor encounter an error. Not using it
43 * will lead to undefined behaviour. 46 * will lead to undefined behaviour.
44 */ 47 */
45 // TODO(ajohnsen): make this transformer reusable? 48 // TODO(ajohnsen): make this transformer reusable?
46 class _WebSocketProtocolTransformer implements StreamTransformer, EventSink { 49 class _WebSocketProtocolTransformer implements StreamTransformer, EventSink {
47 static const int START = 0; 50 static const int START = 0;
48 static const int LEN_FIRST = 1; 51 static const int LEN_FIRST = 1;
49 static const int LEN_REST = 2; 52 static const int LEN_REST = 2;
50 static const int MASK = 3; 53 static const int MASK = 3;
51 static const int PAYLOAD = 4; 54 static const int PAYLOAD = 4;
52 static const int CLOSED = 5; 55 static const int CLOSED = 5;
53 static const int FAILURE = 6; 56 static const int FAILURE = 6;
57 static const int FIN = 0x80;
58 static const int RSV1 = 0x40;
59 static const int RSV2 = 0x20;
60 static const int RSV3 = 0x10;
61 static const int OPCODE = 0xF;
54 62
55 int _state = START; 63 int _state = START;
56 bool _fin = false; 64 bool _fin = false;
65 bool _compressed = false;
57 int _opcode = -1; 66 int _opcode = -1;
58 int _len = -1; 67 int _len = -1;
59 bool _masked = false; 68 bool _masked = false;
60 int _remainingLenBytes = -1; 69 int _remainingLenBytes = -1;
61 int _remainingMaskingKeyBytes = 4; 70 int _remainingMaskingKeyBytes = 4;
62 int _remainingPayloadBytes = -1; 71 int _remainingPayloadBytes = -1;
63 int _unmaskingIndex = 0; 72 int _unmaskingIndex = 0;
64 int _currentMessageType = _WebSocketMessageType.NONE; 73 int _currentMessageType = _WebSocketMessageType.NONE;
65 int closeCode = WebSocketStatus.NO_STATUS_RECEIVED; 74 int closeCode = WebSocketStatus.NO_STATUS_RECEIVED;
66 String closeReason = ""; 75 String closeReason = "";
67 76
68 EventSink _eventSink; 77 EventSink _eventSink;
69 78
70 final bool _serverSide; 79 final bool _serverSide;
71 final List _maskingBytes = new List(4); 80 final List _maskingBytes = new List(4);
72 final BytesBuilder _payload = new BytesBuilder(copy: false); 81 final BytesBuilder _payload = new BytesBuilder(copy: false);
73 82
74 _WebSocketProtocolTransformer([this._serverSide = false]); 83 _WebSocketPerMessageDeflate _deflate;
84 _WebSocketProtocolTransformer([this._serverSide = false, this._deflate]);
75 85
76 Stream bind(Stream stream) { 86 Stream bind(Stream stream) {
77 return new Stream.eventTransformed( 87 return new Stream.eventTransformed(stream, (EventSink eventSink) {
78 stream, 88 if (_eventSink != null) {
79 (EventSink eventSink) { 89 throw new StateError("WebSocket transformer already used.");
80 if (_eventSink != null) { 90 }
81 throw new StateError("WebSocket transformer already used."); 91 _eventSink = eventSink;
82 } 92 return this;
83 _eventSink = eventSink; 93 });
84 return this;
85 });
86 } 94 }
87 95
88 void addError(Object error, [StackTrace stackTrace]) => 96 void addError(Object error, [StackTrace stackTrace]) =>
89 _eventSink.addError(error, stackTrace); 97 _eventSink.addError(error, stackTrace);
90 98
91 void close() => _eventSink.close(); 99 void close() => _eventSink.close();
92 100
93 /** 101 /**
94 * Process data received from the underlying communication channel. 102 * Process data received from the underlying communication channel.
95 */ 103 */
96 void add(Uint8List buffer) { 104 void add(Uint8List buffer) {
97 int count = buffer.length;
98 int index = 0; 105 int index = 0;
99 int lastIndex = count; 106 int lastIndex = buffer.length;
100 if (_state == CLOSED) { 107 if (_state == CLOSED) {
101 throw new WebSocketException("Data on closed connection"); 108 throw new WebSocketException("Data on closed connection");
102 } 109 }
103 if (_state == FAILURE) { 110 if (_state == FAILURE) {
104 throw new WebSocketException("Data on failed connection"); 111 throw new WebSocketException("Data on failed connection");
105 } 112 }
106 while ((index < lastIndex) && _state != CLOSED && _state != FAILURE) { 113 while ((index < lastIndex) && _state != CLOSED && _state != FAILURE) {
107 int byte = buffer[index]; 114 int byte = buffer[index];
108 if (_state <= LEN_REST) { 115 if (_state <= LEN_REST) {
109 if (_state == START) { 116 if (_state == START) {
110 _fin = (byte & 0x80) != 0; 117 _fin = (byte & FIN) != 0;
111 if ((byte & 0x70) != 0) { 118
112 // The RSV1, RSV2 bits RSV3 must be all zero. 119 if((byte & (RSV2 | RSV3)) != 0) {
120 // The RSV2, RSV3 bits must both be zero.
113 throw new WebSocketException("Protocol error"); 121 throw new WebSocketException("Protocol error");
114 } 122 }
115 _opcode = (byte & 0xF); 123
124 if ((byte & RSV1) != 0) {
125 _compressed = true;
126 } else {
127 _compressed = false;
128 }
129 _opcode = (byte & OPCODE);
130
116 if (_opcode <= _WebSocketOpcode.BINARY) { 131 if (_opcode <= _WebSocketOpcode.BINARY) {
117 if (_opcode == _WebSocketOpcode.CONTINUATION) { 132 if (_opcode == _WebSocketOpcode.CONTINUATION) {
118 if (_currentMessageType == _WebSocketMessageType.NONE) { 133 if (_currentMessageType == _WebSocketMessageType.NONE) {
119 throw new WebSocketException("Protocol error"); 134 throw new WebSocketException("Protocol error");
120 } 135 }
121 } else { 136 } else {
122 assert(_opcode == _WebSocketOpcode.TEXT || 137 assert(_opcode == _WebSocketOpcode.TEXT ||
123 _opcode == _WebSocketOpcode.BINARY); 138 _opcode == _WebSocketOpcode.BINARY);
124 if (_currentMessageType != _WebSocketMessageType.NONE) { 139 if (_currentMessageType != _WebSocketMessageType.NONE) {
125 throw new WebSocketException("Protocol error"); 140 throw new WebSocketException("Protocol error");
126 } 141 }
127 _currentMessageType = _opcode; 142 _currentMessageType = _opcode;
128 } 143 }
129 } else if (_opcode >= _WebSocketOpcode.CLOSE && 144 } else if (_opcode >= _WebSocketOpcode.CLOSE &&
130 _opcode <= _WebSocketOpcode.PONG) { 145 _opcode <= _WebSocketOpcode.PONG) {
131 // Control frames cannot be fragmented. 146 // Control frames cannot be fragmented.
132 if (!_fin) throw new WebSocketException("Protocol error"); 147 if (!_fin) throw new WebSocketException("Protocol error");
133 } else { 148 } else {
134 throw new WebSocketException("Protocol error"); 149 throw new WebSocketException("Protocol error");
135 } 150 }
136 _state = LEN_FIRST; 151 _state = LEN_FIRST;
137 } else if (_state == LEN_FIRST) { 152 } else if (_state == LEN_FIRST) {
138 _masked = (byte & 0x80) != 0; 153 _masked = (byte & 0x80) != 0;
139 _len = byte & 0x7F; 154 _len = byte & 0x7F;
140 if (_isControlFrame() && _len > 125) { 155 if (_isControlFrame() && _len > 125) {
(...skipping 28 matching lines...) Expand all
169 } else { 184 } else {
170 assert(_state == PAYLOAD); 185 assert(_state == PAYLOAD);
171 // The payload is not handled one byte at a time but in blocks. 186 // The payload is not handled one byte at a time but in blocks.
172 int payloadLength = min(lastIndex - index, _remainingPayloadBytes); 187 int payloadLength = min(lastIndex - index, _remainingPayloadBytes);
173 _remainingPayloadBytes -= payloadLength; 188 _remainingPayloadBytes -= payloadLength;
174 // Unmask payload if masked. 189 // Unmask payload if masked.
175 if (_masked) { 190 if (_masked) {
176 _unmask(index, payloadLength, buffer); 191 _unmask(index, payloadLength, buffer);
177 } 192 }
178 // Control frame and data frame share _payloads. 193 // Control frame and data frame share _payloads.
179 _payload.add( 194 _payload.add(new Uint8List.view(buffer.buffer, index, payloadLength));
180 new Uint8List.view(buffer.buffer, index, payloadLength));
181 index += payloadLength; 195 index += payloadLength;
182 if (_isControlFrame()) { 196 if (_isControlFrame()) {
183 if (_remainingPayloadBytes == 0) _controlFrameEnd(); 197 if (_remainingPayloadBytes == 0) _controlFrameEnd();
184 } else { 198 } else {
185 if (_currentMessageType != _WebSocketMessageType.TEXT && 199 if (_currentMessageType != _WebSocketMessageType.TEXT &&
186 _currentMessageType != _WebSocketMessageType.BINARY) { 200 _currentMessageType != _WebSocketMessageType.BINARY) {
187 throw new WebSocketException("Protocol error"); 201 throw new WebSocketException("Protocol error");
188 } 202 }
189 if (_remainingPayloadBytes == 0) _messageFrameEnd(); 203 if (_remainingPayloadBytes == 0) _messageFrameEnd();
190 } 204 }
191 205
192 // Hack - as we always do index++ below. 206 // Hack - as we always do index++ below.
193 index--; 207 index--;
194 } 208 }
195 } 209 }
196 210
197 // Move to the next byte. 211 // Move to the next byte.
(...skipping 14 matching lines...) Expand all
212 index += startOffset; 226 index += startOffset;
213 length -= startOffset; 227 length -= startOffset;
214 final int blockCount = length ~/ BLOCK_SIZE; 228 final int blockCount = length ~/ BLOCK_SIZE;
215 if (blockCount > 0) { 229 if (blockCount > 0) {
216 // Create mask block. 230 // Create mask block.
217 int mask = 0; 231 int mask = 0;
218 for (int i = 3; i >= 0; i--) { 232 for (int i = 3; i >= 0; i--) {
219 mask = (mask << 8) | _maskingBytes[(_unmaskingIndex + i) & 3]; 233 mask = (mask << 8) | _maskingBytes[(_unmaskingIndex + i) & 3];
220 } 234 }
221 Int32x4 blockMask = new Int32x4(mask, mask, mask, mask); 235 Int32x4 blockMask = new Int32x4(mask, mask, mask, mask);
222 Int32x4List blockBuffer = new Int32x4List.view( 236 Int32x4List blockBuffer =
223 buffer.buffer, index, blockCount); 237 new Int32x4List.view(buffer.buffer, index, blockCount);
224 for (int i = 0; i < blockBuffer.length; i++) { 238 for (int i = 0; i < blockBuffer.length; i++) {
225 blockBuffer[i] ^= blockMask; 239 blockBuffer[i] ^= blockMask;
226 } 240 }
227 final int bytes = blockCount * BLOCK_SIZE; 241 final int bytes = blockCount * BLOCK_SIZE;
228 index += bytes; 242 index += bytes;
229 length -= bytes; 243 length -= bytes;
230 } 244 }
231 } 245 }
232 // Handle end. 246 // Handle end.
233 final int end = index + length; 247 final int end = index + length;
(...skipping 43 matching lines...) Expand 10 before | Expand all | Expand 10 after
277 } else { 291 } else {
278 _messageFrameEnd(); 292 _messageFrameEnd();
279 } 293 }
280 } else { 294 } else {
281 _state = PAYLOAD; 295 _state = PAYLOAD;
282 } 296 }
283 } 297 }
284 298
285 void _messageFrameEnd() { 299 void _messageFrameEnd() {
286 if (_fin) { 300 if (_fin) {
301 var bytes = _payload.takeBytes();
302 if (_deflate != null && _compressed) {
303 bytes = _deflate.processIncomingMessage(bytes);
304 }
305
287 switch (_currentMessageType) { 306 switch (_currentMessageType) {
288 case _WebSocketMessageType.TEXT: 307 case _WebSocketMessageType.TEXT:
289 _eventSink.add(UTF8.decode(_payload.takeBytes())); 308 _eventSink.add(UTF8.decode(bytes));
290 break; 309 break;
291 case _WebSocketMessageType.BINARY: 310 case _WebSocketMessageType.BINARY:
292 _eventSink.add(_payload.takeBytes()); 311 _eventSink.add(bytes);
293 break; 312 break;
294 } 313 }
295 _currentMessageType = _WebSocketMessageType.NONE; 314 _currentMessageType = _WebSocketMessageType.NONE;
296 } 315 }
297 _prepareForNextFrame(); 316 _prepareForNextFrame();
298 } 317 }
299 318
300 void _controlFrameEnd() { 319 void _controlFrameEnd() {
301 switch (_opcode) { 320 switch (_opcode) {
302 case _WebSocketOpcode.CLOSE: 321 case _WebSocketOpcode.CLOSE:
(...skipping 21 matching lines...) Expand all
324 343
325 case _WebSocketOpcode.PONG: 344 case _WebSocketOpcode.PONG:
326 _eventSink.add(new _WebSocketPong(_payload.takeBytes())); 345 _eventSink.add(new _WebSocketPong(_payload.takeBytes()));
327 break; 346 break;
328 } 347 }
329 _prepareForNextFrame(); 348 _prepareForNextFrame();
330 } 349 }
331 350
332 bool _isControlFrame() { 351 bool _isControlFrame() {
333 return _opcode == _WebSocketOpcode.CLOSE || 352 return _opcode == _WebSocketOpcode.CLOSE ||
334 _opcode == _WebSocketOpcode.PING || 353 _opcode == _WebSocketOpcode.PING ||
335 _opcode == _WebSocketOpcode.PONG; 354 _opcode == _WebSocketOpcode.PONG;
336 } 355 }
337 356
338 void _prepareForNextFrame() { 357 void _prepareForNextFrame() {
339 if (_state != CLOSED && _state != FAILURE) _state = START; 358 if (_state != CLOSED && _state != FAILURE) _state = START;
340 _fin = false; 359 _fin = false;
341 _opcode = -1; 360 _opcode = -1;
342 _len = -1; 361 _len = -1;
343 _remainingLenBytes = -1; 362 _remainingLenBytes = -1;
344 _remainingMaskingKeyBytes = 4; 363 _remainingMaskingKeyBytes = 4;
345 _remainingPayloadBytes = -1; 364 _remainingPayloadBytes = -1;
346 _unmaskingIndex = 0; 365 _unmaskingIndex = 0;
347 } 366 }
348 } 367 }
349 368
350
351 class _WebSocketPing { 369 class _WebSocketPing {
352 final List<int> payload; 370 final List<int> payload;
353 _WebSocketPing([this.payload = null]); 371 _WebSocketPing([this.payload = null]);
354 } 372 }
355 373
356
357 class _WebSocketPong { 374 class _WebSocketPong {
358 final List<int> payload; 375 final List<int> payload;
359 _WebSocketPong([this.payload = null]); 376 _WebSocketPong([this.payload = null]);
360 } 377 }
361 378
362
363 class _WebSocketTransformerImpl implements WebSocketTransformer { 379 class _WebSocketTransformerImpl implements WebSocketTransformer {
364 final StreamController<WebSocket> _controller = 380 final StreamController<WebSocket> _controller =
365 new StreamController<WebSocket>(sync: true); 381 new StreamController<WebSocket>(sync: true);
366 final Function _protocolSelector; 382 final Function _protocolSelector;
383 final CompressionOptions _compression;
367 384
368 _WebSocketTransformerImpl(this._protocolSelector); 385 _WebSocketTransformerImpl(this._protocolSelector, this._compression);
369 386
370 Stream<WebSocket> bind(Stream<HttpRequest> stream) { 387 Stream<WebSocket> bind(Stream<HttpRequest> stream) {
371 stream.listen((request) { 388 stream.listen((request) {
372 _upgrade(request, _protocolSelector) 389 _upgrade(request, _protocolSelector, _compression)
373 .then((WebSocket webSocket) => _controller.add(webSocket)) 390 .then((WebSocket webSocket) => _controller.add(webSocket))
374 .catchError(_controller.addError); 391 .catchError(_controller.addError);
375 }, onDone: () { 392 }, onDone: () {
376 _controller.close(); 393 _controller.close();
377 }); 394 });
378 395
379 return _controller.stream; 396 return _controller.stream;
380 } 397 }
381 398
382 static Future<WebSocket> _upgrade(HttpRequest request, _protocolSelector) { 399 static Future<WebSocket> _upgrade(
400 HttpRequest request, _protocolSelector, CompressionOptions compression) {
383 var response = request.response; 401 var response = request.response;
384 if (!_isUpgradeRequest(request)) { 402 if (!_isUpgradeRequest(request)) {
385 // Send error response. 403 // Send error response.
386 response 404 response
387 ..statusCode = HttpStatus.BAD_REQUEST 405 ..statusCode = HttpStatus.BAD_REQUEST
388 ..close(); 406 ..close();
389 return new Future.error( 407 return new Future.error(
390 new WebSocketException("Invalid WebSocket upgrade request")); 408 new WebSocketException("Invalid WebSocket upgrade request"));
391 } 409 }
392 410
393 Future upgrade(String protocol) { 411 Future upgrade(String protocol) {
394 // Send the upgrade response. 412 // Send the upgrade response.
395 response 413 response
396 ..statusCode = HttpStatus.SWITCHING_PROTOCOLS 414 ..statusCode = HttpStatus.SWITCHING_PROTOCOLS
397 ..headers.add(HttpHeaders.CONNECTION, "Upgrade") 415 ..headers.add(HttpHeaders.CONNECTION, "Upgrade")
398 ..headers.add(HttpHeaders.UPGRADE, "websocket"); 416 ..headers.add(HttpHeaders.UPGRADE, "websocket");
399 String key = request.headers.value("Sec-WebSocket-Key"); 417 String key = request.headers.value("Sec-WebSocket-Key");
400 _SHA1 sha1 = new _SHA1(); 418 _SHA1 sha1 = new _SHA1();
401 sha1.add("$key$_webSocketGUID".codeUnits); 419 sha1.add("$key$_webSocketGUID".codeUnits);
402 String accept = _CryptoUtils.bytesToBase64(sha1.close()); 420 String accept = _CryptoUtils.bytesToBase64(sha1.close());
403 response.headers.add("Sec-WebSocket-Accept", accept); 421 response.headers.add("Sec-WebSocket-Accept", accept);
404 if (protocol != null) { 422 if (protocol != null) {
405 response.headers.add("Sec-WebSocket-Protocol", protocol); 423 response.headers.add("Sec-WebSocket-Protocol", protocol);
406 } 424 }
425
426 var deflate = _negotiateCompression(request, response, compression);
427
407 response.headers.contentLength = 0; 428 response.headers.contentLength = 0;
408 return response.detachSocket() 429 return response.detachSocket().then((socket) =>
409 .then((socket) => new _WebSocketImpl._fromSocket( 430 new _WebSocketImpl._fromSocket(
410 socket, protocol, true)); 431 socket, protocol, compression, true, deflate));
411 } 432 }
412 433
413 var protocols = request.headers['Sec-WebSocket-Protocol']; 434 var protocols = request.headers['Sec-WebSocket-Protocol'];
414 if (protocols != null && _protocolSelector != null) { 435 if (protocols != null && _protocolSelector != null) {
415 // The suggested protocols can be spread over multiple lines, each 436 // The suggested protocols can be spread over multiple lines, each
416 // consisting of multiple protocols. To unify all of them, first join 437 // consisting of multiple protocols. To unify all of them, first join
417 // the lists with ', ' and then tokenize. 438 // the lists with ', ' and then tokenize.
418 protocols = _HttpParser._tokenizeFieldValue(protocols.join(', ')); 439 protocols = _HttpParser._tokenizeFieldValue(protocols.join(', '));
419 return new Future(() => _protocolSelector(protocols)) 440 return new Future(() => _protocolSelector(protocols)).then((protocol) {
420 .then((protocol) { 441 if (protocols.indexOf(protocol) < 0) {
421 if (protocols.indexOf(protocol) < 0) { 442 throw new WebSocketException(
422 throw new WebSocketException( 443 "Selected protocol is not in the list of available protocols");
423 "Selected protocol is not in the list of available protocols"); 444 }
424 } 445 return protocol;
425 return protocol; 446 }).catchError((error) {
426 }) 447 response
427 .catchError((error) { 448 ..statusCode = HttpStatus.INTERNAL_SERVER_ERROR
428 response 449 ..close();
429 ..statusCode = HttpStatus.INTERNAL_SERVER_ERROR 450 throw error;
430 ..close(); 451 }).then(upgrade);
431 throw error;
432 })
433 .then(upgrade);
434 } else { 452 } else {
435 return upgrade(null); 453 return upgrade(null);
436 } 454 }
437 } 455 }
438 456
457 static _WebSocketPerMessageDeflate _negotiateCompression(HttpRequest request,
458 HttpResponse response, CompressionOptions compression) {
459 var extensionHeader = request.headers.value("Sec-WebSocket-Extensions");
460
461 if (extensionHeader == null) {
462 extensionHeader = "";
463 }
464
465 var hv = HeaderValue.parse(extensionHeader, valueSeparator: ',');
466 if (compression.enabled && hv.value == _WebSocketImpl.PER_MESSAGE_DEFLATE) {
467 var info = compression._createHeader(hv);
468
469 response.headers.add("Sec-WebSocket-Extensions", info[0]);
470 var serverNoContextTakeover =
471 hv.parameters.containsKey(_serverNoContextTakeover);
472 var clientNoContextTakeover =
473 hv.parameters.containsKey(_clientNoContextTakeover);
474 var deflate = new _WebSocketPerMessageDeflate(
475 serverNoContextTakeover: serverNoContextTakeover,
476 clientNoContextTakeover: clientNoContextTakeover,
477 serverMaxWindowBits: info[1],
478 clientMaxWindowBits: info[1],
479 serverSide: true);
480
481 return deflate;
482 }
483
484 return null;
485 }
486
439 static bool _isUpgradeRequest(HttpRequest request) { 487 static bool _isUpgradeRequest(HttpRequest request) {
440 if (request.method != "GET") { 488 if (request.method != "GET") {
441 return false; 489 return false;
442 } 490 }
443 if (request.headers[HttpHeaders.CONNECTION] == null) { 491 if (request.headers[HttpHeaders.CONNECTION] == null) {
444 return false; 492 return false;
445 } 493 }
446 bool isUpgrade = false; 494 bool isUpgrade = false;
447 request.headers[HttpHeaders.CONNECTION].forEach((String value) { 495 request.headers[HttpHeaders.CONNECTION].forEach((String value) {
448 if (value.toLowerCase() == "upgrade") isUpgrade = true; 496 if (value.toLowerCase() == "upgrade") isUpgrade = true;
449 }); 497 });
450 if (!isUpgrade) return false; 498 if (!isUpgrade) return false;
451 String upgrade = request.headers.value(HttpHeaders.UPGRADE); 499 String upgrade = request.headers.value(HttpHeaders.UPGRADE);
452 if (upgrade == null || upgrade.toLowerCase() != "websocket") { 500 if (upgrade == null || upgrade.toLowerCase() != "websocket") {
453 return false; 501 return false;
454 } 502 }
455 String version = request.headers.value("Sec-WebSocket-Version"); 503 String version = request.headers.value("Sec-WebSocket-Version");
456 if (version == null || version != "13") { 504 if (version == null || version != "13") {
457 return false; 505 return false;
458 } 506 }
459 String key = request.headers.value("Sec-WebSocket-Key"); 507 String key = request.headers.value("Sec-WebSocket-Key");
460 if (key == null) { 508 if (key == null) {
461 return false; 509 return false;
462 } 510 }
463 return true; 511 return true;
464 } 512 }
465 } 513 }
466 514
515 class _WebSocketPerMessageDeflate {
516 bool serverNoContextTakeover;
517 bool clientNoContextTakeover;
518 int clientMaxWindowBits;
519 int serverMaxWindowBits;
520 bool serverSide;
521
522 _Filter decoder;
523 _Filter encoder;
524
525 _WebSocketPerMessageDeflate(
526 {this.clientMaxWindowBits: _WebSocketImpl.DEFAULT_WINDOW_BITS,
527 this.serverMaxWindowBits: _WebSocketImpl.DEFAULT_WINDOW_BITS,
528 this.serverNoContextTakeover: false,
529 this.clientNoContextTakeover: false,
530 this.serverSide: false});
531
532 void _ensureDecoder() {
533 if (decoder == null) {
534 decoder = _Filter._newZLibInflateFilter(
535 serverSide ? clientMaxWindowBits : serverMaxWindowBits, null, true);
536 }
537 }
538
539 void _ensureEncoder() {
540 if (encoder == null) {
541 encoder = _Filter._newZLibDeflateFilter(
542 false,
543 ZLibOption.DEFAULT_LEVEL,
544 serverSide ? serverMaxWindowBits : clientMaxWindowBits,
545 ZLibOption.DEFAULT_MEM_LEVEL,
546 ZLibOption.STRATEGY_DEFAULT,
547 null,
548 true);
549 }
550 }
551
552 Uint8List processIncomingMessage(List<int> msg) {
553 _ensureDecoder();
554
555 var data = [];
556 data.addAll(msg);
557 data.addAll(const [0x00, 0x00, 0xff, 0xff]);
558
559 decoder.process(data, 0, data.length);
560 var reuse =
561 !(serverSide ? clientNoContextTakeover : serverNoContextTakeover);
562 var result = [];
563 var out;
564
565 while ((out = decoder.processed(flush: reuse)) != null) {
566 result.addAll(out);
567 }
568
569 decoder.processed(flush: reuse);
570
571 if (!reuse) {
572 decoder.end();
573 decoder = null;
574 }
575 return new Uint8List.fromList(result);
576 }
577
578 List<int> processOutgoingMessage(List<int> msg) {
579 _ensureEncoder();
580 var reuse =
581 !(serverSide ? serverNoContextTakeover : clientNoContextTakeover);
582 var result = [];
583 Uint8List buffer;
584 var out;
585
586 if (msg is! Uint8List) {
587 for (var i = 0; i < msg.length; i++) {
588 if (msg[i] < 0 || 255 < msg[i]) {
589 throw new ArgumentError("List element is not a byte value "
590 "(value ${msg[i]} at index $i)");
591 }
592 }
593 buffer = new Uint8List.fromList(msg);
594 } else {
595 buffer = msg;
596 }
597
598 encoder.process(buffer, 0, buffer.length);
599
600 while ((out = encoder.processed(flush: reuse)) != null) {
601 result.addAll(out);
602 }
603
604 if (serverSide ? serverNoContextTakeover : clientNoContextTakeover) {
605 encoder.end();
606 encoder = null;
607 }
608
609 if (result.length > 4) {
610 result = result.sublist(0, result.length - 4);
611 }
612
613 return result;
614 }
615 }
467 616
468 // TODO(ajohnsen): Make this transformer reusable. 617 // TODO(ajohnsen): Make this transformer reusable.
469 class _WebSocketOutgoingTransformer implements StreamTransformer, EventSink { 618 class _WebSocketOutgoingTransformer implements StreamTransformer, EventSink {
470 final _WebSocketImpl webSocket; 619 final _WebSocketImpl webSocket;
471 EventSink _eventSink; 620 EventSink _eventSink;
472 621
473 _WebSocketOutgoingTransformer(this.webSocket); 622 _WebSocketPerMessageDeflate _deflateHelper;
623
624 _WebSocketOutgoingTransformer(this.webSocket) {
625 _deflateHelper = webSocket._deflate;
626 }
474 627
475 Stream bind(Stream stream) { 628 Stream bind(Stream stream) {
476 return new Stream.eventTransformed( 629 return new Stream.eventTransformed(stream, (EventSink eventSink) {
477 stream, 630 if (_eventSink != null) {
478 (EventSink eventSink) { 631 throw new StateError("WebSocket transformer already used");
479 if (_eventSink != null) { 632 }
480 throw new StateError("WebSocket transformer already used"); 633 _eventSink = eventSink;
481 } 634 return this;
482 _eventSink = eventSink; 635 });
483 return this;
484 });
485 } 636 }
486 637
487 void add(message) { 638 void add(message) {
488 if (message is _WebSocketPong) { 639 if (message is _WebSocketPong) {
489 addFrame(_WebSocketOpcode.PONG, message.payload); 640 addFrame(_WebSocketOpcode.PONG, message.payload);
490 return; 641 return;
491 } 642 }
492 if (message is _WebSocketPing) { 643 if (message is _WebSocketPing) {
493 addFrame(_WebSocketOpcode.PING, message.payload); 644 addFrame(_WebSocketOpcode.PING, message.payload);
494 return; 645 return;
495 } 646 }
496 List<int> data; 647 List<int> data;
497 int opcode; 648 int opcode;
498 if (message != null) { 649 if (message != null) {
499 if (message is String) { 650 if (message is String) {
500 opcode = _WebSocketOpcode.TEXT; 651 opcode = _WebSocketOpcode.TEXT;
501 data = UTF8.encode(message); 652 data = UTF8.encode(message);
502 } else { 653 } else {
503 if (message is !List<int>) { 654 if (message is! List<int>) {
504 throw new ArgumentError(message); 655 throw new ArgumentError(message);
505 } 656 }
506 opcode = _WebSocketOpcode.BINARY; 657 opcode = _WebSocketOpcode.BINARY;
507 data = message; 658 data = message;
508 } 659 }
660
661 if (_deflateHelper != null) {
662 data = _deflateHelper.processOutgoingMessage(data);
663 }
509 } else { 664 } else {
510 opcode = _WebSocketOpcode.TEXT; 665 opcode = _WebSocketOpcode.TEXT;
511 } 666 }
512 addFrame(opcode, data); 667 addFrame(opcode, data);
513 } 668 }
514 669
515 void addError(Object error, [StackTrace stackTrace]) => 670 void addError(Object error, [StackTrace stackTrace]) =>
516 _eventSink.addError(error, stackTrace); 671 _eventSink.addError(error, stackTrace);
517 672
518 void close() { 673 void close() {
519 int code = webSocket._outCloseCode; 674 int code = webSocket._outCloseCode;
520 String reason = webSocket._outCloseReason; 675 String reason = webSocket._outCloseReason;
521 List<int> data; 676 List<int> data;
522 if (code != null) { 677 if (code != null) {
523 data = new List<int>(); 678 data = new List<int>();
524 data.add((code >> 8) & 0xFF); 679 data.add((code >> 8) & 0xFF);
525 data.add(code & 0xFF); 680 data.add(code & 0xFF);
526 if (reason != null) { 681 if (reason != null) {
527 data.addAll(UTF8.encode(reason)); 682 data.addAll(UTF8.encode(reason));
528 } 683 }
529 } 684 }
530 addFrame(_WebSocketOpcode.CLOSE, data); 685 addFrame(_WebSocketOpcode.CLOSE, data);
531 _eventSink.close(); 686 _eventSink.close();
532 } 687 }
533 688
534 void addFrame(int opcode, List<int> data) => 689 void addFrame(int opcode, List<int> data) => createFrame(
535 createFrame(opcode, data, webSocket._serverSide).forEach(_eventSink.add); 690 opcode,
691 data,
692 webSocket._serverSide,
693 _deflateHelper != null &&
694 (opcode == _WebSocketOpcode.TEXT ||
695 opcode == _WebSocketOpcode.BINARY)).forEach((e) {
696 _eventSink.add(e);
697 });
536 698
537 static Iterable createFrame(int opcode, List<int> data, bool serverSide) { 699 static Iterable createFrame(
538 bool mask = !serverSide; // Masking not implemented for server. 700 int opcode, List<int> data, bool serverSide, bool compressed) {
701 bool mask = !serverSide; // Masking not implemented for server.
539 int dataLength = data == null ? 0 : data.length; 702 int dataLength = data == null ? 0 : data.length;
540 // Determine the header size. 703 // Determine the header size.
541 int headerSize = (mask) ? 6 : 2; 704 int headerSize = (mask) ? 6 : 2;
542 if (dataLength > 65535) { 705 if (dataLength > 65535) {
543 headerSize += 8; 706 headerSize += 8;
544 } else if (dataLength > 125) { 707 } else if (dataLength > 125) {
545 headerSize += 2; 708 headerSize += 2;
546 } 709 }
547 Uint8List header = new Uint8List(headerSize); 710 Uint8List header = new Uint8List(headerSize);
548 int index = 0; 711 int index = 0;
712
549 // Set FIN and opcode. 713 // Set FIN and opcode.
550 header[index++] = 0x80 | opcode; 714 var hoc = _WebSocketProtocolTransformer.FIN
715 | (compressed ? _WebSocketProtocolTransformer.RSV1 : 0)
716 | (opcode & _WebSocketProtocolTransformer.OPCODE);
717
718 header[index++] = hoc;
551 // Determine size and position of length field. 719 // Determine size and position of length field.
552 int lengthBytes = 1; 720 int lengthBytes = 1;
553 int firstLengthByte = 1;
554 if (dataLength > 65535) { 721 if (dataLength > 65535) {
555 header[index++] = 127; 722 header[index++] = 127;
556 lengthBytes = 8; 723 lengthBytes = 8;
557 } else if (dataLength > 125) { 724 } else if (dataLength > 125) {
558 header[index++] = 126; 725 header[index++] = 126;
559 lengthBytes = 2; 726 lengthBytes = 2;
560 } 727 }
561 // Write the length in network byte order into the header. 728 // Write the length in network byte order into the header.
562 for (int i = 0; i < lengthBytes; i++) { 729 for (int i = 0; i < lengthBytes; i++) {
563 header[index++] = dataLength >> (((lengthBytes - 1) - i) * 8) & 0xFF; 730 header[index++] = dataLength >> (((lengthBytes - 1) - i) * 8) & 0xFF;
564 } 731 }
565 if (mask) { 732 if (mask) {
566 header[1] |= 1 << 7; 733 header[1] |= 1 << 7;
567 var maskBytes = _IOCrypto.getRandomBytes(4); 734 var maskBytes = _IOCrypto.getRandomBytes(4);
568 header.setRange(index, index + 4, maskBytes); 735 header.setRange(index, index + 4, maskBytes);
569 index += 4; 736 index += 4;
570 if (data != null) { 737 if (data != null) {
571 Uint8List list; 738 Uint8List list;
572 // If this is a text message just do the masking inside the 739 // If this is a text message just do the masking inside the
573 // encoded data. 740 // encoded data.
574 if (opcode == _WebSocketOpcode.TEXT && data is Uint8List) { 741 if (opcode == _WebSocketOpcode.TEXT && data is Uint8List) {
575 list = data; 742 list = data;
576 } else { 743 } else {
577 if (data is Uint8List) { 744 if (data is Uint8List) {
578 list = new Uint8List.fromList(data); 745 list = new Uint8List.fromList(data);
579 } else { 746 } else {
580 list = new Uint8List(data.length); 747 list = new Uint8List(data.length);
581 for (int i = 0; i < data.length; i++) { 748 for (int i = 0; i < data.length; i++) {
582 if (data[i] < 0 || 255 < data[i]) { 749 if (data[i] < 0 || 255 < data[i]) {
583 throw new ArgumentError( 750 throw new ArgumentError("List element is not a byte value "
584 "List element is not a byte value "
585 "(value ${data[i]} at index $i)"); 751 "(value ${data[i]} at index $i)");
586 } 752 }
587 list[i] = data[i]; 753 list[i] = data[i];
588 } 754 }
589 } 755 }
590 } 756 }
591 const int BLOCK_SIZE = 16; 757 const int BLOCK_SIZE = 16;
592 int blockCount = list.length ~/ BLOCK_SIZE; 758 int blockCount = list.length ~/ BLOCK_SIZE;
593 if (blockCount > 0) { 759 if (blockCount > 0) {
594 // Create mask block. 760 // Create mask block.
595 int mask = 0; 761 int mask = 0;
596 for (int i = 3; i >= 0; i--) { 762 for (int i = 3; i >= 0; i--) {
597 mask = (mask << 8) | maskBytes[i]; 763 mask = (mask << 8) | maskBytes[i];
598 } 764 }
599 Int32x4 blockMask = new Int32x4(mask, mask, mask, mask); 765 Int32x4 blockMask = new Int32x4(mask, mask, mask, mask);
600 Int32x4List blockBuffer = new Int32x4List.view( 766 Int32x4List blockBuffer =
601 list.buffer, 0, blockCount); 767 new Int32x4List.view(list.buffer, 0, blockCount);
602 for (int i = 0; i < blockBuffer.length; i++) { 768 for (int i = 0; i < blockBuffer.length; i++) {
603 blockBuffer[i] ^= blockMask; 769 blockBuffer[i] ^= blockMask;
604 } 770 }
605 } 771 }
606 // Handle end. 772 // Handle end.
607 for (int i = blockCount * BLOCK_SIZE; i < list.length; i++) { 773 for (int i = blockCount * BLOCK_SIZE; i < list.length; i++) {
608 list[i] ^= maskBytes[i & 3]; 774 list[i] ^= maskBytes[i & 3];
609 } 775 }
610 data = list; 776 data = list;
611 } 777 }
612 } 778 }
613 assert(index == headerSize); 779 assert(index == headerSize);
614 if (data == null) { 780 if (data == null) {
615 return [header]; 781 return [header];
616 } else { 782 } else {
617 return [header, data]; 783 return [header, data];
618 } 784 }
619 } 785 }
620 } 786 }
621 787
622
623 class _WebSocketConsumer implements StreamConsumer { 788 class _WebSocketConsumer implements StreamConsumer {
624 final _WebSocketImpl webSocket; 789 final _WebSocketImpl webSocket;
625 final Socket socket; 790 final Socket socket;
626 StreamController _controller; 791 StreamController _controller;
627 StreamSubscription _subscription; 792 StreamSubscription _subscription;
628 bool _issuedPause = false; 793 bool _issuedPause = false;
629 bool _closed = false; 794 bool _closed = false;
630 Completer _closeCompleter = new Completer(); 795 Completer _closeCompleter = new Completer();
631 Completer _completer; 796 Completer _completer;
632 797
(...skipping 24 matching lines...) Expand all
657 void _cancel() { 822 void _cancel() {
658 if (_subscription != null) { 823 if (_subscription != null) {
659 var subscription = _subscription; 824 var subscription = _subscription;
660 _subscription = null; 825 _subscription = null;
661 subscription.cancel(); 826 subscription.cancel();
662 } 827 }
663 } 828 }
664 829
665 _ensureController() { 830 _ensureController() {
666 if (_controller != null) return; 831 if (_controller != null) return;
667 _controller = new StreamController(sync: true, 832 _controller = new StreamController(
668 onPause: _onPause, 833 sync: true,
669 onResume: _onResume, 834 onPause: _onPause,
670 onCancel: _onListen); 835 onResume: _onResume,
671 var stream = _controller.stream.transform( 836 onCancel: _onListen);
672 new _WebSocketOutgoingTransformer(webSocket)); 837 var stream = _controller.stream
673 socket.addStream(stream) 838 .transform(new _WebSocketOutgoingTransformer(webSocket));
674 .then((_) { 839 socket.addStream(stream).then((_) {
675 _done(); 840 _done();
676 _closeCompleter.complete(webSocket); 841 _closeCompleter.complete(webSocket);
677 }, onError: (error, StackTrace stackTrace) { 842 }, onError: (error, StackTrace stackTrace) {
678 _closed = true; 843 _closed = true;
679 _cancel(); 844 _cancel();
680 if (error is ArgumentError) { 845 if (error is ArgumentError) {
681 if (!_done(error, stackTrace)) { 846 if (!_done(error, stackTrace)) {
682 _closeCompleter.completeError(error, stackTrace); 847 _closeCompleter.completeError(error, stackTrace);
683 } 848 }
684 } else { 849 } else {
685 _done(); 850 _done();
686 _closeCompleter.complete(webSocket); 851 _closeCompleter.complete(webSocket);
687 } 852 }
688 }); 853 });
689 } 854 }
690 855
691 bool _done([error, StackTrace stackTrace]) { 856 bool _done([error, StackTrace stackTrace]) {
692 if (_completer == null) return false; 857 if (_completer == null) return false;
693 if (error != null) { 858 if (error != null) {
694 _completer.completeError(error, stackTrace); 859 _completer.completeError(error, stackTrace);
695 } else { 860 } else {
696 _completer.complete(webSocket); 861 _completer.complete(webSocket);
697 } 862 }
698 _completer = null; 863 _completer = null;
699 return true; 864 return true;
700 } 865 }
701 866
702 Future addStream(var stream) { 867 Future addStream(var stream) {
703 if (_closed) { 868 if (_closed) {
704 stream.listen(null).cancel(); 869 stream.listen(null).cancel();
705 return new Future.value(webSocket); 870 return new Future.value(webSocket);
706 } 871 }
707 _ensureController(); 872 _ensureController();
708 _completer = new Completer(); 873 _completer = new Completer();
709 _subscription = stream.listen( 874 _subscription = stream.listen((data) {
710 (data) { 875 _controller.add(data);
711 _controller.add(data); 876 }, onDone: _done, onError: _done, cancelOnError: true);
712 },
713 onDone: _done,
714 onError: _done,
715 cancelOnError: true);
716 if (_issuedPause) { 877 if (_issuedPause) {
717 _subscription.pause(); 878 _subscription.pause();
718 _issuedPause = false; 879 _issuedPause = false;
719 } 880 }
720 return _completer.future; 881 return _completer.future;
721 } 882 }
722 883
723 Future close() { 884 Future close() {
724 _ensureController(); 885 _ensureController();
725 Future closeSocket() { 886 Future closeSocket() {
726 return socket.close().catchError((_) {}).then((_) => webSocket); 887 return socket.close().catchError((_) {}).then((_) => webSocket);
727 } 888 }
728 _controller.close(); 889 _controller.close();
729 return _closeCompleter.future.then((_) => closeSocket()); 890 return _closeCompleter.future.then((_) => closeSocket());
730 } 891 }
731 892
732 void add(data) { 893 void add(data) {
733 if (_closed) return; 894 if (_closed) return;
734 _ensureController(); 895 _ensureController();
735 _controller.add(data); 896 _controller.add(data);
736 } 897 }
737 898
738 void closeSocket() { 899 void closeSocket() {
739 _closed = true; 900 _closed = true;
740 _cancel(); 901 _cancel();
741 close(); 902 close();
742 } 903 }
743 } 904 }
744 905
745
746 class _WebSocketImpl extends Stream with _ServiceObject implements WebSocket { 906 class _WebSocketImpl extends Stream with _ServiceObject implements WebSocket {
747 // Use default Map so we keep order. 907 // Use default Map so we keep order.
748 static Map<int, _WebSocketImpl> _webSockets = new Map<int, _WebSocketImpl>(); 908 static Map<int, _WebSocketImpl> _webSockets = new Map<int, _WebSocketImpl>();
909 static const int DEFAULT_WINDOW_BITS = 15;
910 static const String PER_MESSAGE_DEFLATE = "permessage-deflate";
749 911
750 final String protocol; 912 final String protocol;
751 913
752 StreamController _controller; 914 StreamController _controller;
753 StreamSubscription _subscription; 915 StreamSubscription _subscription;
754 StreamSink _sink; 916 StreamSink _sink;
755 917
756 final _socket; 918 final _socket;
757 final bool _serverSide; 919 final bool _serverSide;
758 int _readyState = WebSocket.CONNECTING; 920 int _readyState = WebSocket.CONNECTING;
759 bool _writeClosed = false; 921 bool _writeClosed = false;
760 int _closeCode; 922 int _closeCode;
761 String _closeReason; 923 String _closeReason;
762 Duration _pingInterval; 924 Duration _pingInterval;
763 Timer _pingTimer; 925 Timer _pingTimer;
764 _WebSocketConsumer _consumer; 926 _WebSocketConsumer _consumer;
765 927
766 int _outCloseCode; 928 int _outCloseCode;
767 String _outCloseReason; 929 String _outCloseReason;
768 Timer _closeTimer; 930 Timer _closeTimer;
931 _WebSocketPerMessageDeflate _deflate;
769 932
770 static final HttpClient _httpClient = new HttpClient(); 933 static final HttpClient _httpClient = new HttpClient();
771 934
772 static Future<WebSocket> connect( 935 static Future<WebSocket> connect(
773 String url, Iterable<String> protocols, Map<String, dynamic> headers) { 936 String url, Iterable<String> protocols, Map<String, dynamic> headers,
937 {CompressionOptions compression: CompressionOptions.DEFAULT}) {
774 Uri uri = Uri.parse(url); 938 Uri uri = Uri.parse(url);
775 if (uri.scheme != "ws" && uri.scheme != "wss") { 939 if (uri.scheme != "ws" && uri.scheme != "wss") {
776 throw new WebSocketException("Unsupported URL scheme '${uri.scheme}'"); 940 throw new WebSocketException("Unsupported URL scheme '${uri.scheme}'");
777 } 941 }
778 942
779 Random random = new Random(); 943 Random random = new Random();
780 // Generate 16 random bytes. 944 // Generate 16 random bytes.
781 Uint8List nonceData = new Uint8List(16); 945 Uint8List nonceData = new Uint8List(16);
782 for (int i = 0; i < 16; i++) { 946 for (int i = 0; i < 16; i++) {
783 nonceData[i] = random.nextInt(256); 947 nonceData[i] = random.nextInt(256);
784 } 948 }
785 String nonce = _CryptoUtils.bytesToBase64(nonceData); 949 String nonce = _CryptoUtils.bytesToBase64(nonceData);
786 950
787 uri = new Uri(scheme: uri.scheme == "wss" ? "https" : "http", 951 uri = new Uri(
788 userInfo: uri.userInfo, 952 scheme: uri.scheme == "wss" ? "https" : "http",
789 host: uri.host, 953 userInfo: uri.userInfo,
790 port: uri.port, 954 host: uri.host,
791 path: uri.path, 955 port: uri.port,
792 query: uri.query, 956 path: uri.path,
793 fragment: uri.fragment); 957 query: uri.query,
794 return _httpClient.openUrl("GET", uri) 958 fragment: uri.fragment);
795 .then((request) { 959 return _httpClient.openUrl("GET", uri).then((request) {
796 if (uri.userInfo != null && !uri.userInfo.isEmpty) { 960 if (uri.userInfo != null && !uri.userInfo.isEmpty) {
797 // If the URL contains user information use that for basic 961 // If the URL contains user information use that for basic
798 // authorization. 962 // authorization.
799 String auth = 963 String auth = _CryptoUtils.bytesToBase64(UTF8.encode(uri.userInfo));
800 _CryptoUtils.bytesToBase64(UTF8.encode(uri.userInfo)); 964 request.headers.set(HttpHeaders.AUTHORIZATION, "Basic $auth");
801 request.headers.set(HttpHeaders.AUTHORIZATION, "Basic $auth"); 965 }
966 if (headers != null) {
967 headers.forEach((field, value) => request.headers.add(field, value));
968 }
969 // Setup the initial handshake.
970 request.headers
971 ..set(HttpHeaders.CONNECTION, "Upgrade")
972 ..set(HttpHeaders.UPGRADE, "websocket")
973 ..set("Sec-WebSocket-Key", nonce)
974 ..set("Cache-Control", "no-cache")
975 ..set("Sec-WebSocket-Version", "13");
976 if (protocols != null) {
977 request.headers.add("Sec-WebSocket-Protocol", protocols.toList());
978 }
979
980 if (compression.enabled) {
981 request.headers
982 .add("Sec-WebSocket-Extensions", compression._createHeader());
983 }
984
985 return request.close();
986 }).then((response) {
987 void error(String message) {
988 // Flush data.
989 response.detachSocket().then((socket) {
990 socket.destroy();
991 });
992 throw new WebSocketException(message);
993 }
994 if (response.statusCode != HttpStatus.SWITCHING_PROTOCOLS ||
995 response.headers[HttpHeaders.CONNECTION] == null ||
996 !response.headers[HttpHeaders.CONNECTION]
997 .any((value) => value.toLowerCase() == "upgrade") ||
998 response.headers.value(HttpHeaders.UPGRADE).toLowerCase() !=
999 "websocket") {
1000 error("Connection to '$uri' was not upgraded to websocket");
1001 }
1002 String accept = response.headers.value("Sec-WebSocket-Accept");
1003 if (accept == null) {
1004 error("Response did not contain a 'Sec-WebSocket-Accept' header");
1005 }
1006 _SHA1 sha1 = new _SHA1();
1007 sha1.add("$nonce$_webSocketGUID".codeUnits);
1008 List<int> expectedAccept = sha1.close();
1009 List<int> receivedAccept = _CryptoUtils.base64StringToBytes(accept);
1010 if (expectedAccept.length != receivedAccept.length) {
1011 error("Reasponse header 'Sec-WebSocket-Accept' is the wrong length");
1012 }
1013 for (int i = 0; i < expectedAccept.length; i++) {
1014 if (expectedAccept[i] != receivedAccept[i]) {
1015 error("Bad response 'Sec-WebSocket-Accept' header");
802 } 1016 }
803 if (headers != null) { 1017 }
804 headers.forEach((field, value) => request.headers.add(field, value)); 1018 var protocol = response.headers.value('Sec-WebSocket-Protocol');
805 } 1019
806 // Setup the initial handshake. 1020 _WebSocketPerMessageDeflate deflate =
807 request.headers 1021 negotiateClientCompression(response, compression);
808 ..set(HttpHeaders.CONNECTION, "Upgrade") 1022
809 ..set(HttpHeaders.UPGRADE, "websocket") 1023 return response.detachSocket().then((socket) =>
810 ..set("Sec-WebSocket-Key", nonce) 1024 new _WebSocketImpl._fromSocket(
811 ..set("Cache-Control", "no-cache") 1025 socket, protocol, compression, false, deflate));
812 ..set("Sec-WebSocket-Version", "13"); 1026 });
813 if (protocols != null) {
814 request.headers.add("Sec-WebSocket-Protocol", protocols.toList());
815 }
816 return request.close();
817 })
818 .then((response) {
819 void error(String message) {
820 // Flush data.
821 response.detachSocket().then((socket) {
822 socket.destroy();
823 });
824 throw new WebSocketException(message);
825 }
826 if (response.statusCode != HttpStatus.SWITCHING_PROTOCOLS ||
827 response.headers[HttpHeaders.CONNECTION] == null ||
828 !response.headers[HttpHeaders.CONNECTION].any(
829 (value) => value.toLowerCase() == "upgrade") ||
830 response.headers.value(HttpHeaders.UPGRADE).toLowerCase() !=
831 "websocket") {
832 error("Connection to '$uri' was not upgraded to websocket");
833 }
834 String accept = response.headers.value("Sec-WebSocket-Accept");
835 if (accept == null) {
836 error("Response did not contain a 'Sec-WebSocket-Accept' header");
837 }
838 _SHA1 sha1 = new _SHA1();
839 sha1.add("$nonce$_webSocketGUID".codeUnits);
840 List<int> expectedAccept = sha1.close();
841 List<int> receivedAccept = _CryptoUtils.base64StringToBytes(accept);
842 if (expectedAccept.length != receivedAccept.length) {
843 error("Reasponse header 'Sec-WebSocket-Accept' is the wrong length");
844 }
845 for (int i = 0; i < expectedAccept.length; i++) {
846 if (expectedAccept[i] != receivedAccept[i]) {
847 error("Bad response 'Sec-WebSocket-Accept' header");
848 }
849 }
850 var protocol = response.headers.value('Sec-WebSocket-Protocol');
851 return response.detachSocket()
852 .then((socket) => new _WebSocketImpl._fromSocket(socket, protocol));
853 });
854 } 1027 }
855 1028
856 _WebSocketImpl._fromSocket(this._socket, this.protocol, 1029 static _WebSocketPerMessageDeflate negotiateClientCompression(
857 [this._serverSide = false]) { 1030 HttpClientResponse response, CompressionOptions compression) {
1031 String extensionHeader = response.headers.value('Sec-WebSocket-Extensions');
1032
1033 if (extensionHeader == null) {
1034 extensionHeader = "";
1035 }
1036
1037 var hv = HeaderValue.parse(extensionHeader, valueSeparator: ',');
1038
1039 if (compression.enabled && hv.value == PER_MESSAGE_DEFLATE) {
1040 var serverNoContextTakeover =
1041 hv.parameters.containsKey(_serverNoContextTakeover);
1042 var clientNoContextTakeover =
1043 hv.parameters.containsKey(_clientNoContextTakeover);
1044
1045 int getWindowBits(String type) {
1046 var o = hv.parameters[type];
1047 if (o == null) {
1048 return DEFAULT_WINDOW_BITS;
1049 }
1050
1051 o = int.parse(o, onError: (s) => DEFAULT_WINDOW_BITS);
1052 return o;
1053 }
1054
1055 return new _WebSocketPerMessageDeflate(
1056 clientMaxWindowBits: getWindowBits(_clientMaxWindowBits),
1057 serverMaxWindowBits: getWindowBits(_serverMaxWindowBits),
1058 clientNoContextTakeover: clientNoContextTakeover,
1059 serverNoContextTakeover: serverNoContextTakeover);
1060 }
1061
1062 return null;
1063 }
1064
1065 _WebSocketImpl._fromSocket(
1066 this._socket, this.protocol, CompressionOptions compression,
1067 [this._serverSide = false, _WebSocketPerMessageDeflate deflate]) {
858 _consumer = new _WebSocketConsumer(this, _socket); 1068 _consumer = new _WebSocketConsumer(this, _socket);
859 _sink = new _StreamSinkImpl(_consumer); 1069 _sink = new _StreamSinkImpl(_consumer);
860 _readyState = WebSocket.OPEN; 1070 _readyState = WebSocket.OPEN;
1071 _deflate = deflate;
861 1072
862 var transformer = new _WebSocketProtocolTransformer(_serverSide); 1073 var transformer = new _WebSocketProtocolTransformer(_serverSide, _deflate);
863 _subscription = _socket.transform(transformer).listen( 1074 _subscription = _socket.transform(transformer).listen((data) {
864 (data) { 1075 if (data is _WebSocketPing) {
865 if (data is _WebSocketPing) { 1076 if (!_writeClosed) _consumer.add(new _WebSocketPong(data.payload));
866 if (!_writeClosed) _consumer.add(new _WebSocketPong(data.payload)); 1077 } else if (data is _WebSocketPong) {
867 } else if (data is _WebSocketPong) { 1078 // Simply set pingInterval, as it'll cancel any timers.
868 // Simply set pingInterval, as it'll cancel any timers. 1079 pingInterval = _pingInterval;
869 pingInterval = _pingInterval; 1080 } else {
870 } else { 1081 _controller.add(data);
871 _controller.add(data); 1082 }
872 } 1083 }, onError: (error, stackTrace) {
873 }, 1084 if (_closeTimer != null) _closeTimer.cancel();
874 onError: (error) { 1085 if (error is FormatException) {
875 if (_closeTimer != null) _closeTimer.cancel(); 1086 _close(WebSocketStatus.INVALID_FRAME_PAYLOAD_DATA);
876 if (error is FormatException) { 1087 } else {
877 _close(WebSocketStatus.INVALID_FRAME_PAYLOAD_DATA); 1088 _close(WebSocketStatus.PROTOCOL_ERROR);
878 } else { 1089 }
879 _close(WebSocketStatus.PROTOCOL_ERROR); 1090 // An error happened, set the close code set above.
880 } 1091 _closeCode = _outCloseCode;
881 // An error happened, set the close code set above. 1092 _closeReason = _outCloseReason;
882 _closeCode = _outCloseCode; 1093 _controller.close();
883 _closeReason = _outCloseReason; 1094 }, onDone: () {
884 _controller.close(); 1095 if (_closeTimer != null) _closeTimer.cancel();
885 }, 1096 if (_readyState == WebSocket.OPEN) {
886 onDone: () { 1097 _readyState = WebSocket.CLOSING;
887 if (_closeTimer != null) _closeTimer.cancel(); 1098 if (!_isReservedStatusCode(transformer.closeCode)) {
888 if (_readyState == WebSocket.OPEN) { 1099 _close(transformer.closeCode, transformer.closeReason);
889 _readyState = WebSocket.CLOSING; 1100 } else {
890 if (!_isReservedStatusCode(transformer.closeCode)) { 1101 _close();
891 _close(transformer.closeCode, transformer.closeReason); 1102 }
892 } else { 1103 _readyState = WebSocket.CLOSED;
893 _close(); 1104 }
894 } 1105 // Protocol close, use close code from transformer.
895 _readyState = WebSocket.CLOSED; 1106 _closeCode = transformer.closeCode;
896 } 1107 _closeReason = transformer.closeReason;
897 // Protocol close, use close code from transformer. 1108 _controller.close();
898 _closeCode = transformer.closeCode; 1109 }, cancelOnError: true);
899 _closeReason = transformer.closeReason;
900 _controller.close();
901 },
902 cancelOnError: true);
903 _subscription.pause(); 1110 _subscription.pause();
904 _controller = new StreamController(sync: true, 1111 _controller = new StreamController(
905 onListen: _subscription.resume, 1112 sync: true, onListen: _subscription.resume, onCancel: () {
906 onCancel: () { 1113 _subscription.cancel();
907 _subscription.cancel(); 1114 _subscription = null;
908 _subscription = null; 1115 }, onPause: _subscription.pause, onResume: _subscription.resume);
909 },
910 onPause: _subscription.pause,
911 onResume: _subscription.resume);
912 1116
913 _webSockets[_serviceId] = this; 1117 _webSockets[_serviceId] = this;
914 try { _socket._owner = this; } catch (_) {} 1118 try {
1119 _socket._owner = this;
1120 } catch (_) {}
915 } 1121 }
916 1122
917 StreamSubscription listen(void onData(message), 1123 StreamSubscription listen(void onData(message),
918 {Function onError, 1124 {Function onError, void onDone(), bool cancelOnError}) {
919 void onDone(),
920 bool cancelOnError}) {
921 return _controller.stream.listen(onData, 1125 return _controller.stream.listen(onData,
922 onError: onError, 1126 onError: onError, onDone: onDone, cancelOnError: cancelOnError);
923 onDone: onDone,
924 cancelOnError: cancelOnError);
925 } 1127 }
926 1128
927 Duration get pingInterval => _pingInterval; 1129 Duration get pingInterval => _pingInterval;
928 1130
929 void set pingInterval(Duration interval) { 1131 void set pingInterval(Duration interval) {
930 if (_writeClosed) return; 1132 if (_writeClosed) return;
931 if (_pingTimer != null) _pingTimer.cancel(); 1133 if (_pingTimer != null) _pingTimer.cancel();
932 _pingInterval = interval; 1134 _pingInterval = interval;
933 1135
934 if (_pingInterval == null) return; 1136 if (_pingInterval == null) return;
(...skipping 85 matching lines...) Expand 10 before | Expand all | Expand 10 after
1020 'type': '@Socket', 1222 'type': '@Socket',
1021 'name': 'UserSocket', 1223 'name': 'UserSocket',
1022 'user_name': 'UserSocket', 1224 'user_name': 'UserSocket',
1023 }; 1225 };
1024 } 1226 }
1025 return r; 1227 return r;
1026 } 1228 }
1027 1229
1028 static bool _isReservedStatusCode(int code) { 1230 static bool _isReservedStatusCode(int code) {
1029 return code != null && 1231 return code != null &&
1030 (code < WebSocketStatus.NORMAL_CLOSURE || 1232 (code < WebSocketStatus.NORMAL_CLOSURE ||
1031 code == WebSocketStatus.RESERVED_1004 || 1233 code == WebSocketStatus.RESERVED_1004 ||
1032 code == WebSocketStatus.NO_STATUS_RECEIVED || 1234 code == WebSocketStatus.NO_STATUS_RECEIVED ||
1033 code == WebSocketStatus.ABNORMAL_CLOSURE || 1235 code == WebSocketStatus.ABNORMAL_CLOSURE ||
1034 (code > WebSocketStatus.INTERNAL_SERVER_ERROR && 1236 (code > WebSocketStatus.INTERNAL_SERVER_ERROR &&
1035 code < WebSocketStatus.RESERVED_1015) || 1237 code < WebSocketStatus.RESERVED_1015) ||
1036 (code >= WebSocketStatus.RESERVED_1015 && 1238 (code >= WebSocketStatus.RESERVED_1015 && code < 3000));
1037 code < 3000));
1038 } 1239 }
1039 } 1240 }
OLDNEW
« no previous file with comments | « sdk/lib/io/websocket.dart ('k') | tests/standalone/io/web_socket_compression_test.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698