Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(78)

Side by Side Diff: sdk/lib/io/websocket_impl.dart

Issue 1208473005: WebSocket Compression (Closed) Base URL: https://github.com/dart-lang/sdk.git
Patch Set: Remove Debugging Line Created 5 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 part of dart.io; 5 part of dart.io;
6 6
7 const String _webSocketGUID = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"; 7 const String _webSocketGUID = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
8 8
9 // Matches _WebSocketOpcode. 9 // Matches _WebSocketOpcode.
10 class _WebSocketMessageType { 10 class _WebSocketMessageType {
11 static const int NONE = 0; 11 static const int NONE = 0;
12 static const int TEXT = 1; 12 static const int TEXT = 1;
13 static const int BINARY = 2; 13 static const int BINARY = 2;
14 } 14 }
15 15
16
17 class _WebSocketOpcode { 16 class _WebSocketOpcode {
18 static const int CONTINUATION = 0; 17 static const int CONTINUATION = 0;
19 static const int TEXT = 1; 18 static const int TEXT = 1;
20 static const int BINARY = 2; 19 static const int BINARY = 2;
21 static const int RESERVED_3 = 3; 20 static const int RESERVED_3 = 3;
22 static const int RESERVED_4 = 4; 21 static const int RESERVED_4 = 4;
23 static const int RESERVED_5 = 5; 22 static const int RESERVED_5 = 5;
24 static const int RESERVED_6 = 6; 23 static const int RESERVED_6 = 6;
25 static const int RESERVED_7 = 7; 24 static const int RESERVED_7 = 7;
26 static const int CLOSE = 8; 25 static const int CLOSE = 8;
27 static const int PING = 9; 26 static const int PING = 9;
28 static const int PONG = 10; 27 static const int PONG = 10;
29 static const int RESERVED_B = 11; 28 static const int RESERVED_B = 11;
30 static const int RESERVED_C = 12; 29 static const int RESERVED_C = 12;
31 static const int RESERVED_D = 13; 30 static const int RESERVED_D = 13;
32 static const int RESERVED_E = 14; 31 static const int RESERVED_E = 14;
33 static const int RESERVED_F = 15; 32 static const int RESERVED_F = 15;
34 } 33 }
35 34
36 /** 35 /**
37 * The web socket protocol transformer handles the protocol byte stream 36 * The web socket protocol transformer handles the protocol byte stream
38 * which is supplied through the [:handleData:]. As the protocol is processed, 37 * which is supplied through the [:handleData:]. As the protocol is processed,
39 * it'll output frame data as either a List<int> or String. 38 * it'll output frame data as either a List<int> or String.
40 * 39 *
41 * Important infomation about usage: Be sure you use cancelOnError, so the 40 * Important information about usage: Be sure you use cancelOnError, so the
42 * socket will be closed when the processer encounter an error. Not using it 41 * socket will be closed when the processor encounter an error. Not using it
43 * will lead to undefined behaviour. 42 * will lead to undefined behaviour.
44 */ 43 */
45 // TODO(ajohnsen): make this transformer reusable? 44 // TODO(ajohnsen): make this transformer reusable?
46 class _WebSocketProtocolTransformer implements StreamTransformer, EventSink { 45 class _WebSocketProtocolTransformer implements StreamTransformer, EventSink {
47 static const int START = 0; 46 static const int START = 0;
48 static const int LEN_FIRST = 1; 47 static const int LEN_FIRST = 1;
49 static const int LEN_REST = 2; 48 static const int LEN_REST = 2;
50 static const int MASK = 3; 49 static const int MASK = 3;
51 static const int PAYLOAD = 4; 50 static const int PAYLOAD = 4;
52 static const int CLOSED = 5; 51 static const int CLOSED = 5;
53 static const int FAILURE = 6; 52 static const int FAILURE = 6;
53 static const int FIN = 0x80;
54 static const int RSV1 = 0x40;
55 static const int RSV2 = 0x20;
56 static const int RSV3 = 0x10;
57 static const int OPCODE = 0xF;
54 58
55 int _state = START; 59 int _state = START;
56 bool _fin = false; 60 bool _fin = false;
61 bool _compressed = false;
57 int _opcode = -1; 62 int _opcode = -1;
58 int _len = -1; 63 int _len = -1;
59 bool _masked = false; 64 bool _masked = false;
60 int _remainingLenBytes = -1; 65 int _remainingLenBytes = -1;
61 int _remainingMaskingKeyBytes = 4; 66 int _remainingMaskingKeyBytes = 4;
62 int _remainingPayloadBytes = -1; 67 int _remainingPayloadBytes = -1;
63 int _unmaskingIndex = 0; 68 int _unmaskingIndex = 0;
64 int _currentMessageType = _WebSocketMessageType.NONE; 69 int _currentMessageType = _WebSocketMessageType.NONE;
65 int closeCode = WebSocketStatus.NO_STATUS_RECEIVED; 70 int closeCode = WebSocketStatus.NO_STATUS_RECEIVED;
66 String closeReason = ""; 71 String closeReason = "";
67 72
68 EventSink _eventSink; 73 EventSink _eventSink;
69 74
70 final bool _serverSide; 75 final bool _serverSide;
71 final List _maskingBytes = new List(4); 76 final List _maskingBytes = new List(4);
72 final BytesBuilder _payload = new BytesBuilder(copy: false); 77 final BytesBuilder _payload = new BytesBuilder(copy: false);
73 78
74 _WebSocketProtocolTransformer([this._serverSide = false]); 79 _WebSocketPerMessageDeflate _deflate;
80 _WebSocketProtocolTransformer([this._serverSide = false, this._deflate]);
75 81
76 Stream bind(Stream stream) { 82 Stream bind(Stream stream) {
77 return new Stream.eventTransformed( 83 return new Stream.eventTransformed(stream, (EventSink eventSink) {
78 stream, 84 if (_eventSink != null) {
79 (EventSink eventSink) { 85 throw new StateError("WebSocket transformer already used.");
80 if (_eventSink != null) { 86 }
81 throw new StateError("WebSocket transformer already used."); 87 _eventSink = eventSink;
82 } 88 return this;
83 _eventSink = eventSink; 89 });
84 return this;
85 });
86 } 90 }
87 91
88 void addError(Object error, [StackTrace stackTrace]) => 92 void addError(Object error, [StackTrace stackTrace]) =>
89 _eventSink.addError(error, stackTrace); 93 _eventSink.addError(error, stackTrace);
90 94
91 void close() => _eventSink.close(); 95 void close() => _eventSink.close();
92 96
93 /** 97 /**
94 * Process data received from the underlying communication channel. 98 * Process data received from the underlying communication channel.
95 */ 99 */
96 void add(Uint8List buffer) { 100 void add(Uint8List buffer) {
97 int count = buffer.length; 101 int count = buffer.length;
98 int index = 0; 102 int index = 0;
99 int lastIndex = count; 103 int lastIndex = count;
100 if (_state == CLOSED) { 104 if (_state == CLOSED) {
101 throw new WebSocketException("Data on closed connection"); 105 throw new WebSocketException("Data on closed connection");
102 } 106 }
103 if (_state == FAILURE) { 107 if (_state == FAILURE) {
104 throw new WebSocketException("Data on failed connection"); 108 throw new WebSocketException("Data on failed connection");
105 } 109 }
106 while ((index < lastIndex) && _state != CLOSED && _state != FAILURE) { 110 while ((index < lastIndex) && _state != CLOSED && _state != FAILURE) {
107 int byte = buffer[index]; 111 int byte = buffer[index];
108 if (_state <= LEN_REST) { 112 if (_state <= LEN_REST) {
109 if (_state == START) { 113 if (_state == START) {
110 _fin = (byte & 0x80) != 0; 114 _fin = (byte & 0x80) != 0;
111 if ((byte & 0x70) != 0) { 115 var rsv = (byte & 0x70) >> 4;
Søren Gjesse 2015/08/24 08:21:50 I don't think this temp is required - see below.
116
117 if (rsv != 0 && rsv != 4) {
Søren Gjesse 2015/08/24 08:21:50 Please use if ((byte & (RSV1 | RSV2)) != 0) to
112 // The RSV1, RSV2 bits RSV3 must be all zero. 118 // The RSV1, RSV2 bits RSV3 must be all zero.
Søren Gjesse 2015/08/24 08:21:50 Please remove RSV3 from this comment.
113 throw new WebSocketException("Protocol error"); 119 throw new WebSocketException("Protocol error");
114 } 120 }
115 _opcode = (byte & 0xF); 121
122 if (rsv == 4) {
Søren Gjesse 2015/08/24 08:21:50 Please use if ((byte & RSV3) != 0) { to test a
123 _compressed = true;
124 } else {
125 _compressed = false;
126 }
127 _opcode = (byte & 0x0F);
Søren Gjesse 2015/08/24 08:21:50 Please use the new constant OPCODE here.
128
116 if (_opcode <= _WebSocketOpcode.BINARY) { 129 if (_opcode <= _WebSocketOpcode.BINARY) {
117 if (_opcode == _WebSocketOpcode.CONTINUATION) { 130 if (_opcode == _WebSocketOpcode.CONTINUATION) {
118 if (_currentMessageType == _WebSocketMessageType.NONE) { 131 if (_currentMessageType == _WebSocketMessageType.NONE) {
119 throw new WebSocketException("Protocol error"); 132 throw new WebSocketException("Protocol error");
120 } 133 }
121 } else { 134 } else {
122 assert(_opcode == _WebSocketOpcode.TEXT || 135 assert(_opcode == _WebSocketOpcode.TEXT ||
123 _opcode == _WebSocketOpcode.BINARY); 136 _opcode == _WebSocketOpcode.BINARY);
124 if (_currentMessageType != _WebSocketMessageType.NONE) { 137 if (_currentMessageType != _WebSocketMessageType.NONE) {
125 throw new WebSocketException("Protocol error"); 138 throw new WebSocketException("Protocol error");
126 } 139 }
127 _currentMessageType = _opcode; 140 _currentMessageType = _opcode;
128 } 141 }
129 } else if (_opcode >= _WebSocketOpcode.CLOSE && 142 } else if (_opcode >= _WebSocketOpcode.CLOSE &&
130 _opcode <= _WebSocketOpcode.PONG) { 143 _opcode <= _WebSocketOpcode.PONG) {
131 // Control frames cannot be fragmented. 144 // Control frames cannot be fragmented.
132 if (!_fin) throw new WebSocketException("Protocol error"); 145 if (!_fin) throw new WebSocketException("Protocol error");
133 } else { 146 } else {
134 throw new WebSocketException("Protocol error"); 147 throw new WebSocketException("Protocol error");
135 } 148 }
136 _state = LEN_FIRST; 149 _state = LEN_FIRST;
137 } else if (_state == LEN_FIRST) { 150 } else if (_state == LEN_FIRST) {
138 _masked = (byte & 0x80) != 0; 151 _masked = (byte & 0x80) != 0;
139 _len = byte & 0x7F; 152 _len = byte & 0x7F;
140 if (_isControlFrame() && _len > 125) { 153 if (_isControlFrame() && _len > 125) {
(...skipping 28 matching lines...) Expand all
169 } else { 182 } else {
170 assert(_state == PAYLOAD); 183 assert(_state == PAYLOAD);
171 // The payload is not handled one byte at a time but in blocks. 184 // The payload is not handled one byte at a time but in blocks.
172 int payloadLength = min(lastIndex - index, _remainingPayloadBytes); 185 int payloadLength = min(lastIndex - index, _remainingPayloadBytes);
173 _remainingPayloadBytes -= payloadLength; 186 _remainingPayloadBytes -= payloadLength;
174 // Unmask payload if masked. 187 // Unmask payload if masked.
175 if (_masked) { 188 if (_masked) {
176 _unmask(index, payloadLength, buffer); 189 _unmask(index, payloadLength, buffer);
177 } 190 }
178 // Control frame and data frame share _payloads. 191 // Control frame and data frame share _payloads.
179 _payload.add( 192 _payload.add(new Uint8List.view(buffer.buffer, index, payloadLength));
180 new Uint8List.view(buffer.buffer, index, payloadLength));
181 index += payloadLength; 193 index += payloadLength;
182 if (_isControlFrame()) { 194 if (_isControlFrame()) {
183 if (_remainingPayloadBytes == 0) _controlFrameEnd(); 195 if (_remainingPayloadBytes == 0) _controlFrameEnd();
184 } else { 196 } else {
185 if (_currentMessageType != _WebSocketMessageType.TEXT && 197 if (_currentMessageType != _WebSocketMessageType.TEXT &&
186 _currentMessageType != _WebSocketMessageType.BINARY) { 198 _currentMessageType != _WebSocketMessageType.BINARY) {
187 throw new WebSocketException("Protocol error"); 199 throw new WebSocketException("Protocol error");
188 } 200 }
189 if (_remainingPayloadBytes == 0) _messageFrameEnd(); 201 if (_remainingPayloadBytes == 0) _messageFrameEnd();
190 } 202 }
191 203
192 // Hack - as we always do index++ below. 204 // Hack - as we always do index++ below.
193 index--; 205 index--;
194 } 206 }
195 } 207 }
196 208
197 // Move to the next byte. 209 // Move to the next byte.
(...skipping 14 matching lines...) Expand all
212 index += startOffset; 224 index += startOffset;
213 length -= startOffset; 225 length -= startOffset;
214 final int blockCount = length ~/ BLOCK_SIZE; 226 final int blockCount = length ~/ BLOCK_SIZE;
215 if (blockCount > 0) { 227 if (blockCount > 0) {
216 // Create mask block. 228 // Create mask block.
217 int mask = 0; 229 int mask = 0;
218 for (int i = 3; i >= 0; i--) { 230 for (int i = 3; i >= 0; i--) {
219 mask = (mask << 8) | _maskingBytes[(_unmaskingIndex + i) & 3]; 231 mask = (mask << 8) | _maskingBytes[(_unmaskingIndex + i) & 3];
220 } 232 }
221 Int32x4 blockMask = new Int32x4(mask, mask, mask, mask); 233 Int32x4 blockMask = new Int32x4(mask, mask, mask, mask);
222 Int32x4List blockBuffer = new Int32x4List.view( 234 Int32x4List blockBuffer =
223 buffer.buffer, index, blockCount); 235 new Int32x4List.view(buffer.buffer, index, blockCount);
224 for (int i = 0; i < blockBuffer.length; i++) { 236 for (int i = 0; i < blockBuffer.length; i++) {
225 blockBuffer[i] ^= blockMask; 237 blockBuffer[i] ^= blockMask;
226 } 238 }
227 final int bytes = blockCount * BLOCK_SIZE; 239 final int bytes = blockCount * BLOCK_SIZE;
228 index += bytes; 240 index += bytes;
229 length -= bytes; 241 length -= bytes;
230 } 242 }
231 } 243 }
232 // Handle end. 244 // Handle end.
233 final int end = index + length; 245 final int end = index + length;
(...skipping 43 matching lines...) Expand 10 before | Expand all | Expand 10 after
277 } else { 289 } else {
278 _messageFrameEnd(); 290 _messageFrameEnd();
279 } 291 }
280 } else { 292 } else {
281 _state = PAYLOAD; 293 _state = PAYLOAD;
282 } 294 }
283 } 295 }
284 296
285 void _messageFrameEnd() { 297 void _messageFrameEnd() {
286 if (_fin) { 298 if (_fin) {
299 var bytes = _payload.takeBytes();
300 if (_deflate != null && _compressed) {
301 bytes = _deflate.processIncomingMessage(bytes);
302 }
303
287 switch (_currentMessageType) { 304 switch (_currentMessageType) {
288 case _WebSocketMessageType.TEXT: 305 case _WebSocketMessageType.TEXT:
289 _eventSink.add(UTF8.decode(_payload.takeBytes())); 306 _eventSink.add(UTF8.decode(bytes));
290 break; 307 break;
291 case _WebSocketMessageType.BINARY: 308 case _WebSocketMessageType.BINARY:
292 _eventSink.add(_payload.takeBytes()); 309 _eventSink.add(bytes);
293 break; 310 break;
294 } 311 }
295 _currentMessageType = _WebSocketMessageType.NONE; 312 _currentMessageType = _WebSocketMessageType.NONE;
296 } 313 }
297 _prepareForNextFrame(); 314 _prepareForNextFrame();
298 } 315 }
299 316
300 void _controlFrameEnd() { 317 void _controlFrameEnd() {
301 switch (_opcode) { 318 switch (_opcode) {
302 case _WebSocketOpcode.CLOSE: 319 case _WebSocketOpcode.CLOSE:
(...skipping 21 matching lines...) Expand all
324 341
325 case _WebSocketOpcode.PONG: 342 case _WebSocketOpcode.PONG:
326 _eventSink.add(new _WebSocketPong(_payload.takeBytes())); 343 _eventSink.add(new _WebSocketPong(_payload.takeBytes()));
327 break; 344 break;
328 } 345 }
329 _prepareForNextFrame(); 346 _prepareForNextFrame();
330 } 347 }
331 348
332 bool _isControlFrame() { 349 bool _isControlFrame() {
333 return _opcode == _WebSocketOpcode.CLOSE || 350 return _opcode == _WebSocketOpcode.CLOSE ||
334 _opcode == _WebSocketOpcode.PING || 351 _opcode == _WebSocketOpcode.PING ||
335 _opcode == _WebSocketOpcode.PONG; 352 _opcode == _WebSocketOpcode.PONG;
336 } 353 }
337 354
338 void _prepareForNextFrame() { 355 void _prepareForNextFrame() {
339 if (_state != CLOSED && _state != FAILURE) _state = START; 356 if (_state != CLOSED && _state != FAILURE) _state = START;
340 _fin = false; 357 _fin = false;
341 _opcode = -1; 358 _opcode = -1;
342 _len = -1; 359 _len = -1;
343 _remainingLenBytes = -1; 360 _remainingLenBytes = -1;
344 _remainingMaskingKeyBytes = 4; 361 _remainingMaskingKeyBytes = 4;
345 _remainingPayloadBytes = -1; 362 _remainingPayloadBytes = -1;
346 _unmaskingIndex = 0; 363 _unmaskingIndex = 0;
347 } 364 }
348 } 365 }
349 366
350
351 class _WebSocketPing { 367 class _WebSocketPing {
352 final List<int> payload; 368 final List<int> payload;
353 _WebSocketPing([this.payload = null]); 369 _WebSocketPing([this.payload = null]);
354 } 370 }
355 371
356
357 class _WebSocketPong { 372 class _WebSocketPong {
358 final List<int> payload; 373 final List<int> payload;
359 _WebSocketPong([this.payload = null]); 374 _WebSocketPong([this.payload = null]);
360 } 375 }
361 376
362
363 class _WebSocketTransformerImpl implements WebSocketTransformer { 377 class _WebSocketTransformerImpl implements WebSocketTransformer {
364 final StreamController<WebSocket> _controller = 378 final StreamController<WebSocket> _controller =
365 new StreamController<WebSocket>(sync: true); 379 new StreamController<WebSocket>(sync: true);
366 final Function _protocolSelector; 380 final Function _protocolSelector;
381 final CompressionOptions _compression;
367 382
368 _WebSocketTransformerImpl(this._protocolSelector); 383 _WebSocketTransformerImpl(this._protocolSelector, this._compression);
369 384
370 Stream<WebSocket> bind(Stream<HttpRequest> stream) { 385 Stream<WebSocket> bind(Stream<HttpRequest> stream) {
371 stream.listen((request) { 386 stream.listen((request) {
372 _upgrade(request, _protocolSelector) 387 _upgrade(request, _protocolSelector, _compression)
373 .then((WebSocket webSocket) => _controller.add(webSocket)) 388 .then((WebSocket webSocket) => _controller.add(webSocket))
374 .catchError(_controller.addError); 389 .catchError(_controller.addError);
375 }, onDone: () { 390 }, onDone: () {
376 _controller.close(); 391 _controller.close();
377 }); 392 });
378 393
379 return _controller.stream; 394 return _controller.stream;
380 } 395 }
381 396
382 static Future<WebSocket> _upgrade(HttpRequest request, _protocolSelector) { 397 static Future<WebSocket> _upgrade(
398 HttpRequest request, _protocolSelector, CompressionOptions compression) {
383 var response = request.response; 399 var response = request.response;
384 if (!_isUpgradeRequest(request)) { 400 if (!_isUpgradeRequest(request)) {
385 // Send error response. 401 // Send error response.
386 response 402 response
387 ..statusCode = HttpStatus.BAD_REQUEST 403 ..statusCode = HttpStatus.BAD_REQUEST
388 ..close(); 404 ..close();
389 return new Future.error( 405 return new Future.error(
390 new WebSocketException("Invalid WebSocket upgrade request")); 406 new WebSocketException("Invalid WebSocket upgrade request"));
391 } 407 }
392 408
393 Future upgrade(String protocol) { 409 Future upgrade(String protocol) {
394 // Send the upgrade response. 410 // Send the upgrade response.
395 response 411 response
396 ..statusCode = HttpStatus.SWITCHING_PROTOCOLS 412 ..statusCode = HttpStatus.SWITCHING_PROTOCOLS
397 ..headers.add(HttpHeaders.CONNECTION, "Upgrade") 413 ..headers.add(HttpHeaders.CONNECTION, "Upgrade")
398 ..headers.add(HttpHeaders.UPGRADE, "websocket"); 414 ..headers.add(HttpHeaders.UPGRADE, "websocket");
399 String key = request.headers.value("Sec-WebSocket-Key"); 415 String key = request.headers.value("Sec-WebSocket-Key");
400 _SHA1 sha1 = new _SHA1(); 416 _SHA1 sha1 = new _SHA1();
401 sha1.add("$key$_webSocketGUID".codeUnits); 417 sha1.add("$key$_webSocketGUID".codeUnits);
402 String accept = _CryptoUtils.bytesToBase64(sha1.close()); 418 String accept = _CryptoUtils.bytesToBase64(sha1.close());
403 response.headers.add("Sec-WebSocket-Accept", accept); 419 response.headers.add("Sec-WebSocket-Accept", accept);
404 if (protocol != null) { 420 if (protocol != null) {
405 response.headers.add("Sec-WebSocket-Protocol", protocol); 421 response.headers.add("Sec-WebSocket-Protocol", protocol);
406 } 422 }
423
424 var deflate = _negotiateCompression(request, response, compression);
425
407 response.headers.contentLength = 0; 426 response.headers.contentLength = 0;
408 return response.detachSocket() 427 return response.detachSocket().then((socket) =>
409 .then((socket) => new _WebSocketImpl._fromSocket( 428 new _WebSocketImpl._fromSocket(
410 socket, protocol, true)); 429 socket, protocol, compression, true, deflate));
411 } 430 }
412 431
413 var protocols = request.headers['Sec-WebSocket-Protocol']; 432 var protocols = request.headers['Sec-WebSocket-Protocol'];
414 if (protocols != null && _protocolSelector != null) { 433 if (protocols != null && _protocolSelector != null) {
415 // The suggested protocols can be spread over multiple lines, each 434 // The suggested protocols can be spread over multiple lines, each
416 // consisting of multiple protocols. To unify all of them, first join 435 // consisting of multiple protocols. To unify all of them, first join
417 // the lists with ', ' and then tokenize. 436 // the lists with ', ' and then tokenize.
418 protocols = _HttpParser._tokenizeFieldValue(protocols.join(', ')); 437 protocols = _HttpParser._tokenizeFieldValue(protocols.join(', '));
419 return new Future(() => _protocolSelector(protocols)) 438 return new Future(() => _protocolSelector(protocols)).then((protocol) {
420 .then((protocol) { 439 if (protocols.indexOf(protocol) < 0) {
421 if (protocols.indexOf(protocol) < 0) { 440 throw new WebSocketException(
422 throw new WebSocketException( 441 "Selected protocol is not in the list of available protocols");
423 "Selected protocol is not in the list of available protocols"); 442 }
424 } 443 return protocol;
425 return protocol; 444 }).catchError((error) {
426 }) 445 response
427 .catchError((error) { 446 ..statusCode = HttpStatus.INTERNAL_SERVER_ERROR
428 response 447 ..close();
429 ..statusCode = HttpStatus.INTERNAL_SERVER_ERROR 448 throw error;
430 ..close(); 449 }).then(upgrade);
431 throw error;
432 })
433 .then(upgrade);
434 } else { 450 } else {
435 return upgrade(null); 451 return upgrade(null);
436 } 452 }
437 } 453 }
438 454
455 static _WebSocketPerMessageDeflate _negotiateCompression(HttpRequest request,
456 HttpResponse response, CompressionOptions compression) {
457 var extensionHeader = request.headers.value("Sec-WebSocket-Extensions");
458
459 if (extensionHeader == null) {
460 extensionHeader = "";
461 }
462
463 Iterable<List<String>> extensions =
464 extensionHeader.split(",").map((it) => it.split("; "));
Søren Gjesse 2015/08/24 08:21:50 Both the splitting using "," and "; " seems fragil
465
466 var perMessageDeflate = extensions.firstWhere(
467 (x) => x[0] == _WebSocketImpl.PER_MESSAGE_DEFLATE,
468 orElse: () => null);
469 if (compression.enabled && perMessageDeflate != null) {
470 var info = compression._createHeader(perMessageDeflate);
471
472 response.headers.add("Sec-WebSocket-Extensions", info[0]);
473 var serverNoContextTakeover =
474 perMessageDeflate.contains("server_no_context_takeover");
475 var clientNoContextTakeover =
476 perMessageDeflate.contains("client_no_context_takeover");
477 var deflate = new _WebSocketPerMessageDeflate(
478 serverNoContextTakeover: serverNoContextTakeover,
479 clientNoContextTakeover: clientNoContextTakeover,
480 serverMaxWindowBits: info[1],
481 clientMaxWindowBits: info[1],
482 serverSide: true);
483
484 return deflate;
485 }
486
487 return null;
488 }
489
439 static bool _isUpgradeRequest(HttpRequest request) { 490 static bool _isUpgradeRequest(HttpRequest request) {
440 if (request.method != "GET") { 491 if (request.method != "GET") {
441 return false; 492 return false;
442 } 493 }
443 if (request.headers[HttpHeaders.CONNECTION] == null) { 494 if (request.headers[HttpHeaders.CONNECTION] == null) {
444 return false; 495 return false;
445 } 496 }
446 bool isUpgrade = false; 497 bool isUpgrade = false;
447 request.headers[HttpHeaders.CONNECTION].forEach((String value) { 498 request.headers[HttpHeaders.CONNECTION].forEach((String value) {
448 if (value.toLowerCase() == "upgrade") isUpgrade = true; 499 if (value.toLowerCase() == "upgrade") isUpgrade = true;
449 }); 500 });
450 if (!isUpgrade) return false; 501 if (!isUpgrade) return false;
451 String upgrade = request.headers.value(HttpHeaders.UPGRADE); 502 String upgrade = request.headers.value(HttpHeaders.UPGRADE);
452 if (upgrade == null || upgrade.toLowerCase() != "websocket") { 503 if (upgrade == null || upgrade.toLowerCase() != "websocket") {
453 return false; 504 return false;
454 } 505 }
455 String version = request.headers.value("Sec-WebSocket-Version"); 506 String version = request.headers.value("Sec-WebSocket-Version");
456 if (version == null || version != "13") { 507 if (version == null || version != "13") {
457 return false; 508 return false;
458 } 509 }
459 String key = request.headers.value("Sec-WebSocket-Key"); 510 String key = request.headers.value("Sec-WebSocket-Key");
460 if (key == null) { 511 if (key == null) {
461 return false; 512 return false;
462 } 513 }
463 return true; 514 return true;
464 } 515 }
465 } 516 }
466 517
518 class _WebSocketPerMessageDeflate {
519 bool serverNoContextTakeover;
520 bool clientNoContextTakeover;
521 int clientMaxWindowBits;
522 int serverMaxWindowBits;
523 bool serverSide;
524
525 _Filter decoder;
526 _Filter encoder;
527
528 _WebSocketPerMessageDeflate(
529 {this.clientMaxWindowBits,
Søren Gjesse 2015/08/24 08:21:50 nit: Please indent like this (one space more for t
530 this.serverMaxWindowBits,
531 this.serverNoContextTakeover: false,
532 this.clientNoContextTakeover: false,
533 this.serverSide: false}) {
534 if (clientMaxWindowBits == null) {
535 clientMaxWindowBits = _WebSocketImpl.DEFAULT_WINDOW_BITS;
536 }
537
538 if (serverMaxWindowBits == null) {
539 serverMaxWindowBits = _WebSocketImpl.DEFAULT_WINDOW_BITS;
540 }
541 }
542
543 void _ensureDecoder() {
544 if (decoder == null) {
545 decoder = _Filter._newZLibInflateFilter(
546 serverSide ? clientMaxWindowBits : serverMaxWindowBits, null, true);
547 }
548 }
549
550 void _ensureEncoder() {
551 if (encoder == null) {
552 encoder = _Filter._newZLibDeflateFilter(
553 false,
554 ZLibOption.DEFAULT_LEVEL,
555 serverSide ? serverMaxWindowBits : clientMaxWindowBits,
556 ZLibOption.DEFAULT_MEM_LEVEL,
557 ZLibOption.STRATEGY_DEFAULT,
558 null,
559 true);
560 }
561 }
562
563 List<int> processIncomingMessage(List<int> msg) {
564 _ensureDecoder();
565
566 var data = [];
567 data.addAll(msg);
568 data.addAll(const [0x00, 0x00, 0xff, 0xff]);
569
570 decoder.process(data, 0, data.length);
571 var reuse =
572 !(serverSide ? clientNoContextTakeover : serverNoContextTakeover);
573 var result = [];
574 var out;
575
576 while ((out = decoder.processed(flush: reuse)) != null) {
577 result.addAll(out);
578 }
579
580 decoder.processed(flush: reuse);
581
582 if (!reuse) {
583 decoder.end();
584 decoder = null;
585 }
586 return result;
587 }
588
589 List<int> processOutgoingMessage(List<int> msg) {
590 _ensureEncoder();
591 var reuse =
592 !(serverSide ? serverNoContextTakeover : clientNoContextTakeover);
593 var result = [];
594 var out;
595
596 encoder.process(msg, 0, msg.length);
597
598 while ((out = encoder.processed(flush: reuse)) != null) {
599 result.addAll(out);
600 }
601
602 if (serverSide ? serverNoContextTakeover : clientNoContextTakeover) {
603 encoder.end();
604 encoder = null;
605 }
606
607 if (result.length > 4) {
608 result = result.sublist(0, result.length - 4);
609 }
610
611 return result;
612 }
613 }
467 614
468 // TODO(ajohnsen): Make this transformer reusable. 615 // TODO(ajohnsen): Make this transformer reusable.
469 class _WebSocketOutgoingTransformer implements StreamTransformer, EventSink { 616 class _WebSocketOutgoingTransformer implements StreamTransformer, EventSink {
470 final _WebSocketImpl webSocket; 617 final _WebSocketImpl webSocket;
471 EventSink _eventSink; 618 EventSink _eventSink;
472 619
473 _WebSocketOutgoingTransformer(this.webSocket); 620 _WebSocketPerMessageDeflate _deflateHelper;
621
622 _WebSocketOutgoingTransformer(this.webSocket) {
623 _deflateHelper = webSocket._deflate;
624 }
474 625
475 Stream bind(Stream stream) { 626 Stream bind(Stream stream) {
476 return new Stream.eventTransformed( 627 return new Stream.eventTransformed(stream, (EventSink eventSink) {
477 stream, 628 if (_eventSink != null) {
478 (EventSink eventSink) { 629 throw new StateError("WebSocket transformer already used");
479 if (_eventSink != null) { 630 }
480 throw new StateError("WebSocket transformer already used"); 631 _eventSink = eventSink;
481 } 632 return this;
482 _eventSink = eventSink; 633 });
483 return this;
484 });
485 } 634 }
486 635
487 void add(message) { 636 void add(message) {
488 if (message is _WebSocketPong) { 637 if (message is _WebSocketPong) {
489 addFrame(_WebSocketOpcode.PONG, message.payload); 638 addFrame(_WebSocketOpcode.PONG, message.payload);
490 return; 639 return;
491 } 640 }
492 if (message is _WebSocketPing) { 641 if (message is _WebSocketPing) {
493 addFrame(_WebSocketOpcode.PING, message.payload); 642 addFrame(_WebSocketOpcode.PING, message.payload);
494 return; 643 return;
495 } 644 }
496 List<int> data; 645 List<int> data;
497 int opcode; 646 int opcode;
498 if (message != null) { 647 if (message != null) {
499 if (message is String) { 648 if (message is String) {
500 opcode = _WebSocketOpcode.TEXT; 649 opcode = _WebSocketOpcode.TEXT;
501 data = UTF8.encode(message); 650 data = UTF8.encode(message);
502 } else { 651 } else {
503 if (message is !List<int>) { 652 if (message is! List<int>) {
504 throw new ArgumentError(message); 653 throw new ArgumentError(message);
505 } 654 }
506 opcode = _WebSocketOpcode.BINARY; 655 opcode = _WebSocketOpcode.BINARY;
507 data = message; 656 data = message;
508 } 657 }
658
659 if (_deflateHelper != null) {
660 data = _deflateHelper.processOutgoingMessage(data);
661 }
509 } else { 662 } else {
510 opcode = _WebSocketOpcode.TEXT; 663 opcode = _WebSocketOpcode.TEXT;
511 } 664 }
512 addFrame(opcode, data); 665 addFrame(opcode, data);
513 } 666 }
514 667
515 void addError(Object error, [StackTrace stackTrace]) => 668 void addError(Object error, [StackTrace stackTrace]) =>
516 _eventSink.addError(error, stackTrace); 669 _eventSink.addError(error, stackTrace);
517 670
518 void close() { 671 void close() {
519 int code = webSocket._outCloseCode; 672 int code = webSocket._outCloseCode;
520 String reason = webSocket._outCloseReason; 673 String reason = webSocket._outCloseReason;
521 List<int> data; 674 List<int> data;
522 if (code != null) { 675 if (code != null) {
523 data = new List<int>(); 676 data = new List<int>();
524 data.add((code >> 8) & 0xFF); 677 data.add((code >> 8) & 0xFF);
525 data.add(code & 0xFF); 678 data.add(code & 0xFF);
526 if (reason != null) { 679 if (reason != null) {
527 data.addAll(UTF8.encode(reason)); 680 data.addAll(UTF8.encode(reason));
528 } 681 }
529 } 682 }
530 addFrame(_WebSocketOpcode.CLOSE, data); 683 addFrame(_WebSocketOpcode.CLOSE, data);
531 _eventSink.close(); 684 _eventSink.close();
532 } 685 }
533 686
534 void addFrame(int opcode, List<int> data) => 687 void addFrame(int opcode, List<int> data) => createFrame(
535 createFrame(opcode, data, webSocket._serverSide).forEach(_eventSink.add); 688 opcode,
689 data,
690 webSocket._serverSide,
691 _deflateHelper != null &&
692 (opcode == _WebSocketOpcode.TEXT ||
693 opcode == _WebSocketOpcode.BINARY)).forEach((e) {
694 _eventSink.add(e);
695 });
536 696
537 static Iterable createFrame(int opcode, List<int> data, bool serverSide) { 697 static Iterable createFrame(
538 bool mask = !serverSide; // Masking not implemented for server. 698 int opcode, List<int> data, bool serverSide, bool compressed) {
699 bool mask = !serverSide; // Masking not implemented for server.
539 int dataLength = data == null ? 0 : data.length; 700 int dataLength = data == null ? 0 : data.length;
540 // Determine the header size. 701 // Determine the header size.
541 int headerSize = (mask) ? 6 : 2; 702 int headerSize = (mask) ? 6 : 2;
542 if (dataLength > 65535) { 703 if (dataLength > 65535) {
543 headerSize += 8; 704 headerSize += 8;
544 } else if (dataLength > 125) { 705 } else if (dataLength > 125) {
545 headerSize += 2; 706 headerSize += 2;
546 } 707 }
547 Uint8List header = new Uint8List(headerSize); 708 Uint8List header = new Uint8List(headerSize);
548 int index = 0; 709 int index = 0;
710
711 var rsv = 0;
712 if (compressed) {
713 rsv = 4;
714 }
715
549 // Set FIN and opcode. 716 // Set FIN and opcode.
550 header[index++] = 0x80 | opcode; 717 var hoc = (1 << 7) | (rsv % 8) << 4 | (opcode % 128);
718
719 header[index++] = hoc;
Søren Gjesse 2015/08/24 08:21:50 Just set this byte like this (no need to the rsv a
551 // Determine size and position of length field. 720 // Determine size and position of length field.
552 int lengthBytes = 1; 721 int lengthBytes = 1;
553 int firstLengthByte = 1;
554 if (dataLength > 65535) { 722 if (dataLength > 65535) {
555 header[index++] = 127; 723 header[index++] = 127;
556 lengthBytes = 8; 724 lengthBytes = 8;
557 } else if (dataLength > 125) { 725 } else if (dataLength > 125) {
558 header[index++] = 126; 726 header[index++] = 126;
559 lengthBytes = 2; 727 lengthBytes = 2;
560 } 728 }
561 // Write the length in network byte order into the header. 729 // Write the length in network byte order into the header.
562 for (int i = 0; i < lengthBytes; i++) { 730 for (int i = 0; i < lengthBytes; i++) {
563 header[index++] = dataLength >> (((lengthBytes - 1) - i) * 8) & 0xFF; 731 header[index++] = dataLength >> (((lengthBytes - 1) - i) * 8) & 0xFF;
564 } 732 }
565 if (mask) { 733 if (mask) {
566 header[1] |= 1 << 7; 734 header[1] |= 1 << 7;
567 var maskBytes = _IOCrypto.getRandomBytes(4); 735 var maskBytes = _IOCrypto.getRandomBytes(4);
568 header.setRange(index, index + 4, maskBytes); 736 header.setRange(index, index + 4, maskBytes);
569 index += 4; 737 index += 4;
570 if (data != null) { 738 if (data != null) {
571 Uint8List list; 739 Uint8List list;
572 // If this is a text message just do the masking inside the 740 // If this is a text message just do the masking inside the
573 // encoded data. 741 // encoded data.
574 if (opcode == _WebSocketOpcode.TEXT && data is Uint8List) { 742 if (opcode == _WebSocketOpcode.TEXT && data is Uint8List) {
575 list = data; 743 list = data;
576 } else { 744 } else {
577 if (data is Uint8List) { 745 if (data is Uint8List) {
578 list = new Uint8List.fromList(data); 746 list = new Uint8List.fromList(data);
579 } else { 747 } else {
580 list = new Uint8List(data.length); 748 list = new Uint8List(data.length);
581 for (int i = 0; i < data.length; i++) { 749 for (int i = 0; i < data.length; i++) {
582 if (data[i] < 0 || 255 < data[i]) { 750 if (data[i] < 0 || 255 < data[i]) {
583 throw new ArgumentError( 751 throw new ArgumentError("List element is not a byte value "
584 "List element is not a byte value "
585 "(value ${data[i]} at index $i)"); 752 "(value ${data[i]} at index $i)");
586 } 753 }
587 list[i] = data[i]; 754 list[i] = data[i];
588 } 755 }
589 } 756 }
590 } 757 }
591 const int BLOCK_SIZE = 16; 758 const int BLOCK_SIZE = 16;
592 int blockCount = list.length ~/ BLOCK_SIZE; 759 int blockCount = list.length ~/ BLOCK_SIZE;
593 if (blockCount > 0) { 760 if (blockCount > 0) {
594 // Create mask block. 761 // Create mask block.
595 int mask = 0; 762 int mask = 0;
596 for (int i = 3; i >= 0; i--) { 763 for (int i = 3; i >= 0; i--) {
597 mask = (mask << 8) | maskBytes[i]; 764 mask = (mask << 8) | maskBytes[i];
598 } 765 }
599 Int32x4 blockMask = new Int32x4(mask, mask, mask, mask); 766 Int32x4 blockMask = new Int32x4(mask, mask, mask, mask);
600 Int32x4List blockBuffer = new Int32x4List.view( 767 Int32x4List blockBuffer =
601 list.buffer, 0, blockCount); 768 new Int32x4List.view(list.buffer, 0, blockCount);
602 for (int i = 0; i < blockBuffer.length; i++) { 769 for (int i = 0; i < blockBuffer.length; i++) {
603 blockBuffer[i] ^= blockMask; 770 blockBuffer[i] ^= blockMask;
604 } 771 }
605 } 772 }
606 // Handle end. 773 // Handle end.
607 for (int i = blockCount * BLOCK_SIZE; i < list.length; i++) { 774 for (int i = blockCount * BLOCK_SIZE; i < list.length; i++) {
608 list[i] ^= maskBytes[i & 3]; 775 list[i] ^= maskBytes[i & 3];
609 } 776 }
610 data = list; 777 data = list;
611 } 778 }
612 } 779 }
613 assert(index == headerSize); 780 assert(index == headerSize);
614 if (data == null) { 781 if (data == null) {
615 return [header]; 782 return [header];
616 } else { 783 } else {
617 return [header, data]; 784 return [header, data];
618 } 785 }
619 } 786 }
620 } 787 }
621 788
622
623 class _WebSocketConsumer implements StreamConsumer { 789 class _WebSocketConsumer implements StreamConsumer {
624 final _WebSocketImpl webSocket; 790 final _WebSocketImpl webSocket;
625 final Socket socket; 791 final Socket socket;
626 StreamController _controller; 792 StreamController _controller;
627 StreamSubscription _subscription; 793 StreamSubscription _subscription;
628 bool _issuedPause = false; 794 bool _issuedPause = false;
629 bool _closed = false; 795 bool _closed = false;
630 Completer _closeCompleter = new Completer(); 796 Completer _closeCompleter = new Completer();
631 Completer _completer; 797 Completer _completer;
632 798
(...skipping 24 matching lines...) Expand all
657 void _cancel() { 823 void _cancel() {
658 if (_subscription != null) { 824 if (_subscription != null) {
659 var subscription = _subscription; 825 var subscription = _subscription;
660 _subscription = null; 826 _subscription = null;
661 subscription.cancel(); 827 subscription.cancel();
662 } 828 }
663 } 829 }
664 830
665 _ensureController() { 831 _ensureController() {
666 if (_controller != null) return; 832 if (_controller != null) return;
667 _controller = new StreamController(sync: true, 833 _controller = new StreamController(
668 onPause: _onPause, 834 sync: true,
669 onResume: _onResume, 835 onPause: _onPause,
670 onCancel: _onListen); 836 onResume: _onResume,
671 var stream = _controller.stream.transform( 837 onCancel: _onListen);
672 new _WebSocketOutgoingTransformer(webSocket)); 838 var stream = _controller.stream
673 socket.addStream(stream) 839 .transform(new _WebSocketOutgoingTransformer(webSocket));
674 .then((_) { 840 socket.addStream(stream).then((_) {
675 _done(); 841 _done();
676 _closeCompleter.complete(webSocket); 842 _closeCompleter.complete(webSocket);
677 }, onError: (error, StackTrace stackTrace) { 843 }, onError: (error, StackTrace stackTrace) {
678 _closed = true; 844 _closed = true;
679 _cancel(); 845 _cancel();
680 if (error is ArgumentError) { 846 if (error is ArgumentError) {
681 if (!_done(error, stackTrace)) { 847 if (!_done(error, stackTrace)) {
682 _closeCompleter.completeError(error, stackTrace); 848 _closeCompleter.completeError(error, stackTrace);
683 } 849 }
684 } else { 850 } else {
685 _done(); 851 _done();
686 _closeCompleter.complete(webSocket); 852 _closeCompleter.complete(webSocket);
687 } 853 }
688 }); 854 });
689 } 855 }
690 856
691 bool _done([error, StackTrace stackTrace]) { 857 bool _done([error, StackTrace stackTrace]) {
692 if (_completer == null) return false; 858 if (_completer == null) return false;
693 if (error != null) { 859 if (error != null) {
694 _completer.completeError(error, stackTrace); 860 _completer.completeError(error, stackTrace);
695 } else { 861 } else {
696 _completer.complete(webSocket); 862 _completer.complete(webSocket);
697 } 863 }
698 _completer = null; 864 _completer = null;
699 return true; 865 return true;
700 } 866 }
701 867
702 Future addStream(var stream) { 868 Future addStream(var stream) {
703 if (_closed) { 869 if (_closed) {
704 stream.listen(null).cancel(); 870 stream.listen(null).cancel();
705 return new Future.value(webSocket); 871 return new Future.value(webSocket);
706 } 872 }
707 _ensureController(); 873 _ensureController();
708 _completer = new Completer(); 874 _completer = new Completer();
709 _subscription = stream.listen( 875 _subscription = stream.listen((data) {
710 (data) { 876 _controller.add(data);
711 _controller.add(data); 877 }, onDone: _done, onError: _done, cancelOnError: true);
712 },
713 onDone: _done,
714 onError: _done,
715 cancelOnError: true);
716 if (_issuedPause) { 878 if (_issuedPause) {
717 _subscription.pause(); 879 _subscription.pause();
718 _issuedPause = false; 880 _issuedPause = false;
719 } 881 }
720 return _completer.future; 882 return _completer.future;
721 } 883 }
722 884
723 Future close() { 885 Future close() {
724 _ensureController(); 886 _ensureController();
725 Future closeSocket() { 887 Future closeSocket() {
726 return socket.close().catchError((_) {}).then((_) => webSocket); 888 return socket.close().catchError((_) {}).then((_) => webSocket);
727 } 889 }
728 _controller.close(); 890 _controller.close();
729 return _closeCompleter.future.then((_) => closeSocket()); 891 return _closeCompleter.future.then((_) => closeSocket());
730 } 892 }
731 893
732 void add(data) { 894 void add(data) {
733 if (_closed) return; 895 if (_closed) return;
734 _ensureController(); 896 _ensureController();
735 _controller.add(data); 897 _controller.add(data);
736 } 898 }
737 899
738 void closeSocket() { 900 void closeSocket() {
739 _closed = true; 901 _closed = true;
740 _cancel(); 902 _cancel();
741 close(); 903 close();
742 } 904 }
743 } 905 }
744 906
745
746 class _WebSocketImpl extends Stream with _ServiceObject implements WebSocket { 907 class _WebSocketImpl extends Stream with _ServiceObject implements WebSocket {
747 // Use default Map so we keep order. 908 // Use default Map so we keep order.
748 static Map<int, _WebSocketImpl> _webSockets = new Map<int, _WebSocketImpl>(); 909 static Map<int, _WebSocketImpl> _webSockets = new Map<int, _WebSocketImpl>();
910 static const int DEFAULT_WINDOW_BITS = 15;
911 static const String PER_MESSAGE_DEFLATE = "permessage-deflate";
749 912
750 final String protocol; 913 final String protocol;
751 914
752 StreamController _controller; 915 StreamController _controller;
753 StreamSubscription _subscription; 916 StreamSubscription _subscription;
754 StreamSink _sink; 917 StreamSink _sink;
755 918
756 final _socket; 919 final _socket;
757 final bool _serverSide; 920 final bool _serverSide;
758 int _readyState = WebSocket.CONNECTING; 921 int _readyState = WebSocket.CONNECTING;
759 bool _writeClosed = false; 922 bool _writeClosed = false;
760 int _closeCode; 923 int _closeCode;
761 String _closeReason; 924 String _closeReason;
762 Duration _pingInterval; 925 Duration _pingInterval;
763 Timer _pingTimer; 926 Timer _pingTimer;
764 _WebSocketConsumer _consumer; 927 _WebSocketConsumer _consumer;
765 928
766 int _outCloseCode; 929 int _outCloseCode;
767 String _outCloseReason; 930 String _outCloseReason;
768 Timer _closeTimer; 931 Timer _closeTimer;
932 _WebSocketPerMessageDeflate _deflate;
769 933
770 static final HttpClient _httpClient = new HttpClient(); 934 static final HttpClient _httpClient = new HttpClient();
771 935
772 static Future<WebSocket> connect( 936 static Future<WebSocket> connect(
773 String url, Iterable<String> protocols, Map<String, dynamic> headers) { 937 String url, Iterable<String> protocols, Map<String, dynamic> headers,
938 {CompressionOptions compression: CompressionOptions.DEFAULT}) {
774 Uri uri = Uri.parse(url); 939 Uri uri = Uri.parse(url);
775 if (uri.scheme != "ws" && uri.scheme != "wss") { 940 if (uri.scheme != "ws" && uri.scheme != "wss") {
776 throw new WebSocketException("Unsupported URL scheme '${uri.scheme}'"); 941 throw new WebSocketException("Unsupported URL scheme '${uri.scheme}'");
777 } 942 }
778 943
779 Random random = new Random(); 944 Random random = new Random();
780 // Generate 16 random bytes. 945 // Generate 16 random bytes.
781 Uint8List nonceData = new Uint8List(16); 946 Uint8List nonceData = new Uint8List(16);
782 for (int i = 0; i < 16; i++) { 947 for (int i = 0; i < 16; i++) {
783 nonceData[i] = random.nextInt(256); 948 nonceData[i] = random.nextInt(256);
784 } 949 }
785 String nonce = _CryptoUtils.bytesToBase64(nonceData); 950 String nonce = _CryptoUtils.bytesToBase64(nonceData);
786 951
787 uri = new Uri(scheme: uri.scheme == "wss" ? "https" : "http", 952 uri = new Uri(
788 userInfo: uri.userInfo, 953 scheme: uri.scheme == "wss" ? "https" : "http",
789 host: uri.host, 954 userInfo: uri.userInfo,
790 port: uri.port, 955 host: uri.host,
791 path: uri.path, 956 port: uri.port,
792 query: uri.query, 957 path: uri.path,
793 fragment: uri.fragment); 958 query: uri.query,
794 return _httpClient.openUrl("GET", uri) 959 fragment: uri.fragment);
795 .then((request) { 960 return _httpClient.openUrl("GET", uri).then((request) {
796 if (uri.userInfo != null && !uri.userInfo.isEmpty) { 961 if (uri.userInfo != null && !uri.userInfo.isEmpty) {
797 // If the URL contains user information use that for basic 962 // If the URL contains user information use that for basic
798 // authorization. 963 // authorization.
799 String auth = 964 String auth = _CryptoUtils.bytesToBase64(UTF8.encode(uri.userInfo));
800 _CryptoUtils.bytesToBase64(UTF8.encode(uri.userInfo)); 965 request.headers.set(HttpHeaders.AUTHORIZATION, "Basic $auth");
801 request.headers.set(HttpHeaders.AUTHORIZATION, "Basic $auth"); 966 }
967 if (headers != null) {
968 headers.forEach((field, value) => request.headers.add(field, value));
969 }
970 // Setup the initial handshake.
971 request.headers
972 ..set(HttpHeaders.CONNECTION, "Upgrade")
973 ..set(HttpHeaders.UPGRADE, "websocket")
974 ..set("Sec-WebSocket-Key", nonce)
975 ..set("Cache-Control", "no-cache")
976 ..set("Sec-WebSocket-Version", "13");
977 if (protocols != null) {
978 request.headers.add("Sec-WebSocket-Protocol", protocols.toList());
979 }
980
981 request.headers
982 .add("Sec-WebSocket-Extensions", compression._createHeader());
983
984 return request.close();
985 }).then((response) {
986 void error(String message) {
987 // Flush data.
988 response.detachSocket().then((socket) {
989 socket.destroy();
990 });
991 throw new WebSocketException(message);
992 }
993 if (response.statusCode != HttpStatus.SWITCHING_PROTOCOLS ||
994 response.headers[HttpHeaders.CONNECTION] == null ||
995 !response.headers[HttpHeaders.CONNECTION]
996 .any((value) => value.toLowerCase() == "upgrade") ||
997 response.headers.value(HttpHeaders.UPGRADE).toLowerCase() !=
998 "websocket") {
999 error("Connection to '$uri' was not upgraded to websocket");
1000 }
1001 String accept = response.headers.value("Sec-WebSocket-Accept");
1002 if (accept == null) {
1003 error("Response did not contain a 'Sec-WebSocket-Accept' header");
1004 }
1005 _SHA1 sha1 = new _SHA1();
1006 sha1.add("$nonce$_webSocketGUID".codeUnits);
1007 List<int> expectedAccept = sha1.close();
1008 List<int> receivedAccept = _CryptoUtils.base64StringToBytes(accept);
1009 if (expectedAccept.length != receivedAccept.length) {
1010 error("Reasponse header 'Sec-WebSocket-Accept' is the wrong length");
1011 }
1012 for (int i = 0; i < expectedAccept.length; i++) {
1013 if (expectedAccept[i] != receivedAccept[i]) {
1014 error("Bad response 'Sec-WebSocket-Accept' header");
802 } 1015 }
803 if (headers != null) { 1016 }
804 headers.forEach((field, value) => request.headers.add(field, value)); 1017 var protocol = response.headers.value('Sec-WebSocket-Protocol');
805 } 1018
806 // Setup the initial handshake. 1019 _WebSocketPerMessageDeflate deflate =
807 request.headers 1020 negotiateClientCompression(response, compression);
808 ..set(HttpHeaders.CONNECTION, "Upgrade") 1021
809 ..set(HttpHeaders.UPGRADE, "websocket") 1022 return response.detachSocket().then((socket) =>
810 ..set("Sec-WebSocket-Key", nonce) 1023 new _WebSocketImpl._fromSocket(
811 ..set("Cache-Control", "no-cache") 1024 socket, protocol, compression, false, deflate));
812 ..set("Sec-WebSocket-Version", "13"); 1025 });
813 if (protocols != null) {
814 request.headers.add("Sec-WebSocket-Protocol", protocols.toList());
815 }
816 return request.close();
817 })
818 .then((response) {
819 void error(String message) {
820 // Flush data.
821 response.detachSocket().then((socket) {
822 socket.destroy();
823 });
824 throw new WebSocketException(message);
825 }
826 if (response.statusCode != HttpStatus.SWITCHING_PROTOCOLS ||
827 response.headers[HttpHeaders.CONNECTION] == null ||
828 !response.headers[HttpHeaders.CONNECTION].any(
829 (value) => value.toLowerCase() == "upgrade") ||
830 response.headers.value(HttpHeaders.UPGRADE).toLowerCase() !=
831 "websocket") {
832 error("Connection to '$uri' was not upgraded to websocket");
833 }
834 String accept = response.headers.value("Sec-WebSocket-Accept");
835 if (accept == null) {
836 error("Response did not contain a 'Sec-WebSocket-Accept' header");
837 }
838 _SHA1 sha1 = new _SHA1();
839 sha1.add("$nonce$_webSocketGUID".codeUnits);
840 List<int> expectedAccept = sha1.close();
841 List<int> receivedAccept = _CryptoUtils.base64StringToBytes(accept);
842 if (expectedAccept.length != receivedAccept.length) {
843 error("Reasponse header 'Sec-WebSocket-Accept' is the wrong length");
844 }
845 for (int i = 0; i < expectedAccept.length; i++) {
846 if (expectedAccept[i] != receivedAccept[i]) {
847 error("Bad response 'Sec-WebSocket-Accept' header");
848 }
849 }
850 var protocol = response.headers.value('Sec-WebSocket-Protocol');
851 return response.detachSocket()
852 .then((socket) => new _WebSocketImpl._fromSocket(socket, protocol));
853 });
854 } 1026 }
855 1027
856 _WebSocketImpl._fromSocket(this._socket, this.protocol, 1028 static _WebSocketPerMessageDeflate negotiateClientCompression(
857 [this._serverSide = false]) { 1029 HttpClientResponse response, CompressionOptions compression) {
1030 String extensionHeader = response.headers.value('Sec-WebSocket-Extensions');
1031
1032 if (extensionHeader == null) {
1033 extensionHeader = "";
1034 }
1035
1036 Iterable<List<String>> extensions =
1037 extensionHeader.split(", ").map((it) => it.split("; "));
Søren Gjesse 2015/08/24 08:21:50 Please see above (comment on _negoatiateCompressio
1038
1039 if (compression.enabled &&
1040 extensions.any((x) => x[0] == PER_MESSAGE_DEFLATE)) {
1041 var opts = extensions.firstWhere((x) => x[0] == PER_MESSAGE_DEFLATE);
1042 var serverNoContextTakeover = opts.contains("server_no_context_takeover");
1043 var clientNoContextTakeover = opts.contains("client_no_context_takeover");
1044
1045 int getWindowBits(String type) {
1046 var o = opts.firstWhere((x) => x.startsWith("${type}_max_window_bits="),
1047 orElse: () => null);
1048
1049 if (o == null) {
1050 return DEFAULT_WINDOW_BITS;
1051 }
1052
1053 try {
1054 o = o.substring("${type}_max_window_bits=".length);
1055 o = int.parse(o);
1056 } catch (e) {
1057 return DEFAULT_WINDOW_BITS;
1058 }
1059
1060 return o;
1061 }
1062
1063 return new _WebSocketPerMessageDeflate(
1064 clientMaxWindowBits: getWindowBits("client"),
1065 serverMaxWindowBits: getWindowBits("server"),
1066 clientNoContextTakeover: clientNoContextTakeover,
1067 serverNoContextTakeover: serverNoContextTakeover);
1068 }
1069
1070 return null;
1071 }
1072
1073 _WebSocketImpl._fromSocket(
1074 this._socket, this.protocol, CompressionOptions compression,
1075 [this._serverSide = false, _WebSocketPerMessageDeflate deflate]) {
858 _consumer = new _WebSocketConsumer(this, _socket); 1076 _consumer = new _WebSocketConsumer(this, _socket);
859 _sink = new _StreamSinkImpl(_consumer); 1077 _sink = new _StreamSinkImpl(_consumer);
860 _readyState = WebSocket.OPEN; 1078 _readyState = WebSocket.OPEN;
1079 _deflate = deflate;
861 1080
862 var transformer = new _WebSocketProtocolTransformer(_serverSide); 1081 var transformer = new _WebSocketProtocolTransformer(_serverSide, _deflate);
863 _subscription = _socket.transform(transformer).listen( 1082 _subscription = _socket.transform(transformer).listen((data) {
864 (data) { 1083 if (data is _WebSocketPing) {
865 if (data is _WebSocketPing) { 1084 if (!_writeClosed) _consumer.add(new _WebSocketPong(data.payload));
866 if (!_writeClosed) _consumer.add(new _WebSocketPong(data.payload)); 1085 } else if (data is _WebSocketPong) {
867 } else if (data is _WebSocketPong) { 1086 // Simply set pingInterval, as it'll cancel any timers.
868 // Simply set pingInterval, as it'll cancel any timers. 1087 pingInterval = _pingInterval;
869 pingInterval = _pingInterval; 1088 } else {
870 } else { 1089 _controller.add(data);
871 _controller.add(data); 1090 }
872 } 1091 }, onError: (error, stackTrace) {
873 }, 1092 if (_closeTimer != null) _closeTimer.cancel();
874 onError: (error) { 1093 if (error is FormatException) {
875 if (_closeTimer != null) _closeTimer.cancel(); 1094 _close(WebSocketStatus.INVALID_FRAME_PAYLOAD_DATA);
876 if (error is FormatException) { 1095 } else {
877 _close(WebSocketStatus.INVALID_FRAME_PAYLOAD_DATA); 1096 _close(WebSocketStatus.PROTOCOL_ERROR);
878 } else { 1097 }
879 _close(WebSocketStatus.PROTOCOL_ERROR); 1098 // An error happened, set the close code set above.
880 } 1099 _closeCode = _outCloseCode;
881 // An error happened, set the close code set above. 1100 _closeReason = _outCloseReason;
882 _closeCode = _outCloseCode; 1101 _controller.close();
883 _closeReason = _outCloseReason; 1102 }, onDone: () {
884 _controller.close(); 1103 if (_closeTimer != null) _closeTimer.cancel();
885 }, 1104 if (_readyState == WebSocket.OPEN) {
886 onDone: () { 1105 _readyState = WebSocket.CLOSING;
887 if (_closeTimer != null) _closeTimer.cancel(); 1106 if (!_isReservedStatusCode(transformer.closeCode)) {
888 if (_readyState == WebSocket.OPEN) { 1107 _close(transformer.closeCode, transformer.closeReason);
889 _readyState = WebSocket.CLOSING; 1108 } else {
890 if (!_isReservedStatusCode(transformer.closeCode)) { 1109 _close();
891 _close(transformer.closeCode, transformer.closeReason); 1110 }
892 } else { 1111 _readyState = WebSocket.CLOSED;
893 _close(); 1112 }
894 } 1113 // Protocol close, use close code from transformer.
895 _readyState = WebSocket.CLOSED; 1114 _closeCode = transformer.closeCode;
896 } 1115 _closeReason = transformer.closeReason;
897 // Protocol close, use close code from transformer. 1116 _controller.close();
898 _closeCode = transformer.closeCode; 1117 }, cancelOnError: true);
899 _closeReason = transformer.closeReason;
900 _controller.close();
901 },
902 cancelOnError: true);
903 _subscription.pause(); 1118 _subscription.pause();
904 _controller = new StreamController(sync: true, 1119 _controller = new StreamController(
905 onListen: _subscription.resume, 1120 sync: true, onListen: _subscription.resume, onCancel: () {
906 onCancel: () { 1121 _subscription.cancel();
907 _subscription.cancel(); 1122 _subscription = null;
908 _subscription = null; 1123 }, onPause: _subscription.pause, onResume: _subscription.resume);
909 },
910 onPause: _subscription.pause,
911 onResume: _subscription.resume);
912 1124
913 _webSockets[_serviceId] = this; 1125 _webSockets[_serviceId] = this;
914 try { _socket._owner = this; } catch (_) {} 1126 try {
1127 _socket._owner = this;
1128 } catch (_) {}
915 } 1129 }
916 1130
917 StreamSubscription listen(void onData(message), 1131 StreamSubscription listen(void onData(message),
918 {Function onError, 1132 {Function onError, void onDone(), bool cancelOnError}) {
919 void onDone(),
920 bool cancelOnError}) {
921 return _controller.stream.listen(onData, 1133 return _controller.stream.listen(onData,
922 onError: onError, 1134 onError: onError, onDone: onDone, cancelOnError: cancelOnError);
923 onDone: onDone,
924 cancelOnError: cancelOnError);
925 } 1135 }
926 1136
927 Duration get pingInterval => _pingInterval; 1137 Duration get pingInterval => _pingInterval;
928 1138
929 void set pingInterval(Duration interval) { 1139 void set pingInterval(Duration interval) {
930 if (_writeClosed) return; 1140 if (_writeClosed) return;
931 if (_pingTimer != null) _pingTimer.cancel(); 1141 if (_pingTimer != null) _pingTimer.cancel();
932 _pingInterval = interval; 1142 _pingInterval = interval;
933 1143
934 if (_pingInterval == null) return; 1144 if (_pingInterval == null) return;
(...skipping 85 matching lines...) Expand 10 before | Expand all | Expand 10 after
1020 'type': '@Socket', 1230 'type': '@Socket',
1021 'name': 'UserSocket', 1231 'name': 'UserSocket',
1022 'user_name': 'UserSocket', 1232 'user_name': 'UserSocket',
1023 }; 1233 };
1024 } 1234 }
1025 return r; 1235 return r;
1026 } 1236 }
1027 1237
1028 static bool _isReservedStatusCode(int code) { 1238 static bool _isReservedStatusCode(int code) {
1029 return code != null && 1239 return code != null &&
1030 (code < WebSocketStatus.NORMAL_CLOSURE || 1240 (code < WebSocketStatus.NORMAL_CLOSURE ||
1031 code == WebSocketStatus.RESERVED_1004 || 1241 code == WebSocketStatus.RESERVED_1004 ||
1032 code == WebSocketStatus.NO_STATUS_RECEIVED || 1242 code == WebSocketStatus.NO_STATUS_RECEIVED ||
1033 code == WebSocketStatus.ABNORMAL_CLOSURE || 1243 code == WebSocketStatus.ABNORMAL_CLOSURE ||
1034 (code > WebSocketStatus.INTERNAL_SERVER_ERROR && 1244 (code > WebSocketStatus.INTERNAL_SERVER_ERROR &&
1035 code < WebSocketStatus.RESERVED_1015) || 1245 code < WebSocketStatus.RESERVED_1015) ||
1036 (code >= WebSocketStatus.RESERVED_1015 && 1246 (code >= WebSocketStatus.RESERVED_1015 && code < 3000));
1037 code < 3000));
1038 } 1247 }
1039 } 1248 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698