OLD | NEW |
---|---|
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 part of dart.io; | 5 part of dart.io; |
6 | 6 |
7 const String _webSocketGUID = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"; | 7 const String _webSocketGUID = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"; |
8 | 8 |
9 // Matches _WebSocketOpcode. | 9 // Matches _WebSocketOpcode. |
10 class _WebSocketMessageType { | 10 class _WebSocketMessageType { |
11 static const int NONE = 0; | 11 static const int NONE = 0; |
12 static const int TEXT = 1; | 12 static const int TEXT = 1; |
13 static const int BINARY = 2; | 13 static const int BINARY = 2; |
14 } | 14 } |
15 | 15 |
16 | |
17 class _WebSocketOpcode { | 16 class _WebSocketOpcode { |
18 static const int CONTINUATION = 0; | 17 static const int CONTINUATION = 0; |
19 static const int TEXT = 1; | 18 static const int TEXT = 1; |
20 static const int BINARY = 2; | 19 static const int BINARY = 2; |
21 static const int RESERVED_3 = 3; | 20 static const int RESERVED_3 = 3; |
22 static const int RESERVED_4 = 4; | 21 static const int RESERVED_4 = 4; |
23 static const int RESERVED_5 = 5; | 22 static const int RESERVED_5 = 5; |
24 static const int RESERVED_6 = 6; | 23 static const int RESERVED_6 = 6; |
25 static const int RESERVED_7 = 7; | 24 static const int RESERVED_7 = 7; |
26 static const int CLOSE = 8; | 25 static const int CLOSE = 8; |
27 static const int PING = 9; | 26 static const int PING = 9; |
28 static const int PONG = 10; | 27 static const int PONG = 10; |
29 static const int RESERVED_B = 11; | 28 static const int RESERVED_B = 11; |
30 static const int RESERVED_C = 12; | 29 static const int RESERVED_C = 12; |
31 static const int RESERVED_D = 13; | 30 static const int RESERVED_D = 13; |
32 static const int RESERVED_E = 14; | 31 static const int RESERVED_E = 14; |
33 static const int RESERVED_F = 15; | 32 static const int RESERVED_F = 15; |
34 } | 33 } |
35 | 34 |
36 /** | 35 /** |
37 * The web socket protocol transformer handles the protocol byte stream | 36 * The web socket protocol transformer handles the protocol byte stream |
38 * which is supplied through the [:handleData:]. As the protocol is processed, | 37 * which is supplied through the [:handleData:]. As the protocol is processed, |
39 * it'll output frame data as either a List<int> or String. | 38 * it'll output frame data as either a List<int> or String. |
40 * | 39 * |
41 * Important infomation about usage: Be sure you use cancelOnError, so the | 40 * Important information about usage: Be sure you use cancelOnError, so the |
42 * socket will be closed when the processer encounter an error. Not using it | 41 * socket will be closed when the processor encounter an error. Not using it |
43 * will lead to undefined behaviour. | 42 * will lead to undefined behaviour. |
44 */ | 43 */ |
45 // TODO(ajohnsen): make this transformer reusable? | 44 // TODO(ajohnsen): make this transformer reusable? |
46 class _WebSocketProtocolTransformer implements StreamTransformer, EventSink { | 45 class _WebSocketProtocolTransformer implements StreamTransformer, EventSink { |
47 static const int START = 0; | 46 static const int START = 0; |
48 static const int LEN_FIRST = 1; | 47 static const int LEN_FIRST = 1; |
49 static const int LEN_REST = 2; | 48 static const int LEN_REST = 2; |
50 static const int MASK = 3; | 49 static const int MASK = 3; |
51 static const int PAYLOAD = 4; | 50 static const int PAYLOAD = 4; |
52 static const int CLOSED = 5; | 51 static const int CLOSED = 5; |
53 static const int FAILURE = 6; | 52 static const int FAILURE = 6; |
53 static const int FIN = 0x80; | |
54 static const int RSV1 = 0x40; | |
55 static const int RSV2 = 0x20; | |
56 static const int RSV3 = 0x10; | |
57 static const int OPCODE = 0xF; | |
54 | 58 |
55 int _state = START; | 59 int _state = START; |
56 bool _fin = false; | 60 bool _fin = false; |
61 bool _compressed = false; | |
57 int _opcode = -1; | 62 int _opcode = -1; |
58 int _len = -1; | 63 int _len = -1; |
59 bool _masked = false; | 64 bool _masked = false; |
60 int _remainingLenBytes = -1; | 65 int _remainingLenBytes = -1; |
61 int _remainingMaskingKeyBytes = 4; | 66 int _remainingMaskingKeyBytes = 4; |
62 int _remainingPayloadBytes = -1; | 67 int _remainingPayloadBytes = -1; |
63 int _unmaskingIndex = 0; | 68 int _unmaskingIndex = 0; |
64 int _currentMessageType = _WebSocketMessageType.NONE; | 69 int _currentMessageType = _WebSocketMessageType.NONE; |
65 int closeCode = WebSocketStatus.NO_STATUS_RECEIVED; | 70 int closeCode = WebSocketStatus.NO_STATUS_RECEIVED; |
66 String closeReason = ""; | 71 String closeReason = ""; |
67 | 72 |
68 EventSink _eventSink; | 73 EventSink _eventSink; |
69 | 74 |
70 final bool _serverSide; | 75 final bool _serverSide; |
71 final List _maskingBytes = new List(4); | 76 final List _maskingBytes = new List(4); |
72 final BytesBuilder _payload = new BytesBuilder(copy: false); | 77 final BytesBuilder _payload = new BytesBuilder(copy: false); |
73 | 78 |
74 _WebSocketProtocolTransformer([this._serverSide = false]); | 79 _WebSocketPerMessageDeflate _deflate; |
80 _WebSocketProtocolTransformer([this._serverSide = false, this._deflate]); | |
75 | 81 |
76 Stream bind(Stream stream) { | 82 Stream bind(Stream stream) { |
77 return new Stream.eventTransformed( | 83 return new Stream.eventTransformed(stream, (EventSink eventSink) { |
78 stream, | 84 if (_eventSink != null) { |
79 (EventSink eventSink) { | 85 throw new StateError("WebSocket transformer already used."); |
80 if (_eventSink != null) { | 86 } |
81 throw new StateError("WebSocket transformer already used."); | 87 _eventSink = eventSink; |
82 } | 88 return this; |
83 _eventSink = eventSink; | 89 }); |
84 return this; | |
85 }); | |
86 } | 90 } |
87 | 91 |
88 void addError(Object error, [StackTrace stackTrace]) => | 92 void addError(Object error, [StackTrace stackTrace]) => |
89 _eventSink.addError(error, stackTrace); | 93 _eventSink.addError(error, stackTrace); |
90 | 94 |
91 void close() => _eventSink.close(); | 95 void close() => _eventSink.close(); |
92 | 96 |
93 /** | 97 /** |
94 * Process data received from the underlying communication channel. | 98 * Process data received from the underlying communication channel. |
95 */ | 99 */ |
96 void add(Uint8List buffer) { | 100 void add(Uint8List buffer) { |
97 int count = buffer.length; | 101 int count = buffer.length; |
98 int index = 0; | 102 int index = 0; |
99 int lastIndex = count; | 103 int lastIndex = count; |
100 if (_state == CLOSED) { | 104 if (_state == CLOSED) { |
101 throw new WebSocketException("Data on closed connection"); | 105 throw new WebSocketException("Data on closed connection"); |
102 } | 106 } |
103 if (_state == FAILURE) { | 107 if (_state == FAILURE) { |
104 throw new WebSocketException("Data on failed connection"); | 108 throw new WebSocketException("Data on failed connection"); |
105 } | 109 } |
106 while ((index < lastIndex) && _state != CLOSED && _state != FAILURE) { | 110 while ((index < lastIndex) && _state != CLOSED && _state != FAILURE) { |
107 int byte = buffer[index]; | 111 int byte = buffer[index]; |
108 if (_state <= LEN_REST) { | 112 if (_state <= LEN_REST) { |
109 if (_state == START) { | 113 if (_state == START) { |
110 _fin = (byte & 0x80) != 0; | 114 _fin = (byte & 0x80) != 0; |
111 if ((byte & 0x70) != 0) { | 115 var rsv = (byte & 0x70) >> 4; |
Søren Gjesse
2015/08/24 08:21:50
I don't think this temp is required - see below.
| |
116 | |
117 if (rsv != 0 && rsv != 4) { | |
Søren Gjesse
2015/08/24 08:21:50
Please use
if ((byte & (RSV1 | RSV2)) != 0)
to
| |
112 // The RSV1, RSV2 bits RSV3 must be all zero. | 118 // The RSV1, RSV2 bits RSV3 must be all zero. |
Søren Gjesse
2015/08/24 08:21:50
Please remove RSV3 from this comment.
| |
113 throw new WebSocketException("Protocol error"); | 119 throw new WebSocketException("Protocol error"); |
114 } | 120 } |
115 _opcode = (byte & 0xF); | 121 |
122 if (rsv == 4) { | |
Søren Gjesse
2015/08/24 08:21:50
Please use
if ((byte & RSV3) != 0) {
to test a
| |
123 _compressed = true; | |
124 } else { | |
125 _compressed = false; | |
126 } | |
127 _opcode = (byte & 0x0F); | |
Søren Gjesse
2015/08/24 08:21:50
Please use the new constant OPCODE here.
| |
128 | |
116 if (_opcode <= _WebSocketOpcode.BINARY) { | 129 if (_opcode <= _WebSocketOpcode.BINARY) { |
117 if (_opcode == _WebSocketOpcode.CONTINUATION) { | 130 if (_opcode == _WebSocketOpcode.CONTINUATION) { |
118 if (_currentMessageType == _WebSocketMessageType.NONE) { | 131 if (_currentMessageType == _WebSocketMessageType.NONE) { |
119 throw new WebSocketException("Protocol error"); | 132 throw new WebSocketException("Protocol error"); |
120 } | 133 } |
121 } else { | 134 } else { |
122 assert(_opcode == _WebSocketOpcode.TEXT || | 135 assert(_opcode == _WebSocketOpcode.TEXT || |
123 _opcode == _WebSocketOpcode.BINARY); | 136 _opcode == _WebSocketOpcode.BINARY); |
124 if (_currentMessageType != _WebSocketMessageType.NONE) { | 137 if (_currentMessageType != _WebSocketMessageType.NONE) { |
125 throw new WebSocketException("Protocol error"); | 138 throw new WebSocketException("Protocol error"); |
126 } | 139 } |
127 _currentMessageType = _opcode; | 140 _currentMessageType = _opcode; |
128 } | 141 } |
129 } else if (_opcode >= _WebSocketOpcode.CLOSE && | 142 } else if (_opcode >= _WebSocketOpcode.CLOSE && |
130 _opcode <= _WebSocketOpcode.PONG) { | 143 _opcode <= _WebSocketOpcode.PONG) { |
131 // Control frames cannot be fragmented. | 144 // Control frames cannot be fragmented. |
132 if (!_fin) throw new WebSocketException("Protocol error"); | 145 if (!_fin) throw new WebSocketException("Protocol error"); |
133 } else { | 146 } else { |
134 throw new WebSocketException("Protocol error"); | 147 throw new WebSocketException("Protocol error"); |
135 } | 148 } |
136 _state = LEN_FIRST; | 149 _state = LEN_FIRST; |
137 } else if (_state == LEN_FIRST) { | 150 } else if (_state == LEN_FIRST) { |
138 _masked = (byte & 0x80) != 0; | 151 _masked = (byte & 0x80) != 0; |
139 _len = byte & 0x7F; | 152 _len = byte & 0x7F; |
140 if (_isControlFrame() && _len > 125) { | 153 if (_isControlFrame() && _len > 125) { |
(...skipping 28 matching lines...) Expand all Loading... | |
169 } else { | 182 } else { |
170 assert(_state == PAYLOAD); | 183 assert(_state == PAYLOAD); |
171 // The payload is not handled one byte at a time but in blocks. | 184 // The payload is not handled one byte at a time but in blocks. |
172 int payloadLength = min(lastIndex - index, _remainingPayloadBytes); | 185 int payloadLength = min(lastIndex - index, _remainingPayloadBytes); |
173 _remainingPayloadBytes -= payloadLength; | 186 _remainingPayloadBytes -= payloadLength; |
174 // Unmask payload if masked. | 187 // Unmask payload if masked. |
175 if (_masked) { | 188 if (_masked) { |
176 _unmask(index, payloadLength, buffer); | 189 _unmask(index, payloadLength, buffer); |
177 } | 190 } |
178 // Control frame and data frame share _payloads. | 191 // Control frame and data frame share _payloads. |
179 _payload.add( | 192 _payload.add(new Uint8List.view(buffer.buffer, index, payloadLength)); |
180 new Uint8List.view(buffer.buffer, index, payloadLength)); | |
181 index += payloadLength; | 193 index += payloadLength; |
182 if (_isControlFrame()) { | 194 if (_isControlFrame()) { |
183 if (_remainingPayloadBytes == 0) _controlFrameEnd(); | 195 if (_remainingPayloadBytes == 0) _controlFrameEnd(); |
184 } else { | 196 } else { |
185 if (_currentMessageType != _WebSocketMessageType.TEXT && | 197 if (_currentMessageType != _WebSocketMessageType.TEXT && |
186 _currentMessageType != _WebSocketMessageType.BINARY) { | 198 _currentMessageType != _WebSocketMessageType.BINARY) { |
187 throw new WebSocketException("Protocol error"); | 199 throw new WebSocketException("Protocol error"); |
188 } | 200 } |
189 if (_remainingPayloadBytes == 0) _messageFrameEnd(); | 201 if (_remainingPayloadBytes == 0) _messageFrameEnd(); |
190 } | 202 } |
191 | 203 |
192 // Hack - as we always do index++ below. | 204 // Hack - as we always do index++ below. |
193 index--; | 205 index--; |
194 } | 206 } |
195 } | 207 } |
196 | 208 |
197 // Move to the next byte. | 209 // Move to the next byte. |
(...skipping 14 matching lines...) Expand all Loading... | |
212 index += startOffset; | 224 index += startOffset; |
213 length -= startOffset; | 225 length -= startOffset; |
214 final int blockCount = length ~/ BLOCK_SIZE; | 226 final int blockCount = length ~/ BLOCK_SIZE; |
215 if (blockCount > 0) { | 227 if (blockCount > 0) { |
216 // Create mask block. | 228 // Create mask block. |
217 int mask = 0; | 229 int mask = 0; |
218 for (int i = 3; i >= 0; i--) { | 230 for (int i = 3; i >= 0; i--) { |
219 mask = (mask << 8) | _maskingBytes[(_unmaskingIndex + i) & 3]; | 231 mask = (mask << 8) | _maskingBytes[(_unmaskingIndex + i) & 3]; |
220 } | 232 } |
221 Int32x4 blockMask = new Int32x4(mask, mask, mask, mask); | 233 Int32x4 blockMask = new Int32x4(mask, mask, mask, mask); |
222 Int32x4List blockBuffer = new Int32x4List.view( | 234 Int32x4List blockBuffer = |
223 buffer.buffer, index, blockCount); | 235 new Int32x4List.view(buffer.buffer, index, blockCount); |
224 for (int i = 0; i < blockBuffer.length; i++) { | 236 for (int i = 0; i < blockBuffer.length; i++) { |
225 blockBuffer[i] ^= blockMask; | 237 blockBuffer[i] ^= blockMask; |
226 } | 238 } |
227 final int bytes = blockCount * BLOCK_SIZE; | 239 final int bytes = blockCount * BLOCK_SIZE; |
228 index += bytes; | 240 index += bytes; |
229 length -= bytes; | 241 length -= bytes; |
230 } | 242 } |
231 } | 243 } |
232 // Handle end. | 244 // Handle end. |
233 final int end = index + length; | 245 final int end = index + length; |
(...skipping 43 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
277 } else { | 289 } else { |
278 _messageFrameEnd(); | 290 _messageFrameEnd(); |
279 } | 291 } |
280 } else { | 292 } else { |
281 _state = PAYLOAD; | 293 _state = PAYLOAD; |
282 } | 294 } |
283 } | 295 } |
284 | 296 |
285 void _messageFrameEnd() { | 297 void _messageFrameEnd() { |
286 if (_fin) { | 298 if (_fin) { |
299 var bytes = _payload.takeBytes(); | |
300 if (_deflate != null && _compressed) { | |
301 bytes = _deflate.processIncomingMessage(bytes); | |
302 } | |
303 | |
287 switch (_currentMessageType) { | 304 switch (_currentMessageType) { |
288 case _WebSocketMessageType.TEXT: | 305 case _WebSocketMessageType.TEXT: |
289 _eventSink.add(UTF8.decode(_payload.takeBytes())); | 306 _eventSink.add(UTF8.decode(bytes)); |
290 break; | 307 break; |
291 case _WebSocketMessageType.BINARY: | 308 case _WebSocketMessageType.BINARY: |
292 _eventSink.add(_payload.takeBytes()); | 309 _eventSink.add(bytes); |
293 break; | 310 break; |
294 } | 311 } |
295 _currentMessageType = _WebSocketMessageType.NONE; | 312 _currentMessageType = _WebSocketMessageType.NONE; |
296 } | 313 } |
297 _prepareForNextFrame(); | 314 _prepareForNextFrame(); |
298 } | 315 } |
299 | 316 |
300 void _controlFrameEnd() { | 317 void _controlFrameEnd() { |
301 switch (_opcode) { | 318 switch (_opcode) { |
302 case _WebSocketOpcode.CLOSE: | 319 case _WebSocketOpcode.CLOSE: |
(...skipping 21 matching lines...) Expand all Loading... | |
324 | 341 |
325 case _WebSocketOpcode.PONG: | 342 case _WebSocketOpcode.PONG: |
326 _eventSink.add(new _WebSocketPong(_payload.takeBytes())); | 343 _eventSink.add(new _WebSocketPong(_payload.takeBytes())); |
327 break; | 344 break; |
328 } | 345 } |
329 _prepareForNextFrame(); | 346 _prepareForNextFrame(); |
330 } | 347 } |
331 | 348 |
332 bool _isControlFrame() { | 349 bool _isControlFrame() { |
333 return _opcode == _WebSocketOpcode.CLOSE || | 350 return _opcode == _WebSocketOpcode.CLOSE || |
334 _opcode == _WebSocketOpcode.PING || | 351 _opcode == _WebSocketOpcode.PING || |
335 _opcode == _WebSocketOpcode.PONG; | 352 _opcode == _WebSocketOpcode.PONG; |
336 } | 353 } |
337 | 354 |
338 void _prepareForNextFrame() { | 355 void _prepareForNextFrame() { |
339 if (_state != CLOSED && _state != FAILURE) _state = START; | 356 if (_state != CLOSED && _state != FAILURE) _state = START; |
340 _fin = false; | 357 _fin = false; |
341 _opcode = -1; | 358 _opcode = -1; |
342 _len = -1; | 359 _len = -1; |
343 _remainingLenBytes = -1; | 360 _remainingLenBytes = -1; |
344 _remainingMaskingKeyBytes = 4; | 361 _remainingMaskingKeyBytes = 4; |
345 _remainingPayloadBytes = -1; | 362 _remainingPayloadBytes = -1; |
346 _unmaskingIndex = 0; | 363 _unmaskingIndex = 0; |
347 } | 364 } |
348 } | 365 } |
349 | 366 |
350 | |
351 class _WebSocketPing { | 367 class _WebSocketPing { |
352 final List<int> payload; | 368 final List<int> payload; |
353 _WebSocketPing([this.payload = null]); | 369 _WebSocketPing([this.payload = null]); |
354 } | 370 } |
355 | 371 |
356 | |
357 class _WebSocketPong { | 372 class _WebSocketPong { |
358 final List<int> payload; | 373 final List<int> payload; |
359 _WebSocketPong([this.payload = null]); | 374 _WebSocketPong([this.payload = null]); |
360 } | 375 } |
361 | 376 |
362 | |
363 class _WebSocketTransformerImpl implements WebSocketTransformer { | 377 class _WebSocketTransformerImpl implements WebSocketTransformer { |
364 final StreamController<WebSocket> _controller = | 378 final StreamController<WebSocket> _controller = |
365 new StreamController<WebSocket>(sync: true); | 379 new StreamController<WebSocket>(sync: true); |
366 final Function _protocolSelector; | 380 final Function _protocolSelector; |
381 final CompressionOptions _compression; | |
367 | 382 |
368 _WebSocketTransformerImpl(this._protocolSelector); | 383 _WebSocketTransformerImpl(this._protocolSelector, this._compression); |
369 | 384 |
370 Stream<WebSocket> bind(Stream<HttpRequest> stream) { | 385 Stream<WebSocket> bind(Stream<HttpRequest> stream) { |
371 stream.listen((request) { | 386 stream.listen((request) { |
372 _upgrade(request, _protocolSelector) | 387 _upgrade(request, _protocolSelector, _compression) |
373 .then((WebSocket webSocket) => _controller.add(webSocket)) | 388 .then((WebSocket webSocket) => _controller.add(webSocket)) |
374 .catchError(_controller.addError); | 389 .catchError(_controller.addError); |
375 }, onDone: () { | 390 }, onDone: () { |
376 _controller.close(); | 391 _controller.close(); |
377 }); | 392 }); |
378 | 393 |
379 return _controller.stream; | 394 return _controller.stream; |
380 } | 395 } |
381 | 396 |
382 static Future<WebSocket> _upgrade(HttpRequest request, _protocolSelector) { | 397 static Future<WebSocket> _upgrade( |
398 HttpRequest request, _protocolSelector, CompressionOptions compression) { | |
383 var response = request.response; | 399 var response = request.response; |
384 if (!_isUpgradeRequest(request)) { | 400 if (!_isUpgradeRequest(request)) { |
385 // Send error response. | 401 // Send error response. |
386 response | 402 response |
387 ..statusCode = HttpStatus.BAD_REQUEST | 403 ..statusCode = HttpStatus.BAD_REQUEST |
388 ..close(); | 404 ..close(); |
389 return new Future.error( | 405 return new Future.error( |
390 new WebSocketException("Invalid WebSocket upgrade request")); | 406 new WebSocketException("Invalid WebSocket upgrade request")); |
391 } | 407 } |
392 | 408 |
393 Future upgrade(String protocol) { | 409 Future upgrade(String protocol) { |
394 // Send the upgrade response. | 410 // Send the upgrade response. |
395 response | 411 response |
396 ..statusCode = HttpStatus.SWITCHING_PROTOCOLS | 412 ..statusCode = HttpStatus.SWITCHING_PROTOCOLS |
397 ..headers.add(HttpHeaders.CONNECTION, "Upgrade") | 413 ..headers.add(HttpHeaders.CONNECTION, "Upgrade") |
398 ..headers.add(HttpHeaders.UPGRADE, "websocket"); | 414 ..headers.add(HttpHeaders.UPGRADE, "websocket"); |
399 String key = request.headers.value("Sec-WebSocket-Key"); | 415 String key = request.headers.value("Sec-WebSocket-Key"); |
400 _SHA1 sha1 = new _SHA1(); | 416 _SHA1 sha1 = new _SHA1(); |
401 sha1.add("$key$_webSocketGUID".codeUnits); | 417 sha1.add("$key$_webSocketGUID".codeUnits); |
402 String accept = _CryptoUtils.bytesToBase64(sha1.close()); | 418 String accept = _CryptoUtils.bytesToBase64(sha1.close()); |
403 response.headers.add("Sec-WebSocket-Accept", accept); | 419 response.headers.add("Sec-WebSocket-Accept", accept); |
404 if (protocol != null) { | 420 if (protocol != null) { |
405 response.headers.add("Sec-WebSocket-Protocol", protocol); | 421 response.headers.add("Sec-WebSocket-Protocol", protocol); |
406 } | 422 } |
423 | |
424 var deflate = _negotiateCompression(request, response, compression); | |
425 | |
407 response.headers.contentLength = 0; | 426 response.headers.contentLength = 0; |
408 return response.detachSocket() | 427 return response.detachSocket().then((socket) => |
409 .then((socket) => new _WebSocketImpl._fromSocket( | 428 new _WebSocketImpl._fromSocket( |
410 socket, protocol, true)); | 429 socket, protocol, compression, true, deflate)); |
411 } | 430 } |
412 | 431 |
413 var protocols = request.headers['Sec-WebSocket-Protocol']; | 432 var protocols = request.headers['Sec-WebSocket-Protocol']; |
414 if (protocols != null && _protocolSelector != null) { | 433 if (protocols != null && _protocolSelector != null) { |
415 // The suggested protocols can be spread over multiple lines, each | 434 // The suggested protocols can be spread over multiple lines, each |
416 // consisting of multiple protocols. To unify all of them, first join | 435 // consisting of multiple protocols. To unify all of them, first join |
417 // the lists with ', ' and then tokenize. | 436 // the lists with ', ' and then tokenize. |
418 protocols = _HttpParser._tokenizeFieldValue(protocols.join(', ')); | 437 protocols = _HttpParser._tokenizeFieldValue(protocols.join(', ')); |
419 return new Future(() => _protocolSelector(protocols)) | 438 return new Future(() => _protocolSelector(protocols)).then((protocol) { |
420 .then((protocol) { | 439 if (protocols.indexOf(protocol) < 0) { |
421 if (protocols.indexOf(protocol) < 0) { | 440 throw new WebSocketException( |
422 throw new WebSocketException( | 441 "Selected protocol is not in the list of available protocols"); |
423 "Selected protocol is not in the list of available protocols"); | 442 } |
424 } | 443 return protocol; |
425 return protocol; | 444 }).catchError((error) { |
426 }) | 445 response |
427 .catchError((error) { | 446 ..statusCode = HttpStatus.INTERNAL_SERVER_ERROR |
428 response | 447 ..close(); |
429 ..statusCode = HttpStatus.INTERNAL_SERVER_ERROR | 448 throw error; |
430 ..close(); | 449 }).then(upgrade); |
431 throw error; | |
432 }) | |
433 .then(upgrade); | |
434 } else { | 450 } else { |
435 return upgrade(null); | 451 return upgrade(null); |
436 } | 452 } |
437 } | 453 } |
438 | 454 |
455 static _WebSocketPerMessageDeflate _negotiateCompression(HttpRequest request, | |
456 HttpResponse response, CompressionOptions compression) { | |
457 var extensionHeader = request.headers.value("Sec-WebSocket-Extensions"); | |
458 | |
459 if (extensionHeader == null) { | |
460 extensionHeader = ""; | |
461 } | |
462 | |
463 Iterable<List<String>> extensions = | |
464 extensionHeader.split(",").map((it) => it.split("; ")); | |
Søren Gjesse
2015/08/24 08:21:50
Both the splitting using "," and "; " seems fragil
| |
465 | |
466 var perMessageDeflate = extensions.firstWhere( | |
467 (x) => x[0] == _WebSocketImpl.PER_MESSAGE_DEFLATE, | |
468 orElse: () => null); | |
469 if (compression.enabled && perMessageDeflate != null) { | |
470 var info = compression._createHeader(perMessageDeflate); | |
471 | |
472 response.headers.add("Sec-WebSocket-Extensions", info[0]); | |
473 var serverNoContextTakeover = | |
474 perMessageDeflate.contains("server_no_context_takeover"); | |
475 var clientNoContextTakeover = | |
476 perMessageDeflate.contains("client_no_context_takeover"); | |
477 var deflate = new _WebSocketPerMessageDeflate( | |
478 serverNoContextTakeover: serverNoContextTakeover, | |
479 clientNoContextTakeover: clientNoContextTakeover, | |
480 serverMaxWindowBits: info[1], | |
481 clientMaxWindowBits: info[1], | |
482 serverSide: true); | |
483 | |
484 return deflate; | |
485 } | |
486 | |
487 return null; | |
488 } | |
489 | |
439 static bool _isUpgradeRequest(HttpRequest request) { | 490 static bool _isUpgradeRequest(HttpRequest request) { |
440 if (request.method != "GET") { | 491 if (request.method != "GET") { |
441 return false; | 492 return false; |
442 } | 493 } |
443 if (request.headers[HttpHeaders.CONNECTION] == null) { | 494 if (request.headers[HttpHeaders.CONNECTION] == null) { |
444 return false; | 495 return false; |
445 } | 496 } |
446 bool isUpgrade = false; | 497 bool isUpgrade = false; |
447 request.headers[HttpHeaders.CONNECTION].forEach((String value) { | 498 request.headers[HttpHeaders.CONNECTION].forEach((String value) { |
448 if (value.toLowerCase() == "upgrade") isUpgrade = true; | 499 if (value.toLowerCase() == "upgrade") isUpgrade = true; |
449 }); | 500 }); |
450 if (!isUpgrade) return false; | 501 if (!isUpgrade) return false; |
451 String upgrade = request.headers.value(HttpHeaders.UPGRADE); | 502 String upgrade = request.headers.value(HttpHeaders.UPGRADE); |
452 if (upgrade == null || upgrade.toLowerCase() != "websocket") { | 503 if (upgrade == null || upgrade.toLowerCase() != "websocket") { |
453 return false; | 504 return false; |
454 } | 505 } |
455 String version = request.headers.value("Sec-WebSocket-Version"); | 506 String version = request.headers.value("Sec-WebSocket-Version"); |
456 if (version == null || version != "13") { | 507 if (version == null || version != "13") { |
457 return false; | 508 return false; |
458 } | 509 } |
459 String key = request.headers.value("Sec-WebSocket-Key"); | 510 String key = request.headers.value("Sec-WebSocket-Key"); |
460 if (key == null) { | 511 if (key == null) { |
461 return false; | 512 return false; |
462 } | 513 } |
463 return true; | 514 return true; |
464 } | 515 } |
465 } | 516 } |
466 | 517 |
518 class _WebSocketPerMessageDeflate { | |
519 bool serverNoContextTakeover; | |
520 bool clientNoContextTakeover; | |
521 int clientMaxWindowBits; | |
522 int serverMaxWindowBits; | |
523 bool serverSide; | |
524 | |
525 _Filter decoder; | |
526 _Filter encoder; | |
527 | |
528 _WebSocketPerMessageDeflate( | |
529 {this.clientMaxWindowBits, | |
Søren Gjesse
2015/08/24 08:21:50
nit: Please indent like this (one space more for t
| |
530 this.serverMaxWindowBits, | |
531 this.serverNoContextTakeover: false, | |
532 this.clientNoContextTakeover: false, | |
533 this.serverSide: false}) { | |
534 if (clientMaxWindowBits == null) { | |
535 clientMaxWindowBits = _WebSocketImpl.DEFAULT_WINDOW_BITS; | |
536 } | |
537 | |
538 if (serverMaxWindowBits == null) { | |
539 serverMaxWindowBits = _WebSocketImpl.DEFAULT_WINDOW_BITS; | |
540 } | |
541 } | |
542 | |
543 void _ensureDecoder() { | |
544 if (decoder == null) { | |
545 decoder = _Filter._newZLibInflateFilter( | |
546 serverSide ? clientMaxWindowBits : serverMaxWindowBits, null, true); | |
547 } | |
548 } | |
549 | |
550 void _ensureEncoder() { | |
551 if (encoder == null) { | |
552 encoder = _Filter._newZLibDeflateFilter( | |
553 false, | |
554 ZLibOption.DEFAULT_LEVEL, | |
555 serverSide ? serverMaxWindowBits : clientMaxWindowBits, | |
556 ZLibOption.DEFAULT_MEM_LEVEL, | |
557 ZLibOption.STRATEGY_DEFAULT, | |
558 null, | |
559 true); | |
560 } | |
561 } | |
562 | |
563 List<int> processIncomingMessage(List<int> msg) { | |
564 _ensureDecoder(); | |
565 | |
566 var data = []; | |
567 data.addAll(msg); | |
568 data.addAll(const [0x00, 0x00, 0xff, 0xff]); | |
569 | |
570 decoder.process(data, 0, data.length); | |
571 var reuse = | |
572 !(serverSide ? clientNoContextTakeover : serverNoContextTakeover); | |
573 var result = []; | |
574 var out; | |
575 | |
576 while ((out = decoder.processed(flush: reuse)) != null) { | |
577 result.addAll(out); | |
578 } | |
579 | |
580 decoder.processed(flush: reuse); | |
581 | |
582 if (!reuse) { | |
583 decoder.end(); | |
584 decoder = null; | |
585 } | |
586 return result; | |
587 } | |
588 | |
589 List<int> processOutgoingMessage(List<int> msg) { | |
590 _ensureEncoder(); | |
591 var reuse = | |
592 !(serverSide ? serverNoContextTakeover : clientNoContextTakeover); | |
593 var result = []; | |
594 var out; | |
595 | |
596 encoder.process(msg, 0, msg.length); | |
597 | |
598 while ((out = encoder.processed(flush: reuse)) != null) { | |
599 result.addAll(out); | |
600 } | |
601 | |
602 if (serverSide ? serverNoContextTakeover : clientNoContextTakeover) { | |
603 encoder.end(); | |
604 encoder = null; | |
605 } | |
606 | |
607 if (result.length > 4) { | |
608 result = result.sublist(0, result.length - 4); | |
609 } | |
610 | |
611 return result; | |
612 } | |
613 } | |
467 | 614 |
468 // TODO(ajohnsen): Make this transformer reusable. | 615 // TODO(ajohnsen): Make this transformer reusable. |
469 class _WebSocketOutgoingTransformer implements StreamTransformer, EventSink { | 616 class _WebSocketOutgoingTransformer implements StreamTransformer, EventSink { |
470 final _WebSocketImpl webSocket; | 617 final _WebSocketImpl webSocket; |
471 EventSink _eventSink; | 618 EventSink _eventSink; |
472 | 619 |
473 _WebSocketOutgoingTransformer(this.webSocket); | 620 _WebSocketPerMessageDeflate _deflateHelper; |
621 | |
622 _WebSocketOutgoingTransformer(this.webSocket) { | |
623 _deflateHelper = webSocket._deflate; | |
624 } | |
474 | 625 |
475 Stream bind(Stream stream) { | 626 Stream bind(Stream stream) { |
476 return new Stream.eventTransformed( | 627 return new Stream.eventTransformed(stream, (EventSink eventSink) { |
477 stream, | 628 if (_eventSink != null) { |
478 (EventSink eventSink) { | 629 throw new StateError("WebSocket transformer already used"); |
479 if (_eventSink != null) { | 630 } |
480 throw new StateError("WebSocket transformer already used"); | 631 _eventSink = eventSink; |
481 } | 632 return this; |
482 _eventSink = eventSink; | 633 }); |
483 return this; | |
484 }); | |
485 } | 634 } |
486 | 635 |
487 void add(message) { | 636 void add(message) { |
488 if (message is _WebSocketPong) { | 637 if (message is _WebSocketPong) { |
489 addFrame(_WebSocketOpcode.PONG, message.payload); | 638 addFrame(_WebSocketOpcode.PONG, message.payload); |
490 return; | 639 return; |
491 } | 640 } |
492 if (message is _WebSocketPing) { | 641 if (message is _WebSocketPing) { |
493 addFrame(_WebSocketOpcode.PING, message.payload); | 642 addFrame(_WebSocketOpcode.PING, message.payload); |
494 return; | 643 return; |
495 } | 644 } |
496 List<int> data; | 645 List<int> data; |
497 int opcode; | 646 int opcode; |
498 if (message != null) { | 647 if (message != null) { |
499 if (message is String) { | 648 if (message is String) { |
500 opcode = _WebSocketOpcode.TEXT; | 649 opcode = _WebSocketOpcode.TEXT; |
501 data = UTF8.encode(message); | 650 data = UTF8.encode(message); |
502 } else { | 651 } else { |
503 if (message is !List<int>) { | 652 if (message is! List<int>) { |
504 throw new ArgumentError(message); | 653 throw new ArgumentError(message); |
505 } | 654 } |
506 opcode = _WebSocketOpcode.BINARY; | 655 opcode = _WebSocketOpcode.BINARY; |
507 data = message; | 656 data = message; |
508 } | 657 } |
658 | |
659 if (_deflateHelper != null) { | |
660 data = _deflateHelper.processOutgoingMessage(data); | |
661 } | |
509 } else { | 662 } else { |
510 opcode = _WebSocketOpcode.TEXT; | 663 opcode = _WebSocketOpcode.TEXT; |
511 } | 664 } |
512 addFrame(opcode, data); | 665 addFrame(opcode, data); |
513 } | 666 } |
514 | 667 |
515 void addError(Object error, [StackTrace stackTrace]) => | 668 void addError(Object error, [StackTrace stackTrace]) => |
516 _eventSink.addError(error, stackTrace); | 669 _eventSink.addError(error, stackTrace); |
517 | 670 |
518 void close() { | 671 void close() { |
519 int code = webSocket._outCloseCode; | 672 int code = webSocket._outCloseCode; |
520 String reason = webSocket._outCloseReason; | 673 String reason = webSocket._outCloseReason; |
521 List<int> data; | 674 List<int> data; |
522 if (code != null) { | 675 if (code != null) { |
523 data = new List<int>(); | 676 data = new List<int>(); |
524 data.add((code >> 8) & 0xFF); | 677 data.add((code >> 8) & 0xFF); |
525 data.add(code & 0xFF); | 678 data.add(code & 0xFF); |
526 if (reason != null) { | 679 if (reason != null) { |
527 data.addAll(UTF8.encode(reason)); | 680 data.addAll(UTF8.encode(reason)); |
528 } | 681 } |
529 } | 682 } |
530 addFrame(_WebSocketOpcode.CLOSE, data); | 683 addFrame(_WebSocketOpcode.CLOSE, data); |
531 _eventSink.close(); | 684 _eventSink.close(); |
532 } | 685 } |
533 | 686 |
534 void addFrame(int opcode, List<int> data) => | 687 void addFrame(int opcode, List<int> data) => createFrame( |
535 createFrame(opcode, data, webSocket._serverSide).forEach(_eventSink.add); | 688 opcode, |
689 data, | |
690 webSocket._serverSide, | |
691 _deflateHelper != null && | |
692 (opcode == _WebSocketOpcode.TEXT || | |
693 opcode == _WebSocketOpcode.BINARY)).forEach((e) { | |
694 _eventSink.add(e); | |
695 }); | |
536 | 696 |
537 static Iterable createFrame(int opcode, List<int> data, bool serverSide) { | 697 static Iterable createFrame( |
538 bool mask = !serverSide; // Masking not implemented for server. | 698 int opcode, List<int> data, bool serverSide, bool compressed) { |
699 bool mask = !serverSide; // Masking not implemented for server. | |
539 int dataLength = data == null ? 0 : data.length; | 700 int dataLength = data == null ? 0 : data.length; |
540 // Determine the header size. | 701 // Determine the header size. |
541 int headerSize = (mask) ? 6 : 2; | 702 int headerSize = (mask) ? 6 : 2; |
542 if (dataLength > 65535) { | 703 if (dataLength > 65535) { |
543 headerSize += 8; | 704 headerSize += 8; |
544 } else if (dataLength > 125) { | 705 } else if (dataLength > 125) { |
545 headerSize += 2; | 706 headerSize += 2; |
546 } | 707 } |
547 Uint8List header = new Uint8List(headerSize); | 708 Uint8List header = new Uint8List(headerSize); |
548 int index = 0; | 709 int index = 0; |
710 | |
711 var rsv = 0; | |
712 if (compressed) { | |
713 rsv = 4; | |
714 } | |
715 | |
549 // Set FIN and opcode. | 716 // Set FIN and opcode. |
550 header[index++] = 0x80 | opcode; | 717 var hoc = (1 << 7) | (rsv % 8) << 4 | (opcode % 128); |
718 | |
719 header[index++] = hoc; | |
Søren Gjesse
2015/08/24 08:21:50
Just set this byte like this (no need to the rsv a
| |
551 // Determine size and position of length field. | 720 // Determine size and position of length field. |
552 int lengthBytes = 1; | 721 int lengthBytes = 1; |
553 int firstLengthByte = 1; | |
554 if (dataLength > 65535) { | 722 if (dataLength > 65535) { |
555 header[index++] = 127; | 723 header[index++] = 127; |
556 lengthBytes = 8; | 724 lengthBytes = 8; |
557 } else if (dataLength > 125) { | 725 } else if (dataLength > 125) { |
558 header[index++] = 126; | 726 header[index++] = 126; |
559 lengthBytes = 2; | 727 lengthBytes = 2; |
560 } | 728 } |
561 // Write the length in network byte order into the header. | 729 // Write the length in network byte order into the header. |
562 for (int i = 0; i < lengthBytes; i++) { | 730 for (int i = 0; i < lengthBytes; i++) { |
563 header[index++] = dataLength >> (((lengthBytes - 1) - i) * 8) & 0xFF; | 731 header[index++] = dataLength >> (((lengthBytes - 1) - i) * 8) & 0xFF; |
564 } | 732 } |
565 if (mask) { | 733 if (mask) { |
566 header[1] |= 1 << 7; | 734 header[1] |= 1 << 7; |
567 var maskBytes = _IOCrypto.getRandomBytes(4); | 735 var maskBytes = _IOCrypto.getRandomBytes(4); |
568 header.setRange(index, index + 4, maskBytes); | 736 header.setRange(index, index + 4, maskBytes); |
569 index += 4; | 737 index += 4; |
570 if (data != null) { | 738 if (data != null) { |
571 Uint8List list; | 739 Uint8List list; |
572 // If this is a text message just do the masking inside the | 740 // If this is a text message just do the masking inside the |
573 // encoded data. | 741 // encoded data. |
574 if (opcode == _WebSocketOpcode.TEXT && data is Uint8List) { | 742 if (opcode == _WebSocketOpcode.TEXT && data is Uint8List) { |
575 list = data; | 743 list = data; |
576 } else { | 744 } else { |
577 if (data is Uint8List) { | 745 if (data is Uint8List) { |
578 list = new Uint8List.fromList(data); | 746 list = new Uint8List.fromList(data); |
579 } else { | 747 } else { |
580 list = new Uint8List(data.length); | 748 list = new Uint8List(data.length); |
581 for (int i = 0; i < data.length; i++) { | 749 for (int i = 0; i < data.length; i++) { |
582 if (data[i] < 0 || 255 < data[i]) { | 750 if (data[i] < 0 || 255 < data[i]) { |
583 throw new ArgumentError( | 751 throw new ArgumentError("List element is not a byte value " |
584 "List element is not a byte value " | |
585 "(value ${data[i]} at index $i)"); | 752 "(value ${data[i]} at index $i)"); |
586 } | 753 } |
587 list[i] = data[i]; | 754 list[i] = data[i]; |
588 } | 755 } |
589 } | 756 } |
590 } | 757 } |
591 const int BLOCK_SIZE = 16; | 758 const int BLOCK_SIZE = 16; |
592 int blockCount = list.length ~/ BLOCK_SIZE; | 759 int blockCount = list.length ~/ BLOCK_SIZE; |
593 if (blockCount > 0) { | 760 if (blockCount > 0) { |
594 // Create mask block. | 761 // Create mask block. |
595 int mask = 0; | 762 int mask = 0; |
596 for (int i = 3; i >= 0; i--) { | 763 for (int i = 3; i >= 0; i--) { |
597 mask = (mask << 8) | maskBytes[i]; | 764 mask = (mask << 8) | maskBytes[i]; |
598 } | 765 } |
599 Int32x4 blockMask = new Int32x4(mask, mask, mask, mask); | 766 Int32x4 blockMask = new Int32x4(mask, mask, mask, mask); |
600 Int32x4List blockBuffer = new Int32x4List.view( | 767 Int32x4List blockBuffer = |
601 list.buffer, 0, blockCount); | 768 new Int32x4List.view(list.buffer, 0, blockCount); |
602 for (int i = 0; i < blockBuffer.length; i++) { | 769 for (int i = 0; i < blockBuffer.length; i++) { |
603 blockBuffer[i] ^= blockMask; | 770 blockBuffer[i] ^= blockMask; |
604 } | 771 } |
605 } | 772 } |
606 // Handle end. | 773 // Handle end. |
607 for (int i = blockCount * BLOCK_SIZE; i < list.length; i++) { | 774 for (int i = blockCount * BLOCK_SIZE; i < list.length; i++) { |
608 list[i] ^= maskBytes[i & 3]; | 775 list[i] ^= maskBytes[i & 3]; |
609 } | 776 } |
610 data = list; | 777 data = list; |
611 } | 778 } |
612 } | 779 } |
613 assert(index == headerSize); | 780 assert(index == headerSize); |
614 if (data == null) { | 781 if (data == null) { |
615 return [header]; | 782 return [header]; |
616 } else { | 783 } else { |
617 return [header, data]; | 784 return [header, data]; |
618 } | 785 } |
619 } | 786 } |
620 } | 787 } |
621 | 788 |
622 | |
623 class _WebSocketConsumer implements StreamConsumer { | 789 class _WebSocketConsumer implements StreamConsumer { |
624 final _WebSocketImpl webSocket; | 790 final _WebSocketImpl webSocket; |
625 final Socket socket; | 791 final Socket socket; |
626 StreamController _controller; | 792 StreamController _controller; |
627 StreamSubscription _subscription; | 793 StreamSubscription _subscription; |
628 bool _issuedPause = false; | 794 bool _issuedPause = false; |
629 bool _closed = false; | 795 bool _closed = false; |
630 Completer _closeCompleter = new Completer(); | 796 Completer _closeCompleter = new Completer(); |
631 Completer _completer; | 797 Completer _completer; |
632 | 798 |
(...skipping 24 matching lines...) Expand all Loading... | |
657 void _cancel() { | 823 void _cancel() { |
658 if (_subscription != null) { | 824 if (_subscription != null) { |
659 var subscription = _subscription; | 825 var subscription = _subscription; |
660 _subscription = null; | 826 _subscription = null; |
661 subscription.cancel(); | 827 subscription.cancel(); |
662 } | 828 } |
663 } | 829 } |
664 | 830 |
665 _ensureController() { | 831 _ensureController() { |
666 if (_controller != null) return; | 832 if (_controller != null) return; |
667 _controller = new StreamController(sync: true, | 833 _controller = new StreamController( |
668 onPause: _onPause, | 834 sync: true, |
669 onResume: _onResume, | 835 onPause: _onPause, |
670 onCancel: _onListen); | 836 onResume: _onResume, |
671 var stream = _controller.stream.transform( | 837 onCancel: _onListen); |
672 new _WebSocketOutgoingTransformer(webSocket)); | 838 var stream = _controller.stream |
673 socket.addStream(stream) | 839 .transform(new _WebSocketOutgoingTransformer(webSocket)); |
674 .then((_) { | 840 socket.addStream(stream).then((_) { |
675 _done(); | 841 _done(); |
676 _closeCompleter.complete(webSocket); | 842 _closeCompleter.complete(webSocket); |
677 }, onError: (error, StackTrace stackTrace) { | 843 }, onError: (error, StackTrace stackTrace) { |
678 _closed = true; | 844 _closed = true; |
679 _cancel(); | 845 _cancel(); |
680 if (error is ArgumentError) { | 846 if (error is ArgumentError) { |
681 if (!_done(error, stackTrace)) { | 847 if (!_done(error, stackTrace)) { |
682 _closeCompleter.completeError(error, stackTrace); | 848 _closeCompleter.completeError(error, stackTrace); |
683 } | 849 } |
684 } else { | 850 } else { |
685 _done(); | 851 _done(); |
686 _closeCompleter.complete(webSocket); | 852 _closeCompleter.complete(webSocket); |
687 } | 853 } |
688 }); | 854 }); |
689 } | 855 } |
690 | 856 |
691 bool _done([error, StackTrace stackTrace]) { | 857 bool _done([error, StackTrace stackTrace]) { |
692 if (_completer == null) return false; | 858 if (_completer == null) return false; |
693 if (error != null) { | 859 if (error != null) { |
694 _completer.completeError(error, stackTrace); | 860 _completer.completeError(error, stackTrace); |
695 } else { | 861 } else { |
696 _completer.complete(webSocket); | 862 _completer.complete(webSocket); |
697 } | 863 } |
698 _completer = null; | 864 _completer = null; |
699 return true; | 865 return true; |
700 } | 866 } |
701 | 867 |
702 Future addStream(var stream) { | 868 Future addStream(var stream) { |
703 if (_closed) { | 869 if (_closed) { |
704 stream.listen(null).cancel(); | 870 stream.listen(null).cancel(); |
705 return new Future.value(webSocket); | 871 return new Future.value(webSocket); |
706 } | 872 } |
707 _ensureController(); | 873 _ensureController(); |
708 _completer = new Completer(); | 874 _completer = new Completer(); |
709 _subscription = stream.listen( | 875 _subscription = stream.listen((data) { |
710 (data) { | 876 _controller.add(data); |
711 _controller.add(data); | 877 }, onDone: _done, onError: _done, cancelOnError: true); |
712 }, | |
713 onDone: _done, | |
714 onError: _done, | |
715 cancelOnError: true); | |
716 if (_issuedPause) { | 878 if (_issuedPause) { |
717 _subscription.pause(); | 879 _subscription.pause(); |
718 _issuedPause = false; | 880 _issuedPause = false; |
719 } | 881 } |
720 return _completer.future; | 882 return _completer.future; |
721 } | 883 } |
722 | 884 |
723 Future close() { | 885 Future close() { |
724 _ensureController(); | 886 _ensureController(); |
725 Future closeSocket() { | 887 Future closeSocket() { |
726 return socket.close().catchError((_) {}).then((_) => webSocket); | 888 return socket.close().catchError((_) {}).then((_) => webSocket); |
727 } | 889 } |
728 _controller.close(); | 890 _controller.close(); |
729 return _closeCompleter.future.then((_) => closeSocket()); | 891 return _closeCompleter.future.then((_) => closeSocket()); |
730 } | 892 } |
731 | 893 |
732 void add(data) { | 894 void add(data) { |
733 if (_closed) return; | 895 if (_closed) return; |
734 _ensureController(); | 896 _ensureController(); |
735 _controller.add(data); | 897 _controller.add(data); |
736 } | 898 } |
737 | 899 |
738 void closeSocket() { | 900 void closeSocket() { |
739 _closed = true; | 901 _closed = true; |
740 _cancel(); | 902 _cancel(); |
741 close(); | 903 close(); |
742 } | 904 } |
743 } | 905 } |
744 | 906 |
745 | |
746 class _WebSocketImpl extends Stream with _ServiceObject implements WebSocket { | 907 class _WebSocketImpl extends Stream with _ServiceObject implements WebSocket { |
747 // Use default Map so we keep order. | 908 // Use default Map so we keep order. |
748 static Map<int, _WebSocketImpl> _webSockets = new Map<int, _WebSocketImpl>(); | 909 static Map<int, _WebSocketImpl> _webSockets = new Map<int, _WebSocketImpl>(); |
910 static const int DEFAULT_WINDOW_BITS = 15; | |
911 static const String PER_MESSAGE_DEFLATE = "permessage-deflate"; | |
749 | 912 |
750 final String protocol; | 913 final String protocol; |
751 | 914 |
752 StreamController _controller; | 915 StreamController _controller; |
753 StreamSubscription _subscription; | 916 StreamSubscription _subscription; |
754 StreamSink _sink; | 917 StreamSink _sink; |
755 | 918 |
756 final _socket; | 919 final _socket; |
757 final bool _serverSide; | 920 final bool _serverSide; |
758 int _readyState = WebSocket.CONNECTING; | 921 int _readyState = WebSocket.CONNECTING; |
759 bool _writeClosed = false; | 922 bool _writeClosed = false; |
760 int _closeCode; | 923 int _closeCode; |
761 String _closeReason; | 924 String _closeReason; |
762 Duration _pingInterval; | 925 Duration _pingInterval; |
763 Timer _pingTimer; | 926 Timer _pingTimer; |
764 _WebSocketConsumer _consumer; | 927 _WebSocketConsumer _consumer; |
765 | 928 |
766 int _outCloseCode; | 929 int _outCloseCode; |
767 String _outCloseReason; | 930 String _outCloseReason; |
768 Timer _closeTimer; | 931 Timer _closeTimer; |
932 _WebSocketPerMessageDeflate _deflate; | |
769 | 933 |
770 static final HttpClient _httpClient = new HttpClient(); | 934 static final HttpClient _httpClient = new HttpClient(); |
771 | 935 |
772 static Future<WebSocket> connect( | 936 static Future<WebSocket> connect( |
773 String url, Iterable<String> protocols, Map<String, dynamic> headers) { | 937 String url, Iterable<String> protocols, Map<String, dynamic> headers, |
938 {CompressionOptions compression: CompressionOptions.DEFAULT}) { | |
774 Uri uri = Uri.parse(url); | 939 Uri uri = Uri.parse(url); |
775 if (uri.scheme != "ws" && uri.scheme != "wss") { | 940 if (uri.scheme != "ws" && uri.scheme != "wss") { |
776 throw new WebSocketException("Unsupported URL scheme '${uri.scheme}'"); | 941 throw new WebSocketException("Unsupported URL scheme '${uri.scheme}'"); |
777 } | 942 } |
778 | 943 |
779 Random random = new Random(); | 944 Random random = new Random(); |
780 // Generate 16 random bytes. | 945 // Generate 16 random bytes. |
781 Uint8List nonceData = new Uint8List(16); | 946 Uint8List nonceData = new Uint8List(16); |
782 for (int i = 0; i < 16; i++) { | 947 for (int i = 0; i < 16; i++) { |
783 nonceData[i] = random.nextInt(256); | 948 nonceData[i] = random.nextInt(256); |
784 } | 949 } |
785 String nonce = _CryptoUtils.bytesToBase64(nonceData); | 950 String nonce = _CryptoUtils.bytesToBase64(nonceData); |
786 | 951 |
787 uri = new Uri(scheme: uri.scheme == "wss" ? "https" : "http", | 952 uri = new Uri( |
788 userInfo: uri.userInfo, | 953 scheme: uri.scheme == "wss" ? "https" : "http", |
789 host: uri.host, | 954 userInfo: uri.userInfo, |
790 port: uri.port, | 955 host: uri.host, |
791 path: uri.path, | 956 port: uri.port, |
792 query: uri.query, | 957 path: uri.path, |
793 fragment: uri.fragment); | 958 query: uri.query, |
794 return _httpClient.openUrl("GET", uri) | 959 fragment: uri.fragment); |
795 .then((request) { | 960 return _httpClient.openUrl("GET", uri).then((request) { |
796 if (uri.userInfo != null && !uri.userInfo.isEmpty) { | 961 if (uri.userInfo != null && !uri.userInfo.isEmpty) { |
797 // If the URL contains user information use that for basic | 962 // If the URL contains user information use that for basic |
798 // authorization. | 963 // authorization. |
799 String auth = | 964 String auth = _CryptoUtils.bytesToBase64(UTF8.encode(uri.userInfo)); |
800 _CryptoUtils.bytesToBase64(UTF8.encode(uri.userInfo)); | 965 request.headers.set(HttpHeaders.AUTHORIZATION, "Basic $auth"); |
801 request.headers.set(HttpHeaders.AUTHORIZATION, "Basic $auth"); | 966 } |
967 if (headers != null) { | |
968 headers.forEach((field, value) => request.headers.add(field, value)); | |
969 } | |
970 // Setup the initial handshake. | |
971 request.headers | |
972 ..set(HttpHeaders.CONNECTION, "Upgrade") | |
973 ..set(HttpHeaders.UPGRADE, "websocket") | |
974 ..set("Sec-WebSocket-Key", nonce) | |
975 ..set("Cache-Control", "no-cache") | |
976 ..set("Sec-WebSocket-Version", "13"); | |
977 if (protocols != null) { | |
978 request.headers.add("Sec-WebSocket-Protocol", protocols.toList()); | |
979 } | |
980 | |
981 request.headers | |
982 .add("Sec-WebSocket-Extensions", compression._createHeader()); | |
983 | |
984 return request.close(); | |
985 }).then((response) { | |
986 void error(String message) { | |
987 // Flush data. | |
988 response.detachSocket().then((socket) { | |
989 socket.destroy(); | |
990 }); | |
991 throw new WebSocketException(message); | |
992 } | |
993 if (response.statusCode != HttpStatus.SWITCHING_PROTOCOLS || | |
994 response.headers[HttpHeaders.CONNECTION] == null || | |
995 !response.headers[HttpHeaders.CONNECTION] | |
996 .any((value) => value.toLowerCase() == "upgrade") || | |
997 response.headers.value(HttpHeaders.UPGRADE).toLowerCase() != | |
998 "websocket") { | |
999 error("Connection to '$uri' was not upgraded to websocket"); | |
1000 } | |
1001 String accept = response.headers.value("Sec-WebSocket-Accept"); | |
1002 if (accept == null) { | |
1003 error("Response did not contain a 'Sec-WebSocket-Accept' header"); | |
1004 } | |
1005 _SHA1 sha1 = new _SHA1(); | |
1006 sha1.add("$nonce$_webSocketGUID".codeUnits); | |
1007 List<int> expectedAccept = sha1.close(); | |
1008 List<int> receivedAccept = _CryptoUtils.base64StringToBytes(accept); | |
1009 if (expectedAccept.length != receivedAccept.length) { | |
1010 error("Reasponse header 'Sec-WebSocket-Accept' is the wrong length"); | |
1011 } | |
1012 for (int i = 0; i < expectedAccept.length; i++) { | |
1013 if (expectedAccept[i] != receivedAccept[i]) { | |
1014 error("Bad response 'Sec-WebSocket-Accept' header"); | |
802 } | 1015 } |
803 if (headers != null) { | 1016 } |
804 headers.forEach((field, value) => request.headers.add(field, value)); | 1017 var protocol = response.headers.value('Sec-WebSocket-Protocol'); |
805 } | 1018 |
806 // Setup the initial handshake. | 1019 _WebSocketPerMessageDeflate deflate = |
807 request.headers | 1020 negotiateClientCompression(response, compression); |
808 ..set(HttpHeaders.CONNECTION, "Upgrade") | 1021 |
809 ..set(HttpHeaders.UPGRADE, "websocket") | 1022 return response.detachSocket().then((socket) => |
810 ..set("Sec-WebSocket-Key", nonce) | 1023 new _WebSocketImpl._fromSocket( |
811 ..set("Cache-Control", "no-cache") | 1024 socket, protocol, compression, false, deflate)); |
812 ..set("Sec-WebSocket-Version", "13"); | 1025 }); |
813 if (protocols != null) { | |
814 request.headers.add("Sec-WebSocket-Protocol", protocols.toList()); | |
815 } | |
816 return request.close(); | |
817 }) | |
818 .then((response) { | |
819 void error(String message) { | |
820 // Flush data. | |
821 response.detachSocket().then((socket) { | |
822 socket.destroy(); | |
823 }); | |
824 throw new WebSocketException(message); | |
825 } | |
826 if (response.statusCode != HttpStatus.SWITCHING_PROTOCOLS || | |
827 response.headers[HttpHeaders.CONNECTION] == null || | |
828 !response.headers[HttpHeaders.CONNECTION].any( | |
829 (value) => value.toLowerCase() == "upgrade") || | |
830 response.headers.value(HttpHeaders.UPGRADE).toLowerCase() != | |
831 "websocket") { | |
832 error("Connection to '$uri' was not upgraded to websocket"); | |
833 } | |
834 String accept = response.headers.value("Sec-WebSocket-Accept"); | |
835 if (accept == null) { | |
836 error("Response did not contain a 'Sec-WebSocket-Accept' header"); | |
837 } | |
838 _SHA1 sha1 = new _SHA1(); | |
839 sha1.add("$nonce$_webSocketGUID".codeUnits); | |
840 List<int> expectedAccept = sha1.close(); | |
841 List<int> receivedAccept = _CryptoUtils.base64StringToBytes(accept); | |
842 if (expectedAccept.length != receivedAccept.length) { | |
843 error("Reasponse header 'Sec-WebSocket-Accept' is the wrong length"); | |
844 } | |
845 for (int i = 0; i < expectedAccept.length; i++) { | |
846 if (expectedAccept[i] != receivedAccept[i]) { | |
847 error("Bad response 'Sec-WebSocket-Accept' header"); | |
848 } | |
849 } | |
850 var protocol = response.headers.value('Sec-WebSocket-Protocol'); | |
851 return response.detachSocket() | |
852 .then((socket) => new _WebSocketImpl._fromSocket(socket, protocol)); | |
853 }); | |
854 } | 1026 } |
855 | 1027 |
856 _WebSocketImpl._fromSocket(this._socket, this.protocol, | 1028 static _WebSocketPerMessageDeflate negotiateClientCompression( |
857 [this._serverSide = false]) { | 1029 HttpClientResponse response, CompressionOptions compression) { |
1030 String extensionHeader = response.headers.value('Sec-WebSocket-Extensions'); | |
1031 | |
1032 if (extensionHeader == null) { | |
1033 extensionHeader = ""; | |
1034 } | |
1035 | |
1036 Iterable<List<String>> extensions = | |
1037 extensionHeader.split(", ").map((it) => it.split("; ")); | |
Søren Gjesse
2015/08/24 08:21:50
Please see above (comment on _negoatiateCompressio
| |
1038 | |
1039 if (compression.enabled && | |
1040 extensions.any((x) => x[0] == PER_MESSAGE_DEFLATE)) { | |
1041 var opts = extensions.firstWhere((x) => x[0] == PER_MESSAGE_DEFLATE); | |
1042 var serverNoContextTakeover = opts.contains("server_no_context_takeover"); | |
1043 var clientNoContextTakeover = opts.contains("client_no_context_takeover"); | |
1044 | |
1045 int getWindowBits(String type) { | |
1046 var o = opts.firstWhere((x) => x.startsWith("${type}_max_window_bits="), | |
1047 orElse: () => null); | |
1048 | |
1049 if (o == null) { | |
1050 return DEFAULT_WINDOW_BITS; | |
1051 } | |
1052 | |
1053 try { | |
1054 o = o.substring("${type}_max_window_bits=".length); | |
1055 o = int.parse(o); | |
1056 } catch (e) { | |
1057 return DEFAULT_WINDOW_BITS; | |
1058 } | |
1059 | |
1060 return o; | |
1061 } | |
1062 | |
1063 return new _WebSocketPerMessageDeflate( | |
1064 clientMaxWindowBits: getWindowBits("client"), | |
1065 serverMaxWindowBits: getWindowBits("server"), | |
1066 clientNoContextTakeover: clientNoContextTakeover, | |
1067 serverNoContextTakeover: serverNoContextTakeover); | |
1068 } | |
1069 | |
1070 return null; | |
1071 } | |
1072 | |
1073 _WebSocketImpl._fromSocket( | |
1074 this._socket, this.protocol, CompressionOptions compression, | |
1075 [this._serverSide = false, _WebSocketPerMessageDeflate deflate]) { | |
858 _consumer = new _WebSocketConsumer(this, _socket); | 1076 _consumer = new _WebSocketConsumer(this, _socket); |
859 _sink = new _StreamSinkImpl(_consumer); | 1077 _sink = new _StreamSinkImpl(_consumer); |
860 _readyState = WebSocket.OPEN; | 1078 _readyState = WebSocket.OPEN; |
1079 _deflate = deflate; | |
861 | 1080 |
862 var transformer = new _WebSocketProtocolTransformer(_serverSide); | 1081 var transformer = new _WebSocketProtocolTransformer(_serverSide, _deflate); |
863 _subscription = _socket.transform(transformer).listen( | 1082 _subscription = _socket.transform(transformer).listen((data) { |
864 (data) { | 1083 if (data is _WebSocketPing) { |
865 if (data is _WebSocketPing) { | 1084 if (!_writeClosed) _consumer.add(new _WebSocketPong(data.payload)); |
866 if (!_writeClosed) _consumer.add(new _WebSocketPong(data.payload)); | 1085 } else if (data is _WebSocketPong) { |
867 } else if (data is _WebSocketPong) { | 1086 // Simply set pingInterval, as it'll cancel any timers. |
868 // Simply set pingInterval, as it'll cancel any timers. | 1087 pingInterval = _pingInterval; |
869 pingInterval = _pingInterval; | 1088 } else { |
870 } else { | 1089 _controller.add(data); |
871 _controller.add(data); | 1090 } |
872 } | 1091 }, onError: (error, stackTrace) { |
873 }, | 1092 if (_closeTimer != null) _closeTimer.cancel(); |
874 onError: (error) { | 1093 if (error is FormatException) { |
875 if (_closeTimer != null) _closeTimer.cancel(); | 1094 _close(WebSocketStatus.INVALID_FRAME_PAYLOAD_DATA); |
876 if (error is FormatException) { | 1095 } else { |
877 _close(WebSocketStatus.INVALID_FRAME_PAYLOAD_DATA); | 1096 _close(WebSocketStatus.PROTOCOL_ERROR); |
878 } else { | 1097 } |
879 _close(WebSocketStatus.PROTOCOL_ERROR); | 1098 // An error happened, set the close code set above. |
880 } | 1099 _closeCode = _outCloseCode; |
881 // An error happened, set the close code set above. | 1100 _closeReason = _outCloseReason; |
882 _closeCode = _outCloseCode; | 1101 _controller.close(); |
883 _closeReason = _outCloseReason; | 1102 }, onDone: () { |
884 _controller.close(); | 1103 if (_closeTimer != null) _closeTimer.cancel(); |
885 }, | 1104 if (_readyState == WebSocket.OPEN) { |
886 onDone: () { | 1105 _readyState = WebSocket.CLOSING; |
887 if (_closeTimer != null) _closeTimer.cancel(); | 1106 if (!_isReservedStatusCode(transformer.closeCode)) { |
888 if (_readyState == WebSocket.OPEN) { | 1107 _close(transformer.closeCode, transformer.closeReason); |
889 _readyState = WebSocket.CLOSING; | 1108 } else { |
890 if (!_isReservedStatusCode(transformer.closeCode)) { | 1109 _close(); |
891 _close(transformer.closeCode, transformer.closeReason); | 1110 } |
892 } else { | 1111 _readyState = WebSocket.CLOSED; |
893 _close(); | 1112 } |
894 } | 1113 // Protocol close, use close code from transformer. |
895 _readyState = WebSocket.CLOSED; | 1114 _closeCode = transformer.closeCode; |
896 } | 1115 _closeReason = transformer.closeReason; |
897 // Protocol close, use close code from transformer. | 1116 _controller.close(); |
898 _closeCode = transformer.closeCode; | 1117 }, cancelOnError: true); |
899 _closeReason = transformer.closeReason; | |
900 _controller.close(); | |
901 }, | |
902 cancelOnError: true); | |
903 _subscription.pause(); | 1118 _subscription.pause(); |
904 _controller = new StreamController(sync: true, | 1119 _controller = new StreamController( |
905 onListen: _subscription.resume, | 1120 sync: true, onListen: _subscription.resume, onCancel: () { |
906 onCancel: () { | 1121 _subscription.cancel(); |
907 _subscription.cancel(); | 1122 _subscription = null; |
908 _subscription = null; | 1123 }, onPause: _subscription.pause, onResume: _subscription.resume); |
909 }, | |
910 onPause: _subscription.pause, | |
911 onResume: _subscription.resume); | |
912 | 1124 |
913 _webSockets[_serviceId] = this; | 1125 _webSockets[_serviceId] = this; |
914 try { _socket._owner = this; } catch (_) {} | 1126 try { |
1127 _socket._owner = this; | |
1128 } catch (_) {} | |
915 } | 1129 } |
916 | 1130 |
917 StreamSubscription listen(void onData(message), | 1131 StreamSubscription listen(void onData(message), |
918 {Function onError, | 1132 {Function onError, void onDone(), bool cancelOnError}) { |
919 void onDone(), | |
920 bool cancelOnError}) { | |
921 return _controller.stream.listen(onData, | 1133 return _controller.stream.listen(onData, |
922 onError: onError, | 1134 onError: onError, onDone: onDone, cancelOnError: cancelOnError); |
923 onDone: onDone, | |
924 cancelOnError: cancelOnError); | |
925 } | 1135 } |
926 | 1136 |
927 Duration get pingInterval => _pingInterval; | 1137 Duration get pingInterval => _pingInterval; |
928 | 1138 |
929 void set pingInterval(Duration interval) { | 1139 void set pingInterval(Duration interval) { |
930 if (_writeClosed) return; | 1140 if (_writeClosed) return; |
931 if (_pingTimer != null) _pingTimer.cancel(); | 1141 if (_pingTimer != null) _pingTimer.cancel(); |
932 _pingInterval = interval; | 1142 _pingInterval = interval; |
933 | 1143 |
934 if (_pingInterval == null) return; | 1144 if (_pingInterval == null) return; |
(...skipping 85 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
1020 'type': '@Socket', | 1230 'type': '@Socket', |
1021 'name': 'UserSocket', | 1231 'name': 'UserSocket', |
1022 'user_name': 'UserSocket', | 1232 'user_name': 'UserSocket', |
1023 }; | 1233 }; |
1024 } | 1234 } |
1025 return r; | 1235 return r; |
1026 } | 1236 } |
1027 | 1237 |
1028 static bool _isReservedStatusCode(int code) { | 1238 static bool _isReservedStatusCode(int code) { |
1029 return code != null && | 1239 return code != null && |
1030 (code < WebSocketStatus.NORMAL_CLOSURE || | 1240 (code < WebSocketStatus.NORMAL_CLOSURE || |
1031 code == WebSocketStatus.RESERVED_1004 || | 1241 code == WebSocketStatus.RESERVED_1004 || |
1032 code == WebSocketStatus.NO_STATUS_RECEIVED || | 1242 code == WebSocketStatus.NO_STATUS_RECEIVED || |
1033 code == WebSocketStatus.ABNORMAL_CLOSURE || | 1243 code == WebSocketStatus.ABNORMAL_CLOSURE || |
1034 (code > WebSocketStatus.INTERNAL_SERVER_ERROR && | 1244 (code > WebSocketStatus.INTERNAL_SERVER_ERROR && |
1035 code < WebSocketStatus.RESERVED_1015) || | 1245 code < WebSocketStatus.RESERVED_1015) || |
1036 (code >= WebSocketStatus.RESERVED_1015 && | 1246 (code >= WebSocketStatus.RESERVED_1015 && code < 3000)); |
1037 code < 3000)); | |
1038 } | 1247 } |
1039 } | 1248 } |
OLD | NEW |