Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(184)

Side by Side Diff: sdk/lib/io/websocket_impl.dart

Issue 1390353005: Web Socket compression - take two (Closed) Base URL: https://github.com/dart-lang/sdk.git@master
Patch Set: Created 5 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 part of dart.io; 5 part of dart.io;
6 6
7 const String _webSocketGUID = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"; 7 const String _webSocketGUID = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
8 8
9 // Matches _WebSocketOpcode. 9 // Matches _WebSocketOpcode.
10 class _WebSocketMessageType { 10 class _WebSocketMessageType {
11 static const int NONE = 0; 11 static const int NONE = 0;
12 static const int TEXT = 1; 12 static const int TEXT = 1;
13 static const int BINARY = 2; 13 static const int BINARY = 2;
14 } 14 }
15 15
16
17 class _WebSocketOpcode { 16 class _WebSocketOpcode {
18 static const int CONTINUATION = 0; 17 static const int CONTINUATION = 0;
19 static const int TEXT = 1; 18 static const int TEXT = 1;
20 static const int BINARY = 2; 19 static const int BINARY = 2;
21 static const int RESERVED_3 = 3; 20 static const int RESERVED_3 = 3;
22 static const int RESERVED_4 = 4; 21 static const int RESERVED_4 = 4;
23 static const int RESERVED_5 = 5; 22 static const int RESERVED_5 = 5;
24 static const int RESERVED_6 = 6; 23 static const int RESERVED_6 = 6;
25 static const int RESERVED_7 = 7; 24 static const int RESERVED_7 = 7;
26 static const int CLOSE = 8; 25 static const int CLOSE = 8;
27 static const int PING = 9; 26 static const int PING = 9;
28 static const int PONG = 10; 27 static const int PONG = 10;
29 static const int RESERVED_B = 11; 28 static const int RESERVED_B = 11;
30 static const int RESERVED_C = 12; 29 static const int RESERVED_C = 12;
31 static const int RESERVED_D = 13; 30 static const int RESERVED_D = 13;
32 static const int RESERVED_E = 14; 31 static const int RESERVED_E = 14;
33 static const int RESERVED_F = 15; 32 static const int RESERVED_F = 15;
34 } 33 }
35 34
36 /** 35 /**
37 * The web socket protocol transformer handles the protocol byte stream 36 * The web socket protocol transformer handles the protocol byte stream
38 * which is supplied through the [:handleData:]. As the protocol is processed, 37 * which is supplied through the [:handleData:]. As the protocol is processed,
39 * it'll output frame data as either a List<int> or String. 38 * it'll output frame data as either a List<int> or String.
40 * 39 *
41 * Important infomation about usage: Be sure you use cancelOnError, so the 40 * Important information about usage: Be sure you use cancelOnError, so the
42 * socket will be closed when the processer encounter an error. Not using it 41 * socket will be closed when the processor encounter an error. Not using it
43 * will lead to undefined behaviour. 42 * will lead to undefined behaviour.
44 */ 43 */
45 // TODO(ajohnsen): make this transformer reusable? 44 // TODO(ajohnsen): make this transformer reusable?
46 class _WebSocketProtocolTransformer implements StreamTransformer, EventSink { 45 class _WebSocketProtocolTransformer implements StreamTransformer, EventSink {
47 static const int START = 0; 46 static const int START = 0;
48 static const int LEN_FIRST = 1; 47 static const int LEN_FIRST = 1;
49 static const int LEN_REST = 2; 48 static const int LEN_REST = 2;
50 static const int MASK = 3; 49 static const int MASK = 3;
51 static const int PAYLOAD = 4; 50 static const int PAYLOAD = 4;
52 static const int CLOSED = 5; 51 static const int CLOSED = 5;
53 static const int FAILURE = 6; 52 static const int FAILURE = 6;
53 static const int FIN = 0x80;
54 static const int RSV1 = 0x40;
55 static const int RSV2 = 0x20;
56 static const int RSV3 = 0x10;
57 static const int OPCODE = 0xF;
54 58
55 int _state = START; 59 int _state = START;
56 bool _fin = false; 60 bool _fin = false;
61 bool _compressed = false;
57 int _opcode = -1; 62 int _opcode = -1;
58 int _len = -1; 63 int _len = -1;
59 bool _masked = false; 64 bool _masked = false;
60 int _remainingLenBytes = -1; 65 int _remainingLenBytes = -1;
61 int _remainingMaskingKeyBytes = 4; 66 int _remainingMaskingKeyBytes = 4;
62 int _remainingPayloadBytes = -1; 67 int _remainingPayloadBytes = -1;
63 int _unmaskingIndex = 0; 68 int _unmaskingIndex = 0;
64 int _currentMessageType = _WebSocketMessageType.NONE; 69 int _currentMessageType = _WebSocketMessageType.NONE;
65 int closeCode = WebSocketStatus.NO_STATUS_RECEIVED; 70 int closeCode = WebSocketStatus.NO_STATUS_RECEIVED;
66 String closeReason = ""; 71 String closeReason = "";
67 72
68 EventSink _eventSink; 73 EventSink _eventSink;
69 74
70 final bool _serverSide; 75 final bool _serverSide;
71 final List _maskingBytes = new List(4); 76 final List _maskingBytes = new List(4);
72 final BytesBuilder _payload = new BytesBuilder(copy: false); 77 final BytesBuilder _payload = new BytesBuilder(copy: false);
73 78
74 _WebSocketProtocolTransformer([this._serverSide = false]); 79 _WebSocketPerMessageDeflate _deflate;
80 _WebSocketProtocolTransformer([this._serverSide = false, this._deflate]);
75 81
76 Stream bind(Stream stream) { 82 Stream bind(Stream stream) {
77 return new Stream.eventTransformed( 83 return new Stream.eventTransformed(stream, (EventSink eventSink) {
78 stream, 84 if (_eventSink != null) {
79 (EventSink eventSink) { 85 throw new StateError("WebSocket transformer already used.");
80 if (_eventSink != null) { 86 }
81 throw new StateError("WebSocket transformer already used."); 87 _eventSink = eventSink;
82 } 88 return this;
83 _eventSink = eventSink; 89 });
84 return this;
85 });
86 } 90 }
87 91
88 void addError(Object error, [StackTrace stackTrace]) => 92 void addError(Object error, [StackTrace stackTrace]) =>
89 _eventSink.addError(error, stackTrace); 93 _eventSink.addError(error, stackTrace);
90 94
91 void close() => _eventSink.close(); 95 void close() => _eventSink.close();
92 96
93 /** 97 /**
94 * Process data received from the underlying communication channel. 98 * Process data received from the underlying communication channel.
95 */ 99 */
96 void add(Uint8List buffer) { 100 void add(Uint8List buffer) {
97 int count = buffer.length;
98 int index = 0; 101 int index = 0;
99 int lastIndex = count; 102 int lastIndex = buffer.length;
100 if (_state == CLOSED) { 103 if (_state == CLOSED) {
101 throw new WebSocketException("Data on closed connection"); 104 throw new WebSocketException("Data on closed connection");
102 } 105 }
103 if (_state == FAILURE) { 106 if (_state == FAILURE) {
104 throw new WebSocketException("Data on failed connection"); 107 throw new WebSocketException("Data on failed connection");
105 } 108 }
106 while ((index < lastIndex) && _state != CLOSED && _state != FAILURE) { 109 while ((index < lastIndex) && _state != CLOSED && _state != FAILURE) {
107 int byte = buffer[index]; 110 int byte = buffer[index];
108 if (_state <= LEN_REST) { 111 if (_state <= LEN_REST) {
109 if (_state == START) { 112 if (_state == START) {
110 _fin = (byte & 0x80) != 0; 113 _fin = (byte & FIN) != 0;
111 if ((byte & 0x70) != 0) { 114 var rsv = (byte & (RSV1 | RSV2 | RSV3)) >> 4;
Søren Gjesse 2015/10/19 16:53:20 I don't think this temp is needed (see below).
butlermatt 2015/10/22 19:36:08 Done.
112 // The RSV1, RSV2 bits RSV3 must be all zero. 115
116 if (rsv != 0 && rsv != 4) {
Søren Gjesse 2015/10/19 16:53:20 Just check ((byte & (RSV2 | RSV3)) != 0)
butlermatt 2015/10/22 19:36:08 Done.
117 // The RSV1, RSV2, RSV3 bits must be all zero.
Søren Gjesse 2015/10/19 16:53:20 This comment is no longer true.
butlermatt 2015/10/22 19:36:08 Done.
113 throw new WebSocketException("Protocol error"); 118 throw new WebSocketException("Protocol error");
114 } 119 }
115 _opcode = (byte & 0xF); 120
121 if (rsv == 4) {
Søren Gjesse 2015/10/19 16:53:20 4 -> RSV1 with no shift above.
butlermatt 2015/10/22 19:36:08 Done.
122 _compressed = true;
123 } else {
124 _compressed = false;
125 }
126 _opcode = (byte & OPCODE);
127
116 if (_opcode <= _WebSocketOpcode.BINARY) { 128 if (_opcode <= _WebSocketOpcode.BINARY) {
117 if (_opcode == _WebSocketOpcode.CONTINUATION) { 129 if (_opcode == _WebSocketOpcode.CONTINUATION) {
118 if (_currentMessageType == _WebSocketMessageType.NONE) { 130 if (_currentMessageType == _WebSocketMessageType.NONE) {
119 throw new WebSocketException("Protocol error"); 131 throw new WebSocketException("Protocol error");
120 } 132 }
121 } else { 133 } else {
122 assert(_opcode == _WebSocketOpcode.TEXT || 134 assert(_opcode == _WebSocketOpcode.TEXT ||
123 _opcode == _WebSocketOpcode.BINARY); 135 _opcode == _WebSocketOpcode.BINARY);
124 if (_currentMessageType != _WebSocketMessageType.NONE) { 136 if (_currentMessageType != _WebSocketMessageType.NONE) {
125 throw new WebSocketException("Protocol error"); 137 throw new WebSocketException("Protocol error");
126 } 138 }
127 _currentMessageType = _opcode; 139 _currentMessageType = _opcode;
128 } 140 }
129 } else if (_opcode >= _WebSocketOpcode.CLOSE && 141 } else if (_opcode >= _WebSocketOpcode.CLOSE &&
130 _opcode <= _WebSocketOpcode.PONG) { 142 _opcode <= _WebSocketOpcode.PONG) {
131 // Control frames cannot be fragmented. 143 // Control frames cannot be fragmented.
132 if (!_fin) throw new WebSocketException("Protocol error"); 144 if (!_fin) throw new WebSocketException("Protocol error");
133 } else { 145 } else {
134 throw new WebSocketException("Protocol error"); 146 throw new WebSocketException("Protocol error");
135 } 147 }
136 _state = LEN_FIRST; 148 _state = LEN_FIRST;
137 } else if (_state == LEN_FIRST) { 149 } else if (_state == LEN_FIRST) {
138 _masked = (byte & 0x80) != 0; 150 _masked = (byte & 0x80) != 0;
139 _len = byte & 0x7F; 151 _len = byte & 0x7F;
140 if (_isControlFrame() && _len > 125) { 152 if (_isControlFrame() && _len > 125) {
(...skipping 28 matching lines...) Expand all
169 } else { 181 } else {
170 assert(_state == PAYLOAD); 182 assert(_state == PAYLOAD);
171 // The payload is not handled one byte at a time but in blocks. 183 // The payload is not handled one byte at a time but in blocks.
172 int payloadLength = min(lastIndex - index, _remainingPayloadBytes); 184 int payloadLength = min(lastIndex - index, _remainingPayloadBytes);
173 _remainingPayloadBytes -= payloadLength; 185 _remainingPayloadBytes -= payloadLength;
174 // Unmask payload if masked. 186 // Unmask payload if masked.
175 if (_masked) { 187 if (_masked) {
176 _unmask(index, payloadLength, buffer); 188 _unmask(index, payloadLength, buffer);
177 } 189 }
178 // Control frame and data frame share _payloads. 190 // Control frame and data frame share _payloads.
179 _payload.add( 191 _payload.add(new Uint8List.view(buffer.buffer, index, payloadLength));
180 new Uint8List.view(buffer.buffer, index, payloadLength));
181 index += payloadLength; 192 index += payloadLength;
182 if (_isControlFrame()) { 193 if (_isControlFrame()) {
183 if (_remainingPayloadBytes == 0) _controlFrameEnd(); 194 if (_remainingPayloadBytes == 0) _controlFrameEnd();
184 } else { 195 } else {
185 if (_currentMessageType != _WebSocketMessageType.TEXT && 196 if (_currentMessageType != _WebSocketMessageType.TEXT &&
186 _currentMessageType != _WebSocketMessageType.BINARY) { 197 _currentMessageType != _WebSocketMessageType.BINARY) {
187 throw new WebSocketException("Protocol error"); 198 throw new WebSocketException("Protocol error");
188 } 199 }
189 if (_remainingPayloadBytes == 0) _messageFrameEnd(); 200 if (_remainingPayloadBytes == 0) _messageFrameEnd();
190 } 201 }
191 202
192 // Hack - as we always do index++ below. 203 // Hack - as we always do index++ below.
193 index--; 204 index--;
194 } 205 }
195 } 206 }
196 207
197 // Move to the next byte. 208 // Move to the next byte.
(...skipping 14 matching lines...) Expand all
212 index += startOffset; 223 index += startOffset;
213 length -= startOffset; 224 length -= startOffset;
214 final int blockCount = length ~/ BLOCK_SIZE; 225 final int blockCount = length ~/ BLOCK_SIZE;
215 if (blockCount > 0) { 226 if (blockCount > 0) {
216 // Create mask block. 227 // Create mask block.
217 int mask = 0; 228 int mask = 0;
218 for (int i = 3; i >= 0; i--) { 229 for (int i = 3; i >= 0; i--) {
219 mask = (mask << 8) | _maskingBytes[(_unmaskingIndex + i) & 3]; 230 mask = (mask << 8) | _maskingBytes[(_unmaskingIndex + i) & 3];
220 } 231 }
221 Int32x4 blockMask = new Int32x4(mask, mask, mask, mask); 232 Int32x4 blockMask = new Int32x4(mask, mask, mask, mask);
222 Int32x4List blockBuffer = new Int32x4List.view( 233 Int32x4List blockBuffer =
223 buffer.buffer, index, blockCount); 234 new Int32x4List.view(buffer.buffer, index, blockCount);
224 for (int i = 0; i < blockBuffer.length; i++) { 235 for (int i = 0; i < blockBuffer.length; i++) {
225 blockBuffer[i] ^= blockMask; 236 blockBuffer[i] ^= blockMask;
226 } 237 }
227 final int bytes = blockCount * BLOCK_SIZE; 238 final int bytes = blockCount * BLOCK_SIZE;
228 index += bytes; 239 index += bytes;
229 length -= bytes; 240 length -= bytes;
230 } 241 }
231 } 242 }
232 // Handle end. 243 // Handle end.
233 final int end = index + length; 244 final int end = index + length;
(...skipping 41 matching lines...) Expand 10 before | Expand all | Expand 10 after
275 } 286 }
276 _prepareForNextFrame(); 287 _prepareForNextFrame();
277 } else { 288 } else {
278 _messageFrameEnd(); 289 _messageFrameEnd();
279 } 290 }
280 } else { 291 } else {
281 _state = PAYLOAD; 292 _state = PAYLOAD;
282 } 293 }
283 } 294 }
284 295
296 static Utf8Decoder _utfDecoder;
297
285 void _messageFrameEnd() { 298 void _messageFrameEnd() {
286 if (_fin) { 299 if (_fin) {
300 var bytes = _payload.takeBytes();
301 if (_deflate != null && _compressed) {
302 bytes = _deflate.processIncomingMessage(bytes);
303 }
304
287 switch (_currentMessageType) { 305 switch (_currentMessageType) {
288 case _WebSocketMessageType.TEXT: 306 case _WebSocketMessageType.TEXT:
289 _eventSink.add(UTF8.decode(_payload.takeBytes())); 307 if (_utfDecoder == null) {
308 _utfDecoder = new Utf8Decoder();
Søren Gjesse 2015/10/19 16:53:20 Why not just use UTF8.decode?
butlermatt 2015/10/22 19:36:09 Done.
alexandre.ardhuin 2015/10/22 20:06:04 Unlike `UTF8.encode` that uses const, `UTF8.decode
309 }
alexandre.ardhuin 2015/10/13 20:22:57 You can remove this `if` by either : - `_eventSink
butlermatt 2015/10/22 19:36:08 Done.
310 _eventSink.add(_utfDecoder.convert(bytes));
290 break; 311 break;
291 case _WebSocketMessageType.BINARY: 312 case _WebSocketMessageType.BINARY:
292 _eventSink.add(_payload.takeBytes()); 313 _eventSink.add(bytes);
293 break; 314 break;
294 } 315 }
295 _currentMessageType = _WebSocketMessageType.NONE; 316 _currentMessageType = _WebSocketMessageType.NONE;
296 } 317 }
297 _prepareForNextFrame(); 318 _prepareForNextFrame();
298 } 319 }
299 320
300 void _controlFrameEnd() { 321 void _controlFrameEnd() {
301 switch (_opcode) { 322 switch (_opcode) {
302 case _WebSocketOpcode.CLOSE: 323 case _WebSocketOpcode.CLOSE:
(...skipping 21 matching lines...) Expand all
324 345
325 case _WebSocketOpcode.PONG: 346 case _WebSocketOpcode.PONG:
326 _eventSink.add(new _WebSocketPong(_payload.takeBytes())); 347 _eventSink.add(new _WebSocketPong(_payload.takeBytes()));
327 break; 348 break;
328 } 349 }
329 _prepareForNextFrame(); 350 _prepareForNextFrame();
330 } 351 }
331 352
332 bool _isControlFrame() { 353 bool _isControlFrame() {
333 return _opcode == _WebSocketOpcode.CLOSE || 354 return _opcode == _WebSocketOpcode.CLOSE ||
334 _opcode == _WebSocketOpcode.PING || 355 _opcode == _WebSocketOpcode.PING ||
335 _opcode == _WebSocketOpcode.PONG; 356 _opcode == _WebSocketOpcode.PONG;
336 } 357 }
337 358
338 void _prepareForNextFrame() { 359 void _prepareForNextFrame() {
339 if (_state != CLOSED && _state != FAILURE) _state = START; 360 if (_state != CLOSED && _state != FAILURE) _state = START;
340 _fin = false; 361 _fin = false;
341 _opcode = -1; 362 _opcode = -1;
342 _len = -1; 363 _len = -1;
343 _remainingLenBytes = -1; 364 _remainingLenBytes = -1;
344 _remainingMaskingKeyBytes = 4; 365 _remainingMaskingKeyBytes = 4;
345 _remainingPayloadBytes = -1; 366 _remainingPayloadBytes = -1;
346 _unmaskingIndex = 0; 367 _unmaskingIndex = 0;
347 } 368 }
348 } 369 }
349 370
350
351 class _WebSocketPing { 371 class _WebSocketPing {
352 final List<int> payload; 372 final List<int> payload;
353 _WebSocketPing([this.payload = null]); 373 _WebSocketPing([this.payload = null]);
354 } 374 }
355 375
356
357 class _WebSocketPong { 376 class _WebSocketPong {
358 final List<int> payload; 377 final List<int> payload;
359 _WebSocketPong([this.payload = null]); 378 _WebSocketPong([this.payload = null]);
360 } 379 }
361 380
362
363 class _WebSocketTransformerImpl implements WebSocketTransformer { 381 class _WebSocketTransformerImpl implements WebSocketTransformer {
364 final StreamController<WebSocket> _controller = 382 final StreamController<WebSocket> _controller =
365 new StreamController<WebSocket>(sync: true); 383 new StreamController<WebSocket>(sync: true);
366 final Function _protocolSelector; 384 final Function _protocolSelector;
385 final CompressionOptions _compression;
367 386
368 _WebSocketTransformerImpl(this._protocolSelector); 387 _WebSocketTransformerImpl(this._protocolSelector, this._compression);
369 388
370 Stream<WebSocket> bind(Stream<HttpRequest> stream) { 389 Stream<WebSocket> bind(Stream<HttpRequest> stream) {
371 stream.listen((request) { 390 stream.listen((request) {
372 _upgrade(request, _protocolSelector) 391 _upgrade(request, _protocolSelector, _compression)
373 .then((WebSocket webSocket) => _controller.add(webSocket)) 392 .then((WebSocket webSocket) => _controller.add(webSocket))
374 .catchError(_controller.addError); 393 .catchError(_controller.addError);
375 }, onDone: () { 394 }, onDone: () {
376 _controller.close(); 395 _controller.close();
377 }); 396 });
378 397
379 return _controller.stream; 398 return _controller.stream;
380 } 399 }
381 400
382 static Future<WebSocket> _upgrade(HttpRequest request, _protocolSelector) { 401 static Future<WebSocket> _upgrade(
402 HttpRequest request, _protocolSelector, CompressionOptions compression) {
383 var response = request.response; 403 var response = request.response;
384 if (!_isUpgradeRequest(request)) { 404 if (!_isUpgradeRequest(request)) {
385 // Send error response. 405 // Send error response.
386 response 406 response
387 ..statusCode = HttpStatus.BAD_REQUEST 407 ..statusCode = HttpStatus.BAD_REQUEST
388 ..close(); 408 ..close();
389 return new Future.error( 409 return new Future.error(
390 new WebSocketException("Invalid WebSocket upgrade request")); 410 new WebSocketException("Invalid WebSocket upgrade request"));
391 } 411 }
392 412
393 Future upgrade(String protocol) { 413 Future upgrade(String protocol) {
394 // Send the upgrade response. 414 // Send the upgrade response.
395 response 415 response
396 ..statusCode = HttpStatus.SWITCHING_PROTOCOLS 416 ..statusCode = HttpStatus.SWITCHING_PROTOCOLS
397 ..headers.add(HttpHeaders.CONNECTION, "Upgrade") 417 ..headers.add(HttpHeaders.CONNECTION, "Upgrade")
398 ..headers.add(HttpHeaders.UPGRADE, "websocket"); 418 ..headers.add(HttpHeaders.UPGRADE, "websocket");
399 String key = request.headers.value("Sec-WebSocket-Key"); 419 String key = request.headers.value("Sec-WebSocket-Key");
400 _SHA1 sha1 = new _SHA1(); 420 _SHA1 sha1 = new _SHA1();
401 sha1.add("$key$_webSocketGUID".codeUnits); 421 sha1.add("$key$_webSocketGUID".codeUnits);
402 String accept = _CryptoUtils.bytesToBase64(sha1.close()); 422 String accept = _CryptoUtils.bytesToBase64(sha1.close());
403 response.headers.add("Sec-WebSocket-Accept", accept); 423 response.headers.add("Sec-WebSocket-Accept", accept);
404 if (protocol != null) { 424 if (protocol != null) {
405 response.headers.add("Sec-WebSocket-Protocol", protocol); 425 response.headers.add("Sec-WebSocket-Protocol", protocol);
406 } 426 }
427
428 var deflate = _negotiateCompression(request, response, compression);
429
407 response.headers.contentLength = 0; 430 response.headers.contentLength = 0;
408 return response.detachSocket() 431 return response.detachSocket().then((socket) =>
409 .then((socket) => new _WebSocketImpl._fromSocket( 432 new _WebSocketImpl._fromSocket(
410 socket, protocol, true)); 433 socket, protocol, compression, true, deflate));
411 } 434 }
412 435
413 var protocols = request.headers['Sec-WebSocket-Protocol']; 436 var protocols = request.headers['Sec-WebSocket-Protocol'];
414 if (protocols != null && _protocolSelector != null) { 437 if (protocols != null && _protocolSelector != null) {
415 // The suggested protocols can be spread over multiple lines, each 438 // The suggested protocols can be spread over multiple lines, each
416 // consisting of multiple protocols. To unify all of them, first join 439 // consisting of multiple protocols. To unify all of them, first join
417 // the lists with ', ' and then tokenize. 440 // the lists with ', ' and then tokenize.
418 protocols = _HttpParser._tokenizeFieldValue(protocols.join(', ')); 441 protocols = _HttpParser._tokenizeFieldValue(protocols.join(', '));
419 return new Future(() => _protocolSelector(protocols)) 442 return new Future(() => _protocolSelector(protocols)).then((protocol) {
420 .then((protocol) { 443 if (protocols.indexOf(protocol) < 0) {
421 if (protocols.indexOf(protocol) < 0) { 444 throw new WebSocketException(
422 throw new WebSocketException( 445 "Selected protocol is not in the list of available protocols");
423 "Selected protocol is not in the list of available protocols"); 446 }
424 } 447 return protocol;
425 return protocol; 448 }).catchError((error) {
426 }) 449 response
427 .catchError((error) { 450 ..statusCode = HttpStatus.INTERNAL_SERVER_ERROR
428 response 451 ..close();
429 ..statusCode = HttpStatus.INTERNAL_SERVER_ERROR 452 throw error;
430 ..close(); 453 }).then(upgrade);
431 throw error;
432 })
433 .then(upgrade);
434 } else { 454 } else {
435 return upgrade(null); 455 return upgrade(null);
436 } 456 }
437 } 457 }
438 458
459 static _WebSocketPerMessageDeflate _negotiateCompression(HttpRequest request,
460 HttpResponse response, CompressionOptions compression) {
461 var extensionHeader = request.headers.value("Sec-WebSocket-Extensions");
462
463 if (extensionHeader == null) {
464 extensionHeader = "";
465 }
466
467 var hv = HeaderValue.parse(extensionHeader);
468 if (compression.enabled && hv.value == _WebSocketImpl.PER_MESSAGE_DEFLATE) {
469 var info = compression._createHeader(hv);
470
471 response.headers.add("Sec-WebSocket-Extensions", info[0]);
472 var serverNoContextTakeover =
473 hv.parameters.containsKey("server_no_context_takeover");
474 var clientNoContextTakeover =
475 hv.parameters.containsKey("client_no_context_takeover");
476 var deflate = new _WebSocketPerMessageDeflate(
477 serverNoContextTakeover: serverNoContextTakeover,
478 clientNoContextTakeover: clientNoContextTakeover,
479 serverMaxWindowBits: info[1],
480 clientMaxWindowBits: info[1],
481 serverSide: true);
482
483 return deflate;
484 }
485
486 return null;
487 }
488
439 static bool _isUpgradeRequest(HttpRequest request) { 489 static bool _isUpgradeRequest(HttpRequest request) {
440 if (request.method != "GET") { 490 if (request.method != "GET") {
441 return false; 491 return false;
442 } 492 }
443 if (request.headers[HttpHeaders.CONNECTION] == null) { 493 if (request.headers[HttpHeaders.CONNECTION] == null) {
444 return false; 494 return false;
445 } 495 }
446 bool isUpgrade = false; 496 bool isUpgrade = false;
447 request.headers[HttpHeaders.CONNECTION].forEach((String value) { 497 request.headers[HttpHeaders.CONNECTION].forEach((String value) {
448 if (value.toLowerCase() == "upgrade") isUpgrade = true; 498 if (value.toLowerCase() == "upgrade") isUpgrade = true;
449 }); 499 });
450 if (!isUpgrade) return false; 500 if (!isUpgrade) return false;
451 String upgrade = request.headers.value(HttpHeaders.UPGRADE); 501 String upgrade = request.headers.value(HttpHeaders.UPGRADE);
452 if (upgrade == null || upgrade.toLowerCase() != "websocket") { 502 if (upgrade == null || upgrade.toLowerCase() != "websocket") {
453 return false; 503 return false;
454 } 504 }
455 String version = request.headers.value("Sec-WebSocket-Version"); 505 String version = request.headers.value("Sec-WebSocket-Version");
456 if (version == null || version != "13") { 506 if (version == null || version != "13") {
457 return false; 507 return false;
458 } 508 }
459 String key = request.headers.value("Sec-WebSocket-Key"); 509 String key = request.headers.value("Sec-WebSocket-Key");
460 if (key == null) { 510 if (key == null) {
461 return false; 511 return false;
462 } 512 }
463 return true; 513 return true;
464 } 514 }
465 } 515 }
466 516
517 class _WebSocketPerMessageDeflate {
518 bool serverNoContextTakeover;
519 bool clientNoContextTakeover;
520 int clientMaxWindowBits;
521 int serverMaxWindowBits;
522 bool serverSide;
523
524 _Filter decoder;
525 _Filter encoder;
526
527 _WebSocketPerMessageDeflate(
528 {this.clientMaxWindowBits,
529 this.serverMaxWindowBits,
530 this.serverNoContextTakeover: false,
531 this.clientNoContextTakeover: false,
532 this.serverSide: false}) {
533 if (clientMaxWindowBits == null) {
Søren Gjesse 2015/10/19 16:53:20 Just add default values in the argument list?
butlermatt 2015/10/22 19:36:08 Done.
534 clientMaxWindowBits = _WebSocketImpl.DEFAULT_WINDOW_BITS;
535 }
536
537 if (serverMaxWindowBits == null) {
538 serverMaxWindowBits = _WebSocketImpl.DEFAULT_WINDOW_BITS;
539 }
540 }
541
542 void _ensureDecoder() {
543 if (decoder == null) {
544 decoder = _Filter._newZLibInflateFilter(
545 serverSide ? clientMaxWindowBits : serverMaxWindowBits, null, true);
546 }
547 }
548
549 void _ensureEncoder() {
550 if (encoder == null) {
551 encoder = _Filter._newZLibDeflateFilter(
552 false,
553 ZLibOption.DEFAULT_LEVEL,
554 serverSide ? serverMaxWindowBits : clientMaxWindowBits,
555 ZLibOption.DEFAULT_MEM_LEVEL,
556 ZLibOption.STRATEGY_DEFAULT,
557 null,
558 true);
559 }
560 }
561
562 List<int> processIncomingMessage(List<int> msg) {
563 _ensureDecoder();
564
565 var data = [];
566 data.addAll(msg);
567 data.addAll(const [0x00, 0x00, 0xff, 0xff]);
568
569 decoder.process(data, 0, data.length);
570 var reuse =
571 !(serverSide ? clientNoContextTakeover : serverNoContextTakeover);
572 var result = [];
573 var out;
574
575 while ((out = decoder.processed(flush: reuse)) != null) {
576 result.addAll(out);
577 }
578
579 decoder.processed(flush: reuse);
580
581 if (!reuse) {
582 decoder.end();
583 decoder = null;
584 }
585 return result;
586 }
587
588 List<int> processOutgoingMessage(List<int> msg) {
589 _ensureEncoder();
590 var reuse =
591 !(serverSide ? serverNoContextTakeover : clientNoContextTakeover);
592 var result = [];
593 var buffer = new Uint8List(msg.length);
Søren Gjesse 2015/10/19 16:53:20 Avoid new buffer if msg is Uint8List.
butlermatt 2015/10/22 19:36:08 Done.
594 var out;
595
596 for(var i = 0; i < msg.length; i++) {
Søren Gjesse 2015/10/19 16:53:20 Space after "for".
butlermatt 2015/10/22 19:36:08 Done.
597 if (msg[i] < 0 || 255 < msg[i]) {
598 throw new ArgumentError("List element is not a byte value "
599 "(value ${msg[i]} at index $i)");
600 }
601 buffer[i] = msg[i];
602 }
603
604 encoder.process(buffer, 0, buffer.length);
605
606 while ((out = encoder.processed(flush: reuse)) != null) {
607 result.addAll(out);
608 }
609
610 if (serverSide ? serverNoContextTakeover : clientNoContextTakeover) {
611 encoder.end();
612 encoder = null;
613 }
614
615 if (result.length > 4) {
616 result = result.sublist(0, result.length - 4);
617 }
618
619 return result;
620 }
621 }
467 622
468 // TODO(ajohnsen): Make this transformer reusable. 623 // TODO(ajohnsen): Make this transformer reusable.
469 class _WebSocketOutgoingTransformer implements StreamTransformer, EventSink { 624 class _WebSocketOutgoingTransformer implements StreamTransformer, EventSink {
470 final _WebSocketImpl webSocket; 625 final _WebSocketImpl webSocket;
471 EventSink _eventSink; 626 EventSink _eventSink;
472 627
473 _WebSocketOutgoingTransformer(this.webSocket); 628 _WebSocketPerMessageDeflate _deflateHelper;
629
630 _WebSocketOutgoingTransformer(this.webSocket) {
631 _deflateHelper = webSocket._deflate;
632 }
474 633
475 Stream bind(Stream stream) { 634 Stream bind(Stream stream) {
476 return new Stream.eventTransformed( 635 return new Stream.eventTransformed(stream, (EventSink eventSink) {
477 stream, 636 if (_eventSink != null) {
478 (EventSink eventSink) { 637 throw new StateError("WebSocket transformer already used");
479 if (_eventSink != null) { 638 }
480 throw new StateError("WebSocket transformer already used"); 639 _eventSink = eventSink;
481 } 640 return this;
482 _eventSink = eventSink; 641 });
483 return this;
484 });
485 } 642 }
486 643
487 void add(message) { 644 void add(message) {
488 if (message is _WebSocketPong) { 645 if (message is _WebSocketPong) {
489 addFrame(_WebSocketOpcode.PONG, message.payload); 646 addFrame(_WebSocketOpcode.PONG, message.payload);
490 return; 647 return;
491 } 648 }
492 if (message is _WebSocketPing) { 649 if (message is _WebSocketPing) {
493 addFrame(_WebSocketOpcode.PING, message.payload); 650 addFrame(_WebSocketOpcode.PING, message.payload);
494 return; 651 return;
495 } 652 }
496 List<int> data; 653 List<int> data;
497 int opcode; 654 int opcode;
498 if (message != null) { 655 if (message != null) {
499 if (message is String) { 656 if (message is String) {
500 opcode = _WebSocketOpcode.TEXT; 657 opcode = _WebSocketOpcode.TEXT;
501 data = UTF8.encode(message); 658 if (_utfEncoder == null) {
659 _utfEncoder = new Utf8Encoder();
660 }
alexandre.ardhuin 2015/10/13 20:22:57 You can remove this `if` with `static final Utf8En
butlermatt 2015/10/22 19:36:08 Converted to UTF8.encode to match previous comment
661 data = _utfEncoder.convert(message);
502 } else { 662 } else {
503 if (message is !List<int>) { 663 if (message is! List<int>) {
504 throw new ArgumentError(message); 664 throw new ArgumentError(message);
505 } 665 }
506 opcode = _WebSocketOpcode.BINARY; 666 opcode = _WebSocketOpcode.BINARY;
507 data = message; 667 data = message;
508 } 668 }
669
670 if (_deflateHelper != null) {
671 data = _deflateHelper.processOutgoingMessage(data);
672 }
509 } else { 673 } else {
510 opcode = _WebSocketOpcode.TEXT; 674 opcode = _WebSocketOpcode.TEXT;
511 } 675 }
512 addFrame(opcode, data); 676 addFrame(opcode, data);
513 } 677 }
514 678
679 static Utf8Encoder _utfEncoder;
680
515 void addError(Object error, [StackTrace stackTrace]) => 681 void addError(Object error, [StackTrace stackTrace]) =>
516 _eventSink.addError(error, stackTrace); 682 _eventSink.addError(error, stackTrace);
517 683
518 void close() { 684 void close() {
519 int code = webSocket._outCloseCode; 685 int code = webSocket._outCloseCode;
520 String reason = webSocket._outCloseReason; 686 String reason = webSocket._outCloseReason;
521 List<int> data; 687 List<int> data;
522 if (code != null) { 688 if (code != null) {
523 data = new List<int>(); 689 data = new List<int>();
524 data.add((code >> 8) & 0xFF); 690 data.add((code >> 8) & 0xFF);
525 data.add(code & 0xFF); 691 data.add(code & 0xFF);
526 if (reason != null) { 692 if (reason != null) {
527 data.addAll(UTF8.encode(reason)); 693 data.addAll(UTF8.encode(reason));
528 } 694 }
529 } 695 }
530 addFrame(_WebSocketOpcode.CLOSE, data); 696 addFrame(_WebSocketOpcode.CLOSE, data);
531 _eventSink.close(); 697 _eventSink.close();
532 } 698 }
533 699
534 void addFrame(int opcode, List<int> data) => 700 void addFrame(int opcode, List<int> data) => createFrame(
535 createFrame(opcode, data, webSocket._serverSide).forEach(_eventSink.add); 701 opcode,
702 data,
703 webSocket._serverSide,
704 _deflateHelper != null &&
705 (opcode == _WebSocketOpcode.TEXT ||
706 opcode == _WebSocketOpcode.BINARY)).forEach((e) {
707 _eventSink.add(e);
708 });
536 709
537 static Iterable createFrame(int opcode, List<int> data, bool serverSide) { 710 static Iterable createFrame(
538 bool mask = !serverSide; // Masking not implemented for server. 711 int opcode, List<int> data, bool serverSide, bool compressed) {
712 bool mask = !serverSide; // Masking not implemented for server.
539 int dataLength = data == null ? 0 : data.length; 713 int dataLength = data == null ? 0 : data.length;
540 // Determine the header size. 714 // Determine the header size.
541 int headerSize = (mask) ? 6 : 2; 715 int headerSize = (mask) ? 6 : 2;
542 if (dataLength > 65535) { 716 if (dataLength > 65535) {
543 headerSize += 8; 717 headerSize += 8;
544 } else if (dataLength > 125) { 718 } else if (dataLength > 125) {
545 headerSize += 2; 719 headerSize += 2;
546 } 720 }
547 Uint8List header = new Uint8List(headerSize); 721 Uint8List header = new Uint8List(headerSize);
548 int index = 0; 722 int index = 0;
723
724 var rsv = 0;
Søren Gjesse 2015/10/19 16:53:20 Use the RSVx constants.
butlermatt 2015/10/22 19:36:08 Acknowledged.
725 if (compressed) {
726 rsv = 4;
727 }
728
549 // Set FIN and opcode. 729 // Set FIN and opcode.
550 header[index++] = 0x80 | opcode; 730 var hoc = (1 << 7) | (rsv % 8) << 4 | (opcode % 128);
Søren Gjesse 2015/10/19 16:53:20 How about var hoc = FIN | (compressed ? RSV1 : 0)
butlermatt 2015/10/22 19:36:08 Done.
731
732 header[index++] = hoc;
551 // Determine size and position of length field. 733 // Determine size and position of length field.
552 int lengthBytes = 1; 734 int lengthBytes = 1;
553 int firstLengthByte = 1;
554 if (dataLength > 65535) { 735 if (dataLength > 65535) {
555 header[index++] = 127; 736 header[index++] = 127;
556 lengthBytes = 8; 737 lengthBytes = 8;
557 } else if (dataLength > 125) { 738 } else if (dataLength > 125) {
558 header[index++] = 126; 739 header[index++] = 126;
559 lengthBytes = 2; 740 lengthBytes = 2;
560 } 741 }
561 // Write the length in network byte order into the header. 742 // Write the length in network byte order into the header.
562 for (int i = 0; i < lengthBytes; i++) { 743 for (int i = 0; i < lengthBytes; i++) {
563 header[index++] = dataLength >> (((lengthBytes - 1) - i) * 8) & 0xFF; 744 header[index++] = dataLength >> (((lengthBytes - 1) - i) * 8) & 0xFF;
564 } 745 }
565 if (mask) { 746 if (mask) {
566 header[1] |= 1 << 7; 747 header[1] |= 1 << 7;
567 var maskBytes = _IOCrypto.getRandomBytes(4); 748 var maskBytes = _IOCrypto.getRandomBytes(4);
568 header.setRange(index, index + 4, maskBytes); 749 header.setRange(index, index + 4, maskBytes);
569 index += 4; 750 index += 4;
570 if (data != null) { 751 if (data != null) {
571 Uint8List list; 752 Uint8List list;
572 // If this is a text message just do the masking inside the 753 // If this is a text message just do the masking inside the
573 // encoded data. 754 // encoded data.
574 if (opcode == _WebSocketOpcode.TEXT && data is Uint8List) { 755 if (opcode == _WebSocketOpcode.TEXT && data is Uint8List) {
575 list = data; 756 list = data;
576 } else { 757 } else {
577 if (data is Uint8List) { 758 if (data is Uint8List) {
578 list = new Uint8List.fromList(data); 759 list = new Uint8List.fromList(data);
579 } else { 760 } else {
580 list = new Uint8List(data.length); 761 list = new Uint8List(data.length);
581 for (int i = 0; i < data.length; i++) { 762 for (int i = 0; i < data.length; i++) {
582 if (data[i] < 0 || 255 < data[i]) { 763 if (data[i] < 0 || 255 < data[i]) {
583 throw new ArgumentError( 764 throw new ArgumentError("List element is not a byte value "
584 "List element is not a byte value "
585 "(value ${data[i]} at index $i)"); 765 "(value ${data[i]} at index $i)");
586 } 766 }
587 list[i] = data[i]; 767 list[i] = data[i];
588 } 768 }
589 } 769 }
590 } 770 }
591 const int BLOCK_SIZE = 16; 771 const int BLOCK_SIZE = 16;
592 int blockCount = list.length ~/ BLOCK_SIZE; 772 int blockCount = list.length ~/ BLOCK_SIZE;
593 if (blockCount > 0) { 773 if (blockCount > 0) {
594 // Create mask block. 774 // Create mask block.
595 int mask = 0; 775 int mask = 0;
596 for (int i = 3; i >= 0; i--) { 776 for (int i = 3; i >= 0; i--) {
597 mask = (mask << 8) | maskBytes[i]; 777 mask = (mask << 8) | maskBytes[i];
598 } 778 }
599 Int32x4 blockMask = new Int32x4(mask, mask, mask, mask); 779 Int32x4 blockMask = new Int32x4(mask, mask, mask, mask);
600 Int32x4List blockBuffer = new Int32x4List.view( 780 Int32x4List blockBuffer =
601 list.buffer, 0, blockCount); 781 new Int32x4List.view(list.buffer, 0, blockCount);
602 for (int i = 0; i < blockBuffer.length; i++) { 782 for (int i = 0; i < blockBuffer.length; i++) {
603 blockBuffer[i] ^= blockMask; 783 blockBuffer[i] ^= blockMask;
604 } 784 }
605 } 785 }
606 // Handle end. 786 // Handle end.
607 for (int i = blockCount * BLOCK_SIZE; i < list.length; i++) { 787 for (int i = blockCount * BLOCK_SIZE; i < list.length; i++) {
608 list[i] ^= maskBytes[i & 3]; 788 list[i] ^= maskBytes[i & 3];
609 } 789 }
610 data = list; 790 data = list;
611 } 791 }
612 } 792 }
613 assert(index == headerSize); 793 assert(index == headerSize);
614 if (data == null) { 794 if (data == null) {
615 return [header]; 795 return [header];
616 } else { 796 } else {
617 return [header, data]; 797 return [header, data];
618 } 798 }
619 } 799 }
620 } 800 }
621 801
622
623 class _WebSocketConsumer implements StreamConsumer { 802 class _WebSocketConsumer implements StreamConsumer {
624 final _WebSocketImpl webSocket; 803 final _WebSocketImpl webSocket;
625 final Socket socket; 804 final Socket socket;
626 StreamController _controller; 805 StreamController _controller;
627 StreamSubscription _subscription; 806 StreamSubscription _subscription;
628 bool _issuedPause = false; 807 bool _issuedPause = false;
629 bool _closed = false; 808 bool _closed = false;
630 Completer _closeCompleter = new Completer(); 809 Completer _closeCompleter = new Completer();
631 Completer _completer; 810 Completer _completer;
632 811
(...skipping 24 matching lines...) Expand all
657 void _cancel() { 836 void _cancel() {
658 if (_subscription != null) { 837 if (_subscription != null) {
659 var subscription = _subscription; 838 var subscription = _subscription;
660 _subscription = null; 839 _subscription = null;
661 subscription.cancel(); 840 subscription.cancel();
662 } 841 }
663 } 842 }
664 843
665 _ensureController() { 844 _ensureController() {
666 if (_controller != null) return; 845 if (_controller != null) return;
667 _controller = new StreamController(sync: true, 846 _controller = new StreamController(
668 onPause: _onPause, 847 sync: true,
669 onResume: _onResume, 848 onPause: _onPause,
670 onCancel: _onListen); 849 onResume: _onResume,
671 var stream = _controller.stream.transform( 850 onCancel: _onListen);
672 new _WebSocketOutgoingTransformer(webSocket)); 851 var stream = _controller.stream
673 socket.addStream(stream) 852 .transform(new _WebSocketOutgoingTransformer(webSocket));
674 .then((_) { 853 socket.addStream(stream).then((_) {
675 _done(); 854 _done();
676 _closeCompleter.complete(webSocket); 855 _closeCompleter.complete(webSocket);
677 }, onError: (error, StackTrace stackTrace) { 856 }, onError: (error, StackTrace stackTrace) {
678 _closed = true; 857 _closed = true;
679 _cancel(); 858 _cancel();
680 if (error is ArgumentError) { 859 if (error is ArgumentError) {
681 if (!_done(error, stackTrace)) { 860 if (!_done(error, stackTrace)) {
682 _closeCompleter.completeError(error, stackTrace); 861 _closeCompleter.completeError(error, stackTrace);
683 } 862 }
684 } else { 863 } else {
685 _done(); 864 _done();
686 _closeCompleter.complete(webSocket); 865 _closeCompleter.complete(webSocket);
687 } 866 }
688 }); 867 });
689 } 868 }
690 869
691 bool _done([error, StackTrace stackTrace]) { 870 bool _done([error, StackTrace stackTrace]) {
692 if (_completer == null) return false; 871 if (_completer == null) return false;
693 if (error != null) { 872 if (error != null) {
694 _completer.completeError(error, stackTrace); 873 _completer.completeError(error, stackTrace);
695 } else { 874 } else {
696 _completer.complete(webSocket); 875 _completer.complete(webSocket);
697 } 876 }
698 _completer = null; 877 _completer = null;
699 return true; 878 return true;
700 } 879 }
701 880
702 Future addStream(var stream) { 881 Future addStream(var stream) {
703 if (_closed) { 882 if (_closed) {
704 stream.listen(null).cancel(); 883 stream.listen(null).cancel();
705 return new Future.value(webSocket); 884 return new Future.value(webSocket);
706 } 885 }
707 _ensureController(); 886 _ensureController();
708 _completer = new Completer(); 887 _completer = new Completer();
709 _subscription = stream.listen( 888 _subscription = stream.listen((data) {
710 (data) { 889 _controller.add(data);
711 _controller.add(data); 890 }, onDone: _done, onError: _done, cancelOnError: true);
712 },
713 onDone: _done,
714 onError: _done,
715 cancelOnError: true);
716 if (_issuedPause) { 891 if (_issuedPause) {
717 _subscription.pause(); 892 _subscription.pause();
718 _issuedPause = false; 893 _issuedPause = false;
719 } 894 }
720 return _completer.future; 895 return _completer.future;
721 } 896 }
722 897
723 Future close() { 898 Future close() {
724 _ensureController(); 899 _ensureController();
725 Future closeSocket() { 900 Future closeSocket() {
726 return socket.close().catchError((_) {}).then((_) => webSocket); 901 return socket.close().catchError((_) {}).then((_) => webSocket);
727 } 902 }
728 _controller.close(); 903 _controller.close();
729 return _closeCompleter.future.then((_) => closeSocket()); 904 return _closeCompleter.future.then((_) => closeSocket());
730 } 905 }
731 906
732 void add(data) { 907 void add(data) {
733 if (_closed) return; 908 if (_closed) return;
734 _ensureController(); 909 _ensureController();
735 _controller.add(data); 910 _controller.add(data);
736 } 911 }
737 912
738 void closeSocket() { 913 void closeSocket() {
739 _closed = true; 914 _closed = true;
740 _cancel(); 915 _cancel();
741 close(); 916 close();
742 } 917 }
743 } 918 }
744 919
745
746 class _WebSocketImpl extends Stream with _ServiceObject implements WebSocket { 920 class _WebSocketImpl extends Stream with _ServiceObject implements WebSocket {
747 // Use default Map so we keep order. 921 // Use default Map so we keep order.
748 static Map<int, _WebSocketImpl> _webSockets = new Map<int, _WebSocketImpl>(); 922 static Map<int, _WebSocketImpl> _webSockets = new Map<int, _WebSocketImpl>();
923 static const int DEFAULT_WINDOW_BITS = 15;
924 static const String PER_MESSAGE_DEFLATE = "permessage-deflate";
749 925
750 final String protocol; 926 final String protocol;
751 927
752 StreamController _controller; 928 StreamController _controller;
753 StreamSubscription _subscription; 929 StreamSubscription _subscription;
754 StreamSink _sink; 930 StreamSink _sink;
755 931
756 final _socket; 932 final _socket;
757 final bool _serverSide; 933 final bool _serverSide;
758 int _readyState = WebSocket.CONNECTING; 934 int _readyState = WebSocket.CONNECTING;
759 bool _writeClosed = false; 935 bool _writeClosed = false;
760 int _closeCode; 936 int _closeCode;
761 String _closeReason; 937 String _closeReason;
762 Duration _pingInterval; 938 Duration _pingInterval;
763 Timer _pingTimer; 939 Timer _pingTimer;
764 _WebSocketConsumer _consumer; 940 _WebSocketConsumer _consumer;
765 941
766 int _outCloseCode; 942 int _outCloseCode;
767 String _outCloseReason; 943 String _outCloseReason;
768 Timer _closeTimer; 944 Timer _closeTimer;
945 _WebSocketPerMessageDeflate _deflate;
769 946
770 static final HttpClient _httpClient = new HttpClient(); 947 static final HttpClient _httpClient = new HttpClient();
771 948
772 static Future<WebSocket> connect( 949 static Future<WebSocket> connect(
773 String url, Iterable<String> protocols, Map<String, dynamic> headers) { 950 String url, Iterable<String> protocols, Map<String, dynamic> headers,
951 {CompressionOptions compression: CompressionOptions.DEFAULT}) {
774 Uri uri = Uri.parse(url); 952 Uri uri = Uri.parse(url);
775 if (uri.scheme != "ws" && uri.scheme != "wss") { 953 if (uri.scheme != "ws" && uri.scheme != "wss") {
776 throw new WebSocketException("Unsupported URL scheme '${uri.scheme}'"); 954 throw new WebSocketException("Unsupported URL scheme '${uri.scheme}'");
777 } 955 }
778 956
779 Random random = new Random(); 957 Random random = new Random();
780 // Generate 16 random bytes. 958 // Generate 16 random bytes.
781 Uint8List nonceData = new Uint8List(16); 959 Uint8List nonceData = new Uint8List(16);
782 for (int i = 0; i < 16; i++) { 960 for (int i = 0; i < 16; i++) {
783 nonceData[i] = random.nextInt(256); 961 nonceData[i] = random.nextInt(256);
784 } 962 }
785 String nonce = _CryptoUtils.bytesToBase64(nonceData); 963 String nonce = _CryptoUtils.bytesToBase64(nonceData);
786 964
787 uri = new Uri(scheme: uri.scheme == "wss" ? "https" : "http", 965 uri = new Uri(
788 userInfo: uri.userInfo, 966 scheme: uri.scheme == "wss" ? "https" : "http",
789 host: uri.host, 967 userInfo: uri.userInfo,
790 port: uri.port, 968 host: uri.host,
791 path: uri.path, 969 port: uri.port,
792 query: uri.query, 970 path: uri.path,
793 fragment: uri.fragment); 971 query: uri.query,
794 return _httpClient.openUrl("GET", uri) 972 fragment: uri.fragment);
795 .then((request) { 973 return _httpClient.openUrl("GET", uri).then((request) {
796 if (uri.userInfo != null && !uri.userInfo.isEmpty) { 974 if (uri.userInfo != null && !uri.userInfo.isEmpty) {
797 // If the URL contains user information use that for basic 975 // If the URL contains user information use that for basic
798 // authorization. 976 // authorization.
799 String auth = 977 String auth = _CryptoUtils.bytesToBase64(UTF8.encode(uri.userInfo));
800 _CryptoUtils.bytesToBase64(UTF8.encode(uri.userInfo)); 978 request.headers.set(HttpHeaders.AUTHORIZATION, "Basic $auth");
801 request.headers.set(HttpHeaders.AUTHORIZATION, "Basic $auth"); 979 }
980 if (headers != null) {
981 headers.forEach((field, value) => request.headers.add(field, value));
982 }
983 // Setup the initial handshake.
984 request.headers
985 ..set(HttpHeaders.CONNECTION, "Upgrade")
986 ..set(HttpHeaders.UPGRADE, "websocket")
987 ..set("Sec-WebSocket-Key", nonce)
988 ..set("Cache-Control", "no-cache")
989 ..set("Sec-WebSocket-Version", "13");
990 if (protocols != null) {
991 request.headers.add("Sec-WebSocket-Protocol", protocols.toList());
992 }
993
994 if(compression.enabled) {
Søren Gjesse 2015/10/19 16:53:20 Space after "if"
butlermatt 2015/10/22 19:36:08 Done.
995 request.headers
996 .add("Sec-WebSocket-Extensions", compression._createHeader());
997 }
998
999 return request.close();
1000 }).then((response) {
1001 void error(String message) {
1002 // Flush data.
1003 response.detachSocket().then((socket) {
1004 socket.destroy();
1005 });
1006 throw new WebSocketException(message);
1007 }
1008 if (response.statusCode != HttpStatus.SWITCHING_PROTOCOLS ||
1009 response.headers[HttpHeaders.CONNECTION] == null ||
1010 !response.headers[HttpHeaders.CONNECTION]
1011 .any((value) => value.toLowerCase() == "upgrade") ||
1012 response.headers.value(HttpHeaders.UPGRADE).toLowerCase() !=
1013 "websocket") {
1014 error("Connection to '$uri' was not upgraded to websocket");
1015 }
1016 String accept = response.headers.value("Sec-WebSocket-Accept");
1017 if (accept == null) {
1018 error("Response did not contain a 'Sec-WebSocket-Accept' header");
1019 }
1020 _SHA1 sha1 = new _SHA1();
1021 sha1.add("$nonce$_webSocketGUID".codeUnits);
1022 List<int> expectedAccept = sha1.close();
1023 List<int> receivedAccept = _CryptoUtils.base64StringToBytes(accept);
1024 if (expectedAccept.length != receivedAccept.length) {
1025 error("Reasponse header 'Sec-WebSocket-Accept' is the wrong length");
1026 }
1027 for (int i = 0; i < expectedAccept.length; i++) {
1028 if (expectedAccept[i] != receivedAccept[i]) {
1029 error("Bad response 'Sec-WebSocket-Accept' header");
802 } 1030 }
803 if (headers != null) { 1031 }
804 headers.forEach((field, value) => request.headers.add(field, value)); 1032 var protocol = response.headers.value('Sec-WebSocket-Protocol');
805 } 1033
806 // Setup the initial handshake. 1034 _WebSocketPerMessageDeflate deflate =
807 request.headers 1035 negotiateClientCompression(response, compression);
808 ..set(HttpHeaders.CONNECTION, "Upgrade") 1036
809 ..set(HttpHeaders.UPGRADE, "websocket") 1037 return response.detachSocket().then((socket) =>
810 ..set("Sec-WebSocket-Key", nonce) 1038 new _WebSocketImpl._fromSocket(
811 ..set("Cache-Control", "no-cache") 1039 socket, protocol, compression, false, deflate));
812 ..set("Sec-WebSocket-Version", "13"); 1040 });
813 if (protocols != null) {
814 request.headers.add("Sec-WebSocket-Protocol", protocols.toList());
815 }
816 return request.close();
817 })
818 .then((response) {
819 void error(String message) {
820 // Flush data.
821 response.detachSocket().then((socket) {
822 socket.destroy();
823 });
824 throw new WebSocketException(message);
825 }
826 if (response.statusCode != HttpStatus.SWITCHING_PROTOCOLS ||
827 response.headers[HttpHeaders.CONNECTION] == null ||
828 !response.headers[HttpHeaders.CONNECTION].any(
829 (value) => value.toLowerCase() == "upgrade") ||
830 response.headers.value(HttpHeaders.UPGRADE).toLowerCase() !=
831 "websocket") {
832 error("Connection to '$uri' was not upgraded to websocket");
833 }
834 String accept = response.headers.value("Sec-WebSocket-Accept");
835 if (accept == null) {
836 error("Response did not contain a 'Sec-WebSocket-Accept' header");
837 }
838 _SHA1 sha1 = new _SHA1();
839 sha1.add("$nonce$_webSocketGUID".codeUnits);
840 List<int> expectedAccept = sha1.close();
841 List<int> receivedAccept = _CryptoUtils.base64StringToBytes(accept);
842 if (expectedAccept.length != receivedAccept.length) {
843 error("Reasponse header 'Sec-WebSocket-Accept' is the wrong length");
844 }
845 for (int i = 0; i < expectedAccept.length; i++) {
846 if (expectedAccept[i] != receivedAccept[i]) {
847 error("Bad response 'Sec-WebSocket-Accept' header");
848 }
849 }
850 var protocol = response.headers.value('Sec-WebSocket-Protocol');
851 return response.detachSocket()
852 .then((socket) => new _WebSocketImpl._fromSocket(socket, protocol));
853 });
854 } 1041 }
855 1042
856 _WebSocketImpl._fromSocket(this._socket, this.protocol, 1043 static _WebSocketPerMessageDeflate negotiateClientCompression(
857 [this._serverSide = false]) { 1044 HttpClientResponse response, CompressionOptions compression) {
1045 String extensionHeader = response.headers.value('Sec-WebSocket-Extensions');
1046
1047 if (extensionHeader == null) {
1048 extensionHeader = "";
1049 }
1050
1051 Iterable<List<String>> extensions =
1052 extensionHeader.split(", ").map((it) => it.split("; "));
Søren Gjesse 2015/10/19 16:53:20 This seems fragile. I don't think the specificatio
butlermatt 2015/10/22 19:36:08 Done.
1053
1054 if (compression.enabled &&
1055 extensions.any((x) => x[0] == PER_MESSAGE_DEFLATE)) {
1056 var opts = extensions.firstWhere((x) => x[0] == PER_MESSAGE_DEFLATE);
1057 var serverNoContextTakeover = opts.contains("server_no_context_takeover");
1058 var clientNoContextTakeover = opts.contains("client_no_context_takeover");
1059
1060 int getWindowBits(String type) {
1061 var o = opts.firstWhere((x) => x.startsWith("${type}_max_window_bits="),
Søren Gjesse 2015/10/19 16:53:20 This is also fragile. Doesn't the spec allow for w
butlermatt 2015/10/22 19:36:08 Acknowledged.
1062 orElse: () => null);
1063
1064 if (o == null) {
1065 return DEFAULT_WINDOW_BITS;
1066 }
1067
1068 try {
1069 o = o.substring("${type}_max_window_bits=".length);
1070 o = int.parse(o);
Søren Gjesse 2015/10/19 16:53:20 According to the spec the value can be quoted.
butlermatt 2015/10/22 19:36:08 Acknowledged.
1071 } catch (e) {
1072 return DEFAULT_WINDOW_BITS;
1073 }
1074
1075 return o;
1076 }
1077
1078 return new _WebSocketPerMessageDeflate(
1079 clientMaxWindowBits: getWindowBits("client"),
1080 serverMaxWindowBits: getWindowBits("server"),
1081 clientNoContextTakeover: clientNoContextTakeover,
1082 serverNoContextTakeover: serverNoContextTakeover);
1083 }
1084
1085 return null;
1086 }
1087
1088 _WebSocketImpl._fromSocket(
1089 this._socket, this.protocol, CompressionOptions compression,
1090 [this._serverSide = false, _WebSocketPerMessageDeflate deflate]) {
858 _consumer = new _WebSocketConsumer(this, _socket); 1091 _consumer = new _WebSocketConsumer(this, _socket);
859 _sink = new _StreamSinkImpl(_consumer); 1092 _sink = new _StreamSinkImpl(_consumer);
860 _readyState = WebSocket.OPEN; 1093 _readyState = WebSocket.OPEN;
1094 _deflate = deflate;
861 1095
862 var transformer = new _WebSocketProtocolTransformer(_serverSide); 1096 var transformer = new _WebSocketProtocolTransformer(_serverSide, _deflate);
863 _subscription = _socket.transform(transformer).listen( 1097 _subscription = _socket.transform(transformer).listen((data) {
864 (data) { 1098 if (data is _WebSocketPing) {
865 if (data is _WebSocketPing) { 1099 if (!_writeClosed) _consumer.add(new _WebSocketPong(data.payload));
866 if (!_writeClosed) _consumer.add(new _WebSocketPong(data.payload)); 1100 } else if (data is _WebSocketPong) {
867 } else if (data is _WebSocketPong) { 1101 // Simply set pingInterval, as it'll cancel any timers.
868 // Simply set pingInterval, as it'll cancel any timers. 1102 pingInterval = _pingInterval;
869 pingInterval = _pingInterval; 1103 } else {
870 } else { 1104 _controller.add(data);
871 _controller.add(data); 1105 }
872 } 1106 }, onError: (error, stackTrace) {
873 }, 1107 if (_closeTimer != null) _closeTimer.cancel();
874 onError: (error) { 1108 if (error is FormatException) {
875 if (_closeTimer != null) _closeTimer.cancel(); 1109 _close(WebSocketStatus.INVALID_FRAME_PAYLOAD_DATA);
876 if (error is FormatException) { 1110 } else {
877 _close(WebSocketStatus.INVALID_FRAME_PAYLOAD_DATA); 1111 _close(WebSocketStatus.PROTOCOL_ERROR);
878 } else { 1112 }
879 _close(WebSocketStatus.PROTOCOL_ERROR); 1113 // An error happened, set the close code set above.
880 } 1114 _closeCode = _outCloseCode;
881 // An error happened, set the close code set above. 1115 _closeReason = _outCloseReason;
882 _closeCode = _outCloseCode; 1116 _controller.close();
883 _closeReason = _outCloseReason; 1117 }, onDone: () {
884 _controller.close(); 1118 if (_closeTimer != null) _closeTimer.cancel();
885 }, 1119 if (_readyState == WebSocket.OPEN) {
886 onDone: () { 1120 _readyState = WebSocket.CLOSING;
887 if (_closeTimer != null) _closeTimer.cancel(); 1121 if (!_isReservedStatusCode(transformer.closeCode)) {
888 if (_readyState == WebSocket.OPEN) { 1122 _close(transformer.closeCode, transformer.closeReason);
889 _readyState = WebSocket.CLOSING; 1123 } else {
890 if (!_isReservedStatusCode(transformer.closeCode)) { 1124 _close();
891 _close(transformer.closeCode, transformer.closeReason); 1125 }
892 } else { 1126 _readyState = WebSocket.CLOSED;
893 _close(); 1127 }
894 } 1128 // Protocol close, use close code from transformer.
895 _readyState = WebSocket.CLOSED; 1129 _closeCode = transformer.closeCode;
896 } 1130 _closeReason = transformer.closeReason;
897 // Protocol close, use close code from transformer. 1131 _controller.close();
898 _closeCode = transformer.closeCode; 1132 }, cancelOnError: true);
899 _closeReason = transformer.closeReason;
900 _controller.close();
901 },
902 cancelOnError: true);
903 _subscription.pause(); 1133 _subscription.pause();
904 _controller = new StreamController(sync: true, 1134 _controller = new StreamController(
905 onListen: _subscription.resume, 1135 sync: true, onListen: _subscription.resume, onCancel: () {
906 onCancel: () { 1136 _subscription.cancel();
907 _subscription.cancel(); 1137 _subscription = null;
908 _subscription = null; 1138 }, onPause: _subscription.pause, onResume: _subscription.resume);
909 },
910 onPause: _subscription.pause,
911 onResume: _subscription.resume);
912 1139
913 _webSockets[_serviceId] = this; 1140 _webSockets[_serviceId] = this;
914 try { _socket._owner = this; } catch (_) {} 1141 try {
1142 _socket._owner = this;
1143 } catch (_) {}
915 } 1144 }
916 1145
917 StreamSubscription listen(void onData(message), 1146 StreamSubscription listen(void onData(message),
918 {Function onError, 1147 {Function onError, void onDone(), bool cancelOnError}) {
919 void onDone(),
920 bool cancelOnError}) {
921 return _controller.stream.listen(onData, 1148 return _controller.stream.listen(onData,
922 onError: onError, 1149 onError: onError, onDone: onDone, cancelOnError: cancelOnError);
923 onDone: onDone,
924 cancelOnError: cancelOnError);
925 } 1150 }
926 1151
927 Duration get pingInterval => _pingInterval; 1152 Duration get pingInterval => _pingInterval;
928 1153
929 void set pingInterval(Duration interval) { 1154 void set pingInterval(Duration interval) {
930 if (_writeClosed) return; 1155 if (_writeClosed) return;
931 if (_pingTimer != null) _pingTimer.cancel(); 1156 if (_pingTimer != null) _pingTimer.cancel();
932 _pingInterval = interval; 1157 _pingInterval = interval;
933 1158
934 if (_pingInterval == null) return; 1159 if (_pingInterval == null) return;
(...skipping 85 matching lines...) Expand 10 before | Expand all | Expand 10 after
1020 'type': '@Socket', 1245 'type': '@Socket',
1021 'name': 'UserSocket', 1246 'name': 'UserSocket',
1022 'user_name': 'UserSocket', 1247 'user_name': 'UserSocket',
1023 }; 1248 };
1024 } 1249 }
1025 return r; 1250 return r;
1026 } 1251 }
1027 1252
1028 static bool _isReservedStatusCode(int code) { 1253 static bool _isReservedStatusCode(int code) {
1029 return code != null && 1254 return code != null &&
1030 (code < WebSocketStatus.NORMAL_CLOSURE || 1255 (code < WebSocketStatus.NORMAL_CLOSURE ||
1031 code == WebSocketStatus.RESERVED_1004 || 1256 code == WebSocketStatus.RESERVED_1004 ||
1032 code == WebSocketStatus.NO_STATUS_RECEIVED || 1257 code == WebSocketStatus.NO_STATUS_RECEIVED ||
1033 code == WebSocketStatus.ABNORMAL_CLOSURE || 1258 code == WebSocketStatus.ABNORMAL_CLOSURE ||
1034 (code > WebSocketStatus.INTERNAL_SERVER_ERROR && 1259 (code > WebSocketStatus.INTERNAL_SERVER_ERROR &&
1035 code < WebSocketStatus.RESERVED_1015) || 1260 code < WebSocketStatus.RESERVED_1015) ||
1036 (code >= WebSocketStatus.RESERVED_1015 && 1261 (code >= WebSocketStatus.RESERVED_1015 && code < 3000));
1037 code < 3000));
1038 } 1262 }
1039 } 1263 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698