Index: sdk/lib/io/websocket_impl.dart |
diff --git a/sdk/lib/io/websocket_impl.dart b/sdk/lib/io/websocket_impl.dart |
index aa2a1e52d3eb0b5deeeaaabcfa66b8718a270537..a73dbf587a7d308c4c8ee60df188122d210ee288 100644 |
--- a/sdk/lib/io/websocket_impl.dart |
+++ b/sdk/lib/io/websocket_impl.dart |
@@ -64,8 +64,10 @@ class _CompressionMaxWindowBits { |
*/ |
// TODO(ajohnsen): make this transformer reusable? |
class _WebSocketProtocolTransformer |
- implements EventSink<List<int>>, StreamTransformer< |
- List<int>, dynamic/*List<int>|_WebSocketPing|_WebSocketPong>*/> { |
+ implements |
+ EventSink<List<int>>, |
+ StreamTransformer<List<int>, |
+ dynamic /*List<int>|_WebSocketPing|_WebSocketPong>*/ > { |
static const int START = 0; |
static const int LEN_FIRST = 1; |
static const int LEN_REST = 2; |
@@ -93,7 +95,7 @@ class _WebSocketProtocolTransformer |
int closeCode = WebSocketStatus.NO_STATUS_RECEIVED; |
String closeReason = ""; |
- EventSink<dynamic/*List<int>|_WebSocketPing|_WebSocketPong>*/> _eventSink; |
+ EventSink<dynamic /*List<int>|_WebSocketPing|_WebSocketPong>*/ > _eventSink; |
final bool _serverSide; |
final List _maskingBytes = new List(4); |
@@ -102,7 +104,7 @@ class _WebSocketProtocolTransformer |
_WebSocketPerMessageDeflate _deflate; |
_WebSocketProtocolTransformer([this._serverSide = false, this._deflate]); |
- Stream<dynamic/*List<int>|_WebSocketPing|_WebSocketPong>*/> bind( |
+ Stream<dynamic /*List<int>|_WebSocketPing|_WebSocketPong>*/ > bind( |
Stream<List<int>> stream) { |
return new Stream.eventTransformed(stream, (EventSink eventSink) { |
if (_eventSink != null) { |
@@ -117,7 +119,9 @@ class _WebSocketProtocolTransformer |
_eventSink.addError(error, stackTrace); |
} |
- void close() { _eventSink.close(); } |
+ void close() { |
+ _eventSink.close(); |
+ } |
/** |
* Process data received from the underlying communication channel. |
@@ -138,7 +142,7 @@ class _WebSocketProtocolTransformer |
if (_state == START) { |
_fin = (byte & FIN) != 0; |
- if((byte & (RSV2 | RSV3)) != 0) { |
+ if ((byte & (RSV2 | RSV3)) != 0) { |
// The RSV2, RSV3 bits must both be zero. |
throw new WebSocketException("Protocol error"); |
} |
@@ -495,10 +499,10 @@ class _WebSocketTransformerImpl implements WebSocketTransformer { |
response.headers.add("Sec-WebSocket-Extensions", info.headerValue); |
var serverNoContextTakeover = |
(hv.parameters.containsKey(_serverNoContextTakeover) && |
- compression.serverNoContextTakeover); |
+ compression.serverNoContextTakeover); |
var clientNoContextTakeover = |
(hv.parameters.containsKey(_clientNoContextTakeover) && |
- compression.clientNoContextTakeover); |
+ compression.clientNoContextTakeover); |
var deflate = new _WebSocketPerMessageDeflate( |
serverNoContextTakeover: serverNoContextTakeover, |
clientNoContextTakeover: clientNoContextTakeover, |
@@ -552,10 +556,10 @@ class _WebSocketPerMessageDeflate { |
_WebSocketPerMessageDeflate( |
{this.clientMaxWindowBits: _WebSocketImpl.DEFAULT_WINDOW_BITS, |
- this.serverMaxWindowBits: _WebSocketImpl.DEFAULT_WINDOW_BITS, |
- this.serverNoContextTakeover: false, |
- this.clientNoContextTakeover: false, |
- this.serverSide: false}); |
+ this.serverMaxWindowBits: _WebSocketImpl.DEFAULT_WINDOW_BITS, |
+ this.serverNoContextTakeover: false, |
+ this.clientNoContextTakeover: false, |
+ this.serverSide: false}); |
void _ensureDecoder() { |
if (decoder == null) { |
@@ -650,8 +654,8 @@ class _WebSocketOutgoingTransformer |
} |
Stream<List<int>> bind(Stream stream) { |
- return new Stream<List<int>>.eventTransformed( |
- stream, (EventSink<List<int>> eventSink) { |
+ return new Stream<List<int>>.eventTransformed(stream, |
+ (EventSink<List<int>> eventSink) { |
if (_eventSink != null) { |
throw new StateError("WebSocket transformer already used"); |
} |
@@ -721,8 +725,9 @@ class _WebSocketOutgoingTransformer |
webSocket._serverSide, |
_deflateHelper != null && |
(opcode == _WebSocketOpcode.TEXT || |
- opcode == _WebSocketOpcode.BINARY)) |
- .forEach((e) { _eventSink.add(e); }); |
+ opcode == _WebSocketOpcode.BINARY)).forEach((e) { |
+ _eventSink.add(e); |
+ }); |
} |
static Iterable<List<int>> createFrame( |
@@ -740,9 +745,9 @@ class _WebSocketOutgoingTransformer |
int index = 0; |
// Set FIN and opcode. |
- var hoc = _WebSocketProtocolTransformer.FIN |
- | (compressed ? _WebSocketProtocolTransformer.RSV1 : 0) |
- | (opcode & _WebSocketProtocolTransformer.OPCODE); |
+ var hoc = _WebSocketProtocolTransformer.FIN | |
+ (compressed ? _WebSocketProtocolTransformer.RSV1 : 0) | |
+ (opcode & _WebSocketProtocolTransformer.OPCODE); |
header[index++] = hoc; |
// Determine size and position of length field. |
@@ -915,6 +920,7 @@ class _WebSocketConsumer implements StreamConsumer { |
Future closeSocket() { |
return socket.close().catchError((_) {}).then((_) => webSocket); |
} |
+ |
_controller.close(); |
return _closeCompleter.future.then((_) => closeSocket()); |
} |
@@ -1013,7 +1019,6 @@ class _WebSocketImpl extends Stream with _ServiceObject implements WebSocket { |
return request.close(); |
}).then((response) { |
- |
void error(String message) { |
// Flush data. |
response.detachSocket().then((socket) { |
@@ -1139,10 +1144,14 @@ class _WebSocketImpl extends Stream with _ServiceObject implements WebSocket { |
}, cancelOnError: true); |
_subscription.pause(); |
_controller = new StreamController( |
- sync: true, onListen: _subscription.resume, onCancel: () { |
- _subscription.cancel(); |
- _subscription = null; |
- }, onPause: _subscription.pause, onResume: _subscription.resume); |
+ sync: true, |
+ onListen: _subscription.resume, |
+ onCancel: () { |
+ _subscription.cancel(); |
+ _subscription = null; |
+ }, |
+ onPause: _subscription.pause, |
+ onResume: _subscription.resume); |
_webSockets[_serviceId] = this; |
try { |
@@ -1181,16 +1190,21 @@ class _WebSocketImpl extends Stream with _ServiceObject implements WebSocket { |
int get closeCode => _closeCode; |
String get closeReason => _closeReason; |
- void add(data) { _sink.add(data); } |
+ void add(data) { |
+ _sink.add(data); |
+ } |
+ |
void addUtf8Text(List<int> bytes) { |
if (bytes is! List<int>) { |
throw new ArgumentError.value(bytes, "bytes", "Is not a list of bytes"); |
} |
_sink.add(new _EncodedString(bytes)); |
} |
+ |
void addError(error, [StackTrace stackTrace]) { |
_sink.addError(error, stackTrace); |
} |
+ |
Future addStream(Stream stream) => _sink.addStream(stream); |
Future get done => _sink.done; |