Index: sdk/lib/io/websocket_impl.dart |
diff --git a/sdk/lib/io/websocket_impl.dart b/sdk/lib/io/websocket_impl.dart |
index 4162ca1f0f2745db4b395e75a2246528049feb5c..cb6d099d154f35f658621f214bd519058fb35ad1 100644 |
--- a/sdk/lib/io/websocket_impl.dart |
+++ b/sdk/lib/io/websocket_impl.dart |
@@ -41,7 +41,8 @@ class _WebSocketOpcode { |
* socket will be closed when the processer encounter an error. Not using it |
* will lead to undefined behaviour. |
*/ |
-class _WebSocketProtocolTransformer extends StreamEventTransformer { |
+// TODO(ajohnsen): make this transformer reusable? |
+class _WebSocketProtocolTransformer implements StreamTransformer, EventSink { |
static const int START = 0; |
static const int LEN_FIRST = 1; |
static const int LEN_REST = 2; |
@@ -51,16 +52,35 @@ class _WebSocketProtocolTransformer extends StreamEventTransformer { |
static const int FAILURE = 6; |
bool _serverSide; |
+ EventSink _eventSink; |
_WebSocketProtocolTransformer([bool this._serverSide = false]) { |
_prepareForNextFrame(); |
_currentMessageType = _WebSocketMessageType.NONE; |
} |
+ Stream bind(Stream stream) { |
+ return new Stream.eventTransformed( |
+ stream, |
+ (EventSink eventSink) { |
+ if (_eventSink != null) { |
+ throw new StateError("WebSocket transformer already used."); |
+ } |
+ _eventSink = eventSink; |
+ return this; |
+ }); |
+ } |
+ |
+ void addError(Object error, [StackTrace stackTrace]) { |
+ _eventSink.addError(error, stackTrace); |
+ } |
+ |
+ void close() => _eventSink.close(); |
+ |
/** |
* Process data received from the underlying communication channel. |
*/ |
- void handleData(Uint8List buffer, EventSink sink) { |
+ void add(Uint8List buffer) { |
int count = buffer.length; |
int index = 0; |
int lastIndex = count; |
@@ -98,8 +118,8 @@ class _WebSocketProtocolTransformer extends StreamEventTransformer { |
.transform(UTF8.decoder) |
.fold(new StringBuffer(), (buffer, str) => buffer..write(str)) |
.then((buffer) { |
- sink.add(buffer.toString()); |
- }, onError: sink.addError); |
+ _eventSink.add(buffer.toString()); |
+ }, onError: _eventSink.addError); |
break; |
case _WebSocketOpcode.BINARY: |
@@ -111,8 +131,8 @@ class _WebSocketProtocolTransformer extends StreamEventTransformer { |
_controller.stream |
.fold(new BytesBuilder(), (buffer, data) => buffer..add(data)) |
.then((buffer) { |
- sink.add(buffer.takeBytes()); |
- }, onError: sink.addError); |
+ _eventSink.add(buffer.takeBytes()); |
+ }, onError: _eventSink.addError); |
break; |
case _WebSocketOpcode.CLOSE: |
@@ -135,7 +155,7 @@ class _WebSocketProtocolTransformer extends StreamEventTransformer { |
throw new WebSocketException("Protocol error"); |
} |
if (_len < 126) { |
- _lengthDone(sink); |
+ _lengthDone(); |
} else if (_len == 126) { |
_len = 0; |
_remainingLenBytes = 2; |
@@ -151,7 +171,7 @@ class _WebSocketProtocolTransformer extends StreamEventTransformer { |
_len = _len << 8 | byte; |
_remainingLenBytes--; |
if (_remainingLenBytes == 0) { |
- _lengthDone(sink); |
+ _lengthDone(); |
} |
break; |
@@ -159,7 +179,7 @@ class _WebSocketProtocolTransformer extends StreamEventTransformer { |
_maskingKey = _maskingKey << 8 | byte; |
_remainingMaskingKeyBytes--; |
if (_remainingMaskingKeyBytes == 0) { |
- _maskDone(sink); |
+ _maskDone(); |
} |
break; |
@@ -195,7 +215,7 @@ class _WebSocketProtocolTransformer extends StreamEventTransformer { |
} |
if (_remainingPayloadBytes == 0) { |
- _controlFrameEnd(sink); |
+ _controlFrameEnd(); |
} |
} else { |
if (_currentMessageType != _WebSocketMessageType.TEXT && |
@@ -206,7 +226,7 @@ class _WebSocketProtocolTransformer extends StreamEventTransformer { |
new Uint8List.view(buffer.buffer, index, payload)); |
index += payload; |
if (_remainingPayloadBytes == 0) { |
- _messageFrameEnd(sink); |
+ _messageFrameEnd(); |
} |
} |
@@ -218,13 +238,13 @@ class _WebSocketProtocolTransformer extends StreamEventTransformer { |
// Move to the next byte. |
index++; |
} |
- } catch (e) { |
+ } catch (e, stackTrace) { |
_state = FAILURE; |
- sink.addError(e); |
+ _eventSink.addError(e, stackTrace); |
} |
} |
- void _lengthDone(EventSink sink) { |
+ void _lengthDone() { |
if (_masked) { |
if (!_serverSide) { |
throw new WebSocketException("Received masked frame from server"); |
@@ -236,16 +256,16 @@ class _WebSocketProtocolTransformer extends StreamEventTransformer { |
throw new WebSocketException("Received unmasked frame from client"); |
} |
_remainingPayloadBytes = _len; |
- _startPayload(sink); |
+ _startPayload(); |
} |
} |
- void _maskDone(EventSink sink) { |
+ void _maskDone() { |
_remainingPayloadBytes = _len; |
- _startPayload(sink); |
+ _startPayload(); |
} |
- void _startPayload(EventSink sink) { |
+ void _startPayload() { |
// If there is no actual payload perform perform callbacks without |
// going through the PAYLOAD state. |
if (_remainingPayloadBytes == 0) { |
@@ -253,25 +273,25 @@ class _WebSocketProtocolTransformer extends StreamEventTransformer { |
switch (_opcode) { |
case _WebSocketOpcode.CLOSE: |
_state = CLOSED; |
- sink.close(); |
+ _eventSink.close(); |
break; |
case _WebSocketOpcode.PING: |
- sink.add(new _WebSocketPing()); |
+ _eventSink.add(new _WebSocketPing()); |
break; |
case _WebSocketOpcode.PONG: |
- sink.add(new _WebSocketPong()); |
+ _eventSink.add(new _WebSocketPong()); |
break; |
} |
_prepareForNextFrame(); |
} else { |
- _messageFrameEnd(sink); |
+ _messageFrameEnd(); |
} |
} else { |
_state = PAYLOAD; |
} |
} |
- void _messageFrameEnd(EventSink sink) { |
+ void _messageFrameEnd() { |
if (_fin) { |
switch (_currentMessageType) { |
case _WebSocketMessageType.TEXT: |
@@ -287,7 +307,7 @@ class _WebSocketProtocolTransformer extends StreamEventTransformer { |
_prepareForNextFrame(); |
} |
- void _controlFrameEnd(EventSink sink) { |
+ void _controlFrameEnd() { |
switch (_opcode) { |
case _WebSocketOpcode.CLOSE: |
closeCode = WebSocketStatus.NO_STATUS_RECEIVED; |
@@ -304,15 +324,15 @@ class _WebSocketProtocolTransformer extends StreamEventTransformer { |
} |
} |
_state = CLOSED; |
- sink.close(); |
+ _eventSink.close(); |
break; |
case _WebSocketOpcode.PING: |
- sink.add(new _WebSocketPing(_controlPayload)); |
+ _eventSink.add(new _WebSocketPing(_controlPayload)); |
break; |
case _WebSocketOpcode.PONG: |
- sink.add(new _WebSocketPong(_controlPayload)); |
+ _eventSink.add(new _WebSocketPong(_controlPayload)); |
break; |
} |
_prepareForNextFrame(); |
@@ -440,18 +460,32 @@ class _WebSocketTransformerImpl implements WebSocketTransformer { |
} |
-class _WebSocketOutgoingTransformer extends StreamEventTransformer { |
+// TODO(ajohnsen): Make this transformer reusable. |
+class _WebSocketOutgoingTransformer implements StreamTransformer, EventSink { |
final _WebSocketImpl webSocket; |
+ EventSink _eventSink; |
_WebSocketOutgoingTransformer(_WebSocketImpl this.webSocket); |
- void handleData(message, EventSink<List<int>> sink) { |
+ Stream bind(Stream stream) { |
+ return new Stream.eventTransformed( |
+ stream, |
+ (EventSink eventSink) { |
+ if (_eventSink != null) { |
+ throw new StateError("WebSocket transformer already used"); |
+ } |
+ _eventSink = eventSink; |
+ return this; |
+ }); |
+ } |
+ |
+ void add(message) { |
if (message is _WebSocketPong) { |
- addFrame(_WebSocketOpcode.PONG, message.payload, sink); |
+ addFrame(_WebSocketOpcode.PONG, message.payload); |
return; |
} |
if (message is _WebSocketPing) { |
- addFrame(_WebSocketOpcode.PING, message.payload, sink); |
+ addFrame(_WebSocketOpcode.PING, message.payload); |
return; |
} |
List<int> data; |
@@ -470,10 +504,14 @@ class _WebSocketOutgoingTransformer extends StreamEventTransformer { |
} else { |
opcode = _WebSocketOpcode.TEXT; |
} |
- addFrame(opcode, data, sink); |
+ addFrame(opcode, data); |
+ } |
+ |
+ void addError(Object error, [StackTrace stackTrace]) { |
+ _eventSink.addError(error, stackTrace); |
} |
- void handleDone(EventSink<List<int>> sink) { |
+ void close() { |
int code = webSocket._outCloseCode; |
String reason = webSocket._outCloseReason; |
List<int> data; |
@@ -485,12 +523,12 @@ class _WebSocketOutgoingTransformer extends StreamEventTransformer { |
data.addAll(UTF8.encode(reason)); |
} |
} |
- addFrame(_WebSocketOpcode.CLOSE, data, sink); |
- sink.close(); |
+ addFrame(_WebSocketOpcode.CLOSE, data); |
+ _eventSink.close(); |
} |
- void addFrame(int opcode, List<int> data, EventSink<List<int>> sink) { |
- createFrame(opcode, data, webSocket._serverSide).forEach(sink.add); |
+ void addFrame(int opcode, List<int> data) { |
+ createFrame(opcode, data, webSocket._serverSide).forEach(_eventSink.add); |
} |
static Iterable createFrame(int opcode, List<int> data, bool serverSide) { |