OLD | NEW |
1 // Copyright (c) 2014, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2014, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 library http_parser.web_socket; | 5 library http_parser.web_socket; |
6 | 6 |
7 import 'dart:async'; | 7 import 'dart:async'; |
8 import 'dart:convert'; | |
9 import 'dart:math'; | |
10 import 'dart:typed_data'; | |
11 | 8 |
12 import 'package:crypto/crypto.dart'; | 9 import 'package:crypto/crypto.dart'; |
13 | 10 |
14 import 'bytes_builder.dart'; | 11 import 'copy/web_socket_impl.dart'; |
15 | 12 |
16 /// An implementation of the WebSocket protocol that's not specific to "dart:io" | 13 /// An implementation of the WebSocket protocol that's not specific to "dart:io" |
17 /// or to any particular HTTP API. | 14 /// or to any particular HTTP API. |
18 /// | 15 /// |
19 /// Because this is HTTP-API-agnostic, it doesn't handle the initial [WebSocket | 16 /// Because this is HTTP-API-agnostic, it doesn't handle the initial [WebSocket |
20 /// handshake][]. This needs to be handled manually by the user of the code. | 17 /// handshake][]. This needs to be handled manually by the user of the code. |
21 /// Once that's been done, [new CompatibleWebSocket] can be called with the | 18 /// Once that's been done, [new CompatibleWebSocket] can be called with the |
22 /// underlying socket and it will handle the remainder of the protocol. | 19 /// underlying socket and it will handle the remainder of the protocol. |
23 /// | 20 /// |
24 /// [WebSocket handshake]: https://tools.ietf.org/html/rfc6455#section-4 | 21 /// [WebSocket handshake]: https://tools.ietf.org/html/rfc6455#section-4 |
(...skipping 30 matching lines...) Expand all Loading... |
55 /// the [initial handshake]. | 52 /// the [initial handshake]. |
56 /// | 53 /// |
57 /// The return value should be sent back to the client in a | 54 /// The return value should be sent back to the client in a |
58 /// `Sec-WebSocket-Accept` header. | 55 /// `Sec-WebSocket-Accept` header. |
59 /// | 56 /// |
60 /// [initial handshake]: https://tools.ietf.org/html/rfc6455#section-4.2.2 | 57 /// [initial handshake]: https://tools.ietf.org/html/rfc6455#section-4.2.2 |
61 static String signKey(String key) { | 58 static String signKey(String key) { |
62 var hash = new SHA1(); | 59 var hash = new SHA1(); |
63 // We use [codeUnits] here rather than UTF-8-decoding the string because | 60 // We use [codeUnits] here rather than UTF-8-decoding the string because |
64 // [key] is expected to be base64 encoded, and so will be pure ASCII. | 61 // [key] is expected to be base64 encoded, and so will be pure ASCII. |
65 hash.add((key + _webSocketGUID).codeUnits); | 62 hash.add((key + webSocketGUID).codeUnits); |
66 return CryptoUtils.bytesToBase64(hash.close()); | 63 return CryptoUtils.bytesToBase64(hash.close()); |
67 } | 64 } |
68 | 65 |
69 /// Creates a new WebSocket handling messaging across an existing socket. | 66 /// Creates a new WebSocket handling messaging across an existing socket. |
70 /// | 67 /// |
71 /// Because this is HTTP-API-agnostic, the initial [WebSocket handshake][] | 68 /// Because this is HTTP-API-agnostic, the initial [WebSocket handshake][] |
72 /// must have already been completed on the socket before this is called. | 69 /// must have already been completed on the socket before this is called. |
73 /// | 70 /// |
74 /// If [stream] is also a [StreamSink] (for example, if it's a "dart:io" | 71 /// If [stream] is also a [StreamSink] (for example, if it's a "dart:io" |
75 /// `Socket`), it will be used for both sending and receiving data. Otherwise, | 72 /// `Socket`), it will be used for both sending and receiving data. Otherwise, |
76 /// it will be used for receiving data and [sink] will be used for sending it. | 73 /// it will be used for receiving data and [sink] will be used for sending it. |
77 /// | 74 /// |
| 75 /// [protocol] should be the protocol negotiated by this handshake, if any. |
| 76 /// |
78 /// If this is a WebSocket server, [serverSide] should be `true` (the | 77 /// If this is a WebSocket server, [serverSide] should be `true` (the |
79 /// default); if it's a client, [serverSide] should be `false`. | 78 /// default); if it's a client, [serverSide] should be `false`. |
80 /// | 79 /// |
81 /// [WebSocket handshake]: https://tools.ietf.org/html/rfc6455#section-4 | 80 /// [WebSocket handshake]: https://tools.ietf.org/html/rfc6455#section-4 |
82 factory CompatibleWebSocket(Stream<List<int>> stream, | 81 factory CompatibleWebSocket(Stream<List<int>> stream, |
83 {StreamSink<List<int>> sink, bool serverSide: true}) { | 82 {StreamSink<List<int>> sink, String protocol, bool serverSide: true}) { |
84 if (sink == null) { | 83 if (sink == null) { |
85 if (stream is! StreamSink) { | 84 if (stream is! StreamSink) { |
86 throw new ArgumentError("If stream isn't also a StreamSink, sink must " | 85 throw new ArgumentError("If stream isn't also a StreamSink, sink must " |
87 "be passed explicitly."); | 86 "be passed explicitly."); |
88 } | 87 } |
89 sink = stream as StreamSink; | 88 sink = stream as StreamSink; |
90 } | 89 } |
91 | 90 |
92 return new _WebSocketImpl._fromSocket(stream, sink, serverSide); | 91 return new WebSocketImpl.fromSocket(stream, sink, protocol, serverSide); |
93 } | 92 } |
94 | 93 |
95 /// Closes the web socket connection. | 94 /// Closes the web socket connection. |
96 /// | 95 /// |
97 /// [closeCode] and [closeReason] are the [close code][] and [reason][] sent | 96 /// [closeCode] and [closeReason] are the [close code][] and [reason][] sent |
98 /// to the remote peer, respectively. If they are omitted, the peer will see | 97 /// to the remote peer, respectively. If they are omitted, the peer will see |
99 /// a "no status received" code with no reason. | 98 /// a "no status received" code with no reason. |
100 /// | 99 /// |
101 /// [close code]: https://tools.ietf.org/html/rfc6455#section-7.1.5 | 100 /// [close code]: https://tools.ietf.org/html/rfc6455#section-7.1.5 |
102 /// [reason]: https://tools.ietf.org/html/rfc6455#section-7.1.6 | 101 /// [reason]: https://tools.ietf.org/html/rfc6455#section-7.1.6 |
103 Future close([int closeCode, String closeReason]); | 102 Future close([int closeCode, String closeReason]); |
104 } | 103 } |
105 | 104 |
106 /// An exception thrown by [CompatibleWebSocket]. | 105 /// An exception thrown by [CompatibleWebSocket]. |
107 class CompatibleWebSocketException implements Exception { | 106 class CompatibleWebSocketException implements Exception { |
108 final String message; | 107 final String message; |
109 | 108 |
110 CompatibleWebSocketException([this.message]); | 109 CompatibleWebSocketException([this.message]); |
111 | 110 |
112 String toString() => message == null | 111 String toString() => message == null |
113 ? "CompatibleWebSocketException" : | 112 ? "CompatibleWebSocketException" : |
114 "CompatibleWebSocketException: $message"; | 113 "CompatibleWebSocketException: $message"; |
115 } | 114 } |
116 | |
117 // The following code is copied from sdk/lib/io/websocket_impl.dart. The | |
118 // "dart:io" implementation isn't used directly both to support non-"dart:io" | |
119 // applications, and because it's incompatible with non-"dart:io" HTTP requests | |
120 // (issue 18172). | |
121 // | |
122 // Because it's copied directly, only modifications necessary to support the | |
123 // desired public API and to remove "dart:io" dependencies have been made. | |
124 | |
125 /** | |
126 * Web socket status codes used when closing a web socket connection. | |
127 */ | |
128 abstract class _WebSocketStatus { | |
129 static const int NORMAL_CLOSURE = 1000; | |
130 static const int GOING_AWAY = 1001; | |
131 static const int PROTOCOL_ERROR = 1002; | |
132 static const int UNSUPPORTED_DATA = 1003; | |
133 static const int RESERVED_1004 = 1004; | |
134 static const int NO_STATUS_RECEIVED = 1005; | |
135 static const int ABNORMAL_CLOSURE = 1006; | |
136 static const int INVALID_FRAME_PAYLOAD_DATA = 1007; | |
137 static const int POLICY_VIOLATION = 1008; | |
138 static const int MESSAGE_TOO_BIG = 1009; | |
139 static const int MISSING_MANDATORY_EXTENSION = 1010; | |
140 static const int INTERNAL_SERVER_ERROR = 1011; | |
141 static const int RESERVED_1015 = 1015; | |
142 } | |
143 | |
144 abstract class _WebSocketState { | |
145 static const int CONNECTING = 0; | |
146 static const int OPEN = 1; | |
147 static const int CLOSING = 2; | |
148 static const int CLOSED = 3; | |
149 } | |
150 | |
151 const String _webSocketGUID = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"; | |
152 | |
153 final _random = new Random(); | |
154 | |
155 // Matches _WebSocketOpcode. | |
156 class _WebSocketMessageType { | |
157 static const int NONE = 0; | |
158 static const int TEXT = 1; | |
159 static const int BINARY = 2; | |
160 } | |
161 | |
162 | |
163 class _WebSocketOpcode { | |
164 static const int CONTINUATION = 0; | |
165 static const int TEXT = 1; | |
166 static const int BINARY = 2; | |
167 static const int RESERVED_3 = 3; | |
168 static const int RESERVED_4 = 4; | |
169 static const int RESERVED_5 = 5; | |
170 static const int RESERVED_6 = 6; | |
171 static const int RESERVED_7 = 7; | |
172 static const int CLOSE = 8; | |
173 static const int PING = 9; | |
174 static const int PONG = 10; | |
175 static const int RESERVED_B = 11; | |
176 static const int RESERVED_C = 12; | |
177 static const int RESERVED_D = 13; | |
178 static const int RESERVED_E = 14; | |
179 static const int RESERVED_F = 15; | |
180 } | |
181 | |
182 /** | |
183 * The web socket protocol transformer handles the protocol byte stream | |
184 * which is supplied through the [:handleData:]. As the protocol is processed, | |
185 * it'll output frame data as either a List<int> or String. | |
186 * | |
187 * Important infomation about usage: Be sure you use cancelOnError, so the | |
188 * socket will be closed when the processer encounter an error. Not using it | |
189 * will lead to undefined behaviour. | |
190 */ | |
191 // TODO(ajohnsen): make this transformer reusable? | |
192 class _WebSocketProtocolTransformer implements StreamTransformer, EventSink { | |
193 static const int START = 0; | |
194 static const int LEN_FIRST = 1; | |
195 static const int LEN_REST = 2; | |
196 static const int MASK = 3; | |
197 static const int PAYLOAD = 4; | |
198 static const int CLOSED = 5; | |
199 static const int FAILURE = 6; | |
200 | |
201 int _state = START; | |
202 bool _fin = false; | |
203 int _opcode = -1; | |
204 int _len = -1; | |
205 bool _masked = false; | |
206 int _remainingLenBytes = -1; | |
207 int _remainingMaskingKeyBytes = 4; | |
208 int _remainingPayloadBytes = -1; | |
209 int _unmaskingIndex = 0; | |
210 int _currentMessageType = _WebSocketMessageType.NONE; | |
211 int closeCode = _WebSocketStatus.NO_STATUS_RECEIVED; | |
212 String closeReason = ""; | |
213 | |
214 EventSink _eventSink; | |
215 | |
216 final bool _serverSide; | |
217 final List _maskingBytes = new List(4); | |
218 final BytesBuilder _payload = new BytesBuilder(copy: false); | |
219 | |
220 _WebSocketProtocolTransformer([this._serverSide = false]); | |
221 | |
222 Stream bind(Stream stream) { | |
223 return new Stream.eventTransformed( | |
224 stream, | |
225 (EventSink eventSink) { | |
226 if (_eventSink != null) { | |
227 throw new StateError("WebSocket transformer already used."); | |
228 } | |
229 _eventSink = eventSink; | |
230 return this; | |
231 }); | |
232 } | |
233 | |
234 void addError(Object error, [StackTrace stackTrace]) => | |
235 _eventSink.addError(error, stackTrace); | |
236 | |
237 void close() => _eventSink.close(); | |
238 | |
239 /** | |
240 * Process data received from the underlying communication channel. | |
241 */ | |
242 void add(Uint8List buffer) { | |
243 int count = buffer.length; | |
244 int index = 0; | |
245 int lastIndex = count; | |
246 if (_state == CLOSED) { | |
247 throw new CompatibleWebSocketException("Data on closed connection"); | |
248 } | |
249 if (_state == FAILURE) { | |
250 throw new CompatibleWebSocketException("Data on failed connection"); | |
251 } | |
252 while ((index < lastIndex) && _state != CLOSED && _state != FAILURE) { | |
253 int byte = buffer[index]; | |
254 if (_state <= LEN_REST) { | |
255 if (_state == START) { | |
256 _fin = (byte & 0x80) != 0; | |
257 if ((byte & 0x70) != 0) { | |
258 // The RSV1, RSV2 bits RSV3 must be all zero. | |
259 throw new CompatibleWebSocketException("Protocol error"); | |
260 } | |
261 _opcode = (byte & 0xF); | |
262 if (_opcode <= _WebSocketOpcode.BINARY) { | |
263 if (_opcode == _WebSocketOpcode.CONTINUATION) { | |
264 if (_currentMessageType == _WebSocketMessageType.NONE) { | |
265 throw new CompatibleWebSocketException("Protocol error"); | |
266 } | |
267 } else { | |
268 assert(_opcode == _WebSocketOpcode.TEXT || | |
269 _opcode == _WebSocketOpcode.BINARY); | |
270 if (_currentMessageType != _WebSocketMessageType.NONE) { | |
271 throw new CompatibleWebSocketException("Protocol error"); | |
272 } | |
273 _currentMessageType = _opcode; | |
274 } | |
275 } else if (_opcode >= _WebSocketOpcode.CLOSE && | |
276 _opcode <= _WebSocketOpcode.PONG) { | |
277 // Control frames cannot be fragmented. | |
278 if (!_fin) throw new CompatibleWebSocketException("Protocol error"); | |
279 } else { | |
280 throw new CompatibleWebSocketException("Protocol error"); | |
281 } | |
282 _state = LEN_FIRST; | |
283 } else if (_state == LEN_FIRST) { | |
284 _masked = (byte & 0x80) != 0; | |
285 _len = byte & 0x7F; | |
286 if (_isControlFrame() && _len > 125) { | |
287 throw new CompatibleWebSocketException("Protocol error"); | |
288 } | |
289 if (_len == 126) { | |
290 _len = 0; | |
291 _remainingLenBytes = 2; | |
292 _state = LEN_REST; | |
293 } else if (_len == 127) { | |
294 _len = 0; | |
295 _remainingLenBytes = 8; | |
296 _state = LEN_REST; | |
297 } else { | |
298 assert(_len < 126); | |
299 _lengthDone(); | |
300 } | |
301 } else { | |
302 assert(_state == LEN_REST); | |
303 _len = _len << 8 | byte; | |
304 _remainingLenBytes--; | |
305 if (_remainingLenBytes == 0) { | |
306 _lengthDone(); | |
307 } | |
308 } | |
309 } else { | |
310 if (_state == MASK) { | |
311 _maskingBytes[4 - _remainingMaskingKeyBytes--] = byte; | |
312 if (_remainingMaskingKeyBytes == 0) { | |
313 _maskDone(); | |
314 } | |
315 } else { | |
316 assert(_state == PAYLOAD); | |
317 // The payload is not handled one byte at a time but in blocks. | |
318 int payloadLength = min(lastIndex - index, _remainingPayloadBytes); | |
319 _remainingPayloadBytes -= payloadLength; | |
320 // Unmask payload if masked. | |
321 if (_masked) { | |
322 _unmask(index, payloadLength, buffer); | |
323 } | |
324 // Control frame and data frame share _payloads. | |
325 _payload.add( | |
326 new Uint8List.view(buffer.buffer, index, payloadLength)); | |
327 index += payloadLength; | |
328 if (_isControlFrame()) { | |
329 if (_remainingPayloadBytes == 0) _controlFrameEnd(); | |
330 } else { | |
331 if (_currentMessageType != _WebSocketMessageType.TEXT && | |
332 _currentMessageType != _WebSocketMessageType.BINARY) { | |
333 throw new CompatibleWebSocketException("Protocol error"); | |
334 } | |
335 if (_remainingPayloadBytes == 0) _messageFrameEnd(); | |
336 } | |
337 | |
338 // Hack - as we always do index++ below. | |
339 index--; | |
340 } | |
341 } | |
342 | |
343 // Move to the next byte. | |
344 index++; | |
345 } | |
346 } | |
347 | |
348 void _unmask(int index, int length, Uint8List buffer) { | |
349 const int BLOCK_SIZE = 16; | |
350 // Skip Int32x4-version if message is small. | |
351 if (length >= BLOCK_SIZE) { | |
352 // Start by aligning to 16 bytes. | |
353 final int startOffset = BLOCK_SIZE - (index & 15); | |
354 final int end = index + startOffset; | |
355 for (int i = index; i < end; i++) { | |
356 buffer[i] ^= _maskingBytes[_unmaskingIndex++ & 3]; | |
357 } | |
358 index += startOffset; | |
359 length -= startOffset; | |
360 final int blockCount = length ~/ BLOCK_SIZE; | |
361 if (blockCount > 0) { | |
362 // Create mask block. | |
363 int mask = 0; | |
364 for (int i = 3; i >= 0; i--) { | |
365 mask = (mask << 8) | _maskingBytes[(_unmaskingIndex + i) & 3]; | |
366 } | |
367 Int32x4 blockMask = new Int32x4(mask, mask, mask, mask); | |
368 Int32x4List blockBuffer = new Int32x4List.view( | |
369 buffer.buffer, index, blockCount); | |
370 for (int i = 0; i < blockBuffer.length; i++) { | |
371 blockBuffer[i] ^= blockMask; | |
372 } | |
373 final int bytes = blockCount * BLOCK_SIZE; | |
374 index += bytes; | |
375 length -= bytes; | |
376 } | |
377 } | |
378 // Handle end. | |
379 final int end = index + length; | |
380 for (int i = index; i < end; i++) { | |
381 buffer[i] ^= _maskingBytes[_unmaskingIndex++ & 3]; | |
382 } | |
383 } | |
384 | |
385 void _lengthDone() { | |
386 if (_masked) { | |
387 if (!_serverSide) { | |
388 throw new CompatibleWebSocketException( | |
389 "Received masked frame from server"); | |
390 } | |
391 _state = MASK; | |
392 } else { | |
393 if (_serverSide) { | |
394 throw new CompatibleWebSocketException( | |
395 "Received unmasked frame from client"); | |
396 } | |
397 _remainingPayloadBytes = _len; | |
398 _startPayload(); | |
399 } | |
400 } | |
401 | |
402 void _maskDone() { | |
403 _remainingPayloadBytes = _len; | |
404 _startPayload(); | |
405 } | |
406 | |
407 void _startPayload() { | |
408 // If there is no actual payload perform perform callbacks without | |
409 // going through the PAYLOAD state. | |
410 if (_remainingPayloadBytes == 0) { | |
411 if (_isControlFrame()) { | |
412 switch (_opcode) { | |
413 case _WebSocketOpcode.CLOSE: | |
414 _state = CLOSED; | |
415 _eventSink.close(); | |
416 break; | |
417 case _WebSocketOpcode.PING: | |
418 _eventSink.add(new _WebSocketPing()); | |
419 break; | |
420 case _WebSocketOpcode.PONG: | |
421 _eventSink.add(new _WebSocketPong()); | |
422 break; | |
423 } | |
424 _prepareForNextFrame(); | |
425 } else { | |
426 _messageFrameEnd(); | |
427 } | |
428 } else { | |
429 _state = PAYLOAD; | |
430 } | |
431 } | |
432 | |
433 void _messageFrameEnd() { | |
434 if (_fin) { | |
435 switch (_currentMessageType) { | |
436 case _WebSocketMessageType.TEXT: | |
437 _eventSink.add(UTF8.decode(_payload.takeBytes())); | |
438 break; | |
439 case _WebSocketMessageType.BINARY: | |
440 _eventSink.add(_payload.takeBytes()); | |
441 break; | |
442 } | |
443 _currentMessageType = _WebSocketMessageType.NONE; | |
444 } | |
445 _prepareForNextFrame(); | |
446 } | |
447 | |
448 void _controlFrameEnd() { | |
449 switch (_opcode) { | |
450 case _WebSocketOpcode.CLOSE: | |
451 closeCode = _WebSocketStatus.NO_STATUS_RECEIVED; | |
452 var payload = _payload.takeBytes(); | |
453 if (payload.length > 0) { | |
454 if (payload.length == 1) { | |
455 throw new CompatibleWebSocketException("Protocol error"); | |
456 } | |
457 closeCode = payload[0] << 8 | payload[1]; | |
458 if (closeCode == _WebSocketStatus.NO_STATUS_RECEIVED) { | |
459 throw new CompatibleWebSocketException("Protocol error"); | |
460 } | |
461 if (payload.length > 2) { | |
462 closeReason = UTF8.decode(payload.sublist(2)); | |
463 } | |
464 } | |
465 _state = CLOSED; | |
466 _eventSink.close(); | |
467 break; | |
468 | |
469 case _WebSocketOpcode.PING: | |
470 _eventSink.add(new _WebSocketPing(_payload.takeBytes())); | |
471 break; | |
472 | |
473 case _WebSocketOpcode.PONG: | |
474 _eventSink.add(new _WebSocketPong(_payload.takeBytes())); | |
475 break; | |
476 } | |
477 _prepareForNextFrame(); | |
478 } | |
479 | |
480 bool _isControlFrame() { | |
481 return _opcode == _WebSocketOpcode.CLOSE || | |
482 _opcode == _WebSocketOpcode.PING || | |
483 _opcode == _WebSocketOpcode.PONG; | |
484 } | |
485 | |
486 void _prepareForNextFrame() { | |
487 if (_state != CLOSED && _state != FAILURE) _state = START; | |
488 _fin = false; | |
489 _opcode = -1; | |
490 _len = -1; | |
491 _remainingLenBytes = -1; | |
492 _remainingMaskingKeyBytes = 4; | |
493 _remainingPayloadBytes = -1; | |
494 _unmaskingIndex = 0; | |
495 } | |
496 } | |
497 | |
498 | |
499 class _WebSocketPing { | |
500 final List<int> payload; | |
501 _WebSocketPing([this.payload = null]); | |
502 } | |
503 | |
504 | |
505 class _WebSocketPong { | |
506 final List<int> payload; | |
507 _WebSocketPong([this.payload = null]); | |
508 } | |
509 | |
510 // TODO(ajohnsen): Make this transformer reusable. | |
511 class _WebSocketOutgoingTransformer implements StreamTransformer, EventSink { | |
512 final _WebSocketImpl webSocket; | |
513 EventSink _eventSink; | |
514 | |
515 _WebSocketOutgoingTransformer(this.webSocket); | |
516 | |
517 Stream bind(Stream stream) { | |
518 return new Stream.eventTransformed( | |
519 stream, | |
520 (EventSink eventSink) { | |
521 if (_eventSink != null) { | |
522 throw new StateError("WebSocket transformer already used"); | |
523 } | |
524 _eventSink = eventSink; | |
525 return this; | |
526 }); | |
527 } | |
528 | |
529 void add(message) { | |
530 if (message is _WebSocketPong) { | |
531 addFrame(_WebSocketOpcode.PONG, message.payload); | |
532 return; | |
533 } | |
534 if (message is _WebSocketPing) { | |
535 addFrame(_WebSocketOpcode.PING, message.payload); | |
536 return; | |
537 } | |
538 List<int> data; | |
539 int opcode; | |
540 if (message != null) { | |
541 if (message is String) { | |
542 opcode = _WebSocketOpcode.TEXT; | |
543 data = UTF8.encode(message); | |
544 } else { | |
545 if (message is !List<int>) { | |
546 throw new ArgumentError(message); | |
547 } | |
548 opcode = _WebSocketOpcode.BINARY; | |
549 data = message; | |
550 } | |
551 } else { | |
552 opcode = _WebSocketOpcode.TEXT; | |
553 } | |
554 addFrame(opcode, data); | |
555 } | |
556 | |
557 void addError(Object error, [StackTrace stackTrace]) => | |
558 _eventSink.addError(error, stackTrace); | |
559 | |
560 void close() { | |
561 int code = webSocket._outCloseCode; | |
562 String reason = webSocket._outCloseReason; | |
563 List<int> data; | |
564 if (code != null) { | |
565 data = new List<int>(); | |
566 data.add((code >> 8) & 0xFF); | |
567 data.add(code & 0xFF); | |
568 if (reason != null) { | |
569 data.addAll(UTF8.encode(reason)); | |
570 } | |
571 } | |
572 addFrame(_WebSocketOpcode.CLOSE, data); | |
573 _eventSink.close(); | |
574 } | |
575 | |
576 void addFrame(int opcode, List<int> data) => | |
577 createFrame(opcode, data, webSocket._serverSide).forEach(_eventSink.add); | |
578 | |
579 static Iterable createFrame(int opcode, List<int> data, bool serverSide) { | |
580 bool mask = !serverSide; // Masking not implemented for server. | |
581 int dataLength = data == null ? 0 : data.length; | |
582 // Determine the header size. | |
583 int headerSize = (mask) ? 6 : 2; | |
584 if (dataLength > 65535) { | |
585 headerSize += 8; | |
586 } else if (dataLength > 125) { | |
587 headerSize += 2; | |
588 } | |
589 Uint8List header = new Uint8List(headerSize); | |
590 int index = 0; | |
591 // Set FIN and opcode. | |
592 header[index++] = 0x80 | opcode; | |
593 // Determine size and position of length field. | |
594 int lengthBytes = 1; | |
595 if (dataLength > 65535) { | |
596 header[index++] = 127; | |
597 lengthBytes = 8; | |
598 } else if (dataLength > 125) { | |
599 header[index++] = 126; | |
600 lengthBytes = 2; | |
601 } | |
602 // Write the length in network byte order into the header. | |
603 for (int i = 0; i < lengthBytes; i++) { | |
604 header[index++] = dataLength >> (((lengthBytes - 1) - i) * 8) & 0xFF; | |
605 } | |
606 if (mask) { | |
607 header[1] |= 1 << 7; | |
608 var maskBytes = [_random.nextInt(256), _random.nextInt(256), | |
609 _random.nextInt(256), _random.nextInt(256)]; | |
610 header.setRange(index, index + 4, maskBytes); | |
611 index += 4; | |
612 if (data != null) { | |
613 Uint8List list; | |
614 // If this is a text message just do the masking inside the | |
615 // encoded data. | |
616 if (opcode == _WebSocketOpcode.TEXT && data is Uint8List) { | |
617 list = data; | |
618 } else { | |
619 if (data is Uint8List) { | |
620 list = new Uint8List.fromList(data); | |
621 } else { | |
622 list = new Uint8List(data.length); | |
623 for (int i = 0; i < data.length; i++) { | |
624 if (data[i] < 0 || 255 < data[i]) { | |
625 throw new ArgumentError( | |
626 "List element is not a byte value " | |
627 "(value ${data[i]} at index $i)"); | |
628 } | |
629 list[i] = data[i]; | |
630 } | |
631 } | |
632 } | |
633 const int BLOCK_SIZE = 16; | |
634 int blockCount = list.length ~/ BLOCK_SIZE; | |
635 if (blockCount > 0) { | |
636 // Create mask block. | |
637 int mask = 0; | |
638 for (int i = 3; i >= 0; i--) { | |
639 mask = (mask << 8) | maskBytes[i]; | |
640 } | |
641 Int32x4 blockMask = new Int32x4(mask, mask, mask, mask); | |
642 Int32x4List blockBuffer = new Int32x4List.view( | |
643 list.buffer, 0, blockCount); | |
644 for (int i = 0; i < blockBuffer.length; i++) { | |
645 blockBuffer[i] ^= blockMask; | |
646 } | |
647 } | |
648 // Handle end. | |
649 for (int i = blockCount * BLOCK_SIZE; i < list.length; i++) { | |
650 list[i] ^= maskBytes[i & 3]; | |
651 } | |
652 data = list; | |
653 } | |
654 } | |
655 assert(index == headerSize); | |
656 if (data == null) { | |
657 return [header]; | |
658 } else { | |
659 return [header, data]; | |
660 } | |
661 } | |
662 } | |
663 | |
664 | |
665 class _WebSocketConsumer implements StreamConsumer { | |
666 final _WebSocketImpl webSocket; | |
667 final StreamSink<List<int>> sink; | |
668 StreamController _controller; | |
669 StreamSubscription _subscription; | |
670 bool _issuedPause = false; | |
671 bool _closed = false; | |
672 Completer _closeCompleter = new Completer(); | |
673 Completer _completer; | |
674 | |
675 _WebSocketConsumer(this.webSocket, this.sink); | |
676 | |
677 void _onListen() { | |
678 if (_subscription != null) { | |
679 _subscription.cancel(); | |
680 } | |
681 } | |
682 | |
683 void _onPause() { | |
684 if (_subscription != null) { | |
685 _subscription.pause(); | |
686 } else { | |
687 _issuedPause = true; | |
688 } | |
689 } | |
690 | |
691 void _onResume() { | |
692 if (_subscription != null) { | |
693 _subscription.resume(); | |
694 } else { | |
695 _issuedPause = false; | |
696 } | |
697 } | |
698 | |
699 void _cancel() { | |
700 if (_subscription != null) { | |
701 var subscription = _subscription; | |
702 _subscription = null; | |
703 subscription.cancel(); | |
704 } | |
705 } | |
706 | |
707 _ensureController() { | |
708 if (_controller != null) return; | |
709 _controller = new StreamController(sync: true, | |
710 onPause: _onPause, | |
711 onResume: _onResume, | |
712 onCancel: _onListen); | |
713 var stream = _controller.stream.transform( | |
714 new _WebSocketOutgoingTransformer(webSocket)); | |
715 sink.addStream(stream) | |
716 .then((_) { | |
717 _done(); | |
718 _closeCompleter.complete(webSocket); | |
719 }, onError: (error, StackTrace stackTrace) { | |
720 _closed = true; | |
721 _cancel(); | |
722 if (error is ArgumentError) { | |
723 if (!_done(error, stackTrace)) { | |
724 _closeCompleter.completeError(error, stackTrace); | |
725 } | |
726 } else { | |
727 _done(); | |
728 _closeCompleter.complete(webSocket); | |
729 } | |
730 }); | |
731 } | |
732 | |
733 bool _done([error, StackTrace stackTrace]) { | |
734 if (_completer == null) return false; | |
735 if (error != null) { | |
736 _completer.completeError(error, stackTrace); | |
737 } else { | |
738 _completer.complete(webSocket); | |
739 } | |
740 _completer = null; | |
741 return true; | |
742 } | |
743 | |
744 Future addStream(var stream) { | |
745 if (_closed) { | |
746 stream.listen(null).cancel(); | |
747 return new Future.value(webSocket); | |
748 } | |
749 _ensureController(); | |
750 _completer = new Completer(); | |
751 _subscription = stream.listen( | |
752 (data) { | |
753 _controller.add(data); | |
754 }, | |
755 onDone: _done, | |
756 onError: _done, | |
757 cancelOnError: true); | |
758 if (_issuedPause) { | |
759 _subscription.pause(); | |
760 _issuedPause = false; | |
761 } | |
762 return _completer.future; | |
763 } | |
764 | |
765 Future close() { | |
766 _ensureController(); | |
767 Future closeSocket() { | |
768 return sink.close().catchError((_) {}).then((_) => webSocket); | |
769 } | |
770 _controller.close(); | |
771 return _closeCompleter.future.then((_) => closeSocket()); | |
772 } | |
773 | |
774 void add(data) { | |
775 if (_closed) return; | |
776 _ensureController(); | |
777 _controller.add(data); | |
778 } | |
779 | |
780 void closeSocket() { | |
781 _closed = true; | |
782 _cancel(); | |
783 close(); | |
784 } | |
785 } | |
786 | |
787 | |
788 class _WebSocketImpl extends Stream implements CompatibleWebSocket { | |
789 StreamController _controller; | |
790 StreamSubscription _subscription; | |
791 StreamController _sink; | |
792 | |
793 final bool _serverSide; | |
794 int _readyState = _WebSocketState.CONNECTING; | |
795 bool _writeClosed = false; | |
796 int _closeCode; | |
797 String _closeReason; | |
798 Duration _pingInterval; | |
799 Timer _pingTimer; | |
800 _WebSocketConsumer _consumer; | |
801 | |
802 int _outCloseCode; | |
803 String _outCloseReason; | |
804 Timer _closeTimer; | |
805 | |
806 _WebSocketImpl._fromSocket(Stream<List<int>> stream, | |
807 StreamSink<List<int>> sink, [this._serverSide = false]) { | |
808 _consumer = new _WebSocketConsumer(this, sink); | |
809 _sink = new StreamController(); | |
810 _sink.stream.pipe(_consumer); | |
811 _readyState = _WebSocketState.OPEN; | |
812 | |
813 var transformer = new _WebSocketProtocolTransformer(_serverSide); | |
814 _subscription = stream.transform(transformer).listen( | |
815 (data) { | |
816 if (data is _WebSocketPing) { | |
817 if (!_writeClosed) _consumer.add(new _WebSocketPong(data.payload)); | |
818 } else if (data is _WebSocketPong) { | |
819 // Simply set pingInterval, as it'll cancel any timers. | |
820 pingInterval = _pingInterval; | |
821 } else { | |
822 _controller.add(data); | |
823 } | |
824 }, | |
825 onError: (error) { | |
826 if (_closeTimer != null) _closeTimer.cancel(); | |
827 if (error is FormatException) { | |
828 _close(_WebSocketStatus.INVALID_FRAME_PAYLOAD_DATA); | |
829 } else { | |
830 _close(_WebSocketStatus.PROTOCOL_ERROR); | |
831 } | |
832 _controller.close(); | |
833 }, | |
834 onDone: () { | |
835 if (_closeTimer != null) _closeTimer.cancel(); | |
836 if (_readyState == _WebSocketState.OPEN) { | |
837 _readyState = _WebSocketState.CLOSING; | |
838 if (!_isReservedStatusCode(transformer.closeCode)) { | |
839 _close(transformer.closeCode); | |
840 } else { | |
841 _close(); | |
842 } | |
843 _readyState = _WebSocketState.CLOSED; | |
844 } | |
845 _closeCode = transformer.closeCode; | |
846 _closeReason = transformer.closeReason; | |
847 _controller.close(); | |
848 }, | |
849 cancelOnError: true); | |
850 _subscription.pause(); | |
851 _controller = new StreamController(sync: true, | |
852 onListen: _subscription.resume, | |
853 onPause: _subscription.pause, | |
854 onResume: _subscription.resume); | |
855 } | |
856 | |
857 StreamSubscription listen(void onData(message), | |
858 {Function onError, | |
859 void onDone(), | |
860 bool cancelOnError}) { | |
861 return _controller.stream.listen(onData, | |
862 onError: onError, | |
863 onDone: onDone, | |
864 cancelOnError: cancelOnError); | |
865 } | |
866 | |
867 Duration get pingInterval => _pingInterval; | |
868 | |
869 void set pingInterval(Duration interval) { | |
870 if (_writeClosed) return; | |
871 if (_pingTimer != null) _pingTimer.cancel(); | |
872 _pingInterval = interval; | |
873 | |
874 if (_pingInterval == null) return; | |
875 | |
876 _pingTimer = new Timer(_pingInterval, () { | |
877 if (_writeClosed) return; | |
878 _consumer.add(new _WebSocketPing()); | |
879 _pingTimer = new Timer(_pingInterval, () { | |
880 // No pong received. | |
881 _close(_WebSocketStatus.GOING_AWAY); | |
882 }); | |
883 }); | |
884 } | |
885 | |
886 int get closeCode => _closeCode; | |
887 String get closeReason => _closeReason; | |
888 | |
889 void add(data) => _sink.add(data); | |
890 void addError(error, [StackTrace stackTrace]) => | |
891 _sink.addError(error, stackTrace); | |
892 Future addStream(Stream stream) => _sink.addStream(stream); | |
893 Future get done => _sink.done; | |
894 | |
895 Future close([int code, String reason]) { | |
896 if (_isReservedStatusCode(code)) { | |
897 throw new CompatibleWebSocketException("Reserved status code $code"); | |
898 } | |
899 if (_outCloseCode == null) { | |
900 _outCloseCode = code; | |
901 _outCloseReason = reason; | |
902 } | |
903 if (_closeTimer == null && !_controller.isClosed) { | |
904 // When closing the web-socket, we no longer accept data. | |
905 _closeTimer = new Timer(const Duration(seconds: 5), () { | |
906 _subscription.cancel(); | |
907 _controller.close(); | |
908 }); | |
909 } | |
910 return _sink.close(); | |
911 } | |
912 | |
913 void _close([int code, String reason]) { | |
914 if (_writeClosed) return; | |
915 if (_outCloseCode == null) { | |
916 _outCloseCode = code; | |
917 _outCloseReason = reason; | |
918 } | |
919 _writeClosed = true; | |
920 _consumer.closeSocket(); | |
921 } | |
922 | |
923 static bool _isReservedStatusCode(int code) { | |
924 return code != null && | |
925 (code < _WebSocketStatus.NORMAL_CLOSURE || | |
926 code == _WebSocketStatus.RESERVED_1004 || | |
927 code == _WebSocketStatus.NO_STATUS_RECEIVED || | |
928 code == _WebSocketStatus.ABNORMAL_CLOSURE || | |
929 (code > _WebSocketStatus.INTERNAL_SERVER_ERROR && | |
930 code < _WebSocketStatus.RESERVED_1015) || | |
931 (code >= _WebSocketStatus.RESERVED_1015 && | |
932 code < 3000)); | |
933 } | |
934 } | |
935 | |
OLD | NEW |