OLD | NEW |
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 part of dart.io; | 5 part of dart.io; |
6 | 6 |
7 const String _webSocketGUID = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"; | 7 const String _webSocketGUID = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"; |
8 | 8 |
9 class _WebSocketMessageType { | 9 class _WebSocketMessageType { |
10 static const int NONE = 0; | 10 static const int NONE = 0; |
(...skipping 40 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
51 static const int FAILURE = 6; | 51 static const int FAILURE = 6; |
52 | 52 |
53 _WebSocketProtocolTransformer() { | 53 _WebSocketProtocolTransformer() { |
54 _prepareForNextFrame(); | 54 _prepareForNextFrame(); |
55 _currentMessageType = _WebSocketMessageType.NONE; | 55 _currentMessageType = _WebSocketMessageType.NONE; |
56 } | 56 } |
57 | 57 |
58 /** | 58 /** |
59 * Process data received from the underlying communication channel. | 59 * Process data received from the underlying communication channel. |
60 */ | 60 */ |
61 void handleData(List<int> buffer, StreamSink sink) { | 61 void handleData(List<int> buffer, EventSink sink) { |
62 int count = buffer.length; | 62 int count = buffer.length; |
63 int index = 0; | 63 int index = 0; |
64 int lastIndex = count; | 64 int lastIndex = count; |
65 try { | 65 try { |
66 if (_state == CLOSED) { | 66 if (_state == CLOSED) { |
67 throw new WebSocketException("Data on closed connection"); | 67 throw new WebSocketException("Data on closed connection"); |
68 } | 68 } |
69 if (_state == FAILURE) { | 69 if (_state == FAILURE) { |
70 throw new WebSocketException("Data on failed connection"); | 70 throw new WebSocketException("Data on failed connection"); |
71 } | 71 } |
(...skipping 137 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
209 // Hack - as we always do index++ below. | 209 // Hack - as we always do index++ below. |
210 index--; | 210 index--; |
211 break; | 211 break; |
212 } | 212 } |
213 | 213 |
214 // Move to the next byte. | 214 // Move to the next byte. |
215 index++; | 215 index++; |
216 } | 216 } |
217 } catch (e) { | 217 } catch (e) { |
218 _state = FAILURE; | 218 _state = FAILURE; |
219 sink.signalError(e); | 219 sink.addError(e); |
220 } | 220 } |
221 } | 221 } |
222 | 222 |
223 void _lengthDone(StreamSink sink) { | 223 void _lengthDone(EventSink sink) { |
224 if (_masked) { | 224 if (_masked) { |
225 _state = MASK; | 225 _state = MASK; |
226 _remainingMaskingKeyBytes = 4; | 226 _remainingMaskingKeyBytes = 4; |
227 } else { | 227 } else { |
228 _remainingPayloadBytes = _len; | 228 _remainingPayloadBytes = _len; |
229 _startPayload(sink); | 229 _startPayload(sink); |
230 } | 230 } |
231 } | 231 } |
232 | 232 |
233 void _maskDone(StreamSink sink) { | 233 void _maskDone(EventSink sink) { |
234 _remainingPayloadBytes = _len; | 234 _remainingPayloadBytes = _len; |
235 _startPayload(sink); | 235 _startPayload(sink); |
236 } | 236 } |
237 | 237 |
238 void _startPayload(StreamSink sink) { | 238 void _startPayload(EventSink sink) { |
239 // If there is no actual payload perform perform callbacks without | 239 // If there is no actual payload perform perform callbacks without |
240 // going through the PAYLOAD state. | 240 // going through the PAYLOAD state. |
241 if (_remainingPayloadBytes == 0) { | 241 if (_remainingPayloadBytes == 0) { |
242 if (_isControlFrame()) { | 242 if (_isControlFrame()) { |
243 switch (_opcode) { | 243 switch (_opcode) { |
244 case _WebSocketOpcode.CLOSE: | 244 case _WebSocketOpcode.CLOSE: |
245 _state = CLOSED; | 245 _state = CLOSED; |
246 sink.close(); | 246 sink.close(); |
247 break; | 247 break; |
248 case _WebSocketOpcode.PING: | 248 case _WebSocketOpcode.PING: |
249 // TODO(ajohnsen): Handle ping. | 249 // TODO(ajohnsen): Handle ping. |
250 break; | 250 break; |
251 case _WebSocketOpcode.PONG: | 251 case _WebSocketOpcode.PONG: |
252 // TODO(ajohnsen): Handle pong. | 252 // TODO(ajohnsen): Handle pong. |
253 break; | 253 break; |
254 } | 254 } |
255 _prepareForNextFrame(); | 255 _prepareForNextFrame(); |
256 } else { | 256 } else { |
257 _messageFrameEnd(sink); | 257 _messageFrameEnd(sink); |
258 } | 258 } |
259 } else { | 259 } else { |
260 _state = PAYLOAD; | 260 _state = PAYLOAD; |
261 } | 261 } |
262 } | 262 } |
263 | 263 |
264 void _messageFrameEnd(StreamSink sink) { | 264 void _messageFrameEnd(EventSink sink) { |
265 if (_fin) { | 265 if (_fin) { |
266 switch (_currentMessageType) { | 266 switch (_currentMessageType) { |
267 case _WebSocketMessageType.TEXT: | 267 case _WebSocketMessageType.TEXT: |
268 sink.add(_buffer.toString()); | 268 sink.add(_buffer.toString()); |
269 break; | 269 break; |
270 case _WebSocketMessageType.BINARY: | 270 case _WebSocketMessageType.BINARY: |
271 if (_buffer.length == 0) { | 271 if (_buffer.length == 0) { |
272 sink.add(const []); | 272 sink.add(const []); |
273 } else { | 273 } else { |
274 sink.add(_buffer.readBytes(_buffer.length)); | 274 sink.add(_buffer.readBytes(_buffer.length)); |
275 } | 275 } |
276 break; | 276 break; |
277 } | 277 } |
278 _buffer = null; | 278 _buffer = null; |
279 _currentMessageType = _WebSocketMessageType.NONE; | 279 _currentMessageType = _WebSocketMessageType.NONE; |
280 } | 280 } |
281 _prepareForNextFrame(); | 281 _prepareForNextFrame(); |
282 } | 282 } |
283 | 283 |
284 void _controlFrameEnd(StreamSink sink) { | 284 void _controlFrameEnd(EventSink sink) { |
285 switch (_opcode) { | 285 switch (_opcode) { |
286 case _WebSocketOpcode.CLOSE: | 286 case _WebSocketOpcode.CLOSE: |
287 closeCode = WebSocketStatus.NO_STATUS_RECEIVED; | 287 closeCode = WebSocketStatus.NO_STATUS_RECEIVED; |
288 if (_controlPayload.length > 0) { | 288 if (_controlPayload.length > 0) { |
289 if (_controlPayload.length == 1) { | 289 if (_controlPayload.length == 1) { |
290 throw new WebSocketException("Protocol error"); | 290 throw new WebSocketException("Protocol error"); |
291 } | 291 } |
292 closeCode = _controlPayload[0] << 8 | _controlPayload[1]; | 292 closeCode = _controlPayload[0] << 8 | _controlPayload[1]; |
293 if (closeCode == WebSocketStatus.NO_STATUS_RECEIVED) { | 293 if (closeCode == WebSocketStatus.NO_STATUS_RECEIVED) { |
294 throw new WebSocketException("Protocol error"); | 294 throw new WebSocketException("Protocol error"); |
(...skipping 59 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
354 | 354 |
355 | 355 |
356 class _WebSocketTransformerImpl implements WebSocketTransformer { | 356 class _WebSocketTransformerImpl implements WebSocketTransformer { |
357 final StreamController<WebSocket> _controller = | 357 final StreamController<WebSocket> _controller = |
358 new StreamController<WebSocket>(); | 358 new StreamController<WebSocket>(); |
359 | 359 |
360 Stream<WebSocket> bind(Stream<HttpRequest> stream) { | 360 Stream<WebSocket> bind(Stream<HttpRequest> stream) { |
361 stream.listen((request) { | 361 stream.listen((request) { |
362 _upgrade(request) | 362 _upgrade(request) |
363 .then((WebSocket webSocket) => _controller.add(webSocket)) | 363 .then((WebSocket webSocket) => _controller.add(webSocket)) |
364 .catchError((error) => _controller.signalError(error)); | 364 .catchError((error) => _controller.addError(error)); |
365 }); | 365 }); |
366 | 366 |
367 return _controller.stream; | 367 return _controller.stream; |
368 } | 368 } |
369 | 369 |
370 static Future<WebSocket> _upgrade(HttpRequest request) { | 370 static Future<WebSocket> _upgrade(HttpRequest request) { |
371 var response = request.response; | 371 var response = request.response; |
372 if (!_isUpgradeRequest(request)) { | 372 if (!_isUpgradeRequest(request)) { |
373 // Send error response and drain the request. | 373 // Send error response and drain the request. |
374 request.listen((_) {}, onDone: () { | 374 request.listen((_) {}, onDone: () { |
(...skipping 134 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
509 | 509 |
510 bool closed = false; | 510 bool closed = false; |
511 var transformer = new _WebSocketProtocolTransformer(); | 511 var transformer = new _WebSocketProtocolTransformer(); |
512 _socket.transform(transformer).listen( | 512 _socket.transform(transformer).listen( |
513 (data) { | 513 (data) { |
514 _controller.add(data); | 514 _controller.add(data); |
515 }, | 515 }, |
516 onError: (error) { | 516 onError: (error) { |
517 if (closed) return; | 517 if (closed) return; |
518 closed = true; | 518 closed = true; |
519 _controller.signalError(error); | 519 _controller.addError(error); |
520 _controller.close(); | 520 _controller.close(); |
521 }, | 521 }, |
522 onDone: () { | 522 onDone: () { |
523 if (closed) return; | 523 if (closed) return; |
524 closed = true; | 524 closed = true; |
525 if (_readyState == WebSocket.OPEN) { | 525 if (_readyState == WebSocket.OPEN) { |
526 _readyState = WebSocket.CLOSING; | 526 _readyState = WebSocket.CLOSING; |
527 if (transformer.closeCode != WebSocketStatus.NO_STATUS_RECEIVED) { | 527 if (transformer.closeCode != WebSocketStatus.NO_STATUS_RECEIVED) { |
528 _close(transformer.closeCode); | 528 _close(transformer.closeCode); |
529 } else { | 529 } else { |
530 _close(); | 530 _close(); |
531 } | 531 } |
532 _readyState = WebSocket.CLOSED; | 532 _readyState = WebSocket.CLOSED; |
533 } | 533 } |
534 _closeCode = transformer.closeCode; | 534 _closeCode = transformer.closeCode; |
535 _closeReason = transformer.closeReason; | 535 _closeReason = transformer.closeReason; |
536 _controller.close(); | 536 _controller.close(); |
537 if (_writeClosed) _socket.destroy(); | 537 if (_writeClosed) _socket.destroy(); |
538 }, | 538 }, |
539 unsubscribeOnError: true); | 539 unsubscribeOnError: true); |
540 | 540 |
541 _socket.done | 541 _socket.done |
542 .catchError((error) { | 542 .catchError((error) { |
543 if (closed) return; | 543 if (closed) return; |
544 closed = true; | 544 closed = true; |
545 _readyState = WebSocket.CLOSED; | 545 _readyState = WebSocket.CLOSED; |
546 _closeCode = WebSocketStatus.ABNORMAL_CLOSURE; | 546 _closeCode = WebSocketStatus.ABNORMAL_CLOSURE; |
547 _controller.signalError(error); | 547 _controller.addError(error); |
548 _controller.close(); | 548 _controller.close(); |
549 }) | 549 }) |
550 .whenComplete(() { | 550 .whenComplete(() { |
551 _writeClosed = true; | 551 _writeClosed = true; |
552 }); | 552 }); |
553 } | 553 } |
554 | 554 |
555 StreamSubscription listen(void onData(message), | 555 StreamSubscription listen(void onData(message), |
556 {void onError(AsyncError error), | 556 {void onError(AsyncError error), |
557 void onDone(), | 557 void onDone(), |
(...skipping 105 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
663 if (data != null) { | 663 if (data != null) { |
664 _socket.add(data); | 664 _socket.add(data); |
665 } | 665 } |
666 } catch (_) { | 666 } catch (_) { |
667 // The socket can be closed before _socket.done have a chance | 667 // The socket can be closed before _socket.done have a chance |
668 // to complete. | 668 // to complete. |
669 _writeClosed = true; | 669 _writeClosed = true; |
670 } | 670 } |
671 } | 671 } |
672 } | 672 } |
OLD | NEW |