Index: lib/src/web_socket.dart |
diff --git a/lib/src/web_socket.dart b/lib/src/web_socket.dart |
index 968c70b0e13dd0606b2e75309425c41b84fc5cf6..4a7653c2f60f1bc6a760fd18efbc0700a3d74043 100644 |
--- a/lib/src/web_socket.dart |
+++ b/lib/src/web_socket.dart |
@@ -80,7 +80,7 @@ abstract class CompatibleWebSocket implements Stream, StreamSink { |
/// |
/// [WebSocket handshake]: https://tools.ietf.org/html/rfc6455#section-4 |
factory CompatibleWebSocket(Stream<List<int>> stream, |
- {StreamSink<List<int>> sink, bool serverSide: true}) { |
+ {StreamSink<List<int>> sink, bool serverSide: true}) { |
if (sink == null) { |
if (stream is! StreamSink) { |
throw new ArgumentError("If stream isn't also a StreamSink, sink must " |
@@ -110,8 +110,8 @@ class CompatibleWebSocketException implements Exception { |
CompatibleWebSocketException([this.message]); |
String toString() => message == null |
- ? "CompatibleWebSocketException" : |
- "CompatibleWebSocketException: $message"; |
+ ? "CompatibleWebSocketException" |
+ : "CompatibleWebSocketException: $message"; |
} |
// The following code is copied from sdk/lib/io/websocket_impl.dart. The |
@@ -130,7 +130,7 @@ abstract class _WebSocketStatus { |
static const int GOING_AWAY = 1001; |
static const int PROTOCOL_ERROR = 1002; |
static const int UNSUPPORTED_DATA = 1003; |
- static const int RESERVED_1004 = 1004; |
+ static const int RESERVED_1004 = 1004; |
static const int NO_STATUS_RECEIVED = 1005; |
static const int ABNORMAL_CLOSURE = 1006; |
static const int INVALID_FRAME_PAYLOAD_DATA = 1007; |
@@ -159,7 +159,6 @@ class _WebSocketMessageType { |
static const int BINARY = 2; |
} |
- |
class _WebSocketOpcode { |
static const int CONTINUATION = 0; |
static const int TEXT = 1; |
@@ -220,15 +219,13 @@ class _WebSocketProtocolTransformer implements StreamTransformer, EventSink { |
_WebSocketProtocolTransformer([this._serverSide = false]); |
Stream bind(Stream stream) { |
- return new Stream.eventTransformed( |
- stream, |
- (EventSink eventSink) { |
- if (_eventSink != null) { |
- throw new StateError("WebSocket transformer already used."); |
- } |
- _eventSink = eventSink; |
- return this; |
- }); |
+ return new Stream.eventTransformed(stream, (EventSink eventSink) { |
+ if (_eventSink != null) { |
+ throw new StateError("WebSocket transformer already used."); |
+ } |
+ _eventSink = eventSink; |
+ return this; |
+ }); |
} |
void addError(Object error, [StackTrace stackTrace]) => |
@@ -266,14 +263,14 @@ class _WebSocketProtocolTransformer implements StreamTransformer, EventSink { |
} |
} else { |
assert(_opcode == _WebSocketOpcode.TEXT || |
- _opcode == _WebSocketOpcode.BINARY); |
+ _opcode == _WebSocketOpcode.BINARY); |
if (_currentMessageType != _WebSocketMessageType.NONE) { |
throw new CompatibleWebSocketException("Protocol error"); |
} |
_currentMessageType = _opcode; |
} |
} else if (_opcode >= _WebSocketOpcode.CLOSE && |
- _opcode <= _WebSocketOpcode.PONG) { |
+ _opcode <= _WebSocketOpcode.PONG) { |
// Control frames cannot be fragmented. |
if (!_fin) throw new CompatibleWebSocketException("Protocol error"); |
} else { |
@@ -322,15 +319,14 @@ class _WebSocketProtocolTransformer implements StreamTransformer, EventSink { |
_unmask(index, payloadLength, buffer); |
} |
// Control frame and data frame share _payloads. |
- _payload.add( |
- new Uint8List.view(buffer.buffer, index, payloadLength)); |
+ _payload.add(new Uint8List.view(buffer.buffer, index, payloadLength)); |
index += payloadLength; |
if (_isControlFrame()) { |
if (_remainingPayloadBytes == 0) _controlFrameEnd(); |
} else { |
if (_currentMessageType != _WebSocketMessageType.TEXT && |
_currentMessageType != _WebSocketMessageType.BINARY) { |
- throw new CompatibleWebSocketException("Protocol error"); |
+ throw new CompatibleWebSocketException("Protocol error"); |
} |
if (_remainingPayloadBytes == 0) _messageFrameEnd(); |
} |
@@ -365,8 +361,8 @@ class _WebSocketProtocolTransformer implements StreamTransformer, EventSink { |
mask = (mask << 8) | _maskingBytes[(_unmaskingIndex + i) & 3]; |
} |
Int32x4 blockMask = new Int32x4(mask, mask, mask, mask); |
- Int32x4List blockBuffer = new Int32x4List.view( |
- buffer.buffer, index, blockCount); |
+ Int32x4List blockBuffer = |
+ new Int32x4List.view(buffer.buffer, index, blockCount); |
for (int i = 0; i < blockBuffer.length; i++) { |
blockBuffer[i] ^= blockMask; |
} |
@@ -479,8 +475,8 @@ class _WebSocketProtocolTransformer implements StreamTransformer, EventSink { |
bool _isControlFrame() { |
return _opcode == _WebSocketOpcode.CLOSE || |
- _opcode == _WebSocketOpcode.PING || |
- _opcode == _WebSocketOpcode.PONG; |
+ _opcode == _WebSocketOpcode.PING || |
+ _opcode == _WebSocketOpcode.PONG; |
} |
void _prepareForNextFrame() { |
@@ -495,13 +491,11 @@ class _WebSocketProtocolTransformer implements StreamTransformer, EventSink { |
} |
} |
- |
class _WebSocketPing { |
final List<int> payload; |
_WebSocketPing([this.payload = null]); |
} |
- |
class _WebSocketPong { |
final List<int> payload; |
_WebSocketPong([this.payload = null]); |
@@ -515,15 +509,13 @@ class _WebSocketOutgoingTransformer implements StreamTransformer, EventSink { |
_WebSocketOutgoingTransformer(this.webSocket); |
Stream bind(Stream stream) { |
- return new Stream.eventTransformed( |
- stream, |
- (EventSink eventSink) { |
- if (_eventSink != null) { |
- throw new StateError("WebSocket transformer already used"); |
- } |
- _eventSink = eventSink; |
- return this; |
- }); |
+ return new Stream.eventTransformed(stream, (EventSink eventSink) { |
+ if (_eventSink != null) { |
+ throw new StateError("WebSocket transformer already used"); |
+ } |
+ _eventSink = eventSink; |
+ return this; |
+ }); |
} |
void add(message) { |
@@ -542,7 +534,7 @@ class _WebSocketOutgoingTransformer implements StreamTransformer, EventSink { |
opcode = _WebSocketOpcode.TEXT; |
data = UTF8.encode(message); |
} else { |
- if (message is !List<int>) { |
+ if (message is! List<int>) { |
throw new ArgumentError(message); |
} |
opcode = _WebSocketOpcode.BINARY; |
@@ -577,7 +569,7 @@ class _WebSocketOutgoingTransformer implements StreamTransformer, EventSink { |
createFrame(opcode, data, webSocket._serverSide).forEach(_eventSink.add); |
static Iterable createFrame(int opcode, List<int> data, bool serverSide) { |
- bool mask = !serverSide; // Masking not implemented for server. |
+ bool mask = !serverSide; // Masking not implemented for server. |
int dataLength = data == null ? 0 : data.length; |
// Determine the header size. |
int headerSize = (mask) ? 6 : 2; |
@@ -605,8 +597,12 @@ class _WebSocketOutgoingTransformer implements StreamTransformer, EventSink { |
} |
if (mask) { |
header[1] |= 1 << 7; |
- var maskBytes = [_random.nextInt(256), _random.nextInt(256), |
- _random.nextInt(256), _random.nextInt(256)]; |
+ var maskBytes = [ |
+ _random.nextInt(256), |
+ _random.nextInt(256), |
+ _random.nextInt(256), |
+ _random.nextInt(256) |
+ ]; |
header.setRange(index, index + 4, maskBytes); |
index += 4; |
if (data != null) { |
@@ -622,8 +618,7 @@ class _WebSocketOutgoingTransformer implements StreamTransformer, EventSink { |
list = new Uint8List(data.length); |
for (int i = 0; i < data.length; i++) { |
if (data[i] < 0 || 255 < data[i]) { |
- throw new ArgumentError( |
- "List element is not a byte value " |
+ throw new ArgumentError("List element is not a byte value " |
"(value ${data[i]} at index $i)"); |
} |
list[i] = data[i]; |
@@ -639,8 +634,8 @@ class _WebSocketOutgoingTransformer implements StreamTransformer, EventSink { |
mask = (mask << 8) | maskBytes[i]; |
} |
Int32x4 blockMask = new Int32x4(mask, mask, mask, mask); |
- Int32x4List blockBuffer = new Int32x4List.view( |
- list.buffer, 0, blockCount); |
+ Int32x4List blockBuffer = |
+ new Int32x4List.view(list.buffer, 0, blockCount); |
for (int i = 0; i < blockBuffer.length; i++) { |
blockBuffer[i] ^= blockMask; |
} |
@@ -661,7 +656,6 @@ class _WebSocketOutgoingTransformer implements StreamTransformer, EventSink { |
} |
} |
- |
class _WebSocketConsumer implements StreamConsumer { |
final _WebSocketImpl webSocket; |
final StreamSink<List<int>> sink; |
@@ -706,28 +700,28 @@ class _WebSocketConsumer implements StreamConsumer { |
_ensureController() { |
if (_controller != null) return; |
- _controller = new StreamController(sync: true, |
- onPause: _onPause, |
- onResume: _onResume, |
- onCancel: _onListen); |
- var stream = _controller.stream.transform( |
- new _WebSocketOutgoingTransformer(webSocket)); |
- sink.addStream(stream) |
- .then((_) { |
- _done(); |
- _closeCompleter.complete(webSocket); |
- }, onError: (error, StackTrace stackTrace) { |
- _closed = true; |
- _cancel(); |
- if (error is ArgumentError) { |
- if (!_done(error, stackTrace)) { |
- _closeCompleter.completeError(error, stackTrace); |
- } |
- } else { |
- _done(); |
- _closeCompleter.complete(webSocket); |
- } |
- }); |
+ _controller = new StreamController( |
+ sync: true, |
+ onPause: _onPause, |
+ onResume: _onResume, |
+ onCancel: _onListen); |
+ var stream = _controller.stream |
+ .transform(new _WebSocketOutgoingTransformer(webSocket)); |
+ sink.addStream(stream).then((_) { |
+ _done(); |
+ _closeCompleter.complete(webSocket); |
+ }, onError: (error, StackTrace stackTrace) { |
+ _closed = true; |
+ _cancel(); |
+ if (error is ArgumentError) { |
+ if (!_done(error, stackTrace)) { |
+ _closeCompleter.completeError(error, stackTrace); |
+ } |
+ } else { |
+ _done(); |
+ _closeCompleter.complete(webSocket); |
+ } |
+ }); |
} |
bool _done([error, StackTrace stackTrace]) { |
@@ -748,13 +742,9 @@ class _WebSocketConsumer implements StreamConsumer { |
} |
_ensureController(); |
_completer = new Completer(); |
- _subscription = stream.listen( |
- (data) { |
- _controller.add(data); |
- }, |
- onDone: _done, |
- onError: _done, |
- cancelOnError: true); |
+ _subscription = stream.listen((data) { |
+ _controller.add(data); |
+ }, onDone: _done, onError: _done, cancelOnError: true); |
if (_issuedPause) { |
_subscription.pause(); |
_issuedPause = false; |
@@ -784,7 +774,6 @@ class _WebSocketConsumer implements StreamConsumer { |
} |
} |
- |
class _WebSocketImpl extends Stream implements CompatibleWebSocket { |
StreamController _controller; |
StreamSubscription _subscription; |
@@ -803,65 +792,59 @@ class _WebSocketImpl extends Stream implements CompatibleWebSocket { |
String _outCloseReason; |
Timer _closeTimer; |
- _WebSocketImpl._fromSocket(Stream<List<int>> stream, |
- StreamSink<List<int>> sink, [this._serverSide = false]) { |
+ _WebSocketImpl._fromSocket( |
+ Stream<List<int>> stream, StreamSink<List<int>> sink, |
+ [this._serverSide = false]) { |
_consumer = new _WebSocketConsumer(this, sink); |
_sink = new StreamController(); |
_sink.stream.pipe(_consumer); |
_readyState = _WebSocketState.OPEN; |
var transformer = new _WebSocketProtocolTransformer(_serverSide); |
- _subscription = stream.transform(transformer).listen( |
- (data) { |
- if (data is _WebSocketPing) { |
- if (!_writeClosed) _consumer.add(new _WebSocketPong(data.payload)); |
- } else if (data is _WebSocketPong) { |
- // Simply set pingInterval, as it'll cancel any timers. |
- pingInterval = _pingInterval; |
- } else { |
- _controller.add(data); |
- } |
- }, |
- onError: (error) { |
- if (_closeTimer != null) _closeTimer.cancel(); |
- if (error is FormatException) { |
- _close(_WebSocketStatus.INVALID_FRAME_PAYLOAD_DATA); |
- } else { |
- _close(_WebSocketStatus.PROTOCOL_ERROR); |
- } |
- _controller.close(); |
- }, |
- onDone: () { |
- if (_closeTimer != null) _closeTimer.cancel(); |
- if (_readyState == _WebSocketState.OPEN) { |
- _readyState = _WebSocketState.CLOSING; |
- if (!_isReservedStatusCode(transformer.closeCode)) { |
- _close(transformer.closeCode); |
- } else { |
- _close(); |
- } |
- _readyState = _WebSocketState.CLOSED; |
- } |
- _closeCode = transformer.closeCode; |
- _closeReason = transformer.closeReason; |
- _controller.close(); |
- }, |
- cancelOnError: true); |
+ _subscription = stream.transform(transformer).listen((data) { |
+ if (data is _WebSocketPing) { |
+ if (!_writeClosed) _consumer.add(new _WebSocketPong(data.payload)); |
+ } else if (data is _WebSocketPong) { |
+ // Simply set pingInterval, as it'll cancel any timers. |
+ pingInterval = _pingInterval; |
+ } else { |
+ _controller.add(data); |
+ } |
+ }, onError: (error) { |
+ if (_closeTimer != null) _closeTimer.cancel(); |
+ if (error is FormatException) { |
+ _close(_WebSocketStatus.INVALID_FRAME_PAYLOAD_DATA); |
+ } else { |
+ _close(_WebSocketStatus.PROTOCOL_ERROR); |
+ } |
+ _controller.close(); |
+ }, onDone: () { |
+ if (_closeTimer != null) _closeTimer.cancel(); |
+ if (_readyState == _WebSocketState.OPEN) { |
+ _readyState = _WebSocketState.CLOSING; |
+ if (!_isReservedStatusCode(transformer.closeCode)) { |
+ _close(transformer.closeCode); |
+ } else { |
+ _close(); |
+ } |
+ _readyState = _WebSocketState.CLOSED; |
+ } |
+ _closeCode = transformer.closeCode; |
+ _closeReason = transformer.closeReason; |
+ _controller.close(); |
+ }, cancelOnError: true); |
_subscription.pause(); |
- _controller = new StreamController(sync: true, |
- onListen: _subscription.resume, |
- onPause: _subscription.pause, |
- onResume: _subscription.resume); |
+ _controller = new StreamController( |
+ sync: true, |
+ onListen: _subscription.resume, |
+ onPause: _subscription.pause, |
+ onResume: _subscription.resume); |
} |
StreamSubscription listen(void onData(message), |
- {Function onError, |
- void onDone(), |
- bool cancelOnError}) { |
+ {Function onError, void onDone(), bool cancelOnError}) { |
return _controller.stream.listen(onData, |
- onError: onError, |
- onDone: onDone, |
- cancelOnError: cancelOnError); |
+ onError: onError, onDone: onDone, cancelOnError: cancelOnError); |
} |
Duration get pingInterval => _pingInterval; |
@@ -922,14 +905,12 @@ class _WebSocketImpl extends Stream implements CompatibleWebSocket { |
static bool _isReservedStatusCode(int code) { |
return code != null && |
- (code < _WebSocketStatus.NORMAL_CLOSURE || |
+ (code < _WebSocketStatus.NORMAL_CLOSURE || |
code == _WebSocketStatus.RESERVED_1004 || |
code == _WebSocketStatus.NO_STATUS_RECEIVED || |
code == _WebSocketStatus.ABNORMAL_CLOSURE || |
(code > _WebSocketStatus.INTERNAL_SERVER_ERROR && |
- code < _WebSocketStatus.RESERVED_1015) || |
- (code >= _WebSocketStatus.RESERVED_1015 && |
- code < 3000)); |
+ code < _WebSocketStatus.RESERVED_1015) || |
+ (code >= _WebSocketStatus.RESERVED_1015 && code < 3000)); |
} |
} |
- |