Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1)

Side by Side Diff: sdk/lib/io/websocket_impl.dart

Issue 1390353005: Web Socket compression - take two (Closed) Base URL: https://github.com/dart-lang/sdk.git@master
Patch Set: Add custom compression header test Created 5 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 part of dart.io; 5 part of dart.io;
6 6
7 const String _webSocketGUID = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"; 7 const String _webSocketGUID = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
8 const String _clientNoContextTakeover = "client_no_context_takeover";
9 const String _serverNoContextTakeover = "server_no_context_takeover";
10 const String _clientMaxWindowBits = "client_max_window_bits";
11 const String _serverMaxWindowBits = "server_max_window_bits";
8 12
9 // Matches _WebSocketOpcode. 13 // Matches _WebSocketOpcode.
10 class _WebSocketMessageType { 14 class _WebSocketMessageType {
11 static const int NONE = 0; 15 static const int NONE = 0;
12 static const int TEXT = 1; 16 static const int TEXT = 1;
13 static const int BINARY = 2; 17 static const int BINARY = 2;
14 } 18 }
15 19
16
17 class _WebSocketOpcode { 20 class _WebSocketOpcode {
18 static const int CONTINUATION = 0; 21 static const int CONTINUATION = 0;
19 static const int TEXT = 1; 22 static const int TEXT = 1;
20 static const int BINARY = 2; 23 static const int BINARY = 2;
21 static const int RESERVED_3 = 3; 24 static const int RESERVED_3 = 3;
22 static const int RESERVED_4 = 4; 25 static const int RESERVED_4 = 4;
23 static const int RESERVED_5 = 5; 26 static const int RESERVED_5 = 5;
24 static const int RESERVED_6 = 6; 27 static const int RESERVED_6 = 6;
25 static const int RESERVED_7 = 7; 28 static const int RESERVED_7 = 7;
26 static const int CLOSE = 8; 29 static const int CLOSE = 8;
27 static const int PING = 9; 30 static const int PING = 9;
28 static const int PONG = 10; 31 static const int PONG = 10;
29 static const int RESERVED_B = 11; 32 static const int RESERVED_B = 11;
30 static const int RESERVED_C = 12; 33 static const int RESERVED_C = 12;
31 static const int RESERVED_D = 13; 34 static const int RESERVED_D = 13;
32 static const int RESERVED_E = 14; 35 static const int RESERVED_E = 14;
33 static const int RESERVED_F = 15; 36 static const int RESERVED_F = 15;
34 } 37 }
35 38
36 /** 39 /**
37 * The web socket protocol transformer handles the protocol byte stream 40 * The web socket protocol transformer handles the protocol byte stream
38 * which is supplied through the [:handleData:]. As the protocol is processed, 41 * which is supplied through the [:handleData:]. As the protocol is processed,
39 * it'll output frame data as either a List<int> or String. 42 * it'll output frame data as either a List<int> or String.
40 * 43 *
41 * Important infomation about usage: Be sure you use cancelOnError, so the 44 * Important information about usage: Be sure you use cancelOnError, so the
42 * socket will be closed when the processer encounter an error. Not using it 45 * socket will be closed when the processor encounter an error. Not using it
43 * will lead to undefined behaviour. 46 * will lead to undefined behaviour.
44 */ 47 */
45 // TODO(ajohnsen): make this transformer reusable? 48 // TODO(ajohnsen): make this transformer reusable?
46 class _WebSocketProtocolTransformer implements StreamTransformer, EventSink { 49 class _WebSocketProtocolTransformer implements StreamTransformer, EventSink {
47 static const int START = 0; 50 static const int START = 0;
48 static const int LEN_FIRST = 1; 51 static const int LEN_FIRST = 1;
49 static const int LEN_REST = 2; 52 static const int LEN_REST = 2;
50 static const int MASK = 3; 53 static const int MASK = 3;
51 static const int PAYLOAD = 4; 54 static const int PAYLOAD = 4;
52 static const int CLOSED = 5; 55 static const int CLOSED = 5;
53 static const int FAILURE = 6; 56 static const int FAILURE = 6;
57 static const int FIN = 0x80;
58 static const int RSV1 = 0x40;
59 static const int RSV2 = 0x20;
60 static const int RSV3 = 0x10;
61 static const int OPCODE = 0xF;
54 62
55 int _state = START; 63 int _state = START;
56 bool _fin = false; 64 bool _fin = false;
65 bool _compressed = false;
57 int _opcode = -1; 66 int _opcode = -1;
58 int _len = -1; 67 int _len = -1;
59 bool _masked = false; 68 bool _masked = false;
60 int _remainingLenBytes = -1; 69 int _remainingLenBytes = -1;
61 int _remainingMaskingKeyBytes = 4; 70 int _remainingMaskingKeyBytes = 4;
62 int _remainingPayloadBytes = -1; 71 int _remainingPayloadBytes = -1;
63 int _unmaskingIndex = 0; 72 int _unmaskingIndex = 0;
64 int _currentMessageType = _WebSocketMessageType.NONE; 73 int _currentMessageType = _WebSocketMessageType.NONE;
65 int closeCode = WebSocketStatus.NO_STATUS_RECEIVED; 74 int closeCode = WebSocketStatus.NO_STATUS_RECEIVED;
66 String closeReason = ""; 75 String closeReason = "";
67 76
68 EventSink _eventSink; 77 EventSink _eventSink;
69 78
70 final bool _serverSide; 79 final bool _serverSide;
71 final List _maskingBytes = new List(4); 80 final List _maskingBytes = new List(4);
72 final BytesBuilder _payload = new BytesBuilder(copy: false); 81 final BytesBuilder _payload = new BytesBuilder(copy: false);
73 82
74 _WebSocketProtocolTransformer([this._serverSide = false]); 83 _WebSocketPerMessageDeflate _deflate;
84 _WebSocketProtocolTransformer([this._serverSide = false, this._deflate]);
75 85
76 Stream bind(Stream stream) { 86 Stream bind(Stream stream) {
77 return new Stream.eventTransformed( 87 return new Stream.eventTransformed(stream, (EventSink eventSink) {
78 stream, 88 if (_eventSink != null) {
79 (EventSink eventSink) { 89 throw new StateError("WebSocket transformer already used.");
80 if (_eventSink != null) { 90 }
81 throw new StateError("WebSocket transformer already used."); 91 _eventSink = eventSink;
82 } 92 return this;
83 _eventSink = eventSink; 93 });
84 return this;
85 });
86 } 94 }
87 95
88 void addError(Object error, [StackTrace stackTrace]) => 96 void addError(Object error, [StackTrace stackTrace]) =>
89 _eventSink.addError(error, stackTrace); 97 _eventSink.addError(error, stackTrace);
90 98
91 void close() => _eventSink.close(); 99 void close() => _eventSink.close();
92 100
93 /** 101 /**
94 * Process data received from the underlying communication channel. 102 * Process data received from the underlying communication channel.
95 */ 103 */
96 void add(Uint8List buffer) { 104 void add(Uint8List buffer) {
97 int count = buffer.length;
98 int index = 0; 105 int index = 0;
99 int lastIndex = count; 106 int lastIndex = buffer.length;
100 if (_state == CLOSED) { 107 if (_state == CLOSED) {
101 throw new WebSocketException("Data on closed connection"); 108 throw new WebSocketException("Data on closed connection");
102 } 109 }
103 if (_state == FAILURE) { 110 if (_state == FAILURE) {
104 throw new WebSocketException("Data on failed connection"); 111 throw new WebSocketException("Data on failed connection");
105 } 112 }
106 while ((index < lastIndex) && _state != CLOSED && _state != FAILURE) { 113 while ((index < lastIndex) && _state != CLOSED && _state != FAILURE) {
107 int byte = buffer[index]; 114 int byte = buffer[index];
108 if (_state <= LEN_REST) { 115 if (_state <= LEN_REST) {
109 if (_state == START) { 116 if (_state == START) {
110 _fin = (byte & 0x80) != 0; 117 _fin = (byte & FIN) != 0;
111 if ((byte & 0x70) != 0) { 118
112 // The RSV1, RSV2 bits RSV3 must be all zero. 119 if((byte & (RSV2 | RSV3)) != 0) {
120 // The RSV2, RSV3 bits must both be zero.
113 throw new WebSocketException("Protocol error"); 121 throw new WebSocketException("Protocol error");
114 } 122 }
115 _opcode = (byte & 0xF); 123
124 if ((byte & RSV1) != 0) {
125 _compressed = true;
126 } else {
127 _compressed = false;
128 }
129 _opcode = (byte & OPCODE);
130
116 if (_opcode <= _WebSocketOpcode.BINARY) { 131 if (_opcode <= _WebSocketOpcode.BINARY) {
117 if (_opcode == _WebSocketOpcode.CONTINUATION) { 132 if (_opcode == _WebSocketOpcode.CONTINUATION) {
118 if (_currentMessageType == _WebSocketMessageType.NONE) { 133 if (_currentMessageType == _WebSocketMessageType.NONE) {
119 throw new WebSocketException("Protocol error"); 134 throw new WebSocketException("Protocol error");
120 } 135 }
121 } else { 136 } else {
122 assert(_opcode == _WebSocketOpcode.TEXT || 137 assert(_opcode == _WebSocketOpcode.TEXT ||
123 _opcode == _WebSocketOpcode.BINARY); 138 _opcode == _WebSocketOpcode.BINARY);
124 if (_currentMessageType != _WebSocketMessageType.NONE) { 139 if (_currentMessageType != _WebSocketMessageType.NONE) {
125 throw new WebSocketException("Protocol error"); 140 throw new WebSocketException("Protocol error");
126 } 141 }
127 _currentMessageType = _opcode; 142 _currentMessageType = _opcode;
128 } 143 }
129 } else if (_opcode >= _WebSocketOpcode.CLOSE && 144 } else if (_opcode >= _WebSocketOpcode.CLOSE &&
130 _opcode <= _WebSocketOpcode.PONG) { 145 _opcode <= _WebSocketOpcode.PONG) {
131 // Control frames cannot be fragmented. 146 // Control frames cannot be fragmented.
132 if (!_fin) throw new WebSocketException("Protocol error"); 147 if (!_fin) throw new WebSocketException("Protocol error");
133 } else { 148 } else {
134 throw new WebSocketException("Protocol error"); 149 throw new WebSocketException("Protocol error");
135 } 150 }
136 _state = LEN_FIRST; 151 _state = LEN_FIRST;
137 } else if (_state == LEN_FIRST) { 152 } else if (_state == LEN_FIRST) {
138 _masked = (byte & 0x80) != 0; 153 _masked = (byte & 0x80) != 0;
139 _len = byte & 0x7F; 154 _len = byte & 0x7F;
140 if (_isControlFrame() && _len > 125) { 155 if (_isControlFrame() && _len > 125) {
(...skipping 28 matching lines...) Expand all
169 } else { 184 } else {
170 assert(_state == PAYLOAD); 185 assert(_state == PAYLOAD);
171 // The payload is not handled one byte at a time but in blocks. 186 // The payload is not handled one byte at a time but in blocks.
172 int payloadLength = min(lastIndex - index, _remainingPayloadBytes); 187 int payloadLength = min(lastIndex - index, _remainingPayloadBytes);
173 _remainingPayloadBytes -= payloadLength; 188 _remainingPayloadBytes -= payloadLength;
174 // Unmask payload if masked. 189 // Unmask payload if masked.
175 if (_masked) { 190 if (_masked) {
176 _unmask(index, payloadLength, buffer); 191 _unmask(index, payloadLength, buffer);
177 } 192 }
178 // Control frame and data frame share _payloads. 193 // Control frame and data frame share _payloads.
179 _payload.add( 194 _payload.add(new Uint8List.view(buffer.buffer, index, payloadLength));
180 new Uint8List.view(buffer.buffer, index, payloadLength));
181 index += payloadLength; 195 index += payloadLength;
182 if (_isControlFrame()) { 196 if (_isControlFrame()) {
183 if (_remainingPayloadBytes == 0) _controlFrameEnd(); 197 if (_remainingPayloadBytes == 0) _controlFrameEnd();
184 } else { 198 } else {
185 if (_currentMessageType != _WebSocketMessageType.TEXT && 199 if (_currentMessageType != _WebSocketMessageType.TEXT &&
186 _currentMessageType != _WebSocketMessageType.BINARY) { 200 _currentMessageType != _WebSocketMessageType.BINARY) {
187 throw new WebSocketException("Protocol error"); 201 throw new WebSocketException("Protocol error");
188 } 202 }
189 if (_remainingPayloadBytes == 0) _messageFrameEnd(); 203 if (_remainingPayloadBytes == 0) _messageFrameEnd();
190 } 204 }
191 205
192 // Hack - as we always do index++ below. 206 // Hack - as we always do index++ below.
193 index--; 207 index--;
194 } 208 }
195 } 209 }
196 210
197 // Move to the next byte. 211 // Move to the next byte.
(...skipping 14 matching lines...) Expand all
212 index += startOffset; 226 index += startOffset;
213 length -= startOffset; 227 length -= startOffset;
214 final int blockCount = length ~/ BLOCK_SIZE; 228 final int blockCount = length ~/ BLOCK_SIZE;
215 if (blockCount > 0) { 229 if (blockCount > 0) {
216 // Create mask block. 230 // Create mask block.
217 int mask = 0; 231 int mask = 0;
218 for (int i = 3; i >= 0; i--) { 232 for (int i = 3; i >= 0; i--) {
219 mask = (mask << 8) | _maskingBytes[(_unmaskingIndex + i) & 3]; 233 mask = (mask << 8) | _maskingBytes[(_unmaskingIndex + i) & 3];
220 } 234 }
221 Int32x4 blockMask = new Int32x4(mask, mask, mask, mask); 235 Int32x4 blockMask = new Int32x4(mask, mask, mask, mask);
222 Int32x4List blockBuffer = new Int32x4List.view( 236 Int32x4List blockBuffer =
223 buffer.buffer, index, blockCount); 237 new Int32x4List.view(buffer.buffer, index, blockCount);
224 for (int i = 0; i < blockBuffer.length; i++) { 238 for (int i = 0; i < blockBuffer.length; i++) {
225 blockBuffer[i] ^= blockMask; 239 blockBuffer[i] ^= blockMask;
226 } 240 }
227 final int bytes = blockCount * BLOCK_SIZE; 241 final int bytes = blockCount * BLOCK_SIZE;
228 index += bytes; 242 index += bytes;
229 length -= bytes; 243 length -= bytes;
230 } 244 }
231 } 245 }
232 // Handle end. 246 // Handle end.
233 final int end = index + length; 247 final int end = index + length;
(...skipping 43 matching lines...) Expand 10 before | Expand all | Expand 10 after
277 } else { 291 } else {
278 _messageFrameEnd(); 292 _messageFrameEnd();
279 } 293 }
280 } else { 294 } else {
281 _state = PAYLOAD; 295 _state = PAYLOAD;
282 } 296 }
283 } 297 }
284 298
285 void _messageFrameEnd() { 299 void _messageFrameEnd() {
286 if (_fin) { 300 if (_fin) {
301 var bytes = _payload.takeBytes();
302 if (_deflate != null && _compressed) {
303 bytes = _deflate.processIncomingMessage(bytes);
304 }
305
287 switch (_currentMessageType) { 306 switch (_currentMessageType) {
288 case _WebSocketMessageType.TEXT: 307 case _WebSocketMessageType.TEXT:
289 _eventSink.add(UTF8.decode(_payload.takeBytes())); 308 _eventSink.add(UTF8.decode(bytes));
290 break; 309 break;
291 case _WebSocketMessageType.BINARY: 310 case _WebSocketMessageType.BINARY:
292 _eventSink.add(_payload.takeBytes()); 311 _eventSink.add(bytes);
293 break; 312 break;
294 } 313 }
295 _currentMessageType = _WebSocketMessageType.NONE; 314 _currentMessageType = _WebSocketMessageType.NONE;
296 } 315 }
297 _prepareForNextFrame(); 316 _prepareForNextFrame();
298 } 317 }
299 318
300 void _controlFrameEnd() { 319 void _controlFrameEnd() {
301 switch (_opcode) { 320 switch (_opcode) {
302 case _WebSocketOpcode.CLOSE: 321 case _WebSocketOpcode.CLOSE:
(...skipping 21 matching lines...) Expand all
324 343
325 case _WebSocketOpcode.PONG: 344 case _WebSocketOpcode.PONG:
326 _eventSink.add(new _WebSocketPong(_payload.takeBytes())); 345 _eventSink.add(new _WebSocketPong(_payload.takeBytes()));
327 break; 346 break;
328 } 347 }
329 _prepareForNextFrame(); 348 _prepareForNextFrame();
330 } 349 }
331 350
332 bool _isControlFrame() { 351 bool _isControlFrame() {
333 return _opcode == _WebSocketOpcode.CLOSE || 352 return _opcode == _WebSocketOpcode.CLOSE ||
334 _opcode == _WebSocketOpcode.PING || 353 _opcode == _WebSocketOpcode.PING ||
335 _opcode == _WebSocketOpcode.PONG; 354 _opcode == _WebSocketOpcode.PONG;
336 } 355 }
337 356
338 void _prepareForNextFrame() { 357 void _prepareForNextFrame() {
339 if (_state != CLOSED && _state != FAILURE) _state = START; 358 if (_state != CLOSED && _state != FAILURE) _state = START;
340 _fin = false; 359 _fin = false;
341 _opcode = -1; 360 _opcode = -1;
342 _len = -1; 361 _len = -1;
343 _remainingLenBytes = -1; 362 _remainingLenBytes = -1;
344 _remainingMaskingKeyBytes = 4; 363 _remainingMaskingKeyBytes = 4;
345 _remainingPayloadBytes = -1; 364 _remainingPayloadBytes = -1;
346 _unmaskingIndex = 0; 365 _unmaskingIndex = 0;
347 } 366 }
348 } 367 }
349 368
350
351 class _WebSocketPing { 369 class _WebSocketPing {
352 final List<int> payload; 370 final List<int> payload;
353 _WebSocketPing([this.payload = null]); 371 _WebSocketPing([this.payload = null]);
354 } 372 }
355 373
356
357 class _WebSocketPong { 374 class _WebSocketPong {
358 final List<int> payload; 375 final List<int> payload;
359 _WebSocketPong([this.payload = null]); 376 _WebSocketPong([this.payload = null]);
360 } 377 }
361 378
362
363 class _WebSocketTransformerImpl implements WebSocketTransformer { 379 class _WebSocketTransformerImpl implements WebSocketTransformer {
364 final StreamController<WebSocket> _controller = 380 final StreamController<WebSocket> _controller =
365 new StreamController<WebSocket>(sync: true); 381 new StreamController<WebSocket>(sync: true);
366 final Function _protocolSelector; 382 final Function _protocolSelector;
383 final CompressionOptions _compression;
367 384
368 _WebSocketTransformerImpl(this._protocolSelector); 385 _WebSocketTransformerImpl(this._protocolSelector, this._compression);
369 386
370 Stream<WebSocket> bind(Stream<HttpRequest> stream) { 387 Stream<WebSocket> bind(Stream<HttpRequest> stream) {
371 stream.listen((request) { 388 stream.listen((request) {
372 _upgrade(request, _protocolSelector) 389 _upgrade(request, _protocolSelector, _compression)
373 .then((WebSocket webSocket) => _controller.add(webSocket)) 390 .then((WebSocket webSocket) => _controller.add(webSocket))
374 .catchError(_controller.addError); 391 .catchError(_controller.addError);
375 }, onDone: () { 392 }, onDone: () {
376 _controller.close(); 393 _controller.close();
377 }); 394 });
378 395
379 return _controller.stream; 396 return _controller.stream;
380 } 397 }
381 398
382 static Future<WebSocket> _upgrade(HttpRequest request, _protocolSelector) { 399 static Future<WebSocket> _upgrade(
400 HttpRequest request, _protocolSelector, CompressionOptions compression) {
383 var response = request.response; 401 var response = request.response;
384 if (!_isUpgradeRequest(request)) { 402 if (!_isUpgradeRequest(request)) {
385 // Send error response. 403 // Send error response.
386 response 404 response
387 ..statusCode = HttpStatus.BAD_REQUEST 405 ..statusCode = HttpStatus.BAD_REQUEST
388 ..close(); 406 ..close();
389 return new Future.error( 407 return new Future.error(
390 new WebSocketException("Invalid WebSocket upgrade request")); 408 new WebSocketException("Invalid WebSocket upgrade request"));
391 } 409 }
392 410
393 Future upgrade(String protocol) { 411 Future upgrade(String protocol) {
394 // Send the upgrade response. 412 // Send the upgrade response.
395 response 413 response
396 ..statusCode = HttpStatus.SWITCHING_PROTOCOLS 414 ..statusCode = HttpStatus.SWITCHING_PROTOCOLS
397 ..headers.add(HttpHeaders.CONNECTION, "Upgrade") 415 ..headers.add(HttpHeaders.CONNECTION, "Upgrade")
398 ..headers.add(HttpHeaders.UPGRADE, "websocket"); 416 ..headers.add(HttpHeaders.UPGRADE, "websocket");
399 String key = request.headers.value("Sec-WebSocket-Key"); 417 String key = request.headers.value("Sec-WebSocket-Key");
400 _SHA1 sha1 = new _SHA1(); 418 _SHA1 sha1 = new _SHA1();
401 sha1.add("$key$_webSocketGUID".codeUnits); 419 sha1.add("$key$_webSocketGUID".codeUnits);
402 String accept = _CryptoUtils.bytesToBase64(sha1.close()); 420 String accept = _CryptoUtils.bytesToBase64(sha1.close());
403 response.headers.add("Sec-WebSocket-Accept", accept); 421 response.headers.add("Sec-WebSocket-Accept", accept);
404 if (protocol != null) { 422 if (protocol != null) {
405 response.headers.add("Sec-WebSocket-Protocol", protocol); 423 response.headers.add("Sec-WebSocket-Protocol", protocol);
406 } 424 }
425
426 var deflate = _negotiateCompression(request, response, compression);
427
407 response.headers.contentLength = 0; 428 response.headers.contentLength = 0;
408 return response.detachSocket() 429 return response.detachSocket().then((socket) =>
409 .then((socket) => new _WebSocketImpl._fromSocket( 430 new _WebSocketImpl._fromSocket(
410 socket, protocol, true)); 431 socket, protocol, compression, true, deflate));
411 } 432 }
412 433
413 var protocols = request.headers['Sec-WebSocket-Protocol']; 434 var protocols = request.headers['Sec-WebSocket-Protocol'];
414 if (protocols != null && _protocolSelector != null) { 435 if (protocols != null && _protocolSelector != null) {
415 // The suggested protocols can be spread over multiple lines, each 436 // The suggested protocols can be spread over multiple lines, each
416 // consisting of multiple protocols. To unify all of them, first join 437 // consisting of multiple protocols. To unify all of them, first join
417 // the lists with ', ' and then tokenize. 438 // the lists with ', ' and then tokenize.
418 protocols = _HttpParser._tokenizeFieldValue(protocols.join(', ')); 439 protocols = _HttpParser._tokenizeFieldValue(protocols.join(', '));
419 return new Future(() => _protocolSelector(protocols)) 440 return new Future(() => _protocolSelector(protocols)).then((protocol) {
420 .then((protocol) { 441 if (protocols.indexOf(protocol) < 0) {
421 if (protocols.indexOf(protocol) < 0) { 442 throw new WebSocketException(
422 throw new WebSocketException( 443 "Selected protocol is not in the list of available protocols");
423 "Selected protocol is not in the list of available protocols"); 444 }
424 } 445 return protocol;
425 return protocol; 446 }).catchError((error) {
426 }) 447 response
427 .catchError((error) { 448 ..statusCode = HttpStatus.INTERNAL_SERVER_ERROR
428 response 449 ..close();
429 ..statusCode = HttpStatus.INTERNAL_SERVER_ERROR 450 throw error;
430 ..close(); 451 }).then(upgrade);
431 throw error;
432 })
433 .then(upgrade);
434 } else { 452 } else {
435 return upgrade(null); 453 return upgrade(null);
436 } 454 }
437 } 455 }
438 456
457 static _WebSocketPerMessageDeflate _negotiateCompression(HttpRequest request,
458 HttpResponse response, CompressionOptions compression) {
459 var extensionHeader = request.headers.value("Sec-WebSocket-Extensions");
460
461 if (extensionHeader == null) {
462 extensionHeader = "";
463 }
464
465 var hv = HeaderValue.parse(extensionHeader);
466 if (compression.enabled && hv.value == _WebSocketImpl.PER_MESSAGE_DEFLATE) {
467 var info = compression._createHeader(hv);
468
469 response.headers.add("Sec-WebSocket-Extensions", info[0]);
470 var serverNoContextTakeover =
471 hv.parameters.containsKey(_serverNoContextTakeover);
472 var clientNoContextTakeover =
473 hv.parameters.containsKey(_clientNoContextTakeover);
474 var deflate = new _WebSocketPerMessageDeflate(
475 serverNoContextTakeover: serverNoContextTakeover,
476 clientNoContextTakeover: clientNoContextTakeover,
477 serverMaxWindowBits: info[1],
478 clientMaxWindowBits: info[1],
479 serverSide: true);
480
481 return deflate;
482 }
483
484 return null;
485 }
486
439 static bool _isUpgradeRequest(HttpRequest request) { 487 static bool _isUpgradeRequest(HttpRequest request) {
440 if (request.method != "GET") { 488 if (request.method != "GET") {
441 return false; 489 return false;
442 } 490 }
443 if (request.headers[HttpHeaders.CONNECTION] == null) { 491 if (request.headers[HttpHeaders.CONNECTION] == null) {
444 return false; 492 return false;
445 } 493 }
446 bool isUpgrade = false; 494 bool isUpgrade = false;
447 request.headers[HttpHeaders.CONNECTION].forEach((String value) { 495 request.headers[HttpHeaders.CONNECTION].forEach((String value) {
448 if (value.toLowerCase() == "upgrade") isUpgrade = true; 496 if (value.toLowerCase() == "upgrade") isUpgrade = true;
449 }); 497 });
450 if (!isUpgrade) return false; 498 if (!isUpgrade) return false;
451 String upgrade = request.headers.value(HttpHeaders.UPGRADE); 499 String upgrade = request.headers.value(HttpHeaders.UPGRADE);
452 if (upgrade == null || upgrade.toLowerCase() != "websocket") { 500 if (upgrade == null || upgrade.toLowerCase() != "websocket") {
453 return false; 501 return false;
454 } 502 }
455 String version = request.headers.value("Sec-WebSocket-Version"); 503 String version = request.headers.value("Sec-WebSocket-Version");
456 if (version == null || version != "13") { 504 if (version == null || version != "13") {
457 return false; 505 return false;
458 } 506 }
459 String key = request.headers.value("Sec-WebSocket-Key"); 507 String key = request.headers.value("Sec-WebSocket-Key");
460 if (key == null) { 508 if (key == null) {
461 return false; 509 return false;
462 } 510 }
463 return true; 511 return true;
464 } 512 }
465 } 513 }
466 514
515 class _WebSocketPerMessageDeflate {
516 bool serverNoContextTakeover;
517 bool clientNoContextTakeover;
518 int clientMaxWindowBits;
519 int serverMaxWindowBits;
520 bool serverSide;
521
522 _Filter decoder;
523 _Filter encoder;
524
525 _WebSocketPerMessageDeflate(
526 {this.clientMaxWindowBits: _WebSocketImpl.DEFAULT_WINDOW_BITS,
527 this.serverMaxWindowBits: _WebSocketImpl.DEFAULT_WINDOW_BITS,
528 this.serverNoContextTakeover: false,
529 this.clientNoContextTakeover: false,
530 this.serverSide: false});
531
532 void _ensureDecoder() {
533 if (decoder == null) {
534 decoder = _Filter._newZLibInflateFilter(
535 serverSide ? clientMaxWindowBits : serverMaxWindowBits, null, true);
536 }
537 }
538
539 void _ensureEncoder() {
540 if (encoder == null) {
541 encoder = _Filter._newZLibDeflateFilter(
542 false,
543 ZLibOption.DEFAULT_LEVEL,
544 serverSide ? serverMaxWindowBits : clientMaxWindowBits,
545 ZLibOption.DEFAULT_MEM_LEVEL,
546 ZLibOption.STRATEGY_DEFAULT,
547 null,
548 true);
549 }
550 }
551
552 List<int> processIncomingMessage(List<int> msg) {
553 _ensureDecoder();
554
555 var data = [];
556 data.addAll(msg);
557 data.addAll(const [0x00, 0x00, 0xff, 0xff]);
558
559 decoder.process(data, 0, data.length);
560 var reuse =
561 !(serverSide ? clientNoContextTakeover : serverNoContextTakeover);
562 var result = [];
563 var out;
564
565 while ((out = decoder.processed(flush: reuse)) != null) {
566 result.addAll(out);
567 }
568
569 decoder.processed(flush: reuse);
570
571 if (!reuse) {
572 decoder.end();
573 decoder = null;
574 }
575 return result;
576 }
577
578 List<int> processOutgoingMessage(List<int> msg) {
579 _ensureEncoder();
580 var reuse =
581 !(serverSide ? serverNoContextTakeover : clientNoContextTakeover);
582 var result = [];
583 Uint8List buffer;
584 var out;
585
586 if(msg is! Uint8List) {
Søren Gjesse 2015/10/23 15:09:59 nit: Space after if.
butlermatt 2015/10/23 15:44:12 Done.
587 buffer = new Uint8List(msg.length);
Søren Gjesse 2015/10/23 15:09:59 New Uint8List.fromList(msg) will be simpler here.
butlermatt 2015/10/23 15:44:12 Done.
588 for (var i = 0; i < msg.length; i++) {
589 if (msg[i] < 0 || 255 < msg[i]) {
590 throw new ArgumentError("List element is not a byte value "
591 "(value ${msg[i]} at index $i)");
592 }
593 buffer[i] = msg[i];
594 }
595 } else {
596 buffer = msg;
597 if (buffer.any((el) => el < 0 || 255 < el)) {
598 throw new ArgumentError("List element is not a byte value "
599 "(value ${msg[i]} at index $i)");
600 }
601 }
602
603 encoder.process(buffer, 0, buffer.length);
604
605 while ((out = encoder.processed(flush: reuse)) != null) {
606 result.addAll(out);
607 }
608
609 if (serverSide ? serverNoContextTakeover : clientNoContextTakeover) {
610 encoder.end();
611 encoder = null;
612 }
613
614 if (result.length > 4) {
615 result = result.sublist(0, result.length - 4);
616 }
617
618 return result;
619 }
620 }
467 621
468 // TODO(ajohnsen): Make this transformer reusable. 622 // TODO(ajohnsen): Make this transformer reusable.
469 class _WebSocketOutgoingTransformer implements StreamTransformer, EventSink { 623 class _WebSocketOutgoingTransformer implements StreamTransformer, EventSink {
470 final _WebSocketImpl webSocket; 624 final _WebSocketImpl webSocket;
471 EventSink _eventSink; 625 EventSink _eventSink;
472 626
473 _WebSocketOutgoingTransformer(this.webSocket); 627 _WebSocketPerMessageDeflate _deflateHelper;
628
629 _WebSocketOutgoingTransformer(this.webSocket) {
630 _deflateHelper = webSocket._deflate;
631 }
474 632
475 Stream bind(Stream stream) { 633 Stream bind(Stream stream) {
476 return new Stream.eventTransformed( 634 return new Stream.eventTransformed(stream, (EventSink eventSink) {
477 stream, 635 if (_eventSink != null) {
478 (EventSink eventSink) { 636 throw new StateError("WebSocket transformer already used");
479 if (_eventSink != null) { 637 }
480 throw new StateError("WebSocket transformer already used"); 638 _eventSink = eventSink;
481 } 639 return this;
482 _eventSink = eventSink; 640 });
483 return this;
484 });
485 } 641 }
486 642
487 void add(message) { 643 void add(message) {
488 if (message is _WebSocketPong) { 644 if (message is _WebSocketPong) {
489 addFrame(_WebSocketOpcode.PONG, message.payload); 645 addFrame(_WebSocketOpcode.PONG, message.payload);
490 return; 646 return;
491 } 647 }
492 if (message is _WebSocketPing) { 648 if (message is _WebSocketPing) {
493 addFrame(_WebSocketOpcode.PING, message.payload); 649 addFrame(_WebSocketOpcode.PING, message.payload);
494 return; 650 return;
495 } 651 }
496 List<int> data; 652 List<int> data;
497 int opcode; 653 int opcode;
498 if (message != null) { 654 if (message != null) {
499 if (message is String) { 655 if (message is String) {
500 opcode = _WebSocketOpcode.TEXT; 656 opcode = _WebSocketOpcode.TEXT;
501 data = UTF8.encode(message); 657 data = UTF8.encode(message);
502 } else { 658 } else {
503 if (message is !List<int>) { 659 if (message is! List<int>) {
504 throw new ArgumentError(message); 660 throw new ArgumentError(message);
505 } 661 }
506 opcode = _WebSocketOpcode.BINARY; 662 opcode = _WebSocketOpcode.BINARY;
507 data = message; 663 data = message;
508 } 664 }
665
666 if (_deflateHelper != null) {
667 data = _deflateHelper.processOutgoingMessage(data);
668 }
509 } else { 669 } else {
510 opcode = _WebSocketOpcode.TEXT; 670 opcode = _WebSocketOpcode.TEXT;
511 } 671 }
512 addFrame(opcode, data); 672 addFrame(opcode, data);
513 } 673 }
514 674
515 void addError(Object error, [StackTrace stackTrace]) => 675 void addError(Object error, [StackTrace stackTrace]) =>
516 _eventSink.addError(error, stackTrace); 676 _eventSink.addError(error, stackTrace);
517 677
518 void close() { 678 void close() {
519 int code = webSocket._outCloseCode; 679 int code = webSocket._outCloseCode;
520 String reason = webSocket._outCloseReason; 680 String reason = webSocket._outCloseReason;
521 List<int> data; 681 List<int> data;
522 if (code != null) { 682 if (code != null) {
523 data = new List<int>(); 683 data = new List<int>();
524 data.add((code >> 8) & 0xFF); 684 data.add((code >> 8) & 0xFF);
525 data.add(code & 0xFF); 685 data.add(code & 0xFF);
526 if (reason != null) { 686 if (reason != null) {
527 data.addAll(UTF8.encode(reason)); 687 data.addAll(UTF8.encode(reason));
528 } 688 }
529 } 689 }
530 addFrame(_WebSocketOpcode.CLOSE, data); 690 addFrame(_WebSocketOpcode.CLOSE, data);
531 _eventSink.close(); 691 _eventSink.close();
532 } 692 }
533 693
534 void addFrame(int opcode, List<int> data) => 694 void addFrame(int opcode, List<int> data) => createFrame(
535 createFrame(opcode, data, webSocket._serverSide).forEach(_eventSink.add); 695 opcode,
696 data,
697 webSocket._serverSide,
698 _deflateHelper != null &&
699 (opcode == _WebSocketOpcode.TEXT ||
700 opcode == _WebSocketOpcode.BINARY)).forEach((e) {
701 _eventSink.add(e);
702 });
536 703
537 static Iterable createFrame(int opcode, List<int> data, bool serverSide) { 704 static Iterable createFrame(
538 bool mask = !serverSide; // Masking not implemented for server. 705 int opcode, List<int> data, bool serverSide, bool compressed) {
706 bool mask = !serverSide; // Masking not implemented for server.
539 int dataLength = data == null ? 0 : data.length; 707 int dataLength = data == null ? 0 : data.length;
540 // Determine the header size. 708 // Determine the header size.
541 int headerSize = (mask) ? 6 : 2; 709 int headerSize = (mask) ? 6 : 2;
542 if (dataLength > 65535) { 710 if (dataLength > 65535) {
543 headerSize += 8; 711 headerSize += 8;
544 } else if (dataLength > 125) { 712 } else if (dataLength > 125) {
545 headerSize += 2; 713 headerSize += 2;
546 } 714 }
547 Uint8List header = new Uint8List(headerSize); 715 Uint8List header = new Uint8List(headerSize);
548 int index = 0; 716 int index = 0;
717
549 // Set FIN and opcode. 718 // Set FIN and opcode.
550 header[index++] = 0x80 | opcode; 719 var hoc = _WebSocketProtocolTransformer.FIN
720 | (compressed ? _WebSocketProtocolTransformer.RSV1 : 0)
721 | (opcode & _WebSocketProtocolTransformer.OPCODE);
722
723 header[index++] = hoc;
551 // Determine size and position of length field. 724 // Determine size and position of length field.
552 int lengthBytes = 1; 725 int lengthBytes = 1;
553 int firstLengthByte = 1;
554 if (dataLength > 65535) { 726 if (dataLength > 65535) {
555 header[index++] = 127; 727 header[index++] = 127;
556 lengthBytes = 8; 728 lengthBytes = 8;
557 } else if (dataLength > 125) { 729 } else if (dataLength > 125) {
558 header[index++] = 126; 730 header[index++] = 126;
559 lengthBytes = 2; 731 lengthBytes = 2;
560 } 732 }
561 // Write the length in network byte order into the header. 733 // Write the length in network byte order into the header.
562 for (int i = 0; i < lengthBytes; i++) { 734 for (int i = 0; i < lengthBytes; i++) {
563 header[index++] = dataLength >> (((lengthBytes - 1) - i) * 8) & 0xFF; 735 header[index++] = dataLength >> (((lengthBytes - 1) - i) * 8) & 0xFF;
564 } 736 }
565 if (mask) { 737 if (mask) {
566 header[1] |= 1 << 7; 738 header[1] |= 1 << 7;
567 var maskBytes = _IOCrypto.getRandomBytes(4); 739 var maskBytes = _IOCrypto.getRandomBytes(4);
568 header.setRange(index, index + 4, maskBytes); 740 header.setRange(index, index + 4, maskBytes);
569 index += 4; 741 index += 4;
570 if (data != null) { 742 if (data != null) {
571 Uint8List list; 743 Uint8List list;
572 // If this is a text message just do the masking inside the 744 // If this is a text message just do the masking inside the
573 // encoded data. 745 // encoded data.
574 if (opcode == _WebSocketOpcode.TEXT && data is Uint8List) { 746 if (opcode == _WebSocketOpcode.TEXT && data is Uint8List) {
575 list = data; 747 list = data;
576 } else { 748 } else {
577 if (data is Uint8List) { 749 if (data is Uint8List) {
578 list = new Uint8List.fromList(data); 750 list = new Uint8List.fromList(data);
579 } else { 751 } else {
580 list = new Uint8List(data.length); 752 list = new Uint8List(data.length);
581 for (int i = 0; i < data.length; i++) { 753 for (int i = 0; i < data.length; i++) {
582 if (data[i] < 0 || 255 < data[i]) { 754 if (data[i] < 0 || 255 < data[i]) {
583 throw new ArgumentError( 755 throw new ArgumentError("List element is not a byte value "
584 "List element is not a byte value "
585 "(value ${data[i]} at index $i)"); 756 "(value ${data[i]} at index $i)");
586 } 757 }
587 list[i] = data[i]; 758 list[i] = data[i];
588 } 759 }
589 } 760 }
590 } 761 }
591 const int BLOCK_SIZE = 16; 762 const int BLOCK_SIZE = 16;
592 int blockCount = list.length ~/ BLOCK_SIZE; 763 int blockCount = list.length ~/ BLOCK_SIZE;
593 if (blockCount > 0) { 764 if (blockCount > 0) {
594 // Create mask block. 765 // Create mask block.
595 int mask = 0; 766 int mask = 0;
596 for (int i = 3; i >= 0; i--) { 767 for (int i = 3; i >= 0; i--) {
597 mask = (mask << 8) | maskBytes[i]; 768 mask = (mask << 8) | maskBytes[i];
598 } 769 }
599 Int32x4 blockMask = new Int32x4(mask, mask, mask, mask); 770 Int32x4 blockMask = new Int32x4(mask, mask, mask, mask);
600 Int32x4List blockBuffer = new Int32x4List.view( 771 Int32x4List blockBuffer =
601 list.buffer, 0, blockCount); 772 new Int32x4List.view(list.buffer, 0, blockCount);
602 for (int i = 0; i < blockBuffer.length; i++) { 773 for (int i = 0; i < blockBuffer.length; i++) {
603 blockBuffer[i] ^= blockMask; 774 blockBuffer[i] ^= blockMask;
604 } 775 }
605 } 776 }
606 // Handle end. 777 // Handle end.
607 for (int i = blockCount * BLOCK_SIZE; i < list.length; i++) { 778 for (int i = blockCount * BLOCK_SIZE; i < list.length; i++) {
608 list[i] ^= maskBytes[i & 3]; 779 list[i] ^= maskBytes[i & 3];
609 } 780 }
610 data = list; 781 data = list;
611 } 782 }
612 } 783 }
613 assert(index == headerSize); 784 assert(index == headerSize);
614 if (data == null) { 785 if (data == null) {
615 return [header]; 786 return [header];
616 } else { 787 } else {
617 return [header, data]; 788 return [header, data];
618 } 789 }
619 } 790 }
620 } 791 }
621 792
622
623 class _WebSocketConsumer implements StreamConsumer { 793 class _WebSocketConsumer implements StreamConsumer {
624 final _WebSocketImpl webSocket; 794 final _WebSocketImpl webSocket;
625 final Socket socket; 795 final Socket socket;
626 StreamController _controller; 796 StreamController _controller;
627 StreamSubscription _subscription; 797 StreamSubscription _subscription;
628 bool _issuedPause = false; 798 bool _issuedPause = false;
629 bool _closed = false; 799 bool _closed = false;
630 Completer _closeCompleter = new Completer(); 800 Completer _closeCompleter = new Completer();
631 Completer _completer; 801 Completer _completer;
632 802
(...skipping 24 matching lines...) Expand all
657 void _cancel() { 827 void _cancel() {
658 if (_subscription != null) { 828 if (_subscription != null) {
659 var subscription = _subscription; 829 var subscription = _subscription;
660 _subscription = null; 830 _subscription = null;
661 subscription.cancel(); 831 subscription.cancel();
662 } 832 }
663 } 833 }
664 834
665 _ensureController() { 835 _ensureController() {
666 if (_controller != null) return; 836 if (_controller != null) return;
667 _controller = new StreamController(sync: true, 837 _controller = new StreamController(
668 onPause: _onPause, 838 sync: true,
669 onResume: _onResume, 839 onPause: _onPause,
670 onCancel: _onListen); 840 onResume: _onResume,
671 var stream = _controller.stream.transform( 841 onCancel: _onListen);
672 new _WebSocketOutgoingTransformer(webSocket)); 842 var stream = _controller.stream
673 socket.addStream(stream) 843 .transform(new _WebSocketOutgoingTransformer(webSocket));
674 .then((_) { 844 socket.addStream(stream).then((_) {
675 _done(); 845 _done();
676 _closeCompleter.complete(webSocket); 846 _closeCompleter.complete(webSocket);
677 }, onError: (error, StackTrace stackTrace) { 847 }, onError: (error, StackTrace stackTrace) {
678 _closed = true; 848 _closed = true;
679 _cancel(); 849 _cancel();
680 if (error is ArgumentError) { 850 if (error is ArgumentError) {
681 if (!_done(error, stackTrace)) { 851 if (!_done(error, stackTrace)) {
682 _closeCompleter.completeError(error, stackTrace); 852 _closeCompleter.completeError(error, stackTrace);
683 } 853 }
684 } else { 854 } else {
685 _done(); 855 _done();
686 _closeCompleter.complete(webSocket); 856 _closeCompleter.complete(webSocket);
687 } 857 }
688 }); 858 });
689 } 859 }
690 860
691 bool _done([error, StackTrace stackTrace]) { 861 bool _done([error, StackTrace stackTrace]) {
692 if (_completer == null) return false; 862 if (_completer == null) return false;
693 if (error != null) { 863 if (error != null) {
694 _completer.completeError(error, stackTrace); 864 _completer.completeError(error, stackTrace);
695 } else { 865 } else {
696 _completer.complete(webSocket); 866 _completer.complete(webSocket);
697 } 867 }
698 _completer = null; 868 _completer = null;
699 return true; 869 return true;
700 } 870 }
701 871
702 Future addStream(var stream) { 872 Future addStream(var stream) {
703 if (_closed) { 873 if (_closed) {
704 stream.listen(null).cancel(); 874 stream.listen(null).cancel();
705 return new Future.value(webSocket); 875 return new Future.value(webSocket);
706 } 876 }
707 _ensureController(); 877 _ensureController();
708 _completer = new Completer(); 878 _completer = new Completer();
709 _subscription = stream.listen( 879 _subscription = stream.listen((data) {
710 (data) { 880 _controller.add(data);
711 _controller.add(data); 881 }, onDone: _done, onError: _done, cancelOnError: true);
712 },
713 onDone: _done,
714 onError: _done,
715 cancelOnError: true);
716 if (_issuedPause) { 882 if (_issuedPause) {
717 _subscription.pause(); 883 _subscription.pause();
718 _issuedPause = false; 884 _issuedPause = false;
719 } 885 }
720 return _completer.future; 886 return _completer.future;
721 } 887 }
722 888
723 Future close() { 889 Future close() {
724 _ensureController(); 890 _ensureController();
725 Future closeSocket() { 891 Future closeSocket() {
726 return socket.close().catchError((_) {}).then((_) => webSocket); 892 return socket.close().catchError((_) {}).then((_) => webSocket);
727 } 893 }
728 _controller.close(); 894 _controller.close();
729 return _closeCompleter.future.then((_) => closeSocket()); 895 return _closeCompleter.future.then((_) => closeSocket());
730 } 896 }
731 897
732 void add(data) { 898 void add(data) {
733 if (_closed) return; 899 if (_closed) return;
734 _ensureController(); 900 _ensureController();
735 _controller.add(data); 901 _controller.add(data);
736 } 902 }
737 903
738 void closeSocket() { 904 void closeSocket() {
739 _closed = true; 905 _closed = true;
740 _cancel(); 906 _cancel();
741 close(); 907 close();
742 } 908 }
743 } 909 }
744 910
745
746 class _WebSocketImpl extends Stream with _ServiceObject implements WebSocket { 911 class _WebSocketImpl extends Stream with _ServiceObject implements WebSocket {
747 // Use default Map so we keep order. 912 // Use default Map so we keep order.
748 static Map<int, _WebSocketImpl> _webSockets = new Map<int, _WebSocketImpl>(); 913 static Map<int, _WebSocketImpl> _webSockets = new Map<int, _WebSocketImpl>();
914 static const int DEFAULT_WINDOW_BITS = 15;
915 static const String PER_MESSAGE_DEFLATE = "permessage-deflate";
749 916
750 final String protocol; 917 final String protocol;
751 918
752 StreamController _controller; 919 StreamController _controller;
753 StreamSubscription _subscription; 920 StreamSubscription _subscription;
754 StreamSink _sink; 921 StreamSink _sink;
755 922
756 final _socket; 923 final _socket;
757 final bool _serverSide; 924 final bool _serverSide;
758 int _readyState = WebSocket.CONNECTING; 925 int _readyState = WebSocket.CONNECTING;
759 bool _writeClosed = false; 926 bool _writeClosed = false;
760 int _closeCode; 927 int _closeCode;
761 String _closeReason; 928 String _closeReason;
762 Duration _pingInterval; 929 Duration _pingInterval;
763 Timer _pingTimer; 930 Timer _pingTimer;
764 _WebSocketConsumer _consumer; 931 _WebSocketConsumer _consumer;
765 932
766 int _outCloseCode; 933 int _outCloseCode;
767 String _outCloseReason; 934 String _outCloseReason;
768 Timer _closeTimer; 935 Timer _closeTimer;
936 _WebSocketPerMessageDeflate _deflate;
769 937
770 static final HttpClient _httpClient = new HttpClient(); 938 static final HttpClient _httpClient = new HttpClient();
771 939
772 static Future<WebSocket> connect( 940 static Future<WebSocket> connect(
773 String url, Iterable<String> protocols, Map<String, dynamic> headers) { 941 String url, Iterable<String> protocols, Map<String, dynamic> headers,
942 {CompressionOptions compression: CompressionOptions.DEFAULT}) {
774 Uri uri = Uri.parse(url); 943 Uri uri = Uri.parse(url);
775 if (uri.scheme != "ws" && uri.scheme != "wss") { 944 if (uri.scheme != "ws" && uri.scheme != "wss") {
776 throw new WebSocketException("Unsupported URL scheme '${uri.scheme}'"); 945 throw new WebSocketException("Unsupported URL scheme '${uri.scheme}'");
777 } 946 }
778 947
779 Random random = new Random(); 948 Random random = new Random();
780 // Generate 16 random bytes. 949 // Generate 16 random bytes.
781 Uint8List nonceData = new Uint8List(16); 950 Uint8List nonceData = new Uint8List(16);
782 for (int i = 0; i < 16; i++) { 951 for (int i = 0; i < 16; i++) {
783 nonceData[i] = random.nextInt(256); 952 nonceData[i] = random.nextInt(256);
784 } 953 }
785 String nonce = _CryptoUtils.bytesToBase64(nonceData); 954 String nonce = _CryptoUtils.bytesToBase64(nonceData);
786 955
787 uri = new Uri(scheme: uri.scheme == "wss" ? "https" : "http", 956 uri = new Uri(
788 userInfo: uri.userInfo, 957 scheme: uri.scheme == "wss" ? "https" : "http",
789 host: uri.host, 958 userInfo: uri.userInfo,
790 port: uri.port, 959 host: uri.host,
791 path: uri.path, 960 port: uri.port,
792 query: uri.query, 961 path: uri.path,
793 fragment: uri.fragment); 962 query: uri.query,
794 return _httpClient.openUrl("GET", uri) 963 fragment: uri.fragment);
795 .then((request) { 964 return _httpClient.openUrl("GET", uri).then((request) {
796 if (uri.userInfo != null && !uri.userInfo.isEmpty) { 965 if (uri.userInfo != null && !uri.userInfo.isEmpty) {
797 // If the URL contains user information use that for basic 966 // If the URL contains user information use that for basic
798 // authorization. 967 // authorization.
799 String auth = 968 String auth = _CryptoUtils.bytesToBase64(UTF8.encode(uri.userInfo));
800 _CryptoUtils.bytesToBase64(UTF8.encode(uri.userInfo)); 969 request.headers.set(HttpHeaders.AUTHORIZATION, "Basic $auth");
801 request.headers.set(HttpHeaders.AUTHORIZATION, "Basic $auth"); 970 }
971 if (headers != null) {
972 headers.forEach((field, value) => request.headers.add(field, value));
973 }
974 // Setup the initial handshake.
975 request.headers
976 ..set(HttpHeaders.CONNECTION, "Upgrade")
977 ..set(HttpHeaders.UPGRADE, "websocket")
978 ..set("Sec-WebSocket-Key", nonce)
979 ..set("Cache-Control", "no-cache")
980 ..set("Sec-WebSocket-Version", "13");
981 if (protocols != null) {
982 request.headers.add("Sec-WebSocket-Protocol", protocols.toList());
983 }
984
985 if (compression.enabled) {
986 request.headers
987 .add("Sec-WebSocket-Extensions", compression._createHeader());
988 }
989
990 return request.close();
991 }).then((response) {
992 void error(String message) {
993 // Flush data.
994 response.detachSocket().then((socket) {
995 socket.destroy();
996 });
997 throw new WebSocketException(message);
998 }
999 if (response.statusCode != HttpStatus.SWITCHING_PROTOCOLS ||
1000 response.headers[HttpHeaders.CONNECTION] == null ||
1001 !response.headers[HttpHeaders.CONNECTION]
1002 .any((value) => value.toLowerCase() == "upgrade") ||
1003 response.headers.value(HttpHeaders.UPGRADE).toLowerCase() !=
1004 "websocket") {
1005 error("Connection to '$uri' was not upgraded to websocket");
1006 }
1007 String accept = response.headers.value("Sec-WebSocket-Accept");
1008 if (accept == null) {
1009 error("Response did not contain a 'Sec-WebSocket-Accept' header");
1010 }
1011 _SHA1 sha1 = new _SHA1();
1012 sha1.add("$nonce$_webSocketGUID".codeUnits);
1013 List<int> expectedAccept = sha1.close();
1014 List<int> receivedAccept = _CryptoUtils.base64StringToBytes(accept);
1015 if (expectedAccept.length != receivedAccept.length) {
1016 error("Reasponse header 'Sec-WebSocket-Accept' is the wrong length");
1017 }
1018 for (int i = 0; i < expectedAccept.length; i++) {
1019 if (expectedAccept[i] != receivedAccept[i]) {
1020 error("Bad response 'Sec-WebSocket-Accept' header");
802 } 1021 }
803 if (headers != null) { 1022 }
804 headers.forEach((field, value) => request.headers.add(field, value)); 1023 var protocol = response.headers.value('Sec-WebSocket-Protocol');
805 } 1024
806 // Setup the initial handshake. 1025 _WebSocketPerMessageDeflate deflate =
807 request.headers 1026 negotiateClientCompression(response, compression);
808 ..set(HttpHeaders.CONNECTION, "Upgrade") 1027
809 ..set(HttpHeaders.UPGRADE, "websocket") 1028 return response.detachSocket().then((socket) =>
810 ..set("Sec-WebSocket-Key", nonce) 1029 new _WebSocketImpl._fromSocket(
811 ..set("Cache-Control", "no-cache") 1030 socket, protocol, compression, false, deflate));
812 ..set("Sec-WebSocket-Version", "13"); 1031 });
813 if (protocols != null) {
814 request.headers.add("Sec-WebSocket-Protocol", protocols.toList());
815 }
816 return request.close();
817 })
818 .then((response) {
819 void error(String message) {
820 // Flush data.
821 response.detachSocket().then((socket) {
822 socket.destroy();
823 });
824 throw new WebSocketException(message);
825 }
826 if (response.statusCode != HttpStatus.SWITCHING_PROTOCOLS ||
827 response.headers[HttpHeaders.CONNECTION] == null ||
828 !response.headers[HttpHeaders.CONNECTION].any(
829 (value) => value.toLowerCase() == "upgrade") ||
830 response.headers.value(HttpHeaders.UPGRADE).toLowerCase() !=
831 "websocket") {
832 error("Connection to '$uri' was not upgraded to websocket");
833 }
834 String accept = response.headers.value("Sec-WebSocket-Accept");
835 if (accept == null) {
836 error("Response did not contain a 'Sec-WebSocket-Accept' header");
837 }
838 _SHA1 sha1 = new _SHA1();
839 sha1.add("$nonce$_webSocketGUID".codeUnits);
840 List<int> expectedAccept = sha1.close();
841 List<int> receivedAccept = _CryptoUtils.base64StringToBytes(accept);
842 if (expectedAccept.length != receivedAccept.length) {
843 error("Reasponse header 'Sec-WebSocket-Accept' is the wrong length");
844 }
845 for (int i = 0; i < expectedAccept.length; i++) {
846 if (expectedAccept[i] != receivedAccept[i]) {
847 error("Bad response 'Sec-WebSocket-Accept' header");
848 }
849 }
850 var protocol = response.headers.value('Sec-WebSocket-Protocol');
851 return response.detachSocket()
852 .then((socket) => new _WebSocketImpl._fromSocket(socket, protocol));
853 });
854 } 1032 }
855 1033
856 _WebSocketImpl._fromSocket(this._socket, this.protocol, 1034 static _WebSocketPerMessageDeflate negotiateClientCompression(
857 [this._serverSide = false]) { 1035 HttpClientResponse response, CompressionOptions compression) {
1036 String extensionHeader = response.headers.value('Sec-WebSocket-Extensions');
1037
1038 if (extensionHeader == null) {
1039 extensionHeader = "";
1040 }
1041
1042 var hv = HeaderValue.parse(extensionHeader);
1043
1044 if (compression.enabled && hv.value == PER_MESSAGE_DEFLATE) {
1045 var serverNoContextTakeover =
1046 hv.parameters.containsKey(_serverNoContextTakeover);
1047 var clientNoContextTakeover =
1048 hv.parameters.containsKey(_clientNoContextTakeover);
1049
1050 int getWindowBits(String type) {
1051 var o = hv.parameters['${type}_max_window_bits'];
Søren Gjesse 2015/10/23 15:09:59 Please use the constants here and switch on the ty
butlermatt 2015/10/23 15:44:12 Done. I changed the call to use the constants her
1052 if (o == null) {
1053 return DEFAULT_WINDOW_BITS;
1054 }
1055
1056 o = int.parse(o, onError: (s) => DEFAULT_WINDOW_BITS);
1057 return o;
1058 }
1059
1060 return new _WebSocketPerMessageDeflate(
1061 clientMaxWindowBits: getWindowBits("client"),
1062 serverMaxWindowBits: getWindowBits("server"),
1063 clientNoContextTakeover: clientNoContextTakeover,
1064 serverNoContextTakeover: serverNoContextTakeover);
1065 }
1066
1067 return null;
1068 }
1069
1070 _WebSocketImpl._fromSocket(
1071 this._socket, this.protocol, CompressionOptions compression,
1072 [this._serverSide = false, _WebSocketPerMessageDeflate deflate]) {
858 _consumer = new _WebSocketConsumer(this, _socket); 1073 _consumer = new _WebSocketConsumer(this, _socket);
859 _sink = new _StreamSinkImpl(_consumer); 1074 _sink = new _StreamSinkImpl(_consumer);
860 _readyState = WebSocket.OPEN; 1075 _readyState = WebSocket.OPEN;
1076 _deflate = deflate;
861 1077
862 var transformer = new _WebSocketProtocolTransformer(_serverSide); 1078 var transformer = new _WebSocketProtocolTransformer(_serverSide, _deflate);
863 _subscription = _socket.transform(transformer).listen( 1079 _subscription = _socket.transform(transformer).listen((data) {
864 (data) { 1080 if (data is _WebSocketPing) {
865 if (data is _WebSocketPing) { 1081 if (!_writeClosed) _consumer.add(new _WebSocketPong(data.payload));
866 if (!_writeClosed) _consumer.add(new _WebSocketPong(data.payload)); 1082 } else if (data is _WebSocketPong) {
867 } else if (data is _WebSocketPong) { 1083 // Simply set pingInterval, as it'll cancel any timers.
868 // Simply set pingInterval, as it'll cancel any timers. 1084 pingInterval = _pingInterval;
869 pingInterval = _pingInterval; 1085 } else {
870 } else { 1086 _controller.add(data);
871 _controller.add(data); 1087 }
872 } 1088 }, onError: (error, stackTrace) {
873 }, 1089 if (_closeTimer != null) _closeTimer.cancel();
874 onError: (error) { 1090 if (error is FormatException) {
875 if (_closeTimer != null) _closeTimer.cancel(); 1091 _close(WebSocketStatus.INVALID_FRAME_PAYLOAD_DATA);
876 if (error is FormatException) { 1092 } else {
877 _close(WebSocketStatus.INVALID_FRAME_PAYLOAD_DATA); 1093 _close(WebSocketStatus.PROTOCOL_ERROR);
878 } else { 1094 }
879 _close(WebSocketStatus.PROTOCOL_ERROR); 1095 // An error happened, set the close code set above.
880 } 1096 _closeCode = _outCloseCode;
881 // An error happened, set the close code set above. 1097 _closeReason = _outCloseReason;
882 _closeCode = _outCloseCode; 1098 _controller.close();
883 _closeReason = _outCloseReason; 1099 }, onDone: () {
884 _controller.close(); 1100 if (_closeTimer != null) _closeTimer.cancel();
885 }, 1101 if (_readyState == WebSocket.OPEN) {
886 onDone: () { 1102 _readyState = WebSocket.CLOSING;
887 if (_closeTimer != null) _closeTimer.cancel(); 1103 if (!_isReservedStatusCode(transformer.closeCode)) {
888 if (_readyState == WebSocket.OPEN) { 1104 _close(transformer.closeCode, transformer.closeReason);
889 _readyState = WebSocket.CLOSING; 1105 } else {
890 if (!_isReservedStatusCode(transformer.closeCode)) { 1106 _close();
891 _close(transformer.closeCode, transformer.closeReason); 1107 }
892 } else { 1108 _readyState = WebSocket.CLOSED;
893 _close(); 1109 }
894 } 1110 // Protocol close, use close code from transformer.
895 _readyState = WebSocket.CLOSED; 1111 _closeCode = transformer.closeCode;
896 } 1112 _closeReason = transformer.closeReason;
897 // Protocol close, use close code from transformer. 1113 _controller.close();
898 _closeCode = transformer.closeCode; 1114 }, cancelOnError: true);
899 _closeReason = transformer.closeReason;
900 _controller.close();
901 },
902 cancelOnError: true);
903 _subscription.pause(); 1115 _subscription.pause();
904 _controller = new StreamController(sync: true, 1116 _controller = new StreamController(
905 onListen: _subscription.resume, 1117 sync: true, onListen: _subscription.resume, onCancel: () {
906 onCancel: () { 1118 _subscription.cancel();
907 _subscription.cancel(); 1119 _subscription = null;
908 _subscription = null; 1120 }, onPause: _subscription.pause, onResume: _subscription.resume);
909 },
910 onPause: _subscription.pause,
911 onResume: _subscription.resume);
912 1121
913 _webSockets[_serviceId] = this; 1122 _webSockets[_serviceId] = this;
914 try { _socket._owner = this; } catch (_) {} 1123 try {
1124 _socket._owner = this;
1125 } catch (_) {}
915 } 1126 }
916 1127
917 StreamSubscription listen(void onData(message), 1128 StreamSubscription listen(void onData(message),
918 {Function onError, 1129 {Function onError, void onDone(), bool cancelOnError}) {
919 void onDone(),
920 bool cancelOnError}) {
921 return _controller.stream.listen(onData, 1130 return _controller.stream.listen(onData,
922 onError: onError, 1131 onError: onError, onDone: onDone, cancelOnError: cancelOnError);
923 onDone: onDone,
924 cancelOnError: cancelOnError);
925 } 1132 }
926 1133
927 Duration get pingInterval => _pingInterval; 1134 Duration get pingInterval => _pingInterval;
928 1135
929 void set pingInterval(Duration interval) { 1136 void set pingInterval(Duration interval) {
930 if (_writeClosed) return; 1137 if (_writeClosed) return;
931 if (_pingTimer != null) _pingTimer.cancel(); 1138 if (_pingTimer != null) _pingTimer.cancel();
932 _pingInterval = interval; 1139 _pingInterval = interval;
933 1140
934 if (_pingInterval == null) return; 1141 if (_pingInterval == null) return;
(...skipping 85 matching lines...) Expand 10 before | Expand all | Expand 10 after
1020 'type': '@Socket', 1227 'type': '@Socket',
1021 'name': 'UserSocket', 1228 'name': 'UserSocket',
1022 'user_name': 'UserSocket', 1229 'user_name': 'UserSocket',
1023 }; 1230 };
1024 } 1231 }
1025 return r; 1232 return r;
1026 } 1233 }
1027 1234
1028 static bool _isReservedStatusCode(int code) { 1235 static bool _isReservedStatusCode(int code) {
1029 return code != null && 1236 return code != null &&
1030 (code < WebSocketStatus.NORMAL_CLOSURE || 1237 (code < WebSocketStatus.NORMAL_CLOSURE ||
1031 code == WebSocketStatus.RESERVED_1004 || 1238 code == WebSocketStatus.RESERVED_1004 ||
1032 code == WebSocketStatus.NO_STATUS_RECEIVED || 1239 code == WebSocketStatus.NO_STATUS_RECEIVED ||
1033 code == WebSocketStatus.ABNORMAL_CLOSURE || 1240 code == WebSocketStatus.ABNORMAL_CLOSURE ||
1034 (code > WebSocketStatus.INTERNAL_SERVER_ERROR && 1241 (code > WebSocketStatus.INTERNAL_SERVER_ERROR &&
1035 code < WebSocketStatus.RESERVED_1015) || 1242 code < WebSocketStatus.RESERVED_1015) ||
1036 (code >= WebSocketStatus.RESERVED_1015 && 1243 (code >= WebSocketStatus.RESERVED_1015 && code < 3000));
1037 code < 3000));
1038 } 1244 }
1039 } 1245 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698