OLD | NEW |
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 part of dart.io; | 5 part of dart.io; |
6 | 6 |
7 const String _webSocketGUID = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"; | 7 const String _webSocketGUID = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"; |
8 const String _clientNoContextTakeover = "client_no_context_takeover"; | |
9 const String _serverNoContextTakeover = "server_no_context_takeover"; | |
10 const String _clientMaxWindowBits = "client_max_window_bits"; | |
11 const String _serverMaxWindowBits = "server_max_window_bits"; | |
12 | 8 |
13 // Matches _WebSocketOpcode. | 9 // Matches _WebSocketOpcode. |
14 class _WebSocketMessageType { | 10 class _WebSocketMessageType { |
15 static const int NONE = 0; | 11 static const int NONE = 0; |
16 static const int TEXT = 1; | 12 static const int TEXT = 1; |
17 static const int BINARY = 2; | 13 static const int BINARY = 2; |
18 } | 14 } |
19 | 15 |
| 16 |
20 class _WebSocketOpcode { | 17 class _WebSocketOpcode { |
21 static const int CONTINUATION = 0; | 18 static const int CONTINUATION = 0; |
22 static const int TEXT = 1; | 19 static const int TEXT = 1; |
23 static const int BINARY = 2; | 20 static const int BINARY = 2; |
24 static const int RESERVED_3 = 3; | 21 static const int RESERVED_3 = 3; |
25 static const int RESERVED_4 = 4; | 22 static const int RESERVED_4 = 4; |
26 static const int RESERVED_5 = 5; | 23 static const int RESERVED_5 = 5; |
27 static const int RESERVED_6 = 6; | 24 static const int RESERVED_6 = 6; |
28 static const int RESERVED_7 = 7; | 25 static const int RESERVED_7 = 7; |
29 static const int CLOSE = 8; | 26 static const int CLOSE = 8; |
30 static const int PING = 9; | 27 static const int PING = 9; |
31 static const int PONG = 10; | 28 static const int PONG = 10; |
32 static const int RESERVED_B = 11; | 29 static const int RESERVED_B = 11; |
33 static const int RESERVED_C = 12; | 30 static const int RESERVED_C = 12; |
34 static const int RESERVED_D = 13; | 31 static const int RESERVED_D = 13; |
35 static const int RESERVED_E = 14; | 32 static const int RESERVED_E = 14; |
36 static const int RESERVED_F = 15; | 33 static const int RESERVED_F = 15; |
37 } | 34 } |
38 | 35 |
39 /** | 36 /** |
40 * The web socket protocol transformer handles the protocol byte stream | 37 * The web socket protocol transformer handles the protocol byte stream |
41 * which is supplied through the [:handleData:]. As the protocol is processed, | 38 * which is supplied through the [:handleData:]. As the protocol is processed, |
42 * it'll output frame data as either a List<int> or String. | 39 * it'll output frame data as either a List<int> or String. |
43 * | 40 * |
44 * Important information about usage: Be sure you use cancelOnError, so the | 41 * Important infomation about usage: Be sure you use cancelOnError, so the |
45 * socket will be closed when the processor encounter an error. Not using it | 42 * socket will be closed when the processer encounter an error. Not using it |
46 * will lead to undefined behaviour. | 43 * will lead to undefined behaviour. |
47 */ | 44 */ |
48 // TODO(ajohnsen): make this transformer reusable? | 45 // TODO(ajohnsen): make this transformer reusable? |
49 class _WebSocketProtocolTransformer implements StreamTransformer, EventSink { | 46 class _WebSocketProtocolTransformer implements StreamTransformer, EventSink { |
50 static const int START = 0; | 47 static const int START = 0; |
51 static const int LEN_FIRST = 1; | 48 static const int LEN_FIRST = 1; |
52 static const int LEN_REST = 2; | 49 static const int LEN_REST = 2; |
53 static const int MASK = 3; | 50 static const int MASK = 3; |
54 static const int PAYLOAD = 4; | 51 static const int PAYLOAD = 4; |
55 static const int CLOSED = 5; | 52 static const int CLOSED = 5; |
56 static const int FAILURE = 6; | 53 static const int FAILURE = 6; |
57 static const int FIN = 0x80; | |
58 static const int RSV1 = 0x40; | |
59 static const int RSV2 = 0x20; | |
60 static const int RSV3 = 0x10; | |
61 static const int OPCODE = 0xF; | |
62 | 54 |
63 int _state = START; | 55 int _state = START; |
64 bool _fin = false; | 56 bool _fin = false; |
65 bool _compressed = false; | |
66 int _opcode = -1; | 57 int _opcode = -1; |
67 int _len = -1; | 58 int _len = -1; |
68 bool _masked = false; | 59 bool _masked = false; |
69 int _remainingLenBytes = -1; | 60 int _remainingLenBytes = -1; |
70 int _remainingMaskingKeyBytes = 4; | 61 int _remainingMaskingKeyBytes = 4; |
71 int _remainingPayloadBytes = -1; | 62 int _remainingPayloadBytes = -1; |
72 int _unmaskingIndex = 0; | 63 int _unmaskingIndex = 0; |
73 int _currentMessageType = _WebSocketMessageType.NONE; | 64 int _currentMessageType = _WebSocketMessageType.NONE; |
74 int closeCode = WebSocketStatus.NO_STATUS_RECEIVED; | 65 int closeCode = WebSocketStatus.NO_STATUS_RECEIVED; |
75 String closeReason = ""; | 66 String closeReason = ""; |
76 | 67 |
77 EventSink _eventSink; | 68 EventSink _eventSink; |
78 | 69 |
79 final bool _serverSide; | 70 final bool _serverSide; |
80 final List _maskingBytes = new List(4); | 71 final List _maskingBytes = new List(4); |
81 final BytesBuilder _payload = new BytesBuilder(copy: false); | 72 final BytesBuilder _payload = new BytesBuilder(copy: false); |
82 | 73 |
83 _WebSocketPerMessageDeflate _deflate; | 74 _WebSocketProtocolTransformer([this._serverSide = false]); |
84 _WebSocketProtocolTransformer([this._serverSide = false, this._deflate]); | |
85 | 75 |
86 Stream bind(Stream stream) { | 76 Stream bind(Stream stream) { |
87 return new Stream.eventTransformed(stream, (EventSink eventSink) { | 77 return new Stream.eventTransformed( |
88 if (_eventSink != null) { | 78 stream, |
89 throw new StateError("WebSocket transformer already used."); | 79 (EventSink eventSink) { |
90 } | 80 if (_eventSink != null) { |
91 _eventSink = eventSink; | 81 throw new StateError("WebSocket transformer already used."); |
92 return this; | 82 } |
93 }); | 83 _eventSink = eventSink; |
| 84 return this; |
| 85 }); |
94 } | 86 } |
95 | 87 |
96 void addError(Object error, [StackTrace stackTrace]) => | 88 void addError(Object error, [StackTrace stackTrace]) => |
97 _eventSink.addError(error, stackTrace); | 89 _eventSink.addError(error, stackTrace); |
98 | 90 |
99 void close() => _eventSink.close(); | 91 void close() => _eventSink.close(); |
100 | 92 |
101 /** | 93 /** |
102 * Process data received from the underlying communication channel. | 94 * Process data received from the underlying communication channel. |
103 */ | 95 */ |
104 void add(Uint8List buffer) { | 96 void add(Uint8List buffer) { |
| 97 int count = buffer.length; |
105 int index = 0; | 98 int index = 0; |
106 int lastIndex = buffer.length; | 99 int lastIndex = count; |
107 if (_state == CLOSED) { | 100 if (_state == CLOSED) { |
108 throw new WebSocketException("Data on closed connection"); | 101 throw new WebSocketException("Data on closed connection"); |
109 } | 102 } |
110 if (_state == FAILURE) { | 103 if (_state == FAILURE) { |
111 throw new WebSocketException("Data on failed connection"); | 104 throw new WebSocketException("Data on failed connection"); |
112 } | 105 } |
113 while ((index < lastIndex) && _state != CLOSED && _state != FAILURE) { | 106 while ((index < lastIndex) && _state != CLOSED && _state != FAILURE) { |
114 int byte = buffer[index]; | 107 int byte = buffer[index]; |
115 if (_state <= LEN_REST) { | 108 if (_state <= LEN_REST) { |
116 if (_state == START) { | 109 if (_state == START) { |
117 _fin = (byte & FIN) != 0; | 110 _fin = (byte & 0x80) != 0; |
118 | 111 if ((byte & 0x70) != 0) { |
119 if((byte & (RSV2 | RSV3)) != 0) { | 112 // The RSV1, RSV2 bits RSV3 must be all zero. |
120 // The RSV2, RSV3 bits must both be zero. | |
121 throw new WebSocketException("Protocol error"); | 113 throw new WebSocketException("Protocol error"); |
122 } | 114 } |
123 | 115 _opcode = (byte & 0xF); |
124 if ((byte & RSV1) != 0) { | |
125 _compressed = true; | |
126 } else { | |
127 _compressed = false; | |
128 } | |
129 _opcode = (byte & OPCODE); | |
130 | |
131 if (_opcode <= _WebSocketOpcode.BINARY) { | 116 if (_opcode <= _WebSocketOpcode.BINARY) { |
132 if (_opcode == _WebSocketOpcode.CONTINUATION) { | 117 if (_opcode == _WebSocketOpcode.CONTINUATION) { |
133 if (_currentMessageType == _WebSocketMessageType.NONE) { | 118 if (_currentMessageType == _WebSocketMessageType.NONE) { |
134 throw new WebSocketException("Protocol error"); | 119 throw new WebSocketException("Protocol error"); |
135 } | 120 } |
136 } else { | 121 } else { |
137 assert(_opcode == _WebSocketOpcode.TEXT || | 122 assert(_opcode == _WebSocketOpcode.TEXT || |
138 _opcode == _WebSocketOpcode.BINARY); | 123 _opcode == _WebSocketOpcode.BINARY); |
139 if (_currentMessageType != _WebSocketMessageType.NONE) { | 124 if (_currentMessageType != _WebSocketMessageType.NONE) { |
140 throw new WebSocketException("Protocol error"); | 125 throw new WebSocketException("Protocol error"); |
141 } | 126 } |
142 _currentMessageType = _opcode; | 127 _currentMessageType = _opcode; |
143 } | 128 } |
144 } else if (_opcode >= _WebSocketOpcode.CLOSE && | 129 } else if (_opcode >= _WebSocketOpcode.CLOSE && |
145 _opcode <= _WebSocketOpcode.PONG) { | 130 _opcode <= _WebSocketOpcode.PONG) { |
146 // Control frames cannot be fragmented. | 131 // Control frames cannot be fragmented. |
147 if (!_fin) throw new WebSocketException("Protocol error"); | 132 if (!_fin) throw new WebSocketException("Protocol error"); |
148 } else { | 133 } else { |
149 throw new WebSocketException("Protocol error"); | 134 throw new WebSocketException("Protocol error"); |
150 } | 135 } |
151 _state = LEN_FIRST; | 136 _state = LEN_FIRST; |
152 } else if (_state == LEN_FIRST) { | 137 } else if (_state == LEN_FIRST) { |
153 _masked = (byte & 0x80) != 0; | 138 _masked = (byte & 0x80) != 0; |
154 _len = byte & 0x7F; | 139 _len = byte & 0x7F; |
155 if (_isControlFrame() && _len > 125) { | 140 if (_isControlFrame() && _len > 125) { |
(...skipping 28 matching lines...) Expand all Loading... |
184 } else { | 169 } else { |
185 assert(_state == PAYLOAD); | 170 assert(_state == PAYLOAD); |
186 // The payload is not handled one byte at a time but in blocks. | 171 // The payload is not handled one byte at a time but in blocks. |
187 int payloadLength = min(lastIndex - index, _remainingPayloadBytes); | 172 int payloadLength = min(lastIndex - index, _remainingPayloadBytes); |
188 _remainingPayloadBytes -= payloadLength; | 173 _remainingPayloadBytes -= payloadLength; |
189 // Unmask payload if masked. | 174 // Unmask payload if masked. |
190 if (_masked) { | 175 if (_masked) { |
191 _unmask(index, payloadLength, buffer); | 176 _unmask(index, payloadLength, buffer); |
192 } | 177 } |
193 // Control frame and data frame share _payloads. | 178 // Control frame and data frame share _payloads. |
194 _payload.add(new Uint8List.view(buffer.buffer, index, payloadLength)); | 179 _payload.add( |
| 180 new Uint8List.view(buffer.buffer, index, payloadLength)); |
195 index += payloadLength; | 181 index += payloadLength; |
196 if (_isControlFrame()) { | 182 if (_isControlFrame()) { |
197 if (_remainingPayloadBytes == 0) _controlFrameEnd(); | 183 if (_remainingPayloadBytes == 0) _controlFrameEnd(); |
198 } else { | 184 } else { |
199 if (_currentMessageType != _WebSocketMessageType.TEXT && | 185 if (_currentMessageType != _WebSocketMessageType.TEXT && |
200 _currentMessageType != _WebSocketMessageType.BINARY) { | 186 _currentMessageType != _WebSocketMessageType.BINARY) { |
201 throw new WebSocketException("Protocol error"); | 187 throw new WebSocketException("Protocol error"); |
202 } | 188 } |
203 if (_remainingPayloadBytes == 0) _messageFrameEnd(); | 189 if (_remainingPayloadBytes == 0) _messageFrameEnd(); |
204 } | 190 } |
205 | 191 |
206 // Hack - as we always do index++ below. | 192 // Hack - as we always do index++ below. |
207 index--; | 193 index--; |
208 } | 194 } |
209 } | 195 } |
210 | 196 |
211 // Move to the next byte. | 197 // Move to the next byte. |
(...skipping 14 matching lines...) Expand all Loading... |
226 index += startOffset; | 212 index += startOffset; |
227 length -= startOffset; | 213 length -= startOffset; |
228 final int blockCount = length ~/ BLOCK_SIZE; | 214 final int blockCount = length ~/ BLOCK_SIZE; |
229 if (blockCount > 0) { | 215 if (blockCount > 0) { |
230 // Create mask block. | 216 // Create mask block. |
231 int mask = 0; | 217 int mask = 0; |
232 for (int i = 3; i >= 0; i--) { | 218 for (int i = 3; i >= 0; i--) { |
233 mask = (mask << 8) | _maskingBytes[(_unmaskingIndex + i) & 3]; | 219 mask = (mask << 8) | _maskingBytes[(_unmaskingIndex + i) & 3]; |
234 } | 220 } |
235 Int32x4 blockMask = new Int32x4(mask, mask, mask, mask); | 221 Int32x4 blockMask = new Int32x4(mask, mask, mask, mask); |
236 Int32x4List blockBuffer = | 222 Int32x4List blockBuffer = new Int32x4List.view( |
237 new Int32x4List.view(buffer.buffer, index, blockCount); | 223 buffer.buffer, index, blockCount); |
238 for (int i = 0; i < blockBuffer.length; i++) { | 224 for (int i = 0; i < blockBuffer.length; i++) { |
239 blockBuffer[i] ^= blockMask; | 225 blockBuffer[i] ^= blockMask; |
240 } | 226 } |
241 final int bytes = blockCount * BLOCK_SIZE; | 227 final int bytes = blockCount * BLOCK_SIZE; |
242 index += bytes; | 228 index += bytes; |
243 length -= bytes; | 229 length -= bytes; |
244 } | 230 } |
245 } | 231 } |
246 // Handle end. | 232 // Handle end. |
247 final int end = index + length; | 233 final int end = index + length; |
(...skipping 43 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
291 } else { | 277 } else { |
292 _messageFrameEnd(); | 278 _messageFrameEnd(); |
293 } | 279 } |
294 } else { | 280 } else { |
295 _state = PAYLOAD; | 281 _state = PAYLOAD; |
296 } | 282 } |
297 } | 283 } |
298 | 284 |
299 void _messageFrameEnd() { | 285 void _messageFrameEnd() { |
300 if (_fin) { | 286 if (_fin) { |
301 var bytes = _payload.takeBytes(); | |
302 if (_deflate != null && _compressed) { | |
303 bytes = _deflate.processIncomingMessage(bytes); | |
304 } | |
305 | |
306 switch (_currentMessageType) { | 287 switch (_currentMessageType) { |
307 case _WebSocketMessageType.TEXT: | 288 case _WebSocketMessageType.TEXT: |
308 _eventSink.add(UTF8.decode(bytes)); | 289 _eventSink.add(UTF8.decode(_payload.takeBytes())); |
309 break; | 290 break; |
310 case _WebSocketMessageType.BINARY: | 291 case _WebSocketMessageType.BINARY: |
311 _eventSink.add(bytes); | 292 _eventSink.add(_payload.takeBytes()); |
312 break; | 293 break; |
313 } | 294 } |
314 _currentMessageType = _WebSocketMessageType.NONE; | 295 _currentMessageType = _WebSocketMessageType.NONE; |
315 } | 296 } |
316 _prepareForNextFrame(); | 297 _prepareForNextFrame(); |
317 } | 298 } |
318 | 299 |
319 void _controlFrameEnd() { | 300 void _controlFrameEnd() { |
320 switch (_opcode) { | 301 switch (_opcode) { |
321 case _WebSocketOpcode.CLOSE: | 302 case _WebSocketOpcode.CLOSE: |
(...skipping 21 matching lines...) Expand all Loading... |
343 | 324 |
344 case _WebSocketOpcode.PONG: | 325 case _WebSocketOpcode.PONG: |
345 _eventSink.add(new _WebSocketPong(_payload.takeBytes())); | 326 _eventSink.add(new _WebSocketPong(_payload.takeBytes())); |
346 break; | 327 break; |
347 } | 328 } |
348 _prepareForNextFrame(); | 329 _prepareForNextFrame(); |
349 } | 330 } |
350 | 331 |
351 bool _isControlFrame() { | 332 bool _isControlFrame() { |
352 return _opcode == _WebSocketOpcode.CLOSE || | 333 return _opcode == _WebSocketOpcode.CLOSE || |
353 _opcode == _WebSocketOpcode.PING || | 334 _opcode == _WebSocketOpcode.PING || |
354 _opcode == _WebSocketOpcode.PONG; | 335 _opcode == _WebSocketOpcode.PONG; |
355 } | 336 } |
356 | 337 |
357 void _prepareForNextFrame() { | 338 void _prepareForNextFrame() { |
358 if (_state != CLOSED && _state != FAILURE) _state = START; | 339 if (_state != CLOSED && _state != FAILURE) _state = START; |
359 _fin = false; | 340 _fin = false; |
360 _opcode = -1; | 341 _opcode = -1; |
361 _len = -1; | 342 _len = -1; |
362 _remainingLenBytes = -1; | 343 _remainingLenBytes = -1; |
363 _remainingMaskingKeyBytes = 4; | 344 _remainingMaskingKeyBytes = 4; |
364 _remainingPayloadBytes = -1; | 345 _remainingPayloadBytes = -1; |
365 _unmaskingIndex = 0; | 346 _unmaskingIndex = 0; |
366 } | 347 } |
367 } | 348 } |
368 | 349 |
| 350 |
369 class _WebSocketPing { | 351 class _WebSocketPing { |
370 final List<int> payload; | 352 final List<int> payload; |
371 _WebSocketPing([this.payload = null]); | 353 _WebSocketPing([this.payload = null]); |
372 } | 354 } |
373 | 355 |
| 356 |
374 class _WebSocketPong { | 357 class _WebSocketPong { |
375 final List<int> payload; | 358 final List<int> payload; |
376 _WebSocketPong([this.payload = null]); | 359 _WebSocketPong([this.payload = null]); |
377 } | 360 } |
378 | 361 |
| 362 |
379 class _WebSocketTransformerImpl implements WebSocketTransformer { | 363 class _WebSocketTransformerImpl implements WebSocketTransformer { |
380 final StreamController<WebSocket> _controller = | 364 final StreamController<WebSocket> _controller = |
381 new StreamController<WebSocket>(sync: true); | 365 new StreamController<WebSocket>(sync: true); |
382 final Function _protocolSelector; | 366 final Function _protocolSelector; |
383 final CompressionOptions _compression; | |
384 | 367 |
385 _WebSocketTransformerImpl(this._protocolSelector, this._compression); | 368 _WebSocketTransformerImpl(this._protocolSelector); |
386 | 369 |
387 Stream<WebSocket> bind(Stream<HttpRequest> stream) { | 370 Stream<WebSocket> bind(Stream<HttpRequest> stream) { |
388 stream.listen((request) { | 371 stream.listen((request) { |
389 _upgrade(request, _protocolSelector, _compression) | 372 _upgrade(request, _protocolSelector) |
390 .then((WebSocket webSocket) => _controller.add(webSocket)) | 373 .then((WebSocket webSocket) => _controller.add(webSocket)) |
391 .catchError(_controller.addError); | 374 .catchError(_controller.addError); |
392 }, onDone: () { | 375 }, onDone: () { |
393 _controller.close(); | 376 _controller.close(); |
394 }); | 377 }); |
395 | 378 |
396 return _controller.stream; | 379 return _controller.stream; |
397 } | 380 } |
398 | 381 |
399 static Future<WebSocket> _upgrade( | 382 static Future<WebSocket> _upgrade(HttpRequest request, _protocolSelector) { |
400 HttpRequest request, _protocolSelector, CompressionOptions compression) { | |
401 var response = request.response; | 383 var response = request.response; |
402 if (!_isUpgradeRequest(request)) { | 384 if (!_isUpgradeRequest(request)) { |
403 // Send error response. | 385 // Send error response. |
404 response | 386 response |
405 ..statusCode = HttpStatus.BAD_REQUEST | 387 ..statusCode = HttpStatus.BAD_REQUEST |
406 ..close(); | 388 ..close(); |
407 return new Future.error( | 389 return new Future.error( |
408 new WebSocketException("Invalid WebSocket upgrade request")); | 390 new WebSocketException("Invalid WebSocket upgrade request")); |
409 } | 391 } |
410 | 392 |
411 Future upgrade(String protocol) { | 393 Future upgrade(String protocol) { |
412 // Send the upgrade response. | 394 // Send the upgrade response. |
413 response | 395 response |
414 ..statusCode = HttpStatus.SWITCHING_PROTOCOLS | 396 ..statusCode = HttpStatus.SWITCHING_PROTOCOLS |
415 ..headers.add(HttpHeaders.CONNECTION, "Upgrade") | 397 ..headers.add(HttpHeaders.CONNECTION, "Upgrade") |
416 ..headers.add(HttpHeaders.UPGRADE, "websocket"); | 398 ..headers.add(HttpHeaders.UPGRADE, "websocket"); |
417 String key = request.headers.value("Sec-WebSocket-Key"); | 399 String key = request.headers.value("Sec-WebSocket-Key"); |
418 _SHA1 sha1 = new _SHA1(); | 400 _SHA1 sha1 = new _SHA1(); |
419 sha1.add("$key$_webSocketGUID".codeUnits); | 401 sha1.add("$key$_webSocketGUID".codeUnits); |
420 String accept = _CryptoUtils.bytesToBase64(sha1.close()); | 402 String accept = _CryptoUtils.bytesToBase64(sha1.close()); |
421 response.headers.add("Sec-WebSocket-Accept", accept); | 403 response.headers.add("Sec-WebSocket-Accept", accept); |
422 if (protocol != null) { | 404 if (protocol != null) { |
423 response.headers.add("Sec-WebSocket-Protocol", protocol); | 405 response.headers.add("Sec-WebSocket-Protocol", protocol); |
424 } | 406 } |
425 | |
426 var deflate = _negotiateCompression(request, response, compression); | |
427 | |
428 response.headers.contentLength = 0; | 407 response.headers.contentLength = 0; |
429 return response.detachSocket().then((socket) => | 408 return response.detachSocket() |
430 new _WebSocketImpl._fromSocket( | 409 .then((socket) => new _WebSocketImpl._fromSocket( |
431 socket, protocol, compression, true, deflate)); | 410 socket, protocol, true)); |
432 } | 411 } |
433 | 412 |
434 var protocols = request.headers['Sec-WebSocket-Protocol']; | 413 var protocols = request.headers['Sec-WebSocket-Protocol']; |
435 if (protocols != null && _protocolSelector != null) { | 414 if (protocols != null && _protocolSelector != null) { |
436 // The suggested protocols can be spread over multiple lines, each | 415 // The suggested protocols can be spread over multiple lines, each |
437 // consisting of multiple protocols. To unify all of them, first join | 416 // consisting of multiple protocols. To unify all of them, first join |
438 // the lists with ', ' and then tokenize. | 417 // the lists with ', ' and then tokenize. |
439 protocols = _HttpParser._tokenizeFieldValue(protocols.join(', ')); | 418 protocols = _HttpParser._tokenizeFieldValue(protocols.join(', ')); |
440 return new Future(() => _protocolSelector(protocols)).then((protocol) { | 419 return new Future(() => _protocolSelector(protocols)) |
441 if (protocols.indexOf(protocol) < 0) { | 420 .then((protocol) { |
442 throw new WebSocketException( | 421 if (protocols.indexOf(protocol) < 0) { |
443 "Selected protocol is not in the list of available protocols"); | 422 throw new WebSocketException( |
444 } | 423 "Selected protocol is not in the list of available protocols"); |
445 return protocol; | 424 } |
446 }).catchError((error) { | 425 return protocol; |
447 response | 426 }) |
448 ..statusCode = HttpStatus.INTERNAL_SERVER_ERROR | 427 .catchError((error) { |
449 ..close(); | 428 response |
450 throw error; | 429 ..statusCode = HttpStatus.INTERNAL_SERVER_ERROR |
451 }).then(upgrade); | 430 ..close(); |
| 431 throw error; |
| 432 }) |
| 433 .then(upgrade); |
452 } else { | 434 } else { |
453 return upgrade(null); | 435 return upgrade(null); |
454 } | 436 } |
455 } | 437 } |
456 | 438 |
457 static _WebSocketPerMessageDeflate _negotiateCompression(HttpRequest request, | |
458 HttpResponse response, CompressionOptions compression) { | |
459 var extensionHeader = request.headers.value("Sec-WebSocket-Extensions"); | |
460 | |
461 if (extensionHeader == null) { | |
462 extensionHeader = ""; | |
463 } | |
464 | |
465 var hv = HeaderValue.parse(extensionHeader, valueSeparator: ','); | |
466 if (compression.enabled && hv.value == _WebSocketImpl.PER_MESSAGE_DEFLATE) { | |
467 var info = compression._createHeader(hv); | |
468 | |
469 response.headers.add("Sec-WebSocket-Extensions", info[0]); | |
470 var serverNoContextTakeover = | |
471 hv.parameters.containsKey(_serverNoContextTakeover); | |
472 var clientNoContextTakeover = | |
473 hv.parameters.containsKey(_clientNoContextTakeover); | |
474 var deflate = new _WebSocketPerMessageDeflate( | |
475 serverNoContextTakeover: serverNoContextTakeover, | |
476 clientNoContextTakeover: clientNoContextTakeover, | |
477 serverMaxWindowBits: info[1], | |
478 clientMaxWindowBits: info[1], | |
479 serverSide: true); | |
480 | |
481 return deflate; | |
482 } | |
483 | |
484 return null; | |
485 } | |
486 | |
487 static bool _isUpgradeRequest(HttpRequest request) { | 439 static bool _isUpgradeRequest(HttpRequest request) { |
488 if (request.method != "GET") { | 440 if (request.method != "GET") { |
489 return false; | 441 return false; |
490 } | 442 } |
491 if (request.headers[HttpHeaders.CONNECTION] == null) { | 443 if (request.headers[HttpHeaders.CONNECTION] == null) { |
492 return false; | 444 return false; |
493 } | 445 } |
494 bool isUpgrade = false; | 446 bool isUpgrade = false; |
495 request.headers[HttpHeaders.CONNECTION].forEach((String value) { | 447 request.headers[HttpHeaders.CONNECTION].forEach((String value) { |
496 if (value.toLowerCase() == "upgrade") isUpgrade = true; | 448 if (value.toLowerCase() == "upgrade") isUpgrade = true; |
497 }); | 449 }); |
498 if (!isUpgrade) return false; | 450 if (!isUpgrade) return false; |
499 String upgrade = request.headers.value(HttpHeaders.UPGRADE); | 451 String upgrade = request.headers.value(HttpHeaders.UPGRADE); |
500 if (upgrade == null || upgrade.toLowerCase() != "websocket") { | 452 if (upgrade == null || upgrade.toLowerCase() != "websocket") { |
501 return false; | 453 return false; |
502 } | 454 } |
503 String version = request.headers.value("Sec-WebSocket-Version"); | 455 String version = request.headers.value("Sec-WebSocket-Version"); |
504 if (version == null || version != "13") { | 456 if (version == null || version != "13") { |
505 return false; | 457 return false; |
506 } | 458 } |
507 String key = request.headers.value("Sec-WebSocket-Key"); | 459 String key = request.headers.value("Sec-WebSocket-Key"); |
508 if (key == null) { | 460 if (key == null) { |
509 return false; | 461 return false; |
510 } | 462 } |
511 return true; | 463 return true; |
512 } | 464 } |
513 } | 465 } |
514 | 466 |
515 class _WebSocketPerMessageDeflate { | |
516 bool serverNoContextTakeover; | |
517 bool clientNoContextTakeover; | |
518 int clientMaxWindowBits; | |
519 int serverMaxWindowBits; | |
520 bool serverSide; | |
521 | |
522 _Filter decoder; | |
523 _Filter encoder; | |
524 | |
525 _WebSocketPerMessageDeflate( | |
526 {this.clientMaxWindowBits: _WebSocketImpl.DEFAULT_WINDOW_BITS, | |
527 this.serverMaxWindowBits: _WebSocketImpl.DEFAULT_WINDOW_BITS, | |
528 this.serverNoContextTakeover: false, | |
529 this.clientNoContextTakeover: false, | |
530 this.serverSide: false}); | |
531 | |
532 void _ensureDecoder() { | |
533 if (decoder == null) { | |
534 decoder = _Filter._newZLibInflateFilter( | |
535 serverSide ? clientMaxWindowBits : serverMaxWindowBits, null, true); | |
536 } | |
537 } | |
538 | |
539 void _ensureEncoder() { | |
540 if (encoder == null) { | |
541 encoder = _Filter._newZLibDeflateFilter( | |
542 false, | |
543 ZLibOption.DEFAULT_LEVEL, | |
544 serverSide ? serverMaxWindowBits : clientMaxWindowBits, | |
545 ZLibOption.DEFAULT_MEM_LEVEL, | |
546 ZLibOption.STRATEGY_DEFAULT, | |
547 null, | |
548 true); | |
549 } | |
550 } | |
551 | |
552 List<int> processIncomingMessage(List<int> msg) { | |
553 _ensureDecoder(); | |
554 | |
555 var data = []; | |
556 data.addAll(msg); | |
557 data.addAll(const [0x00, 0x00, 0xff, 0xff]); | |
558 | |
559 decoder.process(data, 0, data.length); | |
560 var reuse = | |
561 !(serverSide ? clientNoContextTakeover : serverNoContextTakeover); | |
562 var result = []; | |
563 var out; | |
564 | |
565 while ((out = decoder.processed(flush: reuse)) != null) { | |
566 result.addAll(out); | |
567 } | |
568 | |
569 decoder.processed(flush: reuse); | |
570 | |
571 if (!reuse) { | |
572 decoder.end(); | |
573 decoder = null; | |
574 } | |
575 return result; | |
576 } | |
577 | |
578 List<int> processOutgoingMessage(List<int> msg) { | |
579 _ensureEncoder(); | |
580 var reuse = | |
581 !(serverSide ? serverNoContextTakeover : clientNoContextTakeover); | |
582 var result = []; | |
583 Uint8List buffer; | |
584 var out; | |
585 | |
586 if (msg is! Uint8List) { | |
587 for (var i = 0; i < msg.length; i++) { | |
588 if (msg[i] < 0 || 255 < msg[i]) { | |
589 throw new ArgumentError("List element is not a byte value " | |
590 "(value ${msg[i]} at index $i)"); | |
591 } | |
592 } | |
593 buffer = new Uint8List.fromList(msg); | |
594 } else { | |
595 buffer = msg; | |
596 } | |
597 | |
598 encoder.process(buffer, 0, buffer.length); | |
599 | |
600 while ((out = encoder.processed(flush: reuse)) != null) { | |
601 result.addAll(out); | |
602 } | |
603 | |
604 if (serverSide ? serverNoContextTakeover : clientNoContextTakeover) { | |
605 encoder.end(); | |
606 encoder = null; | |
607 } | |
608 | |
609 if (result.length > 4) { | |
610 result = result.sublist(0, result.length - 4); | |
611 } | |
612 | |
613 return result; | |
614 } | |
615 } | |
616 | 467 |
617 // TODO(ajohnsen): Make this transformer reusable. | 468 // TODO(ajohnsen): Make this transformer reusable. |
618 class _WebSocketOutgoingTransformer implements StreamTransformer, EventSink { | 469 class _WebSocketOutgoingTransformer implements StreamTransformer, EventSink { |
619 final _WebSocketImpl webSocket; | 470 final _WebSocketImpl webSocket; |
620 EventSink _eventSink; | 471 EventSink _eventSink; |
621 | 472 |
622 _WebSocketPerMessageDeflate _deflateHelper; | 473 _WebSocketOutgoingTransformer(this.webSocket); |
623 | |
624 _WebSocketOutgoingTransformer(this.webSocket) { | |
625 _deflateHelper = webSocket._deflate; | |
626 } | |
627 | 474 |
628 Stream bind(Stream stream) { | 475 Stream bind(Stream stream) { |
629 return new Stream.eventTransformed(stream, (EventSink eventSink) { | 476 return new Stream.eventTransformed( |
630 if (_eventSink != null) { | 477 stream, |
631 throw new StateError("WebSocket transformer already used"); | 478 (EventSink eventSink) { |
632 } | 479 if (_eventSink != null) { |
633 _eventSink = eventSink; | 480 throw new StateError("WebSocket transformer already used"); |
634 return this; | 481 } |
635 }); | 482 _eventSink = eventSink; |
| 483 return this; |
| 484 }); |
636 } | 485 } |
637 | 486 |
638 void add(message) { | 487 void add(message) { |
639 if (message is _WebSocketPong) { | 488 if (message is _WebSocketPong) { |
640 addFrame(_WebSocketOpcode.PONG, message.payload); | 489 addFrame(_WebSocketOpcode.PONG, message.payload); |
641 return; | 490 return; |
642 } | 491 } |
643 if (message is _WebSocketPing) { | 492 if (message is _WebSocketPing) { |
644 addFrame(_WebSocketOpcode.PING, message.payload); | 493 addFrame(_WebSocketOpcode.PING, message.payload); |
645 return; | 494 return; |
646 } | 495 } |
647 List<int> data; | 496 List<int> data; |
648 int opcode; | 497 int opcode; |
649 if (message != null) { | 498 if (message != null) { |
650 if (message is String) { | 499 if (message is String) { |
651 opcode = _WebSocketOpcode.TEXT; | 500 opcode = _WebSocketOpcode.TEXT; |
652 data = UTF8.encode(message); | 501 data = UTF8.encode(message); |
653 } else { | 502 } else { |
654 if (message is! List<int>) { | 503 if (message is !List<int>) { |
655 throw new ArgumentError(message); | 504 throw new ArgumentError(message); |
656 } | 505 } |
657 opcode = _WebSocketOpcode.BINARY; | 506 opcode = _WebSocketOpcode.BINARY; |
658 data = message; | 507 data = message; |
659 } | 508 } |
660 | |
661 if (_deflateHelper != null) { | |
662 data = _deflateHelper.processOutgoingMessage(data); | |
663 } | |
664 } else { | 509 } else { |
665 opcode = _WebSocketOpcode.TEXT; | 510 opcode = _WebSocketOpcode.TEXT; |
666 } | 511 } |
667 addFrame(opcode, data); | 512 addFrame(opcode, data); |
668 } | 513 } |
669 | 514 |
670 void addError(Object error, [StackTrace stackTrace]) => | 515 void addError(Object error, [StackTrace stackTrace]) => |
671 _eventSink.addError(error, stackTrace); | 516 _eventSink.addError(error, stackTrace); |
672 | 517 |
673 void close() { | 518 void close() { |
674 int code = webSocket._outCloseCode; | 519 int code = webSocket._outCloseCode; |
675 String reason = webSocket._outCloseReason; | 520 String reason = webSocket._outCloseReason; |
676 List<int> data; | 521 List<int> data; |
677 if (code != null) { | 522 if (code != null) { |
678 data = new List<int>(); | 523 data = new List<int>(); |
679 data.add((code >> 8) & 0xFF); | 524 data.add((code >> 8) & 0xFF); |
680 data.add(code & 0xFF); | 525 data.add(code & 0xFF); |
681 if (reason != null) { | 526 if (reason != null) { |
682 data.addAll(UTF8.encode(reason)); | 527 data.addAll(UTF8.encode(reason)); |
683 } | 528 } |
684 } | 529 } |
685 addFrame(_WebSocketOpcode.CLOSE, data); | 530 addFrame(_WebSocketOpcode.CLOSE, data); |
686 _eventSink.close(); | 531 _eventSink.close(); |
687 } | 532 } |
688 | 533 |
689 void addFrame(int opcode, List<int> data) => createFrame( | 534 void addFrame(int opcode, List<int> data) => |
690 opcode, | 535 createFrame(opcode, data, webSocket._serverSide).forEach(_eventSink.add); |
691 data, | |
692 webSocket._serverSide, | |
693 _deflateHelper != null && | |
694 (opcode == _WebSocketOpcode.TEXT || | |
695 opcode == _WebSocketOpcode.BINARY)).forEach((e) { | |
696 _eventSink.add(e); | |
697 }); | |
698 | 536 |
699 static Iterable createFrame( | 537 static Iterable createFrame(int opcode, List<int> data, bool serverSide) { |
700 int opcode, List<int> data, bool serverSide, bool compressed) { | 538 bool mask = !serverSide; // Masking not implemented for server. |
701 bool mask = !serverSide; // Masking not implemented for server. | |
702 int dataLength = data == null ? 0 : data.length; | 539 int dataLength = data == null ? 0 : data.length; |
703 // Determine the header size. | 540 // Determine the header size. |
704 int headerSize = (mask) ? 6 : 2; | 541 int headerSize = (mask) ? 6 : 2; |
705 if (dataLength > 65535) { | 542 if (dataLength > 65535) { |
706 headerSize += 8; | 543 headerSize += 8; |
707 } else if (dataLength > 125) { | 544 } else if (dataLength > 125) { |
708 headerSize += 2; | 545 headerSize += 2; |
709 } | 546 } |
710 Uint8List header = new Uint8List(headerSize); | 547 Uint8List header = new Uint8List(headerSize); |
711 int index = 0; | 548 int index = 0; |
712 | |
713 // Set FIN and opcode. | 549 // Set FIN and opcode. |
714 var hoc = _WebSocketProtocolTransformer.FIN | 550 header[index++] = 0x80 | opcode; |
715 | (compressed ? _WebSocketProtocolTransformer.RSV1 : 0) | |
716 | (opcode & _WebSocketProtocolTransformer.OPCODE); | |
717 | |
718 header[index++] = hoc; | |
719 // Determine size and position of length field. | 551 // Determine size and position of length field. |
720 int lengthBytes = 1; | 552 int lengthBytes = 1; |
| 553 int firstLengthByte = 1; |
721 if (dataLength > 65535) { | 554 if (dataLength > 65535) { |
722 header[index++] = 127; | 555 header[index++] = 127; |
723 lengthBytes = 8; | 556 lengthBytes = 8; |
724 } else if (dataLength > 125) { | 557 } else if (dataLength > 125) { |
725 header[index++] = 126; | 558 header[index++] = 126; |
726 lengthBytes = 2; | 559 lengthBytes = 2; |
727 } | 560 } |
728 // Write the length in network byte order into the header. | 561 // Write the length in network byte order into the header. |
729 for (int i = 0; i < lengthBytes; i++) { | 562 for (int i = 0; i < lengthBytes; i++) { |
730 header[index++] = dataLength >> (((lengthBytes - 1) - i) * 8) & 0xFF; | 563 header[index++] = dataLength >> (((lengthBytes - 1) - i) * 8) & 0xFF; |
731 } | 564 } |
732 if (mask) { | 565 if (mask) { |
733 header[1] |= 1 << 7; | 566 header[1] |= 1 << 7; |
734 var maskBytes = _IOCrypto.getRandomBytes(4); | 567 var maskBytes = _IOCrypto.getRandomBytes(4); |
735 header.setRange(index, index + 4, maskBytes); | 568 header.setRange(index, index + 4, maskBytes); |
736 index += 4; | 569 index += 4; |
737 if (data != null) { | 570 if (data != null) { |
738 Uint8List list; | 571 Uint8List list; |
739 // If this is a text message just do the masking inside the | 572 // If this is a text message just do the masking inside the |
740 // encoded data. | 573 // encoded data. |
741 if (opcode == _WebSocketOpcode.TEXT && data is Uint8List) { | 574 if (opcode == _WebSocketOpcode.TEXT && data is Uint8List) { |
742 list = data; | 575 list = data; |
743 } else { | 576 } else { |
744 if (data is Uint8List) { | 577 if (data is Uint8List) { |
745 list = new Uint8List.fromList(data); | 578 list = new Uint8List.fromList(data); |
746 } else { | 579 } else { |
747 list = new Uint8List(data.length); | 580 list = new Uint8List(data.length); |
748 for (int i = 0; i < data.length; i++) { | 581 for (int i = 0; i < data.length; i++) { |
749 if (data[i] < 0 || 255 < data[i]) { | 582 if (data[i] < 0 || 255 < data[i]) { |
750 throw new ArgumentError("List element is not a byte value " | 583 throw new ArgumentError( |
| 584 "List element is not a byte value " |
751 "(value ${data[i]} at index $i)"); | 585 "(value ${data[i]} at index $i)"); |
752 } | 586 } |
753 list[i] = data[i]; | 587 list[i] = data[i]; |
754 } | 588 } |
755 } | 589 } |
756 } | 590 } |
757 const int BLOCK_SIZE = 16; | 591 const int BLOCK_SIZE = 16; |
758 int blockCount = list.length ~/ BLOCK_SIZE; | 592 int blockCount = list.length ~/ BLOCK_SIZE; |
759 if (blockCount > 0) { | 593 if (blockCount > 0) { |
760 // Create mask block. | 594 // Create mask block. |
761 int mask = 0; | 595 int mask = 0; |
762 for (int i = 3; i >= 0; i--) { | 596 for (int i = 3; i >= 0; i--) { |
763 mask = (mask << 8) | maskBytes[i]; | 597 mask = (mask << 8) | maskBytes[i]; |
764 } | 598 } |
765 Int32x4 blockMask = new Int32x4(mask, mask, mask, mask); | 599 Int32x4 blockMask = new Int32x4(mask, mask, mask, mask); |
766 Int32x4List blockBuffer = | 600 Int32x4List blockBuffer = new Int32x4List.view( |
767 new Int32x4List.view(list.buffer, 0, blockCount); | 601 list.buffer, 0, blockCount); |
768 for (int i = 0; i < blockBuffer.length; i++) { | 602 for (int i = 0; i < blockBuffer.length; i++) { |
769 blockBuffer[i] ^= blockMask; | 603 blockBuffer[i] ^= blockMask; |
770 } | 604 } |
771 } | 605 } |
772 // Handle end. | 606 // Handle end. |
773 for (int i = blockCount * BLOCK_SIZE; i < list.length; i++) { | 607 for (int i = blockCount * BLOCK_SIZE; i < list.length; i++) { |
774 list[i] ^= maskBytes[i & 3]; | 608 list[i] ^= maskBytes[i & 3]; |
775 } | 609 } |
776 data = list; | 610 data = list; |
777 } | 611 } |
778 } | 612 } |
779 assert(index == headerSize); | 613 assert(index == headerSize); |
780 if (data == null) { | 614 if (data == null) { |
781 return [header]; | 615 return [header]; |
782 } else { | 616 } else { |
783 return [header, data]; | 617 return [header, data]; |
784 } | 618 } |
785 } | 619 } |
786 } | 620 } |
787 | 621 |
| 622 |
788 class _WebSocketConsumer implements StreamConsumer { | 623 class _WebSocketConsumer implements StreamConsumer { |
789 final _WebSocketImpl webSocket; | 624 final _WebSocketImpl webSocket; |
790 final Socket socket; | 625 final Socket socket; |
791 StreamController _controller; | 626 StreamController _controller; |
792 StreamSubscription _subscription; | 627 StreamSubscription _subscription; |
793 bool _issuedPause = false; | 628 bool _issuedPause = false; |
794 bool _closed = false; | 629 bool _closed = false; |
795 Completer _closeCompleter = new Completer(); | 630 Completer _closeCompleter = new Completer(); |
796 Completer _completer; | 631 Completer _completer; |
797 | 632 |
(...skipping 24 matching lines...) Expand all Loading... |
822 void _cancel() { | 657 void _cancel() { |
823 if (_subscription != null) { | 658 if (_subscription != null) { |
824 var subscription = _subscription; | 659 var subscription = _subscription; |
825 _subscription = null; | 660 _subscription = null; |
826 subscription.cancel(); | 661 subscription.cancel(); |
827 } | 662 } |
828 } | 663 } |
829 | 664 |
830 _ensureController() { | 665 _ensureController() { |
831 if (_controller != null) return; | 666 if (_controller != null) return; |
832 _controller = new StreamController( | 667 _controller = new StreamController(sync: true, |
833 sync: true, | 668 onPause: _onPause, |
834 onPause: _onPause, | 669 onResume: _onResume, |
835 onResume: _onResume, | 670 onCancel: _onListen); |
836 onCancel: _onListen); | 671 var stream = _controller.stream.transform( |
837 var stream = _controller.stream | 672 new _WebSocketOutgoingTransformer(webSocket)); |
838 .transform(new _WebSocketOutgoingTransformer(webSocket)); | 673 socket.addStream(stream) |
839 socket.addStream(stream).then((_) { | 674 .then((_) { |
840 _done(); | 675 _done(); |
841 _closeCompleter.complete(webSocket); | 676 _closeCompleter.complete(webSocket); |
842 }, onError: (error, StackTrace stackTrace) { | 677 }, onError: (error, StackTrace stackTrace) { |
843 _closed = true; | 678 _closed = true; |
844 _cancel(); | 679 _cancel(); |
845 if (error is ArgumentError) { | 680 if (error is ArgumentError) { |
846 if (!_done(error, stackTrace)) { | 681 if (!_done(error, stackTrace)) { |
847 _closeCompleter.completeError(error, stackTrace); | 682 _closeCompleter.completeError(error, stackTrace); |
848 } | 683 } |
849 } else { | 684 } else { |
850 _done(); | 685 _done(); |
851 _closeCompleter.complete(webSocket); | 686 _closeCompleter.complete(webSocket); |
852 } | 687 } |
853 }); | 688 }); |
854 } | 689 } |
855 | 690 |
856 bool _done([error, StackTrace stackTrace]) { | 691 bool _done([error, StackTrace stackTrace]) { |
857 if (_completer == null) return false; | 692 if (_completer == null) return false; |
858 if (error != null) { | 693 if (error != null) { |
859 _completer.completeError(error, stackTrace); | 694 _completer.completeError(error, stackTrace); |
860 } else { | 695 } else { |
861 _completer.complete(webSocket); | 696 _completer.complete(webSocket); |
862 } | 697 } |
863 _completer = null; | 698 _completer = null; |
864 return true; | 699 return true; |
865 } | 700 } |
866 | 701 |
867 Future addStream(var stream) { | 702 Future addStream(var stream) { |
868 if (_closed) { | 703 if (_closed) { |
869 stream.listen(null).cancel(); | 704 stream.listen(null).cancel(); |
870 return new Future.value(webSocket); | 705 return new Future.value(webSocket); |
871 } | 706 } |
872 _ensureController(); | 707 _ensureController(); |
873 _completer = new Completer(); | 708 _completer = new Completer(); |
874 _subscription = stream.listen((data) { | 709 _subscription = stream.listen( |
875 _controller.add(data); | 710 (data) { |
876 }, onDone: _done, onError: _done, cancelOnError: true); | 711 _controller.add(data); |
| 712 }, |
| 713 onDone: _done, |
| 714 onError: _done, |
| 715 cancelOnError: true); |
877 if (_issuedPause) { | 716 if (_issuedPause) { |
878 _subscription.pause(); | 717 _subscription.pause(); |
879 _issuedPause = false; | 718 _issuedPause = false; |
880 } | 719 } |
881 return _completer.future; | 720 return _completer.future; |
882 } | 721 } |
883 | 722 |
884 Future close() { | 723 Future close() { |
885 _ensureController(); | 724 _ensureController(); |
886 Future closeSocket() { | 725 Future closeSocket() { |
887 return socket.close().catchError((_) {}).then((_) => webSocket); | 726 return socket.close().catchError((_) {}).then((_) => webSocket); |
888 } | 727 } |
889 _controller.close(); | 728 _controller.close(); |
890 return _closeCompleter.future.then((_) => closeSocket()); | 729 return _closeCompleter.future.then((_) => closeSocket()); |
891 } | 730 } |
892 | 731 |
893 void add(data) { | 732 void add(data) { |
894 if (_closed) return; | 733 if (_closed) return; |
895 _ensureController(); | 734 _ensureController(); |
896 _controller.add(data); | 735 _controller.add(data); |
897 } | 736 } |
898 | 737 |
899 void closeSocket() { | 738 void closeSocket() { |
900 _closed = true; | 739 _closed = true; |
901 _cancel(); | 740 _cancel(); |
902 close(); | 741 close(); |
903 } | 742 } |
904 } | 743 } |
905 | 744 |
| 745 |
906 class _WebSocketImpl extends Stream with _ServiceObject implements WebSocket { | 746 class _WebSocketImpl extends Stream with _ServiceObject implements WebSocket { |
907 // Use default Map so we keep order. | 747 // Use default Map so we keep order. |
908 static Map<int, _WebSocketImpl> _webSockets = new Map<int, _WebSocketImpl>(); | 748 static Map<int, _WebSocketImpl> _webSockets = new Map<int, _WebSocketImpl>(); |
909 static const int DEFAULT_WINDOW_BITS = 15; | |
910 static const String PER_MESSAGE_DEFLATE = "permessage-deflate"; | |
911 | 749 |
912 final String protocol; | 750 final String protocol; |
913 | 751 |
914 StreamController _controller; | 752 StreamController _controller; |
915 StreamSubscription _subscription; | 753 StreamSubscription _subscription; |
916 StreamSink _sink; | 754 StreamSink _sink; |
917 | 755 |
918 final _socket; | 756 final _socket; |
919 final bool _serverSide; | 757 final bool _serverSide; |
920 int _readyState = WebSocket.CONNECTING; | 758 int _readyState = WebSocket.CONNECTING; |
921 bool _writeClosed = false; | 759 bool _writeClosed = false; |
922 int _closeCode; | 760 int _closeCode; |
923 String _closeReason; | 761 String _closeReason; |
924 Duration _pingInterval; | 762 Duration _pingInterval; |
925 Timer _pingTimer; | 763 Timer _pingTimer; |
926 _WebSocketConsumer _consumer; | 764 _WebSocketConsumer _consumer; |
927 | 765 |
928 int _outCloseCode; | 766 int _outCloseCode; |
929 String _outCloseReason; | 767 String _outCloseReason; |
930 Timer _closeTimer; | 768 Timer _closeTimer; |
931 _WebSocketPerMessageDeflate _deflate; | |
932 | 769 |
933 static final HttpClient _httpClient = new HttpClient(); | 770 static final HttpClient _httpClient = new HttpClient(); |
934 | 771 |
935 static Future<WebSocket> connect( | 772 static Future<WebSocket> connect( |
936 String url, Iterable<String> protocols, Map<String, dynamic> headers, | 773 String url, Iterable<String> protocols, Map<String, dynamic> headers) { |
937 {CompressionOptions compression: CompressionOptions.DEFAULT}) { | |
938 Uri uri = Uri.parse(url); | 774 Uri uri = Uri.parse(url); |
939 if (uri.scheme != "ws" && uri.scheme != "wss") { | 775 if (uri.scheme != "ws" && uri.scheme != "wss") { |
940 throw new WebSocketException("Unsupported URL scheme '${uri.scheme}'"); | 776 throw new WebSocketException("Unsupported URL scheme '${uri.scheme}'"); |
941 } | 777 } |
942 | 778 |
943 Random random = new Random(); | 779 Random random = new Random(); |
944 // Generate 16 random bytes. | 780 // Generate 16 random bytes. |
945 Uint8List nonceData = new Uint8List(16); | 781 Uint8List nonceData = new Uint8List(16); |
946 for (int i = 0; i < 16; i++) { | 782 for (int i = 0; i < 16; i++) { |
947 nonceData[i] = random.nextInt(256); | 783 nonceData[i] = random.nextInt(256); |
948 } | 784 } |
949 String nonce = _CryptoUtils.bytesToBase64(nonceData); | 785 String nonce = _CryptoUtils.bytesToBase64(nonceData); |
950 | 786 |
951 uri = new Uri( | 787 uri = new Uri(scheme: uri.scheme == "wss" ? "https" : "http", |
952 scheme: uri.scheme == "wss" ? "https" : "http", | 788 userInfo: uri.userInfo, |
953 userInfo: uri.userInfo, | 789 host: uri.host, |
954 host: uri.host, | 790 port: uri.port, |
955 port: uri.port, | 791 path: uri.path, |
956 path: uri.path, | 792 query: uri.query, |
957 query: uri.query, | 793 fragment: uri.fragment); |
958 fragment: uri.fragment); | 794 return _httpClient.openUrl("GET", uri) |
959 return _httpClient.openUrl("GET", uri).then((request) { | 795 .then((request) { |
960 if (uri.userInfo != null && !uri.userInfo.isEmpty) { | 796 if (uri.userInfo != null && !uri.userInfo.isEmpty) { |
961 // If the URL contains user information use that for basic | 797 // If the URL contains user information use that for basic |
962 // authorization. | 798 // authorization. |
963 String auth = _CryptoUtils.bytesToBase64(UTF8.encode(uri.userInfo)); | 799 String auth = |
964 request.headers.set(HttpHeaders.AUTHORIZATION, "Basic $auth"); | 800 _CryptoUtils.bytesToBase64(UTF8.encode(uri.userInfo)); |
965 } | 801 request.headers.set(HttpHeaders.AUTHORIZATION, "Basic $auth"); |
966 if (headers != null) { | 802 } |
967 headers.forEach((field, value) => request.headers.add(field, value)); | 803 if (headers != null) { |
968 } | 804 headers.forEach((field, value) => request.headers.add(field, value)); |
969 // Setup the initial handshake. | 805 } |
970 request.headers | 806 // Setup the initial handshake. |
971 ..set(HttpHeaders.CONNECTION, "Upgrade") | |
972 ..set(HttpHeaders.UPGRADE, "websocket") | |
973 ..set("Sec-WebSocket-Key", nonce) | |
974 ..set("Cache-Control", "no-cache") | |
975 ..set("Sec-WebSocket-Version", "13"); | |
976 if (protocols != null) { | |
977 request.headers.add("Sec-WebSocket-Protocol", protocols.toList()); | |
978 } | |
979 | |
980 if (compression.enabled) { | |
981 request.headers | 807 request.headers |
982 .add("Sec-WebSocket-Extensions", compression._createHeader()); | 808 ..set(HttpHeaders.CONNECTION, "Upgrade") |
983 } | 809 ..set(HttpHeaders.UPGRADE, "websocket") |
984 | 810 ..set("Sec-WebSocket-Key", nonce) |
985 return request.close(); | 811 ..set("Cache-Control", "no-cache") |
986 }).then((response) { | 812 ..set("Sec-WebSocket-Version", "13"); |
987 void error(String message) { | 813 if (protocols != null) { |
988 // Flush data. | 814 request.headers.add("Sec-WebSocket-Protocol", protocols.toList()); |
989 response.detachSocket().then((socket) { | |
990 socket.destroy(); | |
991 }); | |
992 throw new WebSocketException(message); | |
993 } | |
994 if (response.statusCode != HttpStatus.SWITCHING_PROTOCOLS || | |
995 response.headers[HttpHeaders.CONNECTION] == null || | |
996 !response.headers[HttpHeaders.CONNECTION] | |
997 .any((value) => value.toLowerCase() == "upgrade") || | |
998 response.headers.value(HttpHeaders.UPGRADE).toLowerCase() != | |
999 "websocket") { | |
1000 error("Connection to '$uri' was not upgraded to websocket"); | |
1001 } | |
1002 String accept = response.headers.value("Sec-WebSocket-Accept"); | |
1003 if (accept == null) { | |
1004 error("Response did not contain a 'Sec-WebSocket-Accept' header"); | |
1005 } | |
1006 _SHA1 sha1 = new _SHA1(); | |
1007 sha1.add("$nonce$_webSocketGUID".codeUnits); | |
1008 List<int> expectedAccept = sha1.close(); | |
1009 List<int> receivedAccept = _CryptoUtils.base64StringToBytes(accept); | |
1010 if (expectedAccept.length != receivedAccept.length) { | |
1011 error("Reasponse header 'Sec-WebSocket-Accept' is the wrong length"); | |
1012 } | |
1013 for (int i = 0; i < expectedAccept.length; i++) { | |
1014 if (expectedAccept[i] != receivedAccept[i]) { | |
1015 error("Bad response 'Sec-WebSocket-Accept' header"); | |
1016 } | 815 } |
1017 } | 816 return request.close(); |
1018 var protocol = response.headers.value('Sec-WebSocket-Protocol'); | 817 }) |
1019 | 818 .then((response) { |
1020 _WebSocketPerMessageDeflate deflate = | 819 void error(String message) { |
1021 negotiateClientCompression(response, compression); | 820 // Flush data. |
1022 | 821 response.detachSocket().then((socket) { |
1023 return response.detachSocket().then((socket) => | 822 socket.destroy(); |
1024 new _WebSocketImpl._fromSocket( | 823 }); |
1025 socket, protocol, compression, false, deflate)); | 824 throw new WebSocketException(message); |
1026 }); | 825 } |
| 826 if (response.statusCode != HttpStatus.SWITCHING_PROTOCOLS || |
| 827 response.headers[HttpHeaders.CONNECTION] == null || |
| 828 !response.headers[HttpHeaders.CONNECTION].any( |
| 829 (value) => value.toLowerCase() == "upgrade") || |
| 830 response.headers.value(HttpHeaders.UPGRADE).toLowerCase() != |
| 831 "websocket") { |
| 832 error("Connection to '$uri' was not upgraded to websocket"); |
| 833 } |
| 834 String accept = response.headers.value("Sec-WebSocket-Accept"); |
| 835 if (accept == null) { |
| 836 error("Response did not contain a 'Sec-WebSocket-Accept' header"); |
| 837 } |
| 838 _SHA1 sha1 = new _SHA1(); |
| 839 sha1.add("$nonce$_webSocketGUID".codeUnits); |
| 840 List<int> expectedAccept = sha1.close(); |
| 841 List<int> receivedAccept = _CryptoUtils.base64StringToBytes(accept); |
| 842 if (expectedAccept.length != receivedAccept.length) { |
| 843 error("Reasponse header 'Sec-WebSocket-Accept' is the wrong length"); |
| 844 } |
| 845 for (int i = 0; i < expectedAccept.length; i++) { |
| 846 if (expectedAccept[i] != receivedAccept[i]) { |
| 847 error("Bad response 'Sec-WebSocket-Accept' header"); |
| 848 } |
| 849 } |
| 850 var protocol = response.headers.value('Sec-WebSocket-Protocol'); |
| 851 return response.detachSocket() |
| 852 .then((socket) => new _WebSocketImpl._fromSocket(socket, protocol)); |
| 853 }); |
1027 } | 854 } |
1028 | 855 |
1029 static _WebSocketPerMessageDeflate negotiateClientCompression( | 856 _WebSocketImpl._fromSocket(this._socket, this.protocol, |
1030 HttpClientResponse response, CompressionOptions compression) { | 857 [this._serverSide = false]) { |
1031 String extensionHeader = response.headers.value('Sec-WebSocket-Extensions'); | |
1032 | |
1033 if (extensionHeader == null) { | |
1034 extensionHeader = ""; | |
1035 } | |
1036 | |
1037 var hv = HeaderValue.parse(extensionHeader, valueSeparator: ','); | |
1038 | |
1039 if (compression.enabled && hv.value == PER_MESSAGE_DEFLATE) { | |
1040 var serverNoContextTakeover = | |
1041 hv.parameters.containsKey(_serverNoContextTakeover); | |
1042 var clientNoContextTakeover = | |
1043 hv.parameters.containsKey(_clientNoContextTakeover); | |
1044 | |
1045 int getWindowBits(String type) { | |
1046 var o = hv.parameters[type]; | |
1047 if (o == null) { | |
1048 return DEFAULT_WINDOW_BITS; | |
1049 } | |
1050 | |
1051 o = int.parse(o, onError: (s) => DEFAULT_WINDOW_BITS); | |
1052 return o; | |
1053 } | |
1054 | |
1055 return new _WebSocketPerMessageDeflate( | |
1056 clientMaxWindowBits: getWindowBits(_clientMaxWindowBits), | |
1057 serverMaxWindowBits: getWindowBits(_serverMaxWindowBits), | |
1058 clientNoContextTakeover: clientNoContextTakeover, | |
1059 serverNoContextTakeover: serverNoContextTakeover); | |
1060 } | |
1061 | |
1062 return null; | |
1063 } | |
1064 | |
1065 _WebSocketImpl._fromSocket( | |
1066 this._socket, this.protocol, CompressionOptions compression, | |
1067 [this._serverSide = false, _WebSocketPerMessageDeflate deflate]) { | |
1068 _consumer = new _WebSocketConsumer(this, _socket); | 858 _consumer = new _WebSocketConsumer(this, _socket); |
1069 _sink = new _StreamSinkImpl(_consumer); | 859 _sink = new _StreamSinkImpl(_consumer); |
1070 _readyState = WebSocket.OPEN; | 860 _readyState = WebSocket.OPEN; |
1071 _deflate = deflate; | |
1072 | 861 |
1073 var transformer = new _WebSocketProtocolTransformer(_serverSide, _deflate); | 862 var transformer = new _WebSocketProtocolTransformer(_serverSide); |
1074 _subscription = _socket.transform(transformer).listen((data) { | 863 _subscription = _socket.transform(transformer).listen( |
1075 if (data is _WebSocketPing) { | 864 (data) { |
1076 if (!_writeClosed) _consumer.add(new _WebSocketPong(data.payload)); | 865 if (data is _WebSocketPing) { |
1077 } else if (data is _WebSocketPong) { | 866 if (!_writeClosed) _consumer.add(new _WebSocketPong(data.payload)); |
1078 // Simply set pingInterval, as it'll cancel any timers. | 867 } else if (data is _WebSocketPong) { |
1079 pingInterval = _pingInterval; | 868 // Simply set pingInterval, as it'll cancel any timers. |
1080 } else { | 869 pingInterval = _pingInterval; |
1081 _controller.add(data); | 870 } else { |
1082 } | 871 _controller.add(data); |
1083 }, onError: (error, stackTrace) { | 872 } |
1084 if (_closeTimer != null) _closeTimer.cancel(); | 873 }, |
1085 if (error is FormatException) { | 874 onError: (error) { |
1086 _close(WebSocketStatus.INVALID_FRAME_PAYLOAD_DATA); | 875 if (_closeTimer != null) _closeTimer.cancel(); |
1087 } else { | 876 if (error is FormatException) { |
1088 _close(WebSocketStatus.PROTOCOL_ERROR); | 877 _close(WebSocketStatus.INVALID_FRAME_PAYLOAD_DATA); |
1089 } | 878 } else { |
1090 // An error happened, set the close code set above. | 879 _close(WebSocketStatus.PROTOCOL_ERROR); |
1091 _closeCode = _outCloseCode; | 880 } |
1092 _closeReason = _outCloseReason; | 881 // An error happened, set the close code set above. |
1093 _controller.close(); | 882 _closeCode = _outCloseCode; |
1094 }, onDone: () { | 883 _closeReason = _outCloseReason; |
1095 if (_closeTimer != null) _closeTimer.cancel(); | 884 _controller.close(); |
1096 if (_readyState == WebSocket.OPEN) { | 885 }, |
1097 _readyState = WebSocket.CLOSING; | 886 onDone: () { |
1098 if (!_isReservedStatusCode(transformer.closeCode)) { | 887 if (_closeTimer != null) _closeTimer.cancel(); |
1099 _close(transformer.closeCode, transformer.closeReason); | 888 if (_readyState == WebSocket.OPEN) { |
1100 } else { | 889 _readyState = WebSocket.CLOSING; |
1101 _close(); | 890 if (!_isReservedStatusCode(transformer.closeCode)) { |
1102 } | 891 _close(transformer.closeCode, transformer.closeReason); |
1103 _readyState = WebSocket.CLOSED; | 892 } else { |
1104 } | 893 _close(); |
1105 // Protocol close, use close code from transformer. | 894 } |
1106 _closeCode = transformer.closeCode; | 895 _readyState = WebSocket.CLOSED; |
1107 _closeReason = transformer.closeReason; | 896 } |
1108 _controller.close(); | 897 // Protocol close, use close code from transformer. |
1109 }, cancelOnError: true); | 898 _closeCode = transformer.closeCode; |
| 899 _closeReason = transformer.closeReason; |
| 900 _controller.close(); |
| 901 }, |
| 902 cancelOnError: true); |
1110 _subscription.pause(); | 903 _subscription.pause(); |
1111 _controller = new StreamController( | 904 _controller = new StreamController(sync: true, |
1112 sync: true, onListen: _subscription.resume, onCancel: () { | 905 onListen: _subscription.resume, |
1113 _subscription.cancel(); | 906 onCancel: () { |
1114 _subscription = null; | 907 _subscription.cancel(); |
1115 }, onPause: _subscription.pause, onResume: _subscription.resume); | 908 _subscription = null; |
| 909 }, |
| 910 onPause: _subscription.pause, |
| 911 onResume: _subscription.resume); |
1116 | 912 |
1117 _webSockets[_serviceId] = this; | 913 _webSockets[_serviceId] = this; |
1118 try { | 914 try { _socket._owner = this; } catch (_) {} |
1119 _socket._owner = this; | |
1120 } catch (_) {} | |
1121 } | 915 } |
1122 | 916 |
1123 StreamSubscription listen(void onData(message), | 917 StreamSubscription listen(void onData(message), |
1124 {Function onError, void onDone(), bool cancelOnError}) { | 918 {Function onError, |
| 919 void onDone(), |
| 920 bool cancelOnError}) { |
1125 return _controller.stream.listen(onData, | 921 return _controller.stream.listen(onData, |
1126 onError: onError, onDone: onDone, cancelOnError: cancelOnError); | 922 onError: onError, |
| 923 onDone: onDone, |
| 924 cancelOnError: cancelOnError); |
1127 } | 925 } |
1128 | 926 |
1129 Duration get pingInterval => _pingInterval; | 927 Duration get pingInterval => _pingInterval; |
1130 | 928 |
1131 void set pingInterval(Duration interval) { | 929 void set pingInterval(Duration interval) { |
1132 if (_writeClosed) return; | 930 if (_writeClosed) return; |
1133 if (_pingTimer != null) _pingTimer.cancel(); | 931 if (_pingTimer != null) _pingTimer.cancel(); |
1134 _pingInterval = interval; | 932 _pingInterval = interval; |
1135 | 933 |
1136 if (_pingInterval == null) return; | 934 if (_pingInterval == null) return; |
(...skipping 85 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
1222 'type': '@Socket', | 1020 'type': '@Socket', |
1223 'name': 'UserSocket', | 1021 'name': 'UserSocket', |
1224 'user_name': 'UserSocket', | 1022 'user_name': 'UserSocket', |
1225 }; | 1023 }; |
1226 } | 1024 } |
1227 return r; | 1025 return r; |
1228 } | 1026 } |
1229 | 1027 |
1230 static bool _isReservedStatusCode(int code) { | 1028 static bool _isReservedStatusCode(int code) { |
1231 return code != null && | 1029 return code != null && |
1232 (code < WebSocketStatus.NORMAL_CLOSURE || | 1030 (code < WebSocketStatus.NORMAL_CLOSURE || |
1233 code == WebSocketStatus.RESERVED_1004 || | 1031 code == WebSocketStatus.RESERVED_1004 || |
1234 code == WebSocketStatus.NO_STATUS_RECEIVED || | 1032 code == WebSocketStatus.NO_STATUS_RECEIVED || |
1235 code == WebSocketStatus.ABNORMAL_CLOSURE || | 1033 code == WebSocketStatus.ABNORMAL_CLOSURE || |
1236 (code > WebSocketStatus.INTERNAL_SERVER_ERROR && | 1034 (code > WebSocketStatus.INTERNAL_SERVER_ERROR && |
1237 code < WebSocketStatus.RESERVED_1015) || | 1035 code < WebSocketStatus.RESERVED_1015) || |
1238 (code >= WebSocketStatus.RESERVED_1015 && code < 3000)); | 1036 (code >= WebSocketStatus.RESERVED_1015 && |
| 1037 code < 3000)); |
1239 } | 1038 } |
1240 } | 1039 } |
OLD | NEW |