Index: sdk/lib/io/websocket_impl.dart |
diff --git a/sdk/lib/io/websocket_impl.dart b/sdk/lib/io/websocket_impl.dart |
index 4162ca1f0f2745db4b395e75a2246528049feb5c..f101e79da6aee14a1d88ed67544f9602d401b19d 100644 |
--- a/sdk/lib/io/websocket_impl.dart |
+++ b/sdk/lib/io/websocket_impl.dart |
@@ -41,7 +41,7 @@ class _WebSocketOpcode { |
* socket will be closed when the processer encounter an error. Not using it |
* will lead to undefined behaviour. |
*/ |
-class _WebSocketProtocolTransformer extends StreamEventTransformer { |
+class _WebSocketProtocolTransformer implements StreamTransformer, EventSink { |
static const int START = 0; |
static const int LEN_FIRST = 1; |
static const int LEN_REST = 2; |
@@ -51,16 +51,35 @@ class _WebSocketProtocolTransformer extends StreamEventTransformer { |
static const int FAILURE = 6; |
bool _serverSide; |
+ EventSink _eventSink; |
_WebSocketProtocolTransformer([bool this._serverSide = false]) { |
_prepareForNextFrame(); |
_currentMessageType = _WebSocketMessageType.NONE; |
} |
+ Stream bind(Stream stream) { |
+ return new Stream.eventTransformed( |
+ stream, |
+ (EventSink eventSink) { |
+ if (_eventSink != null) { |
+ throw new StateError("WebSocket transformer already used."); |
+ } |
+ _eventSink = eventSink; |
+ return this; |
+ }); |
+ } |
+ |
+ void addError(Object error, [StackTrace stackTrace]) { |
+ _eventSink.addError(error, stackTrace); |
+ } |
+ |
+ void close() => _eventSink.close(); |
+ |
/** |
* Process data received from the underlying communication channel. |
*/ |
- void handleData(Uint8List buffer, EventSink sink) { |
+ void add(Uint8List buffer) { |
int count = buffer.length; |
int index = 0; |
int lastIndex = count; |
@@ -98,8 +117,8 @@ class _WebSocketProtocolTransformer extends StreamEventTransformer { |
.transform(UTF8.decoder) |
.fold(new StringBuffer(), (buffer, str) => buffer..write(str)) |
.then((buffer) { |
- sink.add(buffer.toString()); |
- }, onError: sink.addError); |
+ _eventSink.add(buffer.toString()); |
+ }, onError: _eventSink.addError); |
break; |
case _WebSocketOpcode.BINARY: |
@@ -111,8 +130,8 @@ class _WebSocketProtocolTransformer extends StreamEventTransformer { |
_controller.stream |
.fold(new BytesBuilder(), (buffer, data) => buffer..add(data)) |
.then((buffer) { |
- sink.add(buffer.takeBytes()); |
- }, onError: sink.addError); |
+ _eventSink.add(buffer.takeBytes()); |
+ }, onError: _eventSink.addError); |
break; |
case _WebSocketOpcode.CLOSE: |
@@ -135,7 +154,7 @@ class _WebSocketProtocolTransformer extends StreamEventTransformer { |
throw new WebSocketException("Protocol error"); |
} |
if (_len < 126) { |
- _lengthDone(sink); |
+ _lengthDone(); |
} else if (_len == 126) { |
_len = 0; |
_remainingLenBytes = 2; |
@@ -151,7 +170,7 @@ class _WebSocketProtocolTransformer extends StreamEventTransformer { |
_len = _len << 8 | byte; |
_remainingLenBytes--; |
if (_remainingLenBytes == 0) { |
- _lengthDone(sink); |
+ _lengthDone(); |
} |
break; |
@@ -159,7 +178,7 @@ class _WebSocketProtocolTransformer extends StreamEventTransformer { |
_maskingKey = _maskingKey << 8 | byte; |
_remainingMaskingKeyBytes--; |
if (_remainingMaskingKeyBytes == 0) { |
- _maskDone(sink); |
+ _maskDone(); |
} |
break; |
@@ -195,7 +214,7 @@ class _WebSocketProtocolTransformer extends StreamEventTransformer { |
} |
if (_remainingPayloadBytes == 0) { |
- _controlFrameEnd(sink); |
+ _controlFrameEnd(); |
} |
} else { |
if (_currentMessageType != _WebSocketMessageType.TEXT && |
@@ -206,7 +225,7 @@ class _WebSocketProtocolTransformer extends StreamEventTransformer { |
new Uint8List.view(buffer.buffer, index, payload)); |
index += payload; |
if (_remainingPayloadBytes == 0) { |
- _messageFrameEnd(sink); |
+ _messageFrameEnd(); |
} |
} |
@@ -218,13 +237,13 @@ class _WebSocketProtocolTransformer extends StreamEventTransformer { |
// Move to the next byte. |
index++; |
} |
- } catch (e) { |
+ } catch (e, stackTrace) { |
_state = FAILURE; |
- sink.addError(e); |
+ _eventSink.addError(e, stackTrace); |
} |
} |
- void _lengthDone(EventSink sink) { |
+ void _lengthDone() { |
if (_masked) { |
if (!_serverSide) { |
throw new WebSocketException("Received masked frame from server"); |
@@ -236,16 +255,16 @@ class _WebSocketProtocolTransformer extends StreamEventTransformer { |
throw new WebSocketException("Received unmasked frame from client"); |
} |
_remainingPayloadBytes = _len; |
- _startPayload(sink); |
+ _startPayload(); |
} |
} |
- void _maskDone(EventSink sink) { |
+ void _maskDone() { |
_remainingPayloadBytes = _len; |
- _startPayload(sink); |
+ _startPayload(); |
} |
- void _startPayload(EventSink sink) { |
+ void _startPayload() { |
// If there is no actual payload perform perform callbacks without |
// going through the PAYLOAD state. |
if (_remainingPayloadBytes == 0) { |
@@ -253,25 +272,25 @@ class _WebSocketProtocolTransformer extends StreamEventTransformer { |
switch (_opcode) { |
case _WebSocketOpcode.CLOSE: |
_state = CLOSED; |
- sink.close(); |
+ _eventSink.close(); |
break; |
case _WebSocketOpcode.PING: |
- sink.add(new _WebSocketPing()); |
+ _eventSink.add(new _WebSocketPing()); |
break; |
case _WebSocketOpcode.PONG: |
- sink.add(new _WebSocketPong()); |
+ _eventSink.add(new _WebSocketPong()); |
break; |
} |
_prepareForNextFrame(); |
} else { |
- _messageFrameEnd(sink); |
+ _messageFrameEnd(); |
} |
} else { |
_state = PAYLOAD; |
} |
} |
- void _messageFrameEnd(EventSink sink) { |
+ void _messageFrameEnd() { |
if (_fin) { |
switch (_currentMessageType) { |
case _WebSocketMessageType.TEXT: |
@@ -287,7 +306,7 @@ class _WebSocketProtocolTransformer extends StreamEventTransformer { |
_prepareForNextFrame(); |
} |
- void _controlFrameEnd(EventSink sink) { |
+ void _controlFrameEnd() { |
switch (_opcode) { |
case _WebSocketOpcode.CLOSE: |
closeCode = WebSocketStatus.NO_STATUS_RECEIVED; |
@@ -304,15 +323,15 @@ class _WebSocketProtocolTransformer extends StreamEventTransformer { |
} |
} |
_state = CLOSED; |
- sink.close(); |
+ _eventSink.close(); |
break; |
case _WebSocketOpcode.PING: |
- sink.add(new _WebSocketPing(_controlPayload)); |
+ _eventSink.add(new _WebSocketPing(_controlPayload)); |
break; |
case _WebSocketOpcode.PONG: |
- sink.add(new _WebSocketPong(_controlPayload)); |
+ _eventSink.add(new _WebSocketPong(_controlPayload)); |
break; |
} |
_prepareForNextFrame(); |
@@ -440,18 +459,31 @@ class _WebSocketTransformerImpl implements WebSocketTransformer { |
} |
-class _WebSocketOutgoingTransformer extends StreamEventTransformer { |
+class _WebSocketOutgoingTransformer implements StreamTransformer, EventSink { |
final _WebSocketImpl webSocket; |
+ EventSink _eventSink; |
_WebSocketOutgoingTransformer(_WebSocketImpl this.webSocket); |
- void handleData(message, EventSink<List<int>> sink) { |
+ Stream bind(Stream stream) { |
+ return new Stream.eventTransformed( |
+ stream, |
+ (EventSink eventSink) { |
+ if (_eventSink != null) { |
+ throw new StateError("WebSocket transformer already used"); |
Lasse Reichstein Nielsen
2013/10/07 11:47:00
If it didn't try to put everything into one class,
floitsch
2013/10/10 15:39:57
Agreed.
Added a TODO. I just wanted to keep the ch
|
+ } |
+ _eventSink = eventSink; |
+ return this; |
+ }); |
+ } |
+ |
+ void add(message) { |
if (message is _WebSocketPong) { |
- addFrame(_WebSocketOpcode.PONG, message.payload, sink); |
+ addFrame(_WebSocketOpcode.PONG, message.payload); |
return; |
} |
if (message is _WebSocketPing) { |
- addFrame(_WebSocketOpcode.PING, message.payload, sink); |
+ addFrame(_WebSocketOpcode.PING, message.payload); |
return; |
} |
List<int> data; |
@@ -470,10 +502,14 @@ class _WebSocketOutgoingTransformer extends StreamEventTransformer { |
} else { |
opcode = _WebSocketOpcode.TEXT; |
} |
- addFrame(opcode, data, sink); |
+ addFrame(opcode, data); |
+ } |
+ |
+ void addError(Object error, [StackTrace stackTrace]) { |
+ _eventSink.addError(error, stackTrace); |
} |
- void handleDone(EventSink<List<int>> sink) { |
+ void close() { |
int code = webSocket._outCloseCode; |
String reason = webSocket._outCloseReason; |
List<int> data; |
@@ -485,12 +521,12 @@ class _WebSocketOutgoingTransformer extends StreamEventTransformer { |
data.addAll(UTF8.encode(reason)); |
} |
} |
- addFrame(_WebSocketOpcode.CLOSE, data, sink); |
- sink.close(); |
+ addFrame(_WebSocketOpcode.CLOSE, data); |
+ _eventSink.close(); |
} |
- void addFrame(int opcode, List<int> data, EventSink<List<int>> sink) { |
- createFrame(opcode, data, webSocket._serverSide).forEach(sink.add); |
+ void addFrame(int opcode, List<int> data) { |
+ createFrame(opcode, data, webSocket._serverSide).forEach(_eventSink.add); |
} |
static Iterable createFrame(int opcode, List<int> data, bool serverSide) { |