Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(2357)

Unified Diff: sdk/lib/io/websocket_impl.dart

Issue 25354003: Redo StreamTransformers so they work with Stack traces. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Address comments. Created 7 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « sdk/lib/io/http_impl.dart ('k') | sdk/lib/utf/utf_stream.dart » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: sdk/lib/io/websocket_impl.dart
diff --git a/sdk/lib/io/websocket_impl.dart b/sdk/lib/io/websocket_impl.dart
index 4162ca1f0f2745db4b395e75a2246528049feb5c..cb6d099d154f35f658621f214bd519058fb35ad1 100644
--- a/sdk/lib/io/websocket_impl.dart
+++ b/sdk/lib/io/websocket_impl.dart
@@ -41,7 +41,8 @@ class _WebSocketOpcode {
* socket will be closed when the processer encounter an error. Not using it
* will lead to undefined behaviour.
*/
-class _WebSocketProtocolTransformer extends StreamEventTransformer {
+// TODO(ajohnsen): make this transformer reusable?
+class _WebSocketProtocolTransformer implements StreamTransformer, EventSink {
static const int START = 0;
static const int LEN_FIRST = 1;
static const int LEN_REST = 2;
@@ -51,16 +52,35 @@ class _WebSocketProtocolTransformer extends StreamEventTransformer {
static const int FAILURE = 6;
bool _serverSide;
+ EventSink _eventSink;
_WebSocketProtocolTransformer([bool this._serverSide = false]) {
_prepareForNextFrame();
_currentMessageType = _WebSocketMessageType.NONE;
}
+ Stream bind(Stream stream) {
+ return new Stream.eventTransformed(
+ stream,
+ (EventSink eventSink) {
+ if (_eventSink != null) {
+ throw new StateError("WebSocket transformer already used.");
+ }
+ _eventSink = eventSink;
+ return this;
+ });
+ }
+
+ void addError(Object error, [StackTrace stackTrace]) {
+ _eventSink.addError(error, stackTrace);
+ }
+
+ void close() => _eventSink.close();
+
/**
* Process data received from the underlying communication channel.
*/
- void handleData(Uint8List buffer, EventSink sink) {
+ void add(Uint8List buffer) {
int count = buffer.length;
int index = 0;
int lastIndex = count;
@@ -98,8 +118,8 @@ class _WebSocketProtocolTransformer extends StreamEventTransformer {
.transform(UTF8.decoder)
.fold(new StringBuffer(), (buffer, str) => buffer..write(str))
.then((buffer) {
- sink.add(buffer.toString());
- }, onError: sink.addError);
+ _eventSink.add(buffer.toString());
+ }, onError: _eventSink.addError);
break;
case _WebSocketOpcode.BINARY:
@@ -111,8 +131,8 @@ class _WebSocketProtocolTransformer extends StreamEventTransformer {
_controller.stream
.fold(new BytesBuilder(), (buffer, data) => buffer..add(data))
.then((buffer) {
- sink.add(buffer.takeBytes());
- }, onError: sink.addError);
+ _eventSink.add(buffer.takeBytes());
+ }, onError: _eventSink.addError);
break;
case _WebSocketOpcode.CLOSE:
@@ -135,7 +155,7 @@ class _WebSocketProtocolTransformer extends StreamEventTransformer {
throw new WebSocketException("Protocol error");
}
if (_len < 126) {
- _lengthDone(sink);
+ _lengthDone();
} else if (_len == 126) {
_len = 0;
_remainingLenBytes = 2;
@@ -151,7 +171,7 @@ class _WebSocketProtocolTransformer extends StreamEventTransformer {
_len = _len << 8 | byte;
_remainingLenBytes--;
if (_remainingLenBytes == 0) {
- _lengthDone(sink);
+ _lengthDone();
}
break;
@@ -159,7 +179,7 @@ class _WebSocketProtocolTransformer extends StreamEventTransformer {
_maskingKey = _maskingKey << 8 | byte;
_remainingMaskingKeyBytes--;
if (_remainingMaskingKeyBytes == 0) {
- _maskDone(sink);
+ _maskDone();
}
break;
@@ -195,7 +215,7 @@ class _WebSocketProtocolTransformer extends StreamEventTransformer {
}
if (_remainingPayloadBytes == 0) {
- _controlFrameEnd(sink);
+ _controlFrameEnd();
}
} else {
if (_currentMessageType != _WebSocketMessageType.TEXT &&
@@ -206,7 +226,7 @@ class _WebSocketProtocolTransformer extends StreamEventTransformer {
new Uint8List.view(buffer.buffer, index, payload));
index += payload;
if (_remainingPayloadBytes == 0) {
- _messageFrameEnd(sink);
+ _messageFrameEnd();
}
}
@@ -218,13 +238,13 @@ class _WebSocketProtocolTransformer extends StreamEventTransformer {
// Move to the next byte.
index++;
}
- } catch (e) {
+ } catch (e, stackTrace) {
_state = FAILURE;
- sink.addError(e);
+ _eventSink.addError(e, stackTrace);
}
}
- void _lengthDone(EventSink sink) {
+ void _lengthDone() {
if (_masked) {
if (!_serverSide) {
throw new WebSocketException("Received masked frame from server");
@@ -236,16 +256,16 @@ class _WebSocketProtocolTransformer extends StreamEventTransformer {
throw new WebSocketException("Received unmasked frame from client");
}
_remainingPayloadBytes = _len;
- _startPayload(sink);
+ _startPayload();
}
}
- void _maskDone(EventSink sink) {
+ void _maskDone() {
_remainingPayloadBytes = _len;
- _startPayload(sink);
+ _startPayload();
}
- void _startPayload(EventSink sink) {
+ void _startPayload() {
// If there is no actual payload perform perform callbacks without
// going through the PAYLOAD state.
if (_remainingPayloadBytes == 0) {
@@ -253,25 +273,25 @@ class _WebSocketProtocolTransformer extends StreamEventTransformer {
switch (_opcode) {
case _WebSocketOpcode.CLOSE:
_state = CLOSED;
- sink.close();
+ _eventSink.close();
break;
case _WebSocketOpcode.PING:
- sink.add(new _WebSocketPing());
+ _eventSink.add(new _WebSocketPing());
break;
case _WebSocketOpcode.PONG:
- sink.add(new _WebSocketPong());
+ _eventSink.add(new _WebSocketPong());
break;
}
_prepareForNextFrame();
} else {
- _messageFrameEnd(sink);
+ _messageFrameEnd();
}
} else {
_state = PAYLOAD;
}
}
- void _messageFrameEnd(EventSink sink) {
+ void _messageFrameEnd() {
if (_fin) {
switch (_currentMessageType) {
case _WebSocketMessageType.TEXT:
@@ -287,7 +307,7 @@ class _WebSocketProtocolTransformer extends StreamEventTransformer {
_prepareForNextFrame();
}
- void _controlFrameEnd(EventSink sink) {
+ void _controlFrameEnd() {
switch (_opcode) {
case _WebSocketOpcode.CLOSE:
closeCode = WebSocketStatus.NO_STATUS_RECEIVED;
@@ -304,15 +324,15 @@ class _WebSocketProtocolTransformer extends StreamEventTransformer {
}
}
_state = CLOSED;
- sink.close();
+ _eventSink.close();
break;
case _WebSocketOpcode.PING:
- sink.add(new _WebSocketPing(_controlPayload));
+ _eventSink.add(new _WebSocketPing(_controlPayload));
break;
case _WebSocketOpcode.PONG:
- sink.add(new _WebSocketPong(_controlPayload));
+ _eventSink.add(new _WebSocketPong(_controlPayload));
break;
}
_prepareForNextFrame();
@@ -440,18 +460,32 @@ class _WebSocketTransformerImpl implements WebSocketTransformer {
}
-class _WebSocketOutgoingTransformer extends StreamEventTransformer {
+// TODO(ajohnsen): Make this transformer reusable.
+class _WebSocketOutgoingTransformer implements StreamTransformer, EventSink {
final _WebSocketImpl webSocket;
+ EventSink _eventSink;
_WebSocketOutgoingTransformer(_WebSocketImpl this.webSocket);
- void handleData(message, EventSink<List<int>> sink) {
+ Stream bind(Stream stream) {
+ return new Stream.eventTransformed(
+ stream,
+ (EventSink eventSink) {
+ if (_eventSink != null) {
+ throw new StateError("WebSocket transformer already used");
+ }
+ _eventSink = eventSink;
+ return this;
+ });
+ }
+
+ void add(message) {
if (message is _WebSocketPong) {
- addFrame(_WebSocketOpcode.PONG, message.payload, sink);
+ addFrame(_WebSocketOpcode.PONG, message.payload);
return;
}
if (message is _WebSocketPing) {
- addFrame(_WebSocketOpcode.PING, message.payload, sink);
+ addFrame(_WebSocketOpcode.PING, message.payload);
return;
}
List<int> data;
@@ -470,10 +504,14 @@ class _WebSocketOutgoingTransformer extends StreamEventTransformer {
} else {
opcode = _WebSocketOpcode.TEXT;
}
- addFrame(opcode, data, sink);
+ addFrame(opcode, data);
+ }
+
+ void addError(Object error, [StackTrace stackTrace]) {
+ _eventSink.addError(error, stackTrace);
}
- void handleDone(EventSink<List<int>> sink) {
+ void close() {
int code = webSocket._outCloseCode;
String reason = webSocket._outCloseReason;
List<int> data;
@@ -485,12 +523,12 @@ class _WebSocketOutgoingTransformer extends StreamEventTransformer {
data.addAll(UTF8.encode(reason));
}
}
- addFrame(_WebSocketOpcode.CLOSE, data, sink);
- sink.close();
+ addFrame(_WebSocketOpcode.CLOSE, data);
+ _eventSink.close();
}
- void addFrame(int opcode, List<int> data, EventSink<List<int>> sink) {
- createFrame(opcode, data, webSocket._serverSide).forEach(sink.add);
+ void addFrame(int opcode, List<int> data) {
+ createFrame(opcode, data, webSocket._serverSide).forEach(_eventSink.add);
}
static Iterable createFrame(int opcode, List<int> data, bool serverSide) {
« no previous file with comments | « sdk/lib/io/http_impl.dart ('k') | sdk/lib/utf/utf_stream.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698