Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(325)

Side by Side Diff: lib/src/copy/web_socket_impl.dart

Issue 1947683006: Bring in the latest version of the SDK's WebSocket impl. (Closed) Base URL: git@github.com:dart-lang/web_socket_channel.git@master
Patch Set: Merge again Created 4 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « lib/src/copy/web_socket.dart ('k') | no next file » | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 // The following code is copied from sdk/lib/io/websocket_impl.dart. The 5 // The following code is copied from sdk/lib/io/websocket_impl.dart. The
6 // "dart:io" implementation isn't used directly to support non-"dart:io" 6 // "dart:io" implementation isn't used directly to support non-"dart:io"
7 // applications. 7 // applications.
8 // 8 //
9 // Because it's copied directly, only modifications necessary to support the 9 // Because it's copied directly, only modifications necessary to support the
10 // desired public API and to remove "dart:io" dependencies have been made. 10 // desired public API and to remove "dart:io" dependencies have been made.
11 // 11 //
12 // This is up-to-date as of sdk revision 12 // This is up-to-date as of sdk revision
13 // 86227840d75d974feb238f8b3c59c038b99c05cf. 13 // e41fb4cafd6052157dbc1490d437045240f4773f.
14
14 import 'dart:async'; 15 import 'dart:async';
15 import 'dart:convert'; 16 import 'dart:convert';
16 import 'dart:math'; 17 import 'dart:math';
17 import 'dart:typed_data'; 18 import 'dart:typed_data';
18 19
19 import '../exception.dart'; 20 import '../exception.dart';
20 import 'bytes_builder.dart'; 21 import 'bytes_builder.dart';
21 import 'io_sink.dart'; 22 import 'io_sink.dart';
22 import 'web_socket.dart'; 23 import 'web_socket.dart';
23 24
24 const String webSocketGUID = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"; 25 const String webSocketGUID = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
26 const String _webSocketGUID = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
27 const String _clientNoContextTakeover = "client_no_context_takeover";
28 const String _serverNoContextTakeover = "server_no_context_takeover";
29 const String _clientMaxWindowBits = "client_max_window_bits";
30 const String _serverMaxWindowBits = "server_max_window_bits";
25 31
26 final _random = new Random(); 32 final _random = new Random();
27 33
28 // Matches _WebSocketOpcode. 34 // Matches _WebSocketOpcode.
29 class _WebSocketMessageType { 35 class _WebSocketMessageType {
30 static const int NONE = 0; 36 static const int NONE = 0;
31 static const int TEXT = 1; 37 static const int TEXT = 1;
32 static const int BINARY = 2; 38 static const int BINARY = 2;
33 } 39 }
34 40
35
36 class _WebSocketOpcode { 41 class _WebSocketOpcode {
37 static const int CONTINUATION = 0; 42 static const int CONTINUATION = 0;
38 static const int TEXT = 1; 43 static const int TEXT = 1;
39 static const int BINARY = 2; 44 static const int BINARY = 2;
40 static const int RESERVED_3 = 3; 45 static const int RESERVED_3 = 3;
41 static const int RESERVED_4 = 4; 46 static const int RESERVED_4 = 4;
42 static const int RESERVED_5 = 5; 47 static const int RESERVED_5 = 5;
43 static const int RESERVED_6 = 6; 48 static const int RESERVED_6 = 6;
44 static const int RESERVED_7 = 7; 49 static const int RESERVED_7 = 7;
45 static const int CLOSE = 8; 50 static const int CLOSE = 8;
46 static const int PING = 9; 51 static const int PING = 9;
47 static const int PONG = 10; 52 static const int PONG = 10;
48 static const int RESERVED_B = 11; 53 static const int RESERVED_B = 11;
49 static const int RESERVED_C = 12; 54 static const int RESERVED_C = 12;
50 static const int RESERVED_D = 13; 55 static const int RESERVED_D = 13;
51 static const int RESERVED_E = 14; 56 static const int RESERVED_E = 14;
52 static const int RESERVED_F = 15; 57 static const int RESERVED_F = 15;
53 } 58 }
54 59
55 /** 60 /**
61 * Stores the header and integer value derived from negotiation of
62 * client_max_window_bits and server_max_window_bits. headerValue will be
63 * set in the Websocket response headers.
64 */
65 class _CompressionMaxWindowBits {
66 String headerValue;
67 int maxWindowBits;
68 _CompressionMaxWindowBits([this.headerValue, this.maxWindowBits]);
69 String toString() => headerValue;
70 }
71
72 /**
56 * The web socket protocol transformer handles the protocol byte stream 73 * The web socket protocol transformer handles the protocol byte stream
57 * which is supplied through the [:handleData:]. As the protocol is processed, 74 * which is supplied through the [:handleData:]. As the protocol is processed,
58 * it'll output frame data as either a List<int> or String. 75 * it'll output frame data as either a List<int> or String.
59 * 76 *
60 * Important infomation about usage: Be sure you use cancelOnError, so the 77 * Important information about usage: Be sure you use cancelOnError, so the
61 * socket will be closed when the processer encounter an error. Not using it 78 * socket will be closed when the processor encounter an error. Not using it
62 * will lead to undefined behaviour. 79 * will lead to undefined behaviour.
63 */ 80 */
64 // TODO(ajohnsen): make this transformer reusable? 81 // TODO(ajohnsen): make this transformer reusable?
65 class _WebSocketProtocolTransformer implements StreamTransformer, EventSink { 82 class _WebSocketProtocolTransformer
83 implements StreamTransformer<List<int>, dynamic>, EventSink<List<int>> {
66 static const int START = 0; 84 static const int START = 0;
67 static const int LEN_FIRST = 1; 85 static const int LEN_FIRST = 1;
68 static const int LEN_REST = 2; 86 static const int LEN_REST = 2;
69 static const int MASK = 3; 87 static const int MASK = 3;
70 static const int PAYLOAD = 4; 88 static const int PAYLOAD = 4;
71 static const int CLOSED = 5; 89 static const int CLOSED = 5;
72 static const int FAILURE = 6; 90 static const int FAILURE = 6;
91 static const int FIN = 0x80;
92 static const int RSV1 = 0x40;
93 static const int RSV2 = 0x20;
94 static const int RSV3 = 0x10;
95 static const int OPCODE = 0xF;
73 96
74 int _state = START; 97 int _state = START;
75 bool _fin = false; 98 bool _fin = false;
99 bool _compressed = false;
76 int _opcode = -1; 100 int _opcode = -1;
77 int _len = -1; 101 int _len = -1;
78 bool _masked = false; 102 bool _masked = false;
79 int _remainingLenBytes = -1; 103 int _remainingLenBytes = -1;
80 int _remainingMaskingKeyBytes = 4; 104 int _remainingMaskingKeyBytes = 4;
81 int _remainingPayloadBytes = -1; 105 int _remainingPayloadBytes = -1;
82 int _unmaskingIndex = 0; 106 int _unmaskingIndex = 0;
83 int _currentMessageType = _WebSocketMessageType.NONE; 107 int _currentMessageType = _WebSocketMessageType.NONE;
84 int closeCode = WebSocketStatus.NO_STATUS_RECEIVED; 108 int closeCode = WebSocketStatus.NO_STATUS_RECEIVED;
85 String closeReason = ""; 109 String closeReason = "";
86 110
87 EventSink _eventSink; 111 EventSink _eventSink;
88 112
89 final bool _serverSide; 113 final bool _serverSide;
90 final List _maskingBytes = new List(4); 114 final List _maskingBytes = new List(4);
91 final BytesBuilder _payload = new BytesBuilder(copy: false); 115 final BytesBuilder _payload = new BytesBuilder(copy: false);
92 116
93 _WebSocketProtocolTransformer([this._serverSide = false]); 117 _WebSocketProtocolTransformer([this._serverSide = false]);
94 118
95 Stream bind(Stream stream) { 119 Stream bind(Stream stream) {
96 return new Stream.eventTransformed( 120 return new Stream.eventTransformed(stream, (EventSink eventSink) {
97 stream, 121 if (_eventSink != null) {
98 (EventSink eventSink) { 122 throw new StateError("WebSocket transformer already used.");
99 if (_eventSink != null) { 123 }
100 throw new StateError("WebSocket transformer already used."); 124 _eventSink = eventSink;
101 } 125 return this;
102 _eventSink = eventSink; 126 });
103 return this;
104 });
105 } 127 }
106 128
107 void addError(Object error, [StackTrace stackTrace]) => 129 void addError(Object error, [StackTrace stackTrace]) {
108 _eventSink.addError(error, stackTrace); 130 _eventSink.addError(error, stackTrace);
131 }
109 132
110 void close() => _eventSink.close(); 133 void close() { _eventSink.close(); }
111 134
112 /** 135 /**
113 * Process data received from the underlying communication channel. 136 * Process data received from the underlying communication channel.
114 */ 137 */
115 void add(Uint8List buffer) { 138 void add(List<int> bytes) {
116 int count = buffer.length; 139 var buffer = bytes is Uint8List ? bytes : new Uint8List.fromList(bytes);
117 int index = 0; 140 int index = 0;
118 int lastIndex = count; 141 int lastIndex = buffer.length;
119 if (_state == CLOSED) { 142 if (_state == CLOSED) {
120 throw new WebSocketChannelException("Data on closed connection"); 143 throw new WebSocketChannelException("Data on closed connection");
121 } 144 }
122 if (_state == FAILURE) { 145 if (_state == FAILURE) {
123 throw new WebSocketChannelException("Data on failed connection"); 146 throw new WebSocketChannelException("Data on failed connection");
124 } 147 }
125 while ((index < lastIndex) && _state != CLOSED && _state != FAILURE) { 148 while ((index < lastIndex) && _state != CLOSED && _state != FAILURE) {
126 int byte = buffer[index]; 149 int byte = buffer[index];
127 if (_state <= LEN_REST) { 150 if (_state <= LEN_REST) {
128 if (_state == START) { 151 if (_state == START) {
129 _fin = (byte & 0x80) != 0; 152 _fin = (byte & FIN) != 0;
130 if ((byte & 0x70) != 0) { 153
131 // The RSV1, RSV2 bits RSV3 must be all zero. 154 if((byte & (RSV2 | RSV3)) != 0) {
155 // The RSV2, RSV3 bits must both be zero.
132 throw new WebSocketChannelException("Protocol error"); 156 throw new WebSocketChannelException("Protocol error");
133 } 157 }
134 _opcode = (byte & 0xF); 158
159 _opcode = (byte & OPCODE);
160
161 if (_opcode != _WebSocketOpcode.CONTINUATION) {
162 if ((byte & RSV1) != 0) {
163 _compressed = true;
164 } else {
165 _compressed = false;
166 }
167 }
168
135 if (_opcode <= _WebSocketOpcode.BINARY) { 169 if (_opcode <= _WebSocketOpcode.BINARY) {
136 if (_opcode == _WebSocketOpcode.CONTINUATION) { 170 if (_opcode == _WebSocketOpcode.CONTINUATION) {
137 if (_currentMessageType == _WebSocketMessageType.NONE) { 171 if (_currentMessageType == _WebSocketMessageType.NONE) {
138 throw new WebSocketChannelException("Protocol error"); 172 throw new WebSocketChannelException("Protocol error");
139 } 173 }
140 } else { 174 } else {
141 assert(_opcode == _WebSocketOpcode.TEXT || 175 assert(_opcode == _WebSocketOpcode.TEXT ||
142 _opcode == _WebSocketOpcode.BINARY); 176 _opcode == _WebSocketOpcode.BINARY);
143 if (_currentMessageType != _WebSocketMessageType.NONE) { 177 if (_currentMessageType != _WebSocketMessageType.NONE) {
144 throw new WebSocketChannelException("Protocol error"); 178 throw new WebSocketChannelException("Protocol error");
145 } 179 }
146 _currentMessageType = _opcode; 180 _currentMessageType = _opcode;
147 } 181 }
148 } else if (_opcode >= _WebSocketOpcode.CLOSE && 182 } else if (_opcode >= _WebSocketOpcode.CLOSE &&
149 _opcode <= _WebSocketOpcode.PONG) { 183 _opcode <= _WebSocketOpcode.PONG) {
150 // Control frames cannot be fragmented. 184 // Control frames cannot be fragmented.
151 if (!_fin) throw new WebSocketChannelException("Protocol error"); 185 if (!_fin) throw new WebSocketChannelException("Protocol error");
152 } else { 186 } else {
153 throw new WebSocketChannelException("Protocol error"); 187 throw new WebSocketChannelException("Protocol error");
154 } 188 }
155 _state = LEN_FIRST; 189 _state = LEN_FIRST;
156 } else if (_state == LEN_FIRST) { 190 } else if (_state == LEN_FIRST) {
157 _masked = (byte & 0x80) != 0; 191 _masked = (byte & 0x80) != 0;
158 _len = byte & 0x7F; 192 _len = byte & 0x7F;
159 if (_isControlFrame() && _len > 125) { 193 if (_isControlFrame() && _len > 125) {
(...skipping 28 matching lines...) Expand all
188 } else { 222 } else {
189 assert(_state == PAYLOAD); 223 assert(_state == PAYLOAD);
190 // The payload is not handled one byte at a time but in blocks. 224 // The payload is not handled one byte at a time but in blocks.
191 int payloadLength = min(lastIndex - index, _remainingPayloadBytes); 225 int payloadLength = min(lastIndex - index, _remainingPayloadBytes);
192 _remainingPayloadBytes -= payloadLength; 226 _remainingPayloadBytes -= payloadLength;
193 // Unmask payload if masked. 227 // Unmask payload if masked.
194 if (_masked) { 228 if (_masked) {
195 _unmask(index, payloadLength, buffer); 229 _unmask(index, payloadLength, buffer);
196 } 230 }
197 // Control frame and data frame share _payloads. 231 // Control frame and data frame share _payloads.
198 _payload.add( 232 _payload.add(new Uint8List.view(buffer.buffer, index, payloadLength));
199 new Uint8List.view(buffer.buffer, index, payloadLength));
200 index += payloadLength; 233 index += payloadLength;
201 if (_isControlFrame()) { 234 if (_isControlFrame()) {
202 if (_remainingPayloadBytes == 0) _controlFrameEnd(); 235 if (_remainingPayloadBytes == 0) _controlFrameEnd();
203 } else { 236 } else {
204 if (_currentMessageType != _WebSocketMessageType.TEXT && 237 if (_currentMessageType != _WebSocketMessageType.TEXT &&
205 _currentMessageType != _WebSocketMessageType.BINARY) { 238 _currentMessageType != _WebSocketMessageType.BINARY) {
206 throw new WebSocketChannelException("Protocol error"); 239 throw new WebSocketChannelException("Protocol error");
207 } 240 }
208 if (_remainingPayloadBytes == 0) _messageFrameEnd(); 241 if (_remainingPayloadBytes == 0) _messageFrameEnd();
209 } 242 }
210 243
211 // Hack - as we always do index++ below. 244 // Hack - as we always do index++ below.
212 index--; 245 index--;
213 } 246 }
214 } 247 }
215 248
216 // Move to the next byte. 249 // Move to the next byte.
(...skipping 14 matching lines...) Expand all
231 index += startOffset; 264 index += startOffset;
232 length -= startOffset; 265 length -= startOffset;
233 final int blockCount = length ~/ BLOCK_SIZE; 266 final int blockCount = length ~/ BLOCK_SIZE;
234 if (blockCount > 0) { 267 if (blockCount > 0) {
235 // Create mask block. 268 // Create mask block.
236 int mask = 0; 269 int mask = 0;
237 for (int i = 3; i >= 0; i--) { 270 for (int i = 3; i >= 0; i--) {
238 mask = (mask << 8) | _maskingBytes[(_unmaskingIndex + i) & 3]; 271 mask = (mask << 8) | _maskingBytes[(_unmaskingIndex + i) & 3];
239 } 272 }
240 Int32x4 blockMask = new Int32x4(mask, mask, mask, mask); 273 Int32x4 blockMask = new Int32x4(mask, mask, mask, mask);
241 Int32x4List blockBuffer = new Int32x4List.view( 274 Int32x4List blockBuffer =
242 buffer.buffer, index, blockCount); 275 new Int32x4List.view(buffer.buffer, index, blockCount);
243 for (int i = 0; i < blockBuffer.length; i++) { 276 for (int i = 0; i < blockBuffer.length; i++) {
244 blockBuffer[i] ^= blockMask; 277 blockBuffer[i] ^= blockMask;
245 } 278 }
246 final int bytes = blockCount * BLOCK_SIZE; 279 final int bytes = blockCount * BLOCK_SIZE;
247 index += bytes; 280 index += bytes;
248 length -= bytes; 281 length -= bytes;
249 } 282 }
250 } 283 }
251 // Handle end. 284 // Handle end.
252 final int end = index + length; 285 final int end = index + length;
(...skipping 45 matching lines...) Expand 10 before | Expand all | Expand 10 after
298 } else { 331 } else {
299 _messageFrameEnd(); 332 _messageFrameEnd();
300 } 333 }
301 } else { 334 } else {
302 _state = PAYLOAD; 335 _state = PAYLOAD;
303 } 336 }
304 } 337 }
305 338
306 void _messageFrameEnd() { 339 void _messageFrameEnd() {
307 if (_fin) { 340 if (_fin) {
341 var bytes = _payload.takeBytes();
342
308 switch (_currentMessageType) { 343 switch (_currentMessageType) {
309 case _WebSocketMessageType.TEXT: 344 case _WebSocketMessageType.TEXT:
310 _eventSink.add(UTF8.decode(_payload.takeBytes())); 345 _eventSink.add(UTF8.decode(bytes));
311 break; 346 break;
312 case _WebSocketMessageType.BINARY: 347 case _WebSocketMessageType.BINARY:
313 _eventSink.add(_payload.takeBytes()); 348 _eventSink.add(bytes);
314 break; 349 break;
315 } 350 }
316 _currentMessageType = _WebSocketMessageType.NONE; 351 _currentMessageType = _WebSocketMessageType.NONE;
317 } 352 }
318 _prepareForNextFrame(); 353 _prepareForNextFrame();
319 } 354 }
320 355
321 void _controlFrameEnd() { 356 void _controlFrameEnd() {
322 switch (_opcode) { 357 switch (_opcode) {
323 case _WebSocketOpcode.CLOSE: 358 case _WebSocketOpcode.CLOSE:
(...skipping 21 matching lines...) Expand all
345 380
346 case _WebSocketOpcode.PONG: 381 case _WebSocketOpcode.PONG:
347 _eventSink.add(new _WebSocketPong(_payload.takeBytes())); 382 _eventSink.add(new _WebSocketPong(_payload.takeBytes()));
348 break; 383 break;
349 } 384 }
350 _prepareForNextFrame(); 385 _prepareForNextFrame();
351 } 386 }
352 387
353 bool _isControlFrame() { 388 bool _isControlFrame() {
354 return _opcode == _WebSocketOpcode.CLOSE || 389 return _opcode == _WebSocketOpcode.CLOSE ||
355 _opcode == _WebSocketOpcode.PING || 390 _opcode == _WebSocketOpcode.PING ||
356 _opcode == _WebSocketOpcode.PONG; 391 _opcode == _WebSocketOpcode.PONG;
357 } 392 }
358 393
359 void _prepareForNextFrame() { 394 void _prepareForNextFrame() {
360 if (_state != CLOSED && _state != FAILURE) _state = START; 395 if (_state != CLOSED && _state != FAILURE) _state = START;
361 _fin = false; 396 _fin = false;
362 _opcode = -1; 397 _opcode = -1;
363 _len = -1; 398 _len = -1;
364 _remainingLenBytes = -1; 399 _remainingLenBytes = -1;
365 _remainingMaskingKeyBytes = 4; 400 _remainingMaskingKeyBytes = 4;
366 _remainingPayloadBytes = -1; 401 _remainingPayloadBytes = -1;
367 _unmaskingIndex = 0; 402 _unmaskingIndex = 0;
368 } 403 }
369 } 404 }
370 405
371
372 class _WebSocketPing { 406 class _WebSocketPing {
373 final List<int> payload; 407 final List<int> payload;
374 _WebSocketPing([this.payload = null]); 408 _WebSocketPing([this.payload = null]);
375 } 409 }
376 410
377
378 class _WebSocketPong { 411 class _WebSocketPong {
379 final List<int> payload; 412 final List<int> payload;
380 _WebSocketPong([this.payload = null]); 413 _WebSocketPong([this.payload = null]);
381 } 414 }
382 415
383 // TODO(ajohnsen): Make this transformer reusable. 416 // TODO(ajohnsen): Make this transformer reusable.
384 class _WebSocketOutgoingTransformer implements StreamTransformer, EventSink { 417 class _WebSocketOutgoingTransformer
418 implements StreamTransformer<dynamic, List<int>>, EventSink {
385 final WebSocketImpl webSocket; 419 final WebSocketImpl webSocket;
386 EventSink _eventSink; 420 EventSink<List<int>> _eventSink;
387 421
388 _WebSocketOutgoingTransformer(this.webSocket); 422 _WebSocketOutgoingTransformer(this.webSocket);
389 423
390 Stream bind(Stream stream) { 424 Stream<List<int>> bind(Stream stream) {
391 return new Stream.eventTransformed( 425 return new Stream.eventTransformed(stream, (eventSink) {
392 stream, 426 if (_eventSink != null) {
393 (EventSink eventSink) { 427 throw new StateError("WebSocket transformer already used");
394 if (_eventSink != null) { 428 }
395 throw new StateError("WebSocket transformer already used"); 429 _eventSink = eventSink;
396 } 430 return this;
397 _eventSink = eventSink; 431 });
398 return this;
399 });
400 } 432 }
401 433
402 void add(message) { 434 void add(message) {
403 if (message is _WebSocketPong) { 435 if (message is _WebSocketPong) {
404 addFrame(_WebSocketOpcode.PONG, message.payload); 436 addFrame(_WebSocketOpcode.PONG, message.payload);
405 return; 437 return;
406 } 438 }
407 if (message is _WebSocketPing) { 439 if (message is _WebSocketPing) {
408 addFrame(_WebSocketOpcode.PING, message.payload); 440 addFrame(_WebSocketOpcode.PING, message.payload);
409 return; 441 return;
410 } 442 }
411 List<int> data; 443 List<int> data;
412 int opcode; 444 int opcode;
413 if (message != null) { 445 if (message != null) {
414 if (message is String) { 446 if (message is String) {
415 opcode = _WebSocketOpcode.TEXT; 447 opcode = _WebSocketOpcode.TEXT;
416 data = UTF8.encode(message); 448 data = UTF8.encode(message);
417 } else { 449 } else {
418 if (message is !List<int>) { 450 if (message is List<int>) {
451 data = message;
452 opcode = _WebSocketOpcode.BINARY;
453 } else {
419 throw new ArgumentError(message); 454 throw new ArgumentError(message);
420 } 455 }
421 opcode = _WebSocketOpcode.BINARY;
422 data = message;
423 } 456 }
424 } else { 457 } else {
425 opcode = _WebSocketOpcode.TEXT; 458 opcode = _WebSocketOpcode.TEXT;
426 } 459 }
427 addFrame(opcode, data); 460 addFrame(opcode, data);
428 } 461 }
429 462
430 void addError(Object error, [StackTrace stackTrace]) => 463 void addError(Object error, [StackTrace stackTrace]) {
431 _eventSink.addError(error, stackTrace); 464 _eventSink.addError(error, stackTrace);
465 }
432 466
433 void close() { 467 void close() {
434 int code = webSocket._outCloseCode; 468 int code = webSocket._outCloseCode;
435 String reason = webSocket._outCloseReason; 469 String reason = webSocket._outCloseReason;
436 List<int> data; 470 List<int> data;
437 if (code != null) { 471 if (code != null) {
438 data = new List<int>(); 472 data = new List<int>();
439 data.add((code >> 8) & 0xFF); 473 data.add((code >> 8) & 0xFF);
440 data.add(code & 0xFF); 474 data.add(code & 0xFF);
441 if (reason != null) { 475 if (reason != null) {
442 data.addAll(UTF8.encode(reason)); 476 data.addAll(UTF8.encode(reason));
443 } 477 }
444 } 478 }
445 addFrame(_WebSocketOpcode.CLOSE, data); 479 addFrame(_WebSocketOpcode.CLOSE, data);
446 _eventSink.close(); 480 _eventSink.close();
447 } 481 }
448 482
449 void addFrame(int opcode, List<int> data) => 483 void addFrame(int opcode, List<int> data) => createFrame(
450 createFrame(opcode, data, webSocket._serverSide).forEach(_eventSink.add); 484 opcode,
485 data,
486 webSocket._serverSide,
487 false).forEach((e) {
488 _eventSink.add(e);
489 });
451 490
452 static Iterable createFrame(int opcode, List<int> data, bool serverSide) { 491 static Iterable<List<int>> createFrame(
453 bool mask = !serverSide; // Masking not implemented for server. 492 int opcode, List<int> data, bool serverSide, bool compressed) {
493 bool mask = !serverSide; // Masking not implemented for server.
454 int dataLength = data == null ? 0 : data.length; 494 int dataLength = data == null ? 0 : data.length;
455 // Determine the header size. 495 // Determine the header size.
456 int headerSize = (mask) ? 6 : 2; 496 int headerSize = (mask) ? 6 : 2;
457 if (dataLength > 65535) { 497 if (dataLength > 65535) {
458 headerSize += 8; 498 headerSize += 8;
459 } else if (dataLength > 125) { 499 } else if (dataLength > 125) {
460 headerSize += 2; 500 headerSize += 2;
461 } 501 }
462 Uint8List header = new Uint8List(headerSize); 502 Uint8List header = new Uint8List(headerSize);
463 int index = 0; 503 int index = 0;
504
464 // Set FIN and opcode. 505 // Set FIN and opcode.
465 header[index++] = 0x80 | opcode; 506 var hoc = _WebSocketProtocolTransformer.FIN
507 | (compressed ? _WebSocketProtocolTransformer.RSV1 : 0)
508 | (opcode & _WebSocketProtocolTransformer.OPCODE);
509
510 header[index++] = hoc;
466 // Determine size and position of length field. 511 // Determine size and position of length field.
467 int lengthBytes = 1; 512 int lengthBytes = 1;
468 if (dataLength > 65535) { 513 if (dataLength > 65535) {
469 header[index++] = 127; 514 header[index++] = 127;
470 lengthBytes = 8; 515 lengthBytes = 8;
471 } else if (dataLength > 125) { 516 } else if (dataLength > 125) {
472 header[index++] = 126; 517 header[index++] = 126;
473 lengthBytes = 2; 518 lengthBytes = 2;
474 } 519 }
475 // Write the length in network byte order into the header. 520 // Write the length in network byte order into the header.
(...skipping 12 matching lines...) Expand all
488 // encoded data. 533 // encoded data.
489 if (opcode == _WebSocketOpcode.TEXT && data is Uint8List) { 534 if (opcode == _WebSocketOpcode.TEXT && data is Uint8List) {
490 list = data; 535 list = data;
491 } else { 536 } else {
492 if (data is Uint8List) { 537 if (data is Uint8List) {
493 list = new Uint8List.fromList(data); 538 list = new Uint8List.fromList(data);
494 } else { 539 } else {
495 list = new Uint8List(data.length); 540 list = new Uint8List(data.length);
496 for (int i = 0; i < data.length; i++) { 541 for (int i = 0; i < data.length; i++) {
497 if (data[i] < 0 || 255 < data[i]) { 542 if (data[i] < 0 || 255 < data[i]) {
498 throw new ArgumentError( 543 throw new ArgumentError("List element is not a byte value "
499 "List element is not a byte value "
500 "(value ${data[i]} at index $i)"); 544 "(value ${data[i]} at index $i)");
501 } 545 }
502 list[i] = data[i]; 546 list[i] = data[i];
503 } 547 }
504 } 548 }
505 } 549 }
506 const int BLOCK_SIZE = 16; 550 const int BLOCK_SIZE = 16;
507 int blockCount = list.length ~/ BLOCK_SIZE; 551 int blockCount = list.length ~/ BLOCK_SIZE;
508 if (blockCount > 0) { 552 if (blockCount > 0) {
509 // Create mask block. 553 // Create mask block.
510 int mask = 0; 554 int mask = 0;
511 for (int i = 3; i >= 0; i--) { 555 for (int i = 3; i >= 0; i--) {
512 mask = (mask << 8) | maskBytes[i]; 556 mask = (mask << 8) | maskBytes[i];
513 } 557 }
514 Int32x4 blockMask = new Int32x4(mask, mask, mask, mask); 558 Int32x4 blockMask = new Int32x4(mask, mask, mask, mask);
515 Int32x4List blockBuffer = new Int32x4List.view( 559 Int32x4List blockBuffer =
516 list.buffer, 0, blockCount); 560 new Int32x4List.view(list.buffer, 0, blockCount);
517 for (int i = 0; i < blockBuffer.length; i++) { 561 for (int i = 0; i < blockBuffer.length; i++) {
518 blockBuffer[i] ^= blockMask; 562 blockBuffer[i] ^= blockMask;
519 } 563 }
520 } 564 }
521 // Handle end. 565 // Handle end.
522 for (int i = blockCount * BLOCK_SIZE; i < list.length; i++) { 566 for (int i = blockCount * BLOCK_SIZE; i < list.length; i++) {
523 list[i] ^= maskBytes[i & 3]; 567 list[i] ^= maskBytes[i & 3];
524 } 568 }
525 data = list; 569 data = list;
526 } 570 }
527 } 571 }
528 assert(index == headerSize); 572 assert(index == headerSize);
529 if (data == null) { 573 if (data == null) {
530 return [header]; 574 return [header];
531 } else { 575 } else {
532 return [header, data]; 576 return [header, data];
533 } 577 }
534 } 578 }
535 } 579 }
536 580
537
538 class _WebSocketConsumer implements StreamConsumer { 581 class _WebSocketConsumer implements StreamConsumer {
539 final WebSocketImpl webSocket; 582 final WebSocketImpl webSocket;
540 final StreamSink<List<int>> sink; 583 final StreamSink<List<int>> sink;
541 StreamController _controller; 584 StreamController _controller;
542 StreamSubscription _subscription; 585 StreamSubscription _subscription;
543 bool _issuedPause = false; 586 bool _issuedPause = false;
544 bool _closed = false; 587 bool _closed = false;
545 Completer _closeCompleter = new Completer(); 588 Completer _closeCompleter = new Completer();
546 Completer _completer; 589 Completer _completer;
547 590
(...skipping 24 matching lines...) Expand all
572 void _cancel() { 615 void _cancel() {
573 if (_subscription != null) { 616 if (_subscription != null) {
574 var subscription = _subscription; 617 var subscription = _subscription;
575 _subscription = null; 618 _subscription = null;
576 subscription.cancel(); 619 subscription.cancel();
577 } 620 }
578 } 621 }
579 622
580 _ensureController() { 623 _ensureController() {
581 if (_controller != null) return; 624 if (_controller != null) return;
582 _controller = new StreamController(sync: true, 625 _controller = new StreamController(
583 onPause: _onPause, 626 sync: true,
584 onResume: _onResume, 627 onPause: _onPause,
585 onCancel: _onListen); 628 onResume: _onResume,
586 var stream = _controller.stream.transform( 629 onCancel: _onListen);
587 new _WebSocketOutgoingTransformer(webSocket)); 630 var stream = _controller.stream
588 sink.addStream(stream) 631 .transform(new _WebSocketOutgoingTransformer(webSocket));
589 .then((_) { 632 sink.addStream(stream).then((_) {
590 _done(); 633 _done();
591 _closeCompleter.complete(webSocket); 634 _closeCompleter.complete(webSocket);
592 }, onError: (error, StackTrace stackTrace) { 635 }, onError: (error, StackTrace stackTrace) {
593 _closed = true; 636 _closed = true;
594 _cancel(); 637 _cancel();
595 if (error is ArgumentError) { 638 if (error is ArgumentError) {
596 if (!_done(error, stackTrace)) { 639 if (!_done(error, stackTrace)) {
597 _closeCompleter.completeError(error, stackTrace); 640 _closeCompleter.completeError(error, stackTrace);
598 } 641 }
599 } else { 642 } else {
600 _done(); 643 _done();
601 _closeCompleter.complete(webSocket); 644 _closeCompleter.complete(webSocket);
602 } 645 }
603 }); 646 });
604 } 647 }
605 648
606 bool _done([error, StackTrace stackTrace]) { 649 bool _done([error, StackTrace stackTrace]) {
607 if (_completer == null) return false; 650 if (_completer == null) return false;
608 if (error != null) { 651 if (error != null) {
609 _completer.completeError(error, stackTrace); 652 _completer.completeError(error, stackTrace);
610 } else { 653 } else {
611 _completer.complete(webSocket); 654 _completer.complete(webSocket);
612 } 655 }
613 _completer = null; 656 _completer = null;
614 return true; 657 return true;
615 } 658 }
616 659
617 Future addStream(var stream) { 660 Future addStream(var stream) {
618 if (_closed) { 661 if (_closed) {
619 stream.listen(null).cancel(); 662 stream.listen(null).cancel();
620 return new Future.value(webSocket); 663 return new Future.value(webSocket);
621 } 664 }
622 _ensureController(); 665 _ensureController();
623 _completer = new Completer(); 666 _completer = new Completer();
624 _subscription = stream.listen( 667 _subscription = stream.listen((data) {
625 (data) { 668 _controller.add(data);
626 _controller.add(data); 669 }, onDone: _done, onError: _done, cancelOnError: true);
627 },
628 onDone: _done,
629 onError: _done,
630 cancelOnError: true);
631 if (_issuedPause) { 670 if (_issuedPause) {
632 _subscription.pause(); 671 _subscription.pause();
633 _issuedPause = false; 672 _issuedPause = false;
634 } 673 }
635 return _completer.future; 674 return _completer.future;
636 } 675 }
637 676
638 Future close() { 677 Future close() {
639 _ensureController(); 678 _ensureController();
640 Future closeSocket() { 679 Future closeSocket() {
641 return sink.close().catchError((_) {}).then((_) => webSocket); 680 return sink.close().catchError((_) {}).then((_) => webSocket);
642 } 681 }
643 _controller.close(); 682 _controller.close();
644 return _closeCompleter.future.then((_) => closeSocket()); 683 return _closeCompleter.future.then((_) => closeSocket());
645 } 684 }
646 685
647 void add(data) { 686 void add(data) {
648 if (_closed) return; 687 if (_closed) return;
649 _ensureController(); 688 _ensureController();
650 _controller.add(data); 689 _controller.add(data);
651 } 690 }
652 691
653 void closeSocket() { 692 void closeSocket() {
654 _closed = true; 693 _closed = true;
655 _cancel(); 694 _cancel();
656 close(); 695 close();
657 } 696 }
658 } 697 }
659 698
660
661 class WebSocketImpl extends Stream with _ServiceObject implements StreamSink { 699 class WebSocketImpl extends Stream with _ServiceObject implements StreamSink {
662 // Use default Map so we keep order. 700 // Use default Map so we keep order.
663 static Map<int, WebSocketImpl> _webSockets = new Map<int, WebSocketImpl>(); 701 static Map<int, WebSocketImpl> _webSockets = new Map<int, WebSocketImpl>();
702 static const int DEFAULT_WINDOW_BITS = 15;
703 static const String PER_MESSAGE_DEFLATE = "permessage-deflate";
664 704
665 final String protocol; 705 final String protocol;
666 706
667 StreamController _controller; 707 StreamController _controller;
668 StreamSubscription _subscription; 708 StreamSubscription _subscription;
669 StreamSink _sink; 709 StreamSink _sink;
670 710
671 final bool _serverSide; 711 final bool _serverSide;
672 int _readyState = WebSocket.CONNECTING; 712 int _readyState = WebSocket.CONNECTING;
673 bool _writeClosed = false; 713 bool _writeClosed = false;
674 int _closeCode; 714 int _closeCode;
675 String _closeReason; 715 String _closeReason;
676 Duration _pingInterval; 716 Duration _pingInterval;
677 Timer _pingTimer; 717 Timer _pingTimer;
678 _WebSocketConsumer _consumer; 718 _WebSocketConsumer _consumer;
679 719
680 int _outCloseCode; 720 int _outCloseCode;
681 String _outCloseReason; 721 String _outCloseReason;
682 Timer _closeTimer; 722 Timer _closeTimer;
683 723
684 WebSocketImpl.fromSocket(Stream<List<int>> stream, 724 WebSocketImpl.fromSocket(
685 StreamSink<List<int>> sink, this.protocol, [this._serverSide = false]) { 725 Stream<List<int>> stream, StreamSink<List<int>> sink, this.protocol,
726 [this._serverSide = false]) {
686 _consumer = new _WebSocketConsumer(this, sink); 727 _consumer = new _WebSocketConsumer(this, sink);
687 _sink = new StreamSinkImpl(_consumer); 728 _sink = new StreamSinkImpl(_consumer);
688 _readyState = WebSocket.OPEN; 729 _readyState = WebSocket.OPEN;
689 730
690 var transformer = new _WebSocketProtocolTransformer(_serverSide); 731 var transformer = new _WebSocketProtocolTransformer(_serverSide);
691 _subscription = stream.transform(transformer).listen( 732 _subscription = stream.transform(transformer).listen((data) {
692 (data) { 733 if (data is _WebSocketPing) {
693 if (data is _WebSocketPing) { 734 if (!_writeClosed) _consumer.add(new _WebSocketPong(data.payload));
694 if (!_writeClosed) _consumer.add(new _WebSocketPong(data.payload)); 735 } else if (data is _WebSocketPong) {
695 } else if (data is _WebSocketPong) { 736 // Simply set pingInterval, as it'll cancel any timers.
696 // Simply set pingInterval, as it'll cancel any timers. 737 pingInterval = _pingInterval;
697 pingInterval = _pingInterval; 738 } else {
698 } else { 739 _controller.add(data);
699 _controller.add(data); 740 }
700 } 741 }, onError: (error, stackTrace) {
701 }, 742 if (_closeTimer != null) _closeTimer.cancel();
702 onError: (error) { 743 if (error is FormatException) {
703 if (_closeTimer != null) _closeTimer.cancel(); 744 _close(WebSocketStatus.INVALID_FRAME_PAYLOAD_DATA);
704 if (error is FormatException) { 745 } else {
705 _close(WebSocketStatus.INVALID_FRAME_PAYLOAD_DATA); 746 _close(WebSocketStatus.PROTOCOL_ERROR);
706 } else { 747 }
707 _close(WebSocketStatus.PROTOCOL_ERROR); 748 // An error happened, set the close code set above.
708 } 749 _closeCode = _outCloseCode;
709 // An error happened, set the close code set above. 750 _closeReason = _outCloseReason;
710 _closeCode = _outCloseCode; 751 _controller.close();
711 _closeReason = _outCloseReason; 752 }, onDone: () {
712 _controller.close(); 753 if (_closeTimer != null) _closeTimer.cancel();
713 }, 754 if (_readyState == WebSocket.OPEN) {
714 onDone: () { 755 _readyState = WebSocket.CLOSING;
715 if (_closeTimer != null) _closeTimer.cancel(); 756 if (!_isReservedStatusCode(transformer.closeCode)) {
716 if (_readyState == WebSocket.OPEN) { 757 _close(transformer.closeCode, transformer.closeReason);
717 _readyState = WebSocket.CLOSING; 758 } else {
718 if (!_isReservedStatusCode(transformer.closeCode)) { 759 _close();
719 _close(transformer.closeCode); 760 }
720 } else { 761 _readyState = WebSocket.CLOSED;
721 _close(); 762 }
722 } 763 // Protocol close, use close code from transformer.
723 _readyState = WebSocket.CLOSED; 764 _closeCode = transformer.closeCode;
724 } 765 _closeReason = transformer.closeReason;
725 // Protocol close, use close code from transformer. 766 _controller.close();
726 _closeCode = transformer.closeCode; 767 }, cancelOnError: true);
727 _closeReason = transformer.closeReason;
728 _controller.close();
729 },
730 cancelOnError: true);
731 _subscription.pause(); 768 _subscription.pause();
732 _controller = new StreamController(sync: true, 769 _controller = new StreamController(
733 onListen: () => _subscription.resume(), 770 sync: true, onListen: () => _subscription.resume(), onCancel: () {
734 onCancel: () { 771 _subscription.cancel();
735 _subscription.cancel(); 772 _subscription = null;
736 _subscription = null; 773 }, onPause: _subscription.pause, onResume: _subscription.resume);
737 },
738 onPause: _subscription.pause,
739 onResume: _subscription.resume);
740 774
741 _webSockets[_serviceId] = this; 775 _webSockets[_serviceId] = this;
742 } 776 }
743 777
744 StreamSubscription listen(void onData(message), 778 StreamSubscription listen(void onData(message),
745 {Function onError, 779 {Function onError, void onDone(), bool cancelOnError}) {
746 void onDone(),
747 bool cancelOnError}) {
748 return _controller.stream.listen(onData, 780 return _controller.stream.listen(onData,
749 onError: onError, 781 onError: onError, onDone: onDone, cancelOnError: cancelOnError);
750 onDone: onDone,
751 cancelOnError: cancelOnError);
752 } 782 }
753 783
754 Duration get pingInterval => _pingInterval; 784 Duration get pingInterval => _pingInterval;
755 785
756 void set pingInterval(Duration interval) { 786 void set pingInterval(Duration interval) {
757 if (_writeClosed) return; 787 if (_writeClosed) return;
758 if (_pingTimer != null) _pingTimer.cancel(); 788 if (_pingTimer != null) _pingTimer.cancel();
759 _pingInterval = interval; 789 _pingInterval = interval;
760 790
761 if (_pingInterval == null) return; 791 if (_pingInterval == null) return;
762 792
763 _pingTimer = new Timer(_pingInterval, () { 793 _pingTimer = new Timer(_pingInterval, () {
764 if (_writeClosed) return; 794 if (_writeClosed) return;
765 _consumer.add(new _WebSocketPing()); 795 _consumer.add(new _WebSocketPing());
766 _pingTimer = new Timer(_pingInterval, () { 796 _pingTimer = new Timer(_pingInterval, () {
767 // No pong received. 797 // No pong received.
768 _close(WebSocketStatus.GOING_AWAY); 798 _close(WebSocketStatus.GOING_AWAY);
769 }); 799 });
770 }); 800 });
771 } 801 }
772 802
773 int get readyState => _readyState; 803 int get readyState => _readyState;
774 804
775 String get extensions => null; 805 String get extensions => null;
776 int get closeCode => _closeCode; 806 int get closeCode => _closeCode;
777 String get closeReason => _closeReason; 807 String get closeReason => _closeReason;
778 808
779 void add(data) => _sink.add(data); 809 void add(data) { _sink.add(data); }
780 void addError(error, [StackTrace stackTrace]) => 810 void addError(error, [StackTrace stackTrace]) {
781 _sink.addError(error, stackTrace); 811 _sink.addError(error, stackTrace);
812 }
782 Future addStream(Stream stream) => _sink.addStream(stream); 813 Future addStream(Stream stream) => _sink.addStream(stream);
783 Future get done => _sink.done; 814 Future get done => _sink.done;
784 815
785 Future close([int code, String reason]) { 816 Future close([int code, String reason]) {
786 if (_isReservedStatusCode(code)) { 817 if (_isReservedStatusCode(code)) {
787 throw new WebSocketChannelException("Reserved status code $code"); 818 throw new WebSocketChannelException("Reserved status code $code");
788 } 819 }
789 if (_outCloseCode == null) { 820 if (_outCloseCode == null) {
790 _outCloseCode = code; 821 _outCloseCode = code;
791 _outCloseReason = reason; 822 _outCloseReason = reason;
(...skipping 26 matching lines...) Expand all
818 if (_writeClosed) return; 849 if (_writeClosed) return;
819 if (_outCloseCode == null) { 850 if (_outCloseCode == null) {
820 _outCloseCode = code; 851 _outCloseCode = code;
821 _outCloseReason = reason; 852 _outCloseReason = reason;
822 } 853 }
823 _writeClosed = true; 854 _writeClosed = true;
824 _consumer.closeSocket(); 855 _consumer.closeSocket();
825 _webSockets.remove(_serviceId); 856 _webSockets.remove(_serviceId);
826 } 857 }
827 858
828 // The _toJSON, _serviceTypePath, and _serviceTypeName methods 859 // The _toJSON, _serviceTypePath, and _serviceTypeName methods have been
829 // have been deleted for http_parser. The methods were unused in WebSocket 860 // deleted for web_socket_channel. The methods were unused in WebSocket code
830 // code and produced warnings. 861 // and produced warnings.
831 862
832 static bool _isReservedStatusCode(int code) { 863 static bool _isReservedStatusCode(int code) {
833 return code != null && 864 return code != null &&
834 (code < WebSocketStatus.NORMAL_CLOSURE || 865 (code < WebSocketStatus.NORMAL_CLOSURE ||
835 code == WebSocketStatus.RESERVED_1004 || 866 code == WebSocketStatus.RESERVED_1004 ||
836 code == WebSocketStatus.NO_STATUS_RECEIVED || 867 code == WebSocketStatus.NO_STATUS_RECEIVED ||
837 code == WebSocketStatus.ABNORMAL_CLOSURE || 868 code == WebSocketStatus.ABNORMAL_CLOSURE ||
838 (code > WebSocketStatus.INTERNAL_SERVER_ERROR && 869 (code > WebSocketStatus.INTERNAL_SERVER_ERROR &&
839 code < WebSocketStatus.RESERVED_1015) || 870 code < WebSocketStatus.RESERVED_1015) ||
840 (code >= WebSocketStatus.RESERVED_1015 && 871 (code >= WebSocketStatus.RESERVED_1015 && code < 3000));
841 code < 3000));
842 } 872 }
843 } 873 }
844 874
845 // The following code is from sdk/lib/io/service_object.dart. 875 // The following code is from sdk/lib/io/service_object.dart.
846 876
847 int _nextServiceId = 1; 877 int _nextServiceId = 1;
848 878
849 // TODO(ajohnsen): Use other way of getting a uniq id. 879 // TODO(ajohnsen): Use other way of getting a uniq id.
850 abstract class _ServiceObject { 880 abstract class _ServiceObject {
851 int __serviceId = 0; 881 int __serviceId = 0;
852 int get _serviceId { 882 int get _serviceId {
853 if (__serviceId == 0) __serviceId = _nextServiceId++; 883 if (__serviceId == 0) __serviceId = _nextServiceId++;
854 return __serviceId; 884 return __serviceId;
855 } 885 }
856 886
857 // The _toJSON, _servicePath, _serviceTypePath, _serviceTypeName, and 887 // The _toJSON, _servicePath, _serviceTypePath, _serviceTypeName, and
858 // _serviceType methods have been deleted for http_parser. The methods were 888 // _serviceType methods have been deleted for http_parser. The methods were
859 // unused in WebSocket code and produced warnings. 889 // unused in WebSocket code and produced warnings.
860 } 890 }
OLDNEW
« no previous file with comments | « lib/src/copy/web_socket.dart ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698