Chromium Code Reviews| Index: sdk/lib/io/websocket_impl.dart |
| diff --git a/sdk/lib/io/websocket_impl.dart b/sdk/lib/io/websocket_impl.dart |
| index 4162ca1f0f2745db4b395e75a2246528049feb5c..f101e79da6aee14a1d88ed67544f9602d401b19d 100644 |
| --- a/sdk/lib/io/websocket_impl.dart |
| +++ b/sdk/lib/io/websocket_impl.dart |
| @@ -41,7 +41,7 @@ class _WebSocketOpcode { |
| * socket will be closed when the processer encounter an error. Not using it |
| * will lead to undefined behaviour. |
| */ |
| -class _WebSocketProtocolTransformer extends StreamEventTransformer { |
| +class _WebSocketProtocolTransformer implements StreamTransformer, EventSink { |
| static const int START = 0; |
| static const int LEN_FIRST = 1; |
| static const int LEN_REST = 2; |
| @@ -51,16 +51,35 @@ class _WebSocketProtocolTransformer extends StreamEventTransformer { |
| static const int FAILURE = 6; |
| bool _serverSide; |
| + EventSink _eventSink; |
| _WebSocketProtocolTransformer([bool this._serverSide = false]) { |
| _prepareForNextFrame(); |
| _currentMessageType = _WebSocketMessageType.NONE; |
| } |
| + Stream bind(Stream stream) { |
| + return new Stream.eventTransformed( |
| + stream, |
| + (EventSink eventSink) { |
| + if (_eventSink != null) { |
| + throw new StateError("WebSocket transformer already used."); |
| + } |
| + _eventSink = eventSink; |
| + return this; |
| + }); |
| + } |
| + |
| + void addError(Object error, [StackTrace stackTrace]) { |
| + _eventSink.addError(error, stackTrace); |
| + } |
| + |
| + void close() => _eventSink.close(); |
| + |
| /** |
| * Process data received from the underlying communication channel. |
| */ |
| - void handleData(Uint8List buffer, EventSink sink) { |
| + void add(Uint8List buffer) { |
| int count = buffer.length; |
| int index = 0; |
| int lastIndex = count; |
| @@ -98,8 +117,8 @@ class _WebSocketProtocolTransformer extends StreamEventTransformer { |
| .transform(UTF8.decoder) |
| .fold(new StringBuffer(), (buffer, str) => buffer..write(str)) |
| .then((buffer) { |
| - sink.add(buffer.toString()); |
| - }, onError: sink.addError); |
| + _eventSink.add(buffer.toString()); |
| + }, onError: _eventSink.addError); |
| break; |
| case _WebSocketOpcode.BINARY: |
| @@ -111,8 +130,8 @@ class _WebSocketProtocolTransformer extends StreamEventTransformer { |
| _controller.stream |
| .fold(new BytesBuilder(), (buffer, data) => buffer..add(data)) |
| .then((buffer) { |
| - sink.add(buffer.takeBytes()); |
| - }, onError: sink.addError); |
| + _eventSink.add(buffer.takeBytes()); |
| + }, onError: _eventSink.addError); |
| break; |
| case _WebSocketOpcode.CLOSE: |
| @@ -135,7 +154,7 @@ class _WebSocketProtocolTransformer extends StreamEventTransformer { |
| throw new WebSocketException("Protocol error"); |
| } |
| if (_len < 126) { |
| - _lengthDone(sink); |
| + _lengthDone(); |
| } else if (_len == 126) { |
| _len = 0; |
| _remainingLenBytes = 2; |
| @@ -151,7 +170,7 @@ class _WebSocketProtocolTransformer extends StreamEventTransformer { |
| _len = _len << 8 | byte; |
| _remainingLenBytes--; |
| if (_remainingLenBytes == 0) { |
| - _lengthDone(sink); |
| + _lengthDone(); |
| } |
| break; |
| @@ -159,7 +178,7 @@ class _WebSocketProtocolTransformer extends StreamEventTransformer { |
| _maskingKey = _maskingKey << 8 | byte; |
| _remainingMaskingKeyBytes--; |
| if (_remainingMaskingKeyBytes == 0) { |
| - _maskDone(sink); |
| + _maskDone(); |
| } |
| break; |
| @@ -195,7 +214,7 @@ class _WebSocketProtocolTransformer extends StreamEventTransformer { |
| } |
| if (_remainingPayloadBytes == 0) { |
| - _controlFrameEnd(sink); |
| + _controlFrameEnd(); |
| } |
| } else { |
| if (_currentMessageType != _WebSocketMessageType.TEXT && |
| @@ -206,7 +225,7 @@ class _WebSocketProtocolTransformer extends StreamEventTransformer { |
| new Uint8List.view(buffer.buffer, index, payload)); |
| index += payload; |
| if (_remainingPayloadBytes == 0) { |
| - _messageFrameEnd(sink); |
| + _messageFrameEnd(); |
| } |
| } |
| @@ -218,13 +237,13 @@ class _WebSocketProtocolTransformer extends StreamEventTransformer { |
| // Move to the next byte. |
| index++; |
| } |
| - } catch (e) { |
| + } catch (e, stackTrace) { |
| _state = FAILURE; |
| - sink.addError(e); |
| + _eventSink.addError(e, stackTrace); |
| } |
| } |
| - void _lengthDone(EventSink sink) { |
| + void _lengthDone() { |
| if (_masked) { |
| if (!_serverSide) { |
| throw new WebSocketException("Received masked frame from server"); |
| @@ -236,16 +255,16 @@ class _WebSocketProtocolTransformer extends StreamEventTransformer { |
| throw new WebSocketException("Received unmasked frame from client"); |
| } |
| _remainingPayloadBytes = _len; |
| - _startPayload(sink); |
| + _startPayload(); |
| } |
| } |
| - void _maskDone(EventSink sink) { |
| + void _maskDone() { |
| _remainingPayloadBytes = _len; |
| - _startPayload(sink); |
| + _startPayload(); |
| } |
| - void _startPayload(EventSink sink) { |
| + void _startPayload() { |
| // If there is no actual payload perform perform callbacks without |
| // going through the PAYLOAD state. |
| if (_remainingPayloadBytes == 0) { |
| @@ -253,25 +272,25 @@ class _WebSocketProtocolTransformer extends StreamEventTransformer { |
| switch (_opcode) { |
| case _WebSocketOpcode.CLOSE: |
| _state = CLOSED; |
| - sink.close(); |
| + _eventSink.close(); |
| break; |
| case _WebSocketOpcode.PING: |
| - sink.add(new _WebSocketPing()); |
| + _eventSink.add(new _WebSocketPing()); |
| break; |
| case _WebSocketOpcode.PONG: |
| - sink.add(new _WebSocketPong()); |
| + _eventSink.add(new _WebSocketPong()); |
| break; |
| } |
| _prepareForNextFrame(); |
| } else { |
| - _messageFrameEnd(sink); |
| + _messageFrameEnd(); |
| } |
| } else { |
| _state = PAYLOAD; |
| } |
| } |
| - void _messageFrameEnd(EventSink sink) { |
| + void _messageFrameEnd() { |
| if (_fin) { |
| switch (_currentMessageType) { |
| case _WebSocketMessageType.TEXT: |
| @@ -287,7 +306,7 @@ class _WebSocketProtocolTransformer extends StreamEventTransformer { |
| _prepareForNextFrame(); |
| } |
| - void _controlFrameEnd(EventSink sink) { |
| + void _controlFrameEnd() { |
| switch (_opcode) { |
| case _WebSocketOpcode.CLOSE: |
| closeCode = WebSocketStatus.NO_STATUS_RECEIVED; |
| @@ -304,15 +323,15 @@ class _WebSocketProtocolTransformer extends StreamEventTransformer { |
| } |
| } |
| _state = CLOSED; |
| - sink.close(); |
| + _eventSink.close(); |
| break; |
| case _WebSocketOpcode.PING: |
| - sink.add(new _WebSocketPing(_controlPayload)); |
| + _eventSink.add(new _WebSocketPing(_controlPayload)); |
| break; |
| case _WebSocketOpcode.PONG: |
| - sink.add(new _WebSocketPong(_controlPayload)); |
| + _eventSink.add(new _WebSocketPong(_controlPayload)); |
| break; |
| } |
| _prepareForNextFrame(); |
| @@ -440,18 +459,31 @@ class _WebSocketTransformerImpl implements WebSocketTransformer { |
| } |
| -class _WebSocketOutgoingTransformer extends StreamEventTransformer { |
| +class _WebSocketOutgoingTransformer implements StreamTransformer, EventSink { |
| final _WebSocketImpl webSocket; |
| + EventSink _eventSink; |
| _WebSocketOutgoingTransformer(_WebSocketImpl this.webSocket); |
| - void handleData(message, EventSink<List<int>> sink) { |
| + Stream bind(Stream stream) { |
| + return new Stream.eventTransformed( |
| + stream, |
| + (EventSink eventSink) { |
| + if (_eventSink != null) { |
| + throw new StateError("WebSocket transformer already used"); |
|
Lasse Reichstein Nielsen
2013/10/07 11:47:00
If it didn't try to put everything into one class,
floitsch
2013/10/10 15:39:57
Agreed.
Added a TODO. I just wanted to keep the ch
|
| + } |
| + _eventSink = eventSink; |
| + return this; |
| + }); |
| + } |
| + |
| + void add(message) { |
| if (message is _WebSocketPong) { |
| - addFrame(_WebSocketOpcode.PONG, message.payload, sink); |
| + addFrame(_WebSocketOpcode.PONG, message.payload); |
| return; |
| } |
| if (message is _WebSocketPing) { |
| - addFrame(_WebSocketOpcode.PING, message.payload, sink); |
| + addFrame(_WebSocketOpcode.PING, message.payload); |
| return; |
| } |
| List<int> data; |
| @@ -470,10 +502,14 @@ class _WebSocketOutgoingTransformer extends StreamEventTransformer { |
| } else { |
| opcode = _WebSocketOpcode.TEXT; |
| } |
| - addFrame(opcode, data, sink); |
| + addFrame(opcode, data); |
| + } |
| + |
| + void addError(Object error, [StackTrace stackTrace]) { |
| + _eventSink.addError(error, stackTrace); |
| } |
| - void handleDone(EventSink<List<int>> sink) { |
| + void close() { |
| int code = webSocket._outCloseCode; |
| String reason = webSocket._outCloseReason; |
| List<int> data; |
| @@ -485,12 +521,12 @@ class _WebSocketOutgoingTransformer extends StreamEventTransformer { |
| data.addAll(UTF8.encode(reason)); |
| } |
| } |
| - addFrame(_WebSocketOpcode.CLOSE, data, sink); |
| - sink.close(); |
| + addFrame(_WebSocketOpcode.CLOSE, data); |
| + _eventSink.close(); |
| } |
| - void addFrame(int opcode, List<int> data, EventSink<List<int>> sink) { |
| - createFrame(opcode, data, webSocket._serverSide).forEach(sink.add); |
| + void addFrame(int opcode, List<int> data) { |
| + createFrame(opcode, data, webSocket._serverSide).forEach(_eventSink.add); |
| } |
| static Iterable createFrame(int opcode, List<int> data, bool serverSide) { |