| Index: sdk/lib/io/websocket_impl.dart
|
| diff --git a/sdk/lib/io/websocket_impl.dart b/sdk/lib/io/websocket_impl.dart
|
| index 4162ca1f0f2745db4b395e75a2246528049feb5c..cb6d099d154f35f658621f214bd519058fb35ad1 100644
|
| --- a/sdk/lib/io/websocket_impl.dart
|
| +++ b/sdk/lib/io/websocket_impl.dart
|
| @@ -41,7 +41,8 @@ class _WebSocketOpcode {
|
| * socket will be closed when the processer encounter an error. Not using it
|
| * will lead to undefined behaviour.
|
| */
|
| -class _WebSocketProtocolTransformer extends StreamEventTransformer {
|
| +// TODO(ajohnsen): make this transformer reusable?
|
| +class _WebSocketProtocolTransformer implements StreamTransformer, EventSink {
|
| static const int START = 0;
|
| static const int LEN_FIRST = 1;
|
| static const int LEN_REST = 2;
|
| @@ -51,16 +52,35 @@ class _WebSocketProtocolTransformer extends StreamEventTransformer {
|
| static const int FAILURE = 6;
|
|
|
| bool _serverSide;
|
| + EventSink _eventSink;
|
|
|
| _WebSocketProtocolTransformer([bool this._serverSide = false]) {
|
| _prepareForNextFrame();
|
| _currentMessageType = _WebSocketMessageType.NONE;
|
| }
|
|
|
| + Stream bind(Stream stream) {
|
| + return new Stream.eventTransformed(
|
| + stream,
|
| + (EventSink eventSink) {
|
| + if (_eventSink != null) {
|
| + throw new StateError("WebSocket transformer already used.");
|
| + }
|
| + _eventSink = eventSink;
|
| + return this;
|
| + });
|
| + }
|
| +
|
| + void addError(Object error, [StackTrace stackTrace]) {
|
| + _eventSink.addError(error, stackTrace);
|
| + }
|
| +
|
| + void close() => _eventSink.close();
|
| +
|
| /**
|
| * Process data received from the underlying communication channel.
|
| */
|
| - void handleData(Uint8List buffer, EventSink sink) {
|
| + void add(Uint8List buffer) {
|
| int count = buffer.length;
|
| int index = 0;
|
| int lastIndex = count;
|
| @@ -98,8 +118,8 @@ class _WebSocketProtocolTransformer extends StreamEventTransformer {
|
| .transform(UTF8.decoder)
|
| .fold(new StringBuffer(), (buffer, str) => buffer..write(str))
|
| .then((buffer) {
|
| - sink.add(buffer.toString());
|
| - }, onError: sink.addError);
|
| + _eventSink.add(buffer.toString());
|
| + }, onError: _eventSink.addError);
|
| break;
|
|
|
| case _WebSocketOpcode.BINARY:
|
| @@ -111,8 +131,8 @@ class _WebSocketProtocolTransformer extends StreamEventTransformer {
|
| _controller.stream
|
| .fold(new BytesBuilder(), (buffer, data) => buffer..add(data))
|
| .then((buffer) {
|
| - sink.add(buffer.takeBytes());
|
| - }, onError: sink.addError);
|
| + _eventSink.add(buffer.takeBytes());
|
| + }, onError: _eventSink.addError);
|
| break;
|
|
|
| case _WebSocketOpcode.CLOSE:
|
| @@ -135,7 +155,7 @@ class _WebSocketProtocolTransformer extends StreamEventTransformer {
|
| throw new WebSocketException("Protocol error");
|
| }
|
| if (_len < 126) {
|
| - _lengthDone(sink);
|
| + _lengthDone();
|
| } else if (_len == 126) {
|
| _len = 0;
|
| _remainingLenBytes = 2;
|
| @@ -151,7 +171,7 @@ class _WebSocketProtocolTransformer extends StreamEventTransformer {
|
| _len = _len << 8 | byte;
|
| _remainingLenBytes--;
|
| if (_remainingLenBytes == 0) {
|
| - _lengthDone(sink);
|
| + _lengthDone();
|
| }
|
| break;
|
|
|
| @@ -159,7 +179,7 @@ class _WebSocketProtocolTransformer extends StreamEventTransformer {
|
| _maskingKey = _maskingKey << 8 | byte;
|
| _remainingMaskingKeyBytes--;
|
| if (_remainingMaskingKeyBytes == 0) {
|
| - _maskDone(sink);
|
| + _maskDone();
|
| }
|
| break;
|
|
|
| @@ -195,7 +215,7 @@ class _WebSocketProtocolTransformer extends StreamEventTransformer {
|
| }
|
|
|
| if (_remainingPayloadBytes == 0) {
|
| - _controlFrameEnd(sink);
|
| + _controlFrameEnd();
|
| }
|
| } else {
|
| if (_currentMessageType != _WebSocketMessageType.TEXT &&
|
| @@ -206,7 +226,7 @@ class _WebSocketProtocolTransformer extends StreamEventTransformer {
|
| new Uint8List.view(buffer.buffer, index, payload));
|
| index += payload;
|
| if (_remainingPayloadBytes == 0) {
|
| - _messageFrameEnd(sink);
|
| + _messageFrameEnd();
|
| }
|
| }
|
|
|
| @@ -218,13 +238,13 @@ class _WebSocketProtocolTransformer extends StreamEventTransformer {
|
| // Move to the next byte.
|
| index++;
|
| }
|
| - } catch (e) {
|
| + } catch (e, stackTrace) {
|
| _state = FAILURE;
|
| - sink.addError(e);
|
| + _eventSink.addError(e, stackTrace);
|
| }
|
| }
|
|
|
| - void _lengthDone(EventSink sink) {
|
| + void _lengthDone() {
|
| if (_masked) {
|
| if (!_serverSide) {
|
| throw new WebSocketException("Received masked frame from server");
|
| @@ -236,16 +256,16 @@ class _WebSocketProtocolTransformer extends StreamEventTransformer {
|
| throw new WebSocketException("Received unmasked frame from client");
|
| }
|
| _remainingPayloadBytes = _len;
|
| - _startPayload(sink);
|
| + _startPayload();
|
| }
|
| }
|
|
|
| - void _maskDone(EventSink sink) {
|
| + void _maskDone() {
|
| _remainingPayloadBytes = _len;
|
| - _startPayload(sink);
|
| + _startPayload();
|
| }
|
|
|
| - void _startPayload(EventSink sink) {
|
| + void _startPayload() {
|
| // If there is no actual payload perform perform callbacks without
|
| // going through the PAYLOAD state.
|
| if (_remainingPayloadBytes == 0) {
|
| @@ -253,25 +273,25 @@ class _WebSocketProtocolTransformer extends StreamEventTransformer {
|
| switch (_opcode) {
|
| case _WebSocketOpcode.CLOSE:
|
| _state = CLOSED;
|
| - sink.close();
|
| + _eventSink.close();
|
| break;
|
| case _WebSocketOpcode.PING:
|
| - sink.add(new _WebSocketPing());
|
| + _eventSink.add(new _WebSocketPing());
|
| break;
|
| case _WebSocketOpcode.PONG:
|
| - sink.add(new _WebSocketPong());
|
| + _eventSink.add(new _WebSocketPong());
|
| break;
|
| }
|
| _prepareForNextFrame();
|
| } else {
|
| - _messageFrameEnd(sink);
|
| + _messageFrameEnd();
|
| }
|
| } else {
|
| _state = PAYLOAD;
|
| }
|
| }
|
|
|
| - void _messageFrameEnd(EventSink sink) {
|
| + void _messageFrameEnd() {
|
| if (_fin) {
|
| switch (_currentMessageType) {
|
| case _WebSocketMessageType.TEXT:
|
| @@ -287,7 +307,7 @@ class _WebSocketProtocolTransformer extends StreamEventTransformer {
|
| _prepareForNextFrame();
|
| }
|
|
|
| - void _controlFrameEnd(EventSink sink) {
|
| + void _controlFrameEnd() {
|
| switch (_opcode) {
|
| case _WebSocketOpcode.CLOSE:
|
| closeCode = WebSocketStatus.NO_STATUS_RECEIVED;
|
| @@ -304,15 +324,15 @@ class _WebSocketProtocolTransformer extends StreamEventTransformer {
|
| }
|
| }
|
| _state = CLOSED;
|
| - sink.close();
|
| + _eventSink.close();
|
| break;
|
|
|
| case _WebSocketOpcode.PING:
|
| - sink.add(new _WebSocketPing(_controlPayload));
|
| + _eventSink.add(new _WebSocketPing(_controlPayload));
|
| break;
|
|
|
| case _WebSocketOpcode.PONG:
|
| - sink.add(new _WebSocketPong(_controlPayload));
|
| + _eventSink.add(new _WebSocketPong(_controlPayload));
|
| break;
|
| }
|
| _prepareForNextFrame();
|
| @@ -440,18 +460,32 @@ class _WebSocketTransformerImpl implements WebSocketTransformer {
|
| }
|
|
|
|
|
| -class _WebSocketOutgoingTransformer extends StreamEventTransformer {
|
| +// TODO(ajohnsen): Make this transformer reusable.
|
| +class _WebSocketOutgoingTransformer implements StreamTransformer, EventSink {
|
| final _WebSocketImpl webSocket;
|
| + EventSink _eventSink;
|
|
|
| _WebSocketOutgoingTransformer(_WebSocketImpl this.webSocket);
|
|
|
| - void handleData(message, EventSink<List<int>> sink) {
|
| + Stream bind(Stream stream) {
|
| + return new Stream.eventTransformed(
|
| + stream,
|
| + (EventSink eventSink) {
|
| + if (_eventSink != null) {
|
| + throw new StateError("WebSocket transformer already used");
|
| + }
|
| + _eventSink = eventSink;
|
| + return this;
|
| + });
|
| + }
|
| +
|
| + void add(message) {
|
| if (message is _WebSocketPong) {
|
| - addFrame(_WebSocketOpcode.PONG, message.payload, sink);
|
| + addFrame(_WebSocketOpcode.PONG, message.payload);
|
| return;
|
| }
|
| if (message is _WebSocketPing) {
|
| - addFrame(_WebSocketOpcode.PING, message.payload, sink);
|
| + addFrame(_WebSocketOpcode.PING, message.payload);
|
| return;
|
| }
|
| List<int> data;
|
| @@ -470,10 +504,14 @@ class _WebSocketOutgoingTransformer extends StreamEventTransformer {
|
| } else {
|
| opcode = _WebSocketOpcode.TEXT;
|
| }
|
| - addFrame(opcode, data, sink);
|
| + addFrame(opcode, data);
|
| + }
|
| +
|
| + void addError(Object error, [StackTrace stackTrace]) {
|
| + _eventSink.addError(error, stackTrace);
|
| }
|
|
|
| - void handleDone(EventSink<List<int>> sink) {
|
| + void close() {
|
| int code = webSocket._outCloseCode;
|
| String reason = webSocket._outCloseReason;
|
| List<int> data;
|
| @@ -485,12 +523,12 @@ class _WebSocketOutgoingTransformer extends StreamEventTransformer {
|
| data.addAll(UTF8.encode(reason));
|
| }
|
| }
|
| - addFrame(_WebSocketOpcode.CLOSE, data, sink);
|
| - sink.close();
|
| + addFrame(_WebSocketOpcode.CLOSE, data);
|
| + _eventSink.close();
|
| }
|
|
|
| - void addFrame(int opcode, List<int> data, EventSink<List<int>> sink) {
|
| - createFrame(opcode, data, webSocket._serverSide).forEach(sink.add);
|
| + void addFrame(int opcode, List<int> data) {
|
| + createFrame(opcode, data, webSocket._serverSide).forEach(_eventSink.add);
|
| }
|
|
|
| static Iterable createFrame(int opcode, List<int> data, bool serverSide) {
|
|
|