OLD | NEW |
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2014, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 part of dart.io; | 5 library http_parser.web_socket; |
| 6 |
| 7 import 'dart:async'; |
| 8 import 'dart:convert'; |
| 9 import 'dart:math'; |
| 10 import 'dart:typed_data'; |
| 11 |
| 12 import 'package:crypto/crypto.dart'; |
| 13 |
| 14 import 'bytes_builder.dart'; |
| 15 |
| 16 /// An implementation of the WebSocket protocol that's not specific to "dart:io" |
| 17 /// or to any particular HTTP API. |
| 18 /// |
| 19 /// Because this is HTTP-API-agnostic, it doesn't handle the initial [WebSocket |
| 20 /// handshake][]. This needs to be handled manually by the user of the code. |
| 21 /// Once that's been done, [new CompatibleWebSocket] can be called with the |
| 22 /// underlying socket and it will handle the remainder of the protocol. |
| 23 /// |
| 24 /// [WebSocket handshake]: https://tools.ietf.org/html/rfc6455#section-4 |
| 25 abstract class CompatibleWebSocket implements Stream, StreamSink { |
| 26 /// The interval for sending ping signals. |
| 27 /// |
| 28 /// If a ping message is not answered by a pong message from the peer, the |
| 29 /// `WebSocket` is assumed disconnected and the connection is closed with a |
| 30 /// [WebSocketStatus.GOING_AWAY] close code. When a ping signal is sent, the |
| 31 /// pong message must be received within [pingInterval]. |
| 32 /// |
| 33 /// There are never two outstanding pings at any given time, and the next ping |
| 34 /// timer starts when the pong is received. |
| 35 /// |
| 36 /// By default, the [pingInterval] is `null`, indicating that ping messages |
| 37 /// are disabled. |
| 38 Duration pingInterval; |
| 39 |
| 40 /// The [close code][] set when the WebSocket connection is closed. |
| 41 /// |
| 42 /// [close code]: https://tools.ietf.org/html/rfc6455#section-7.1.5 |
| 43 /// |
| 44 /// Before the connection has been closed, this will be `null`. |
| 45 int get closeCode; |
| 46 |
| 47 /// The [close reason][] set when the WebSocket connection is closed. |
| 48 /// |
| 49 /// [close reason]: https://tools.ietf.org/html/rfc6455#section-7.1.6 |
| 50 /// |
| 51 /// Before the connection has been closed, this will be `null`. |
| 52 String get closeReason; |
| 53 |
| 54 /// Signs a `Sec-WebSocket-Key` header sent by a WebSocket client as part of |
| 55 /// the [initial handshake]. |
| 56 /// |
| 57 /// The return value should be sent back to the client in a |
| 58 /// `Sec-WebSocket-Accept` header. |
| 59 /// |
| 60 /// [initial handshake]: https://tools.ietf.org/html/rfc6455#section-4.2.2 |
| 61 static String signKey(String key) { |
| 62 var hash = new SHA1(); |
| 63 // We use [codeUnits] here rather than UTF-8-decoding the string because |
| 64 // [key] is expected to be base64 encoded, and so will be pure ASCII. |
| 65 hash.add((key + _webSocketGUID).codeUnits); |
| 66 return CryptoUtils.bytesToBase64(hash.close()); |
| 67 } |
| 68 |
| 69 /// Creates a new WebSocket handling messaging across an existing socket. |
| 70 /// |
| 71 /// Because this is HTTP-API-agnostic, the initial [WebSocket handshake][] |
| 72 /// must have already been completed on the socket before this is called. |
| 73 /// |
| 74 /// If [stream] is also a [StreamSink] (for example, if it's a "dart:io" |
| 75 /// `Socket`), it will be used for both sending and receiving data. Otherwise, |
| 76 /// it will be used for receiving data and [sink] will be used for sending it. |
| 77 /// |
| 78 /// If this is a WebSocket server, [serverSide] should be `true` (the |
| 79 /// default); if it's a client, [serverSide] should be `false`. |
| 80 /// |
| 81 /// [WebSocket handshake]: https://tools.ietf.org/html/rfc6455#section-4 |
| 82 factory CompatibleWebSocket(Stream<List<int>> stream, |
| 83 {StreamSink<List<int>> sink, bool serverSide: true}) { |
| 84 if (sink == null) { |
| 85 if (stream is! StreamSink) { |
| 86 throw new ArgumentError("If stream isn't also a StreamSink, sink must " |
| 87 "be passed explicitly."); |
| 88 } |
| 89 sink = stream as StreamSink; |
| 90 } |
| 91 |
| 92 return new _WebSocketImpl._fromSocket(stream, sink, serverSide); |
| 93 } |
| 94 |
| 95 /// Closes the web socket connection. |
| 96 /// |
| 97 /// [closeCode] and [closeReason] are the [close code][] and [reason][] sent |
| 98 /// to the remote peer, respectively. If they are omitted, the peer will see |
| 99 /// a "no status received" code with no reason. |
| 100 /// |
| 101 /// [close code]: https://tools.ietf.org/html/rfc6455#section-7.1.5 |
| 102 /// [reason]: https://tools.ietf.org/html/rfc6455#section-7.1.6 |
| 103 Future close([int closeCode, String closeReason]); |
| 104 } |
| 105 |
| 106 /// An exception thrown by [CompatibleWebSocket]. |
| 107 class CompatibleWebSocketException implements Exception { |
| 108 final String message; |
| 109 |
| 110 CompatibleWebSocketException([this.message]); |
| 111 |
| 112 String toString() => message == null |
| 113 ? "CompatibleWebSocketException" : |
| 114 "CompatibleWebSocketException: $message"; |
| 115 } |
| 116 |
| 117 // The following code is copied from sdk/lib/io/websocket_impl.dart. The |
| 118 // "dart:io" implementation isn't used directly both to support non-"dart:io" |
| 119 // applications, and because it's incompatible with non-"dart:io" HTTP requests |
| 120 // (issue 18172). |
| 121 // |
| 122 // Because it's copied directly, only modifications necessary to support the |
| 123 // desired public API and to remove "dart:io" dependencies have been made. |
| 124 |
| 125 /** |
| 126 * Web socket status codes used when closing a web socket connection. |
| 127 */ |
| 128 abstract class _WebSocketStatus { |
| 129 static const int NORMAL_CLOSURE = 1000; |
| 130 static const int GOING_AWAY = 1001; |
| 131 static const int PROTOCOL_ERROR = 1002; |
| 132 static const int UNSUPPORTED_DATA = 1003; |
| 133 static const int RESERVED_1004 = 1004; |
| 134 static const int NO_STATUS_RECEIVED = 1005; |
| 135 static const int ABNORMAL_CLOSURE = 1006; |
| 136 static const int INVALID_FRAME_PAYLOAD_DATA = 1007; |
| 137 static const int POLICY_VIOLATION = 1008; |
| 138 static const int MESSAGE_TOO_BIG = 1009; |
| 139 static const int MISSING_MANDATORY_EXTENSION = 1010; |
| 140 static const int INTERNAL_SERVER_ERROR = 1011; |
| 141 static const int RESERVED_1015 = 1015; |
| 142 } |
| 143 |
| 144 abstract class _WebSocketState { |
| 145 static const int CONNECTING = 0; |
| 146 static const int OPEN = 1; |
| 147 static const int CLOSING = 2; |
| 148 static const int CLOSED = 3; |
| 149 } |
6 | 150 |
7 const String _webSocketGUID = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"; | 151 const String _webSocketGUID = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"; |
8 | 152 |
| 153 final _random = new Random(); |
| 154 |
9 // Matches _WebSocketOpcode. | 155 // Matches _WebSocketOpcode. |
10 class _WebSocketMessageType { | 156 class _WebSocketMessageType { |
11 static const int NONE = 0; | 157 static const int NONE = 0; |
12 static const int TEXT = 1; | 158 static const int TEXT = 1; |
13 static const int BINARY = 2; | 159 static const int BINARY = 2; |
14 } | 160 } |
15 | 161 |
16 | 162 |
17 class _WebSocketOpcode { | 163 class _WebSocketOpcode { |
18 static const int CONTINUATION = 0; | 164 static const int CONTINUATION = 0; |
(...skipping 36 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
55 int _state = START; | 201 int _state = START; |
56 bool _fin = false; | 202 bool _fin = false; |
57 int _opcode = -1; | 203 int _opcode = -1; |
58 int _len = -1; | 204 int _len = -1; |
59 bool _masked = false; | 205 bool _masked = false; |
60 int _remainingLenBytes = -1; | 206 int _remainingLenBytes = -1; |
61 int _remainingMaskingKeyBytes = 4; | 207 int _remainingMaskingKeyBytes = 4; |
62 int _remainingPayloadBytes = -1; | 208 int _remainingPayloadBytes = -1; |
63 int _unmaskingIndex = 0; | 209 int _unmaskingIndex = 0; |
64 int _currentMessageType = _WebSocketMessageType.NONE; | 210 int _currentMessageType = _WebSocketMessageType.NONE; |
65 int closeCode = WebSocketStatus.NO_STATUS_RECEIVED; | 211 int closeCode = _WebSocketStatus.NO_STATUS_RECEIVED; |
66 String closeReason = ""; | 212 String closeReason = ""; |
67 | 213 |
68 EventSink _eventSink; | 214 EventSink _eventSink; |
69 | 215 |
70 final bool _serverSide; | 216 final bool _serverSide; |
71 final List _maskingBytes = new List(4); | 217 final List _maskingBytes = new List(4); |
72 final BytesBuilder _payload = new BytesBuilder(copy: false); | 218 final BytesBuilder _payload = new BytesBuilder(copy: false); |
73 | 219 |
74 _WebSocketProtocolTransformer([this._serverSide = false]); | 220 _WebSocketProtocolTransformer([this._serverSide = false]); |
75 | 221 |
(...skipping 15 matching lines...) Expand all Loading... |
91 void close() => _eventSink.close(); | 237 void close() => _eventSink.close(); |
92 | 238 |
93 /** | 239 /** |
94 * Process data received from the underlying communication channel. | 240 * Process data received from the underlying communication channel. |
95 */ | 241 */ |
96 void add(Uint8List buffer) { | 242 void add(Uint8List buffer) { |
97 int count = buffer.length; | 243 int count = buffer.length; |
98 int index = 0; | 244 int index = 0; |
99 int lastIndex = count; | 245 int lastIndex = count; |
100 if (_state == CLOSED) { | 246 if (_state == CLOSED) { |
101 throw new WebSocketException("Data on closed connection"); | 247 throw new CompatibleWebSocketException("Data on closed connection"); |
102 } | 248 } |
103 if (_state == FAILURE) { | 249 if (_state == FAILURE) { |
104 throw new WebSocketException("Data on failed connection"); | 250 throw new CompatibleWebSocketException("Data on failed connection"); |
105 } | 251 } |
106 while ((index < lastIndex) && _state != CLOSED && _state != FAILURE) { | 252 while ((index < lastIndex) && _state != CLOSED && _state != FAILURE) { |
107 int byte = buffer[index]; | 253 int byte = buffer[index]; |
108 if (_state <= LEN_REST) { | 254 if (_state <= LEN_REST) { |
109 if (_state == START) { | 255 if (_state == START) { |
110 _fin = (byte & 0x80) != 0; | 256 _fin = (byte & 0x80) != 0; |
111 if ((byte & 0x70) != 0) { | 257 if ((byte & 0x70) != 0) { |
112 // The RSV1, RSV2 bits RSV3 must be all zero. | 258 // The RSV1, RSV2 bits RSV3 must be all zero. |
113 throw new WebSocketException("Protocol error"); | 259 throw new CompatibleWebSocketException("Protocol error"); |
114 } | 260 } |
115 _opcode = (byte & 0xF); | 261 _opcode = (byte & 0xF); |
116 if (_opcode <= _WebSocketOpcode.BINARY) { | 262 if (_opcode <= _WebSocketOpcode.BINARY) { |
117 if (_opcode == _WebSocketOpcode.CONTINUATION) { | 263 if (_opcode == _WebSocketOpcode.CONTINUATION) { |
118 if (_currentMessageType == _WebSocketMessageType.NONE) { | 264 if (_currentMessageType == _WebSocketMessageType.NONE) { |
119 throw new WebSocketException("Protocol error"); | 265 throw new CompatibleWebSocketException("Protocol error"); |
120 } | 266 } |
121 } else { | 267 } else { |
122 assert(_opcode == _WebSocketOpcode.TEXT || | 268 assert(_opcode == _WebSocketOpcode.TEXT || |
123 _opcode == _WebSocketOpcode.BINARY); | 269 _opcode == _WebSocketOpcode.BINARY); |
124 if (_currentMessageType != _WebSocketMessageType.NONE) { | 270 if (_currentMessageType != _WebSocketMessageType.NONE) { |
125 throw new WebSocketException("Protocol error"); | 271 throw new CompatibleWebSocketException("Protocol error"); |
126 } | 272 } |
127 _currentMessageType = _opcode; | 273 _currentMessageType = _opcode; |
128 } | 274 } |
129 } else if (_opcode >= _WebSocketOpcode.CLOSE && | 275 } else if (_opcode >= _WebSocketOpcode.CLOSE && |
130 _opcode <= _WebSocketOpcode.PONG) { | 276 _opcode <= _WebSocketOpcode.PONG) { |
131 // Control frames cannot be fragmented. | 277 // Control frames cannot be fragmented. |
132 if (!_fin) throw new WebSocketException("Protocol error"); | 278 if (!_fin) throw new CompatibleWebSocketException("Protocol error"); |
133 } else { | 279 } else { |
134 throw new WebSocketException("Protocol error"); | 280 throw new CompatibleWebSocketException("Protocol error"); |
135 } | 281 } |
136 _state = LEN_FIRST; | 282 _state = LEN_FIRST; |
137 } else if (_state == LEN_FIRST) { | 283 } else if (_state == LEN_FIRST) { |
138 _masked = (byte & 0x80) != 0; | 284 _masked = (byte & 0x80) != 0; |
139 _len = byte & 0x7F; | 285 _len = byte & 0x7F; |
140 if (_isControlFrame() && _len > 125) { | 286 if (_isControlFrame() && _len > 125) { |
141 throw new WebSocketException("Protocol error"); | 287 throw new CompatibleWebSocketException("Protocol error"); |
142 } | 288 } |
143 if (_len == 126) { | 289 if (_len == 126) { |
144 _len = 0; | 290 _len = 0; |
145 _remainingLenBytes = 2; | 291 _remainingLenBytes = 2; |
146 _state = LEN_REST; | 292 _state = LEN_REST; |
147 } else if (_len == 127) { | 293 } else if (_len == 127) { |
148 _len = 0; | 294 _len = 0; |
149 _remainingLenBytes = 8; | 295 _remainingLenBytes = 8; |
150 _state = LEN_REST; | 296 _state = LEN_REST; |
151 } else { | 297 } else { |
(...skipping 25 matching lines...) Expand all Loading... |
177 } | 323 } |
178 // Control frame and data frame share _payloads. | 324 // Control frame and data frame share _payloads. |
179 _payload.add( | 325 _payload.add( |
180 new Uint8List.view(buffer.buffer, index, payloadLength)); | 326 new Uint8List.view(buffer.buffer, index, payloadLength)); |
181 index += payloadLength; | 327 index += payloadLength; |
182 if (_isControlFrame()) { | 328 if (_isControlFrame()) { |
183 if (_remainingPayloadBytes == 0) _controlFrameEnd(); | 329 if (_remainingPayloadBytes == 0) _controlFrameEnd(); |
184 } else { | 330 } else { |
185 if (_currentMessageType != _WebSocketMessageType.TEXT && | 331 if (_currentMessageType != _WebSocketMessageType.TEXT && |
186 _currentMessageType != _WebSocketMessageType.BINARY) { | 332 _currentMessageType != _WebSocketMessageType.BINARY) { |
187 throw new WebSocketException("Protocol error"); | 333 throw new CompatibleWebSocketException("Protocol error"); |
188 } | 334 } |
189 if (_remainingPayloadBytes == 0) _messageFrameEnd(); | 335 if (_remainingPayloadBytes == 0) _messageFrameEnd(); |
190 } | 336 } |
191 | 337 |
192 // Hack - as we always do index++ below. | 338 // Hack - as we always do index++ below. |
193 index--; | 339 index--; |
194 } | 340 } |
195 } | 341 } |
196 | 342 |
197 // Move to the next byte. | 343 // Move to the next byte. |
(...skipping 34 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
232 // Handle end. | 378 // Handle end. |
233 final int end = index + length; | 379 final int end = index + length; |
234 for (int i = index; i < end; i++) { | 380 for (int i = index; i < end; i++) { |
235 buffer[i] ^= _maskingBytes[_unmaskingIndex++ & 3]; | 381 buffer[i] ^= _maskingBytes[_unmaskingIndex++ & 3]; |
236 } | 382 } |
237 } | 383 } |
238 | 384 |
239 void _lengthDone() { | 385 void _lengthDone() { |
240 if (_masked) { | 386 if (_masked) { |
241 if (!_serverSide) { | 387 if (!_serverSide) { |
242 throw new WebSocketException("Received masked frame from server"); | 388 throw new CompatibleWebSocketException( |
| 389 "Received masked frame from server"); |
243 } | 390 } |
244 _state = MASK; | 391 _state = MASK; |
245 } else { | 392 } else { |
246 if (_serverSide) { | 393 if (_serverSide) { |
247 throw new WebSocketException("Received unmasked frame from client"); | 394 throw new CompatibleWebSocketException( |
| 395 "Received unmasked frame from client"); |
248 } | 396 } |
249 _remainingPayloadBytes = _len; | 397 _remainingPayloadBytes = _len; |
250 _startPayload(); | 398 _startPayload(); |
251 } | 399 } |
252 } | 400 } |
253 | 401 |
254 void _maskDone() { | 402 void _maskDone() { |
255 _remainingPayloadBytes = _len; | 403 _remainingPayloadBytes = _len; |
256 _startPayload(); | 404 _startPayload(); |
257 } | 405 } |
(...skipping 35 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
293 break; | 441 break; |
294 } | 442 } |
295 _currentMessageType = _WebSocketMessageType.NONE; | 443 _currentMessageType = _WebSocketMessageType.NONE; |
296 } | 444 } |
297 _prepareForNextFrame(); | 445 _prepareForNextFrame(); |
298 } | 446 } |
299 | 447 |
300 void _controlFrameEnd() { | 448 void _controlFrameEnd() { |
301 switch (_opcode) { | 449 switch (_opcode) { |
302 case _WebSocketOpcode.CLOSE: | 450 case _WebSocketOpcode.CLOSE: |
303 closeCode = WebSocketStatus.NO_STATUS_RECEIVED; | 451 closeCode = _WebSocketStatus.NO_STATUS_RECEIVED; |
304 var payload = _payload.takeBytes(); | 452 var payload = _payload.takeBytes(); |
305 if (payload.length > 0) { | 453 if (payload.length > 0) { |
306 if (payload.length == 1) { | 454 if (payload.length == 1) { |
307 throw new WebSocketException("Protocol error"); | 455 throw new CompatibleWebSocketException("Protocol error"); |
308 } | 456 } |
309 closeCode = payload[0] << 8 | payload[1]; | 457 closeCode = payload[0] << 8 | payload[1]; |
310 if (closeCode == WebSocketStatus.NO_STATUS_RECEIVED) { | 458 if (closeCode == _WebSocketStatus.NO_STATUS_RECEIVED) { |
311 throw new WebSocketException("Protocol error"); | 459 throw new CompatibleWebSocketException("Protocol error"); |
312 } | 460 } |
313 if (payload.length > 2) { | 461 if (payload.length > 2) { |
314 closeReason = UTF8.decode(payload.sublist(2)); | 462 closeReason = UTF8.decode(payload.sublist(2)); |
315 } | 463 } |
316 } | 464 } |
317 _state = CLOSED; | 465 _state = CLOSED; |
318 _eventSink.close(); | 466 _eventSink.close(); |
319 break; | 467 break; |
320 | 468 |
321 case _WebSocketOpcode.PING: | 469 case _WebSocketOpcode.PING: |
(...skipping 30 matching lines...) Expand all Loading... |
352 final List<int> payload; | 500 final List<int> payload; |
353 _WebSocketPing([this.payload = null]); | 501 _WebSocketPing([this.payload = null]); |
354 } | 502 } |
355 | 503 |
356 | 504 |
357 class _WebSocketPong { | 505 class _WebSocketPong { |
358 final List<int> payload; | 506 final List<int> payload; |
359 _WebSocketPong([this.payload = null]); | 507 _WebSocketPong([this.payload = null]); |
360 } | 508 } |
361 | 509 |
362 | |
363 class _WebSocketTransformerImpl implements WebSocketTransformer { | |
364 final StreamController<WebSocket> _controller = | |
365 new StreamController<WebSocket>(sync: true); | |
366 final Function _protocolSelector; | |
367 | |
368 _WebSocketTransformerImpl(this._protocolSelector); | |
369 | |
370 Stream<WebSocket> bind(Stream<HttpRequest> stream) { | |
371 stream.listen((request) { | |
372 _upgrade(request, _protocolSelector) | |
373 .then((WebSocket webSocket) => _controller.add(webSocket)) | |
374 .catchError(_controller.addError); | |
375 }); | |
376 | |
377 return _controller.stream; | |
378 } | |
379 | |
380 static Future<WebSocket> _upgrade(HttpRequest request, _protocolSelector) { | |
381 var response = request.response; | |
382 if (!_isUpgradeRequest(request)) { | |
383 // Send error response. | |
384 response | |
385 ..statusCode = HttpStatus.BAD_REQUEST | |
386 ..close(); | |
387 return new Future.error( | |
388 new WebSocketException("Invalid WebSocket upgrade request")); | |
389 } | |
390 | |
391 Future upgrade(String protocol) { | |
392 // Send the upgrade response. | |
393 response | |
394 ..statusCode = HttpStatus.SWITCHING_PROTOCOLS | |
395 ..headers.add(HttpHeaders.CONNECTION, "Upgrade") | |
396 ..headers.add(HttpHeaders.UPGRADE, "websocket"); | |
397 String key = request.headers.value("Sec-WebSocket-Key"); | |
398 _SHA1 sha1 = new _SHA1(); | |
399 sha1.add("$key$_webSocketGUID".codeUnits); | |
400 String accept = _CryptoUtils.bytesToBase64(sha1.close()); | |
401 response.headers.add("Sec-WebSocket-Accept", accept); | |
402 if (protocol != null && protocol.isNotEmpty) { | |
403 response.headers.add("Sec-WebSocket-Protocol", protocol); | |
404 } | |
405 response.headers.contentLength = 0; | |
406 return response.detachSocket() | |
407 .then((socket) => new _WebSocketImpl._fromSocket( | |
408 socket, protocol, true)); | |
409 } | |
410 | |
411 var protocols = request.headers['Sec-WebSocket-Protocol']; | |
412 if (protocols != null && _protocolSelector != null) { | |
413 // The suggested protocols can be spread over multiple lines, each | |
414 // consisting of multiple protocols. To unify all of them, first join | |
415 // the lists with ', ' and then tokenize. | |
416 protocols = _HttpParser._tokenizeFieldValue(protocols.join(', ')); | |
417 return new Future(() => _protocolSelector(protocols)) | |
418 .then((protocol) { | |
419 if (protocols.indexOf(protocol) < 0) { | |
420 throw new WebSocketException( | |
421 "Selected protocol is not in the list of available protocols"); | |
422 } | |
423 return protocol; | |
424 }) | |
425 .catchError((error) { | |
426 response | |
427 ..statusCode = HttpStatus.INTERNAL_SERVER_ERROR | |
428 ..close(); | |
429 throw error; | |
430 }) | |
431 .then(upgrade); | |
432 } else { | |
433 return upgrade(null); | |
434 } | |
435 } | |
436 | |
437 static bool _isUpgradeRequest(HttpRequest request) { | |
438 if (request.method != "GET") { | |
439 return false; | |
440 } | |
441 if (request.headers[HttpHeaders.CONNECTION] == null) { | |
442 return false; | |
443 } | |
444 bool isUpgrade = false; | |
445 request.headers[HttpHeaders.CONNECTION].forEach((String value) { | |
446 if (value.toLowerCase() == "upgrade") isUpgrade = true; | |
447 }); | |
448 if (!isUpgrade) return false; | |
449 String upgrade = request.headers.value(HttpHeaders.UPGRADE); | |
450 if (upgrade == null || upgrade.toLowerCase() != "websocket") { | |
451 return false; | |
452 } | |
453 String version = request.headers.value("Sec-WebSocket-Version"); | |
454 if (version == null || version != "13") { | |
455 return false; | |
456 } | |
457 String key = request.headers.value("Sec-WebSocket-Key"); | |
458 if (key == null) { | |
459 return false; | |
460 } | |
461 return true; | |
462 } | |
463 } | |
464 | |
465 | |
466 // TODO(ajohnsen): Make this transformer reusable. | 510 // TODO(ajohnsen): Make this transformer reusable. |
467 class _WebSocketOutgoingTransformer implements StreamTransformer, EventSink { | 511 class _WebSocketOutgoingTransformer implements StreamTransformer, EventSink { |
468 final _WebSocketImpl webSocket; | 512 final _WebSocketImpl webSocket; |
469 EventSink _eventSink; | 513 EventSink _eventSink; |
470 | 514 |
471 _WebSocketOutgoingTransformer(this.webSocket); | 515 _WebSocketOutgoingTransformer(this.webSocket); |
472 | 516 |
473 Stream bind(Stream stream) { | 517 Stream bind(Stream stream) { |
474 return new Stream.eventTransformed( | 518 return new Stream.eventTransformed( |
475 stream, | 519 stream, |
(...skipping 79 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
555 } else if (dataLength > 125) { | 599 } else if (dataLength > 125) { |
556 header[index++] = 126; | 600 header[index++] = 126; |
557 lengthBytes = 2; | 601 lengthBytes = 2; |
558 } | 602 } |
559 // Write the length in network byte order into the header. | 603 // Write the length in network byte order into the header. |
560 for (int i = 0; i < lengthBytes; i++) { | 604 for (int i = 0; i < lengthBytes; i++) { |
561 header[index++] = dataLength >> (((lengthBytes - 1) - i) * 8) & 0xFF; | 605 header[index++] = dataLength >> (((lengthBytes - 1) - i) * 8) & 0xFF; |
562 } | 606 } |
563 if (mask) { | 607 if (mask) { |
564 header[1] |= 1 << 7; | 608 header[1] |= 1 << 7; |
565 var maskBytes = _IOCrypto.getRandomBytes(4); | 609 var maskBytes = [_random.nextInt(256), _random.nextInt(256), |
| 610 _random.nextInt(256), _random.nextInt(256)]; |
566 header.setRange(index, index + 4, maskBytes); | 611 header.setRange(index, index + 4, maskBytes); |
567 index += 4; | 612 index += 4; |
568 if (data != null) { | 613 if (data != null) { |
569 Uint8List list; | 614 Uint8List list; |
570 // If this is a text message just do the masking inside the | 615 // If this is a text message just do the masking inside the |
571 // encoded data. | 616 // encoded data. |
572 if (opcode == _WebSocketOpcode.TEXT && data is Uint8List) { | 617 if (opcode == _WebSocketOpcode.TEXT && data is Uint8List) { |
573 list = data; | 618 list = data; |
574 } else { | 619 } else { |
575 if (data is Uint8List) { | 620 if (data is Uint8List) { |
(...skipping 37 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
613 return [header]; | 658 return [header]; |
614 } else { | 659 } else { |
615 return [header, data]; | 660 return [header, data]; |
616 } | 661 } |
617 } | 662 } |
618 } | 663 } |
619 | 664 |
620 | 665 |
621 class _WebSocketConsumer implements StreamConsumer { | 666 class _WebSocketConsumer implements StreamConsumer { |
622 final _WebSocketImpl webSocket; | 667 final _WebSocketImpl webSocket; |
623 final Socket socket; | 668 final StreamSink<List<int>> sink; |
624 StreamController _controller; | 669 StreamController _controller; |
625 StreamSubscription _subscription; | 670 StreamSubscription _subscription; |
626 bool _issuedPause = false; | 671 bool _issuedPause = false; |
627 bool _closed = false; | 672 bool _closed = false; |
628 Completer _closeCompleter = new Completer(); | 673 Completer _closeCompleter = new Completer(); |
629 Completer _completer; | 674 Completer _completer; |
630 | 675 |
631 _WebSocketConsumer(this.webSocket, this.socket); | 676 _WebSocketConsumer(this.webSocket, this.sink); |
632 | 677 |
633 void _onListen() { | 678 void _onListen() { |
634 if (_subscription != null) { | 679 if (_subscription != null) { |
635 _subscription.cancel(); | 680 _subscription.cancel(); |
636 } | 681 } |
637 } | 682 } |
638 | 683 |
639 void _onPause() { | 684 void _onPause() { |
640 if (_subscription != null) { | 685 if (_subscription != null) { |
641 _subscription.pause(); | 686 _subscription.pause(); |
(...skipping 19 matching lines...) Expand all Loading... |
661 } | 706 } |
662 | 707 |
663 _ensureController() { | 708 _ensureController() { |
664 if (_controller != null) return; | 709 if (_controller != null) return; |
665 _controller = new StreamController(sync: true, | 710 _controller = new StreamController(sync: true, |
666 onPause: _onPause, | 711 onPause: _onPause, |
667 onResume: _onResume, | 712 onResume: _onResume, |
668 onCancel: _onListen); | 713 onCancel: _onListen); |
669 var stream = _controller.stream.transform( | 714 var stream = _controller.stream.transform( |
670 new _WebSocketOutgoingTransformer(webSocket)); | 715 new _WebSocketOutgoingTransformer(webSocket)); |
671 socket.addStream(stream) | 716 sink.addStream(stream) |
672 .then((_) { | 717 .then((_) { |
673 _done(); | 718 _done(); |
674 _closeCompleter.complete(webSocket); | 719 _closeCompleter.complete(webSocket); |
675 }, onError: (error, StackTrace stackTrace) { | 720 }, onError: (error, StackTrace stackTrace) { |
676 _closed = true; | 721 _closed = true; |
677 _cancel(); | 722 _cancel(); |
678 if (error is ArgumentError) { | 723 if (error is ArgumentError) { |
679 if (!_done(error, stackTrace)) { | 724 if (!_done(error, stackTrace)) { |
680 _closeCompleter.completeError(error, stackTrace); | 725 _closeCompleter.completeError(error, stackTrace); |
681 } | 726 } |
(...skipping 32 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
714 if (_issuedPause) { | 759 if (_issuedPause) { |
715 _subscription.pause(); | 760 _subscription.pause(); |
716 _issuedPause = false; | 761 _issuedPause = false; |
717 } | 762 } |
718 return _completer.future; | 763 return _completer.future; |
719 } | 764 } |
720 | 765 |
721 Future close() { | 766 Future close() { |
722 _ensureController(); | 767 _ensureController(); |
723 Future closeSocket() { | 768 Future closeSocket() { |
724 return socket.close().catchError((_) {}).then((_) => webSocket); | 769 return sink.close().catchError((_) {}).then((_) => webSocket); |
725 } | 770 } |
726 _controller.close(); | 771 _controller.close(); |
727 return _closeCompleter.future.then((_) => closeSocket()); | 772 return _closeCompleter.future.then((_) => closeSocket()); |
728 } | 773 } |
729 | 774 |
730 void add(data) { | 775 void add(data) { |
731 if (_closed) return; | 776 if (_closed) return; |
732 _ensureController(); | 777 _ensureController(); |
733 _controller.add(data); | 778 _controller.add(data); |
734 } | 779 } |
735 | 780 |
736 void closeSocket() { | 781 void closeSocket() { |
737 _closed = true; | 782 _closed = true; |
738 _cancel(); | 783 _cancel(); |
739 close(); | 784 close(); |
740 } | 785 } |
741 } | 786 } |
742 | 787 |
743 | 788 |
744 class _WebSocketImpl extends Stream implements WebSocket { | 789 class _WebSocketImpl extends Stream implements CompatibleWebSocket { |
745 final String protocol; | |
746 | |
747 StreamController _controller; | 790 StreamController _controller; |
748 StreamSubscription _subscription; | 791 StreamSubscription _subscription; |
749 StreamSink _sink; | 792 StreamController _sink; |
750 | 793 |
751 final Socket _socket; | |
752 final bool _serverSide; | 794 final bool _serverSide; |
753 int _readyState = WebSocket.CONNECTING; | 795 int _readyState = _WebSocketState.CONNECTING; |
754 bool _writeClosed = false; | 796 bool _writeClosed = false; |
755 int _closeCode; | 797 int _closeCode; |
756 String _closeReason; | 798 String _closeReason; |
757 Duration _pingInterval; | 799 Duration _pingInterval; |
758 Timer _pingTimer; | 800 Timer _pingTimer; |
759 _WebSocketConsumer _consumer; | 801 _WebSocketConsumer _consumer; |
760 | 802 |
761 int _outCloseCode; | 803 int _outCloseCode; |
762 String _outCloseReason; | 804 String _outCloseReason; |
763 Timer _closeTimer; | 805 Timer _closeTimer; |
764 | 806 |
765 static final HttpClient _httpClient = new HttpClient(); | 807 _WebSocketImpl._fromSocket(Stream<List<int>> stream, |
766 | 808 StreamSink<List<int>> sink, [this._serverSide = false]) { |
767 static Future<WebSocket> connect(String url, List<String> protocols) { | 809 _consumer = new _WebSocketConsumer(this, sink); |
768 Uri uri = Uri.parse(url); | 810 _sink = new StreamController(); |
769 if (uri.scheme != "ws" && uri.scheme != "wss") { | 811 _sink.stream.pipe(_consumer); |
770 throw new WebSocketException("Unsupported URL scheme '${uri.scheme}'"); | 812 _readyState = _WebSocketState.OPEN; |
771 } | |
772 if (uri.userInfo != "") { | |
773 throw new WebSocketException("Unsupported user info '${uri.userInfo}'"); | |
774 } | |
775 | |
776 Random random = new Random(); | |
777 // Generate 16 random bytes. | |
778 Uint8List nonceData = new Uint8List(16); | |
779 for (int i = 0; i < 16; i++) { | |
780 nonceData[i] = random.nextInt(256); | |
781 } | |
782 String nonce = _CryptoUtils.bytesToBase64(nonceData); | |
783 | |
784 uri = new Uri(scheme: uri.scheme == "wss" ? "https" : "http", | |
785 userInfo: uri.userInfo, | |
786 host: uri.host, | |
787 port: uri.port, | |
788 path: uri.path, | |
789 query: uri.query, | |
790 fragment: uri.fragment); | |
791 return _httpClient.openUrl("GET", uri) | |
792 .then((request) { | |
793 // Setup the initial handshake. | |
794 request.headers | |
795 ..add(HttpHeaders.CONNECTION, "Upgrade") | |
796 ..set(HttpHeaders.UPGRADE, "websocket") | |
797 ..set("Sec-WebSocket-Key", nonce) | |
798 ..set("Cache-Control", "no-cache") | |
799 ..set("Sec-WebSocket-Version", "13"); | |
800 if (protocols.isNotEmpty) { | |
801 request.headers.add("Sec-WebSocket-Protocol", protocols); | |
802 } | |
803 return request.close(); | |
804 }) | |
805 .then((response) { | |
806 void error(String message) { | |
807 // Flush data. | |
808 response.detachSocket().then((socket) { | |
809 socket.destroy(); | |
810 }); | |
811 throw new WebSocketException(message); | |
812 } | |
813 if (response.statusCode != HttpStatus.SWITCHING_PROTOCOLS || | |
814 response.headers[HttpHeaders.CONNECTION] == null || | |
815 !response.headers[HttpHeaders.CONNECTION].any( | |
816 (value) => value.toLowerCase() == "upgrade") || | |
817 response.headers.value(HttpHeaders.UPGRADE).toLowerCase() != | |
818 "websocket") { | |
819 error("Connection to '$uri' was not upgraded to websocket"); | |
820 } | |
821 String accept = response.headers.value("Sec-WebSocket-Accept"); | |
822 if (accept == null) { | |
823 error("Response did not contain a 'Sec-WebSocket-Accept' header"); | |
824 } | |
825 _SHA1 sha1 = new _SHA1(); | |
826 sha1.add("$nonce$_webSocketGUID".codeUnits); | |
827 List<int> expectedAccept = sha1.close(); | |
828 List<int> receivedAccept = _CryptoUtils.base64StringToBytes(accept); | |
829 if (expectedAccept.length != receivedAccept.length) { | |
830 error("Reasponse header 'Sec-WebSocket-Accept' is the wrong length"); | |
831 } | |
832 for (int i = 0; i < expectedAccept.length; i++) { | |
833 if (expectedAccept[i] != receivedAccept[i]) { | |
834 error("Bad response 'Sec-WebSocket-Accept' header"); | |
835 } | |
836 } | |
837 var protocol = response.headers.value('Sec-WebSocket-Protocol'); | |
838 return response.detachSocket() | |
839 .then((socket) => new _WebSocketImpl._fromSocket(socket, protocol)); | |
840 }); | |
841 } | |
842 | |
843 _WebSocketImpl._fromSocket(this._socket, this.protocol, | |
844 [this._serverSide = false]) { | |
845 _consumer = new _WebSocketConsumer(this, _socket); | |
846 _sink = new _StreamSinkImpl(_consumer); | |
847 _readyState = WebSocket.OPEN; | |
848 | 813 |
849 var transformer = new _WebSocketProtocolTransformer(_serverSide); | 814 var transformer = new _WebSocketProtocolTransformer(_serverSide); |
850 _subscription = _socket.transform(transformer).listen( | 815 _subscription = stream.transform(transformer).listen( |
851 (data) { | 816 (data) { |
852 if (data is _WebSocketPing) { | 817 if (data is _WebSocketPing) { |
853 if (!_writeClosed) _consumer.add(new _WebSocketPong(data.payload)); | 818 if (!_writeClosed) _consumer.add(new _WebSocketPong(data.payload)); |
854 } else if (data is _WebSocketPong) { | 819 } else if (data is _WebSocketPong) { |
855 // Simply set pingInterval, as it'll cancel any timers. | 820 // Simply set pingInterval, as it'll cancel any timers. |
856 pingInterval = _pingInterval; | 821 pingInterval = _pingInterval; |
857 } else { | 822 } else { |
858 _controller.add(data); | 823 _controller.add(data); |
859 } | 824 } |
860 }, | 825 }, |
861 onError: (error) { | 826 onError: (error) { |
862 if (_closeTimer != null) _closeTimer.cancel(); | 827 if (_closeTimer != null) _closeTimer.cancel(); |
863 if (error is FormatException) { | 828 if (error is FormatException) { |
864 _close(WebSocketStatus.INVALID_FRAME_PAYLOAD_DATA); | 829 _close(_WebSocketStatus.INVALID_FRAME_PAYLOAD_DATA); |
865 } else { | 830 } else { |
866 _close(WebSocketStatus.PROTOCOL_ERROR); | 831 _close(_WebSocketStatus.PROTOCOL_ERROR); |
867 } | 832 } |
868 _controller.close(); | 833 _controller.close(); |
869 }, | 834 }, |
870 onDone: () { | 835 onDone: () { |
871 if (_closeTimer != null) _closeTimer.cancel(); | 836 if (_closeTimer != null) _closeTimer.cancel(); |
872 if (_readyState == WebSocket.OPEN) { | 837 if (_readyState == _WebSocketState.OPEN) { |
873 _readyState = WebSocket.CLOSING; | 838 _readyState = _WebSocketState.CLOSING; |
874 if (!_isReservedStatusCode(transformer.closeCode)) { | 839 if (!_isReservedStatusCode(transformer.closeCode)) { |
875 _close(transformer.closeCode); | 840 _close(transformer.closeCode); |
876 } else { | 841 } else { |
877 _close(); | 842 _close(); |
878 } | 843 } |
879 _readyState = WebSocket.CLOSED; | 844 _readyState = _WebSocketState.CLOSED; |
880 } | 845 } |
881 _closeCode = transformer.closeCode; | 846 _closeCode = transformer.closeCode; |
882 _closeReason = transformer.closeReason; | 847 _closeReason = transformer.closeReason; |
883 _controller.close(); | 848 _controller.close(); |
884 }, | 849 }, |
885 cancelOnError: true); | 850 cancelOnError: true); |
886 _subscription.pause(); | 851 _subscription.pause(); |
887 _controller = new StreamController(sync: true, | 852 _controller = new StreamController(sync: true, |
888 onListen: _subscription.resume, | 853 onListen: _subscription.resume, |
889 onPause: _subscription.pause, | 854 onPause: _subscription.pause, |
(...skipping 17 matching lines...) Expand all Loading... |
907 if (_pingTimer != null) _pingTimer.cancel(); | 872 if (_pingTimer != null) _pingTimer.cancel(); |
908 _pingInterval = interval; | 873 _pingInterval = interval; |
909 | 874 |
910 if (_pingInterval == null) return; | 875 if (_pingInterval == null) return; |
911 | 876 |
912 _pingTimer = new Timer(_pingInterval, () { | 877 _pingTimer = new Timer(_pingInterval, () { |
913 if (_writeClosed) return; | 878 if (_writeClosed) return; |
914 _consumer.add(new _WebSocketPing()); | 879 _consumer.add(new _WebSocketPing()); |
915 _pingTimer = new Timer(_pingInterval, () { | 880 _pingTimer = new Timer(_pingInterval, () { |
916 // No pong received. | 881 // No pong received. |
917 _close(WebSocketStatus.GOING_AWAY); | 882 _close(_WebSocketStatus.GOING_AWAY); |
918 }); | 883 }); |
919 }); | 884 }); |
920 } | 885 } |
921 | 886 |
922 int get readyState => _readyState; | |
923 | |
924 String get extensions => null; | |
925 int get closeCode => _closeCode; | 887 int get closeCode => _closeCode; |
926 String get closeReason => _closeReason; | 888 String get closeReason => _closeReason; |
927 | 889 |
928 void add(data) => _sink.add(data); | 890 void add(data) => _sink.add(data); |
929 void addError(error, [StackTrace stackTrace]) => | 891 void addError(error, [StackTrace stackTrace]) => |
930 _sink.addError(error, stackTrace); | 892 _sink.addError(error, stackTrace); |
931 Future addStream(Stream stream) => _sink.addStream(stream); | 893 Future addStream(Stream stream) => _sink.addStream(stream); |
932 Future get done => _sink.done; | 894 Future get done => _sink.done; |
933 | 895 |
934 Future close([int code, String reason]) { | 896 Future close([int code, String reason]) { |
935 if (_isReservedStatusCode(code)) { | 897 if (_isReservedStatusCode(code)) { |
936 throw new WebSocketException("Reserved status code $code"); | 898 throw new CompatibleWebSocketException("Reserved status code $code"); |
937 } | 899 } |
938 if (_outCloseCode == null) { | 900 if (_outCloseCode == null) { |
939 _outCloseCode = code; | 901 _outCloseCode = code; |
940 _outCloseReason = reason; | 902 _outCloseReason = reason; |
941 } | 903 } |
942 if (_closeTimer == null && !_controller.isClosed) { | 904 if (_closeTimer == null && !_controller.isClosed) { |
943 // When closing the web-socket, we no longer accept data. | 905 // When closing the web-socket, we no longer accept data. |
944 _closeTimer = new Timer(const Duration(seconds: 5), () { | 906 _closeTimer = new Timer(const Duration(seconds: 5), () { |
945 _subscription.cancel(); | 907 _subscription.cancel(); |
946 _controller.close(); | 908 _controller.close(); |
947 }); | 909 }); |
948 } | 910 } |
949 return _sink.close(); | 911 return _sink.close(); |
950 } | 912 } |
951 | 913 |
952 void _close([int code, String reason]) { | 914 void _close([int code, String reason]) { |
953 if (_writeClosed) return; | 915 if (_writeClosed) return; |
954 if (_outCloseCode == null) { | 916 if (_outCloseCode == null) { |
955 _outCloseCode = code; | 917 _outCloseCode = code; |
956 _outCloseReason = reason; | 918 _outCloseReason = reason; |
957 } | 919 } |
958 _writeClosed = true; | 920 _writeClosed = true; |
959 _consumer.closeSocket(); | 921 _consumer.closeSocket(); |
960 } | 922 } |
961 | 923 |
962 static bool _isReservedStatusCode(int code) { | 924 static bool _isReservedStatusCode(int code) { |
963 return code != null && | 925 return code != null && |
964 (code < WebSocketStatus.NORMAL_CLOSURE || | 926 (code < _WebSocketStatus.NORMAL_CLOSURE || |
965 code == WebSocketStatus.RESERVED_1004 || | 927 code == _WebSocketStatus.RESERVED_1004 || |
966 code == WebSocketStatus.NO_STATUS_RECEIVED || | 928 code == _WebSocketStatus.NO_STATUS_RECEIVED || |
967 code == WebSocketStatus.ABNORMAL_CLOSURE || | 929 code == _WebSocketStatus.ABNORMAL_CLOSURE || |
968 (code > WebSocketStatus.INTERNAL_SERVER_ERROR && | 930 (code > _WebSocketStatus.INTERNAL_SERVER_ERROR && |
969 code < WebSocketStatus.RESERVED_1015) || | 931 code < _WebSocketStatus.RESERVED_1015) || |
970 (code >= WebSocketStatus.RESERVED_1015 && | 932 (code >= _WebSocketStatus.RESERVED_1015 && |
971 code < 3000)); | 933 code < 3000)); |
972 } | 934 } |
973 } | 935 } |
| 936 |
OLD | NEW |