Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(524)

Side by Side Diff: sdk/lib/io/websocket_impl.dart

Issue 25354003: Redo StreamTransformers so they work with Stack traces. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Small fixes and tests. Created 7 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 part of dart.io; 5 part of dart.io;
6 6
7 const String _webSocketGUID = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"; 7 const String _webSocketGUID = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
8 8
9 class _WebSocketMessageType { 9 class _WebSocketMessageType {
10 static const int NONE = 0; 10 static const int NONE = 0;
(...skipping 23 matching lines...) Expand all
34 34
35 /** 35 /**
36 * The web socket protocol transformer handles the protocol byte stream 36 * The web socket protocol transformer handles the protocol byte stream
37 * which is supplied through the [:handleData:]. As the protocol is processed, 37 * which is supplied through the [:handleData:]. As the protocol is processed,
38 * it'll output frame data as either a List<int> or String. 38 * it'll output frame data as either a List<int> or String.
39 * 39 *
40 * Important infomation about usage: Be sure you use cancelOnError, so the 40 * Important infomation about usage: Be sure you use cancelOnError, so the
41 * socket will be closed when the processer encounter an error. Not using it 41 * socket will be closed when the processer encounter an error. Not using it
42 * will lead to undefined behaviour. 42 * will lead to undefined behaviour.
43 */ 43 */
44 class _WebSocketProtocolTransformer extends StreamEventTransformer { 44 class _WebSocketProtocolTransformer implements StreamTransformer, EventSink {
45 static const int START = 0; 45 static const int START = 0;
46 static const int LEN_FIRST = 1; 46 static const int LEN_FIRST = 1;
47 static const int LEN_REST = 2; 47 static const int LEN_REST = 2;
48 static const int MASK = 3; 48 static const int MASK = 3;
49 static const int PAYLOAD = 4; 49 static const int PAYLOAD = 4;
50 static const int CLOSED = 5; 50 static const int CLOSED = 5;
51 static const int FAILURE = 6; 51 static const int FAILURE = 6;
52 52
53 bool _serverSide; 53 bool _serverSide;
54 EventSink _eventSink;
54 55
55 _WebSocketProtocolTransformer([bool this._serverSide = false]) { 56 _WebSocketProtocolTransformer([bool this._serverSide = false]) {
56 _prepareForNextFrame(); 57 _prepareForNextFrame();
57 _currentMessageType = _WebSocketMessageType.NONE; 58 _currentMessageType = _WebSocketMessageType.NONE;
58 } 59 }
59 60
61 Stream bind(Stream stream) {
62 return new Stream.eventTransformed(
63 stream,
64 (EventSink eventSink) {
65 if (_eventSink != null) {
66 throw new StateError("WebSocket transformer already used.");
67 }
68 _eventSink = eventSink;
69 return this;
70 });
71 }
72
73 void addError(Object error, [StackTrace stackTrace]) {
74 _eventSink.addError(error, stackTrace);
75 }
76
77 void close() => _eventSink.close();
78
60 /** 79 /**
61 * Process data received from the underlying communication channel. 80 * Process data received from the underlying communication channel.
62 */ 81 */
63 void handleData(Uint8List buffer, EventSink sink) { 82 void add(Uint8List buffer) {
64 int count = buffer.length; 83 int count = buffer.length;
65 int index = 0; 84 int index = 0;
66 int lastIndex = count; 85 int lastIndex = count;
67 try { 86 try {
68 if (_state == CLOSED) { 87 if (_state == CLOSED) {
69 throw new WebSocketException("Data on closed connection"); 88 throw new WebSocketException("Data on closed connection");
70 } 89 }
71 if (_state == FAILURE) { 90 if (_state == FAILURE) {
72 throw new WebSocketException("Data on failed connection"); 91 throw new WebSocketException("Data on failed connection");
73 } 92 }
(...skipping 17 matching lines...) Expand all
91 case _WebSocketOpcode.TEXT: 110 case _WebSocketOpcode.TEXT:
92 if (_currentMessageType != _WebSocketMessageType.NONE) { 111 if (_currentMessageType != _WebSocketMessageType.NONE) {
93 throw new WebSocketException("Protocol error"); 112 throw new WebSocketException("Protocol error");
94 } 113 }
95 _currentMessageType = _WebSocketMessageType.TEXT; 114 _currentMessageType = _WebSocketMessageType.TEXT;
96 _controller = new StreamController(sync: true); 115 _controller = new StreamController(sync: true);
97 _controller.stream 116 _controller.stream
98 .transform(UTF8.decoder) 117 .transform(UTF8.decoder)
99 .fold(new StringBuffer(), (buffer, str) => buffer..write(str)) 118 .fold(new StringBuffer(), (buffer, str) => buffer..write(str))
100 .then((buffer) { 119 .then((buffer) {
101 sink.add(buffer.toString()); 120 _eventSink.add(buffer.toString());
102 }, onError: sink.addError); 121 }, onError: _eventSink.addError);
103 break; 122 break;
104 123
105 case _WebSocketOpcode.BINARY: 124 case _WebSocketOpcode.BINARY:
106 if (_currentMessageType != _WebSocketMessageType.NONE) { 125 if (_currentMessageType != _WebSocketMessageType.NONE) {
107 throw new WebSocketException("Protocol error"); 126 throw new WebSocketException("Protocol error");
108 } 127 }
109 _currentMessageType = _WebSocketMessageType.BINARY; 128 _currentMessageType = _WebSocketMessageType.BINARY;
110 _controller = new StreamController(sync: true); 129 _controller = new StreamController(sync: true);
111 _controller.stream 130 _controller.stream
112 .fold(new BytesBuilder(), (buffer, data) => buffer..add(data)) 131 .fold(new BytesBuilder(), (buffer, data) => buffer..add(data))
113 .then((buffer) { 132 .then((buffer) {
114 sink.add(buffer.takeBytes()); 133 _eventSink.add(buffer.takeBytes());
115 }, onError: sink.addError); 134 }, onError: _eventSink.addError);
116 break; 135 break;
117 136
118 case _WebSocketOpcode.CLOSE: 137 case _WebSocketOpcode.CLOSE:
119 case _WebSocketOpcode.PING: 138 case _WebSocketOpcode.PING:
120 case _WebSocketOpcode.PONG: 139 case _WebSocketOpcode.PONG:
121 // Control frames cannot be fragmented. 140 // Control frames cannot be fragmented.
122 if (!_fin) throw new WebSocketException("Protocol error"); 141 if (!_fin) throw new WebSocketException("Protocol error");
123 break; 142 break;
124 143
125 default: 144 default:
126 throw new WebSocketException("Protocol error"); 145 throw new WebSocketException("Protocol error");
127 } 146 }
128 _state = LEN_FIRST; 147 _state = LEN_FIRST;
129 break; 148 break;
130 149
131 case LEN_FIRST: 150 case LEN_FIRST:
132 _masked = (byte & 0x80) != 0; 151 _masked = (byte & 0x80) != 0;
133 _len = byte & 0x7F; 152 _len = byte & 0x7F;
134 if (_isControlFrame() && _len > 125) { 153 if (_isControlFrame() && _len > 125) {
135 throw new WebSocketException("Protocol error"); 154 throw new WebSocketException("Protocol error");
136 } 155 }
137 if (_len < 126) { 156 if (_len < 126) {
138 _lengthDone(sink); 157 _lengthDone();
139 } else if (_len == 126) { 158 } else if (_len == 126) {
140 _len = 0; 159 _len = 0;
141 _remainingLenBytes = 2; 160 _remainingLenBytes = 2;
142 _state = LEN_REST; 161 _state = LEN_REST;
143 } else if (_len == 127) { 162 } else if (_len == 127) {
144 _len = 0; 163 _len = 0;
145 _remainingLenBytes = 8; 164 _remainingLenBytes = 8;
146 _state = LEN_REST; 165 _state = LEN_REST;
147 } 166 }
148 break; 167 break;
149 168
150 case LEN_REST: 169 case LEN_REST:
151 _len = _len << 8 | byte; 170 _len = _len << 8 | byte;
152 _remainingLenBytes--; 171 _remainingLenBytes--;
153 if (_remainingLenBytes == 0) { 172 if (_remainingLenBytes == 0) {
154 _lengthDone(sink); 173 _lengthDone();
155 } 174 }
156 break; 175 break;
157 176
158 case MASK: 177 case MASK:
159 _maskingKey = _maskingKey << 8 | byte; 178 _maskingKey = _maskingKey << 8 | byte;
160 _remainingMaskingKeyBytes--; 179 _remainingMaskingKeyBytes--;
161 if (_remainingMaskingKeyBytes == 0) { 180 if (_remainingMaskingKeyBytes == 0) {
162 _maskDone(sink); 181 _maskDone();
163 } 182 }
164 break; 183 break;
165 184
166 case PAYLOAD: 185 case PAYLOAD:
167 // The payload is not handled one byte at a time but in blocks. 186 // The payload is not handled one byte at a time but in blocks.
168 int payload; 187 int payload;
169 if (lastIndex - index <= _remainingPayloadBytes) { 188 if (lastIndex - index <= _remainingPayloadBytes) {
170 payload = lastIndex - index; 189 payload = lastIndex - index;
171 } else { 190 } else {
172 payload = _remainingPayloadBytes; 191 payload = _remainingPayloadBytes;
(...skipping 15 matching lines...) Expand all
188 // Allocate a buffer for collecting the control frame 207 // Allocate a buffer for collecting the control frame
189 // payload if any. 208 // payload if any.
190 if (_controlPayload == null) { 209 if (_controlPayload == null) {
191 _controlPayload = new List<int>(); 210 _controlPayload = new List<int>();
192 } 211 }
193 _controlPayload.addAll(buffer.sublist(index, index + payload)); 212 _controlPayload.addAll(buffer.sublist(index, index + payload));
194 index += payload; 213 index += payload;
195 } 214 }
196 215
197 if (_remainingPayloadBytes == 0) { 216 if (_remainingPayloadBytes == 0) {
198 _controlFrameEnd(sink); 217 _controlFrameEnd();
199 } 218 }
200 } else { 219 } else {
201 if (_currentMessageType != _WebSocketMessageType.TEXT && 220 if (_currentMessageType != _WebSocketMessageType.TEXT &&
202 _currentMessageType != _WebSocketMessageType.BINARY) { 221 _currentMessageType != _WebSocketMessageType.BINARY) {
203 throw new WebSocketException("Protocol error"); 222 throw new WebSocketException("Protocol error");
204 } 223 }
205 _controller.add( 224 _controller.add(
206 new Uint8List.view(buffer.buffer, index, payload)); 225 new Uint8List.view(buffer.buffer, index, payload));
207 index += payload; 226 index += payload;
208 if (_remainingPayloadBytes == 0) { 227 if (_remainingPayloadBytes == 0) {
209 _messageFrameEnd(sink); 228 _messageFrameEnd();
210 } 229 }
211 } 230 }
212 231
213 // Hack - as we always do index++ below. 232 // Hack - as we always do index++ below.
214 index--; 233 index--;
215 break; 234 break;
216 } 235 }
217 236
218 // Move to the next byte. 237 // Move to the next byte.
219 index++; 238 index++;
220 } 239 }
221 } catch (e) { 240 } catch (e, stackTrace) {
222 _state = FAILURE; 241 _state = FAILURE;
223 sink.addError(e); 242 _eventSink.addError(e, stackTrace);
224 } 243 }
225 } 244 }
226 245
227 void _lengthDone(EventSink sink) { 246 void _lengthDone() {
228 if (_masked) { 247 if (_masked) {
229 if (!_serverSide) { 248 if (!_serverSide) {
230 throw new WebSocketException("Received masked frame from server"); 249 throw new WebSocketException("Received masked frame from server");
231 } 250 }
232 _state = MASK; 251 _state = MASK;
233 _remainingMaskingKeyBytes = 4; 252 _remainingMaskingKeyBytes = 4;
234 } else { 253 } else {
235 if (_serverSide) { 254 if (_serverSide) {
236 throw new WebSocketException("Received unmasked frame from client"); 255 throw new WebSocketException("Received unmasked frame from client");
237 } 256 }
238 _remainingPayloadBytes = _len; 257 _remainingPayloadBytes = _len;
239 _startPayload(sink); 258 _startPayload();
240 } 259 }
241 } 260 }
242 261
243 void _maskDone(EventSink sink) { 262 void _maskDone() {
244 _remainingPayloadBytes = _len; 263 _remainingPayloadBytes = _len;
245 _startPayload(sink); 264 _startPayload();
246 } 265 }
247 266
248 void _startPayload(EventSink sink) { 267 void _startPayload() {
249 // If there is no actual payload perform perform callbacks without 268 // If there is no actual payload perform perform callbacks without
250 // going through the PAYLOAD state. 269 // going through the PAYLOAD state.
251 if (_remainingPayloadBytes == 0) { 270 if (_remainingPayloadBytes == 0) {
252 if (_isControlFrame()) { 271 if (_isControlFrame()) {
253 switch (_opcode) { 272 switch (_opcode) {
254 case _WebSocketOpcode.CLOSE: 273 case _WebSocketOpcode.CLOSE:
255 _state = CLOSED; 274 _state = CLOSED;
256 sink.close(); 275 _eventSink.close();
257 break; 276 break;
258 case _WebSocketOpcode.PING: 277 case _WebSocketOpcode.PING:
259 sink.add(new _WebSocketPing()); 278 _eventSink.add(new _WebSocketPing());
260 break; 279 break;
261 case _WebSocketOpcode.PONG: 280 case _WebSocketOpcode.PONG:
262 sink.add(new _WebSocketPong()); 281 _eventSink.add(new _WebSocketPong());
263 break; 282 break;
264 } 283 }
265 _prepareForNextFrame(); 284 _prepareForNextFrame();
266 } else { 285 } else {
267 _messageFrameEnd(sink); 286 _messageFrameEnd();
268 } 287 }
269 } else { 288 } else {
270 _state = PAYLOAD; 289 _state = PAYLOAD;
271 } 290 }
272 } 291 }
273 292
274 void _messageFrameEnd(EventSink sink) { 293 void _messageFrameEnd() {
275 if (_fin) { 294 if (_fin) {
276 switch (_currentMessageType) { 295 switch (_currentMessageType) {
277 case _WebSocketMessageType.TEXT: 296 case _WebSocketMessageType.TEXT:
278 _controller.close(); 297 _controller.close();
279 break; 298 break;
280 case _WebSocketMessageType.BINARY: 299 case _WebSocketMessageType.BINARY:
281 _controller.close(); 300 _controller.close();
282 break; 301 break;
283 } 302 }
284 _controller = null; 303 _controller = null;
285 _currentMessageType = _WebSocketMessageType.NONE; 304 _currentMessageType = _WebSocketMessageType.NONE;
286 } 305 }
287 _prepareForNextFrame(); 306 _prepareForNextFrame();
288 } 307 }
289 308
290 void _controlFrameEnd(EventSink sink) { 309 void _controlFrameEnd() {
291 switch (_opcode) { 310 switch (_opcode) {
292 case _WebSocketOpcode.CLOSE: 311 case _WebSocketOpcode.CLOSE:
293 closeCode = WebSocketStatus.NO_STATUS_RECEIVED; 312 closeCode = WebSocketStatus.NO_STATUS_RECEIVED;
294 if (_controlPayload.length > 0) { 313 if (_controlPayload.length > 0) {
295 if (_controlPayload.length == 1) { 314 if (_controlPayload.length == 1) {
296 throw new WebSocketException("Protocol error"); 315 throw new WebSocketException("Protocol error");
297 } 316 }
298 closeCode = _controlPayload[0] << 8 | _controlPayload[1]; 317 closeCode = _controlPayload[0] << 8 | _controlPayload[1];
299 if (closeCode == WebSocketStatus.NO_STATUS_RECEIVED) { 318 if (closeCode == WebSocketStatus.NO_STATUS_RECEIVED) {
300 throw new WebSocketException("Protocol error"); 319 throw new WebSocketException("Protocol error");
301 } 320 }
302 if (_controlPayload.length > 2) { 321 if (_controlPayload.length > 2) {
303 closeReason = UTF8.decode(_controlPayload.sublist(2)); 322 closeReason = UTF8.decode(_controlPayload.sublist(2));
304 } 323 }
305 } 324 }
306 _state = CLOSED; 325 _state = CLOSED;
307 sink.close(); 326 _eventSink.close();
308 break; 327 break;
309 328
310 case _WebSocketOpcode.PING: 329 case _WebSocketOpcode.PING:
311 sink.add(new _WebSocketPing(_controlPayload)); 330 _eventSink.add(new _WebSocketPing(_controlPayload));
312 break; 331 break;
313 332
314 case _WebSocketOpcode.PONG: 333 case _WebSocketOpcode.PONG:
315 sink.add(new _WebSocketPong(_controlPayload)); 334 _eventSink.add(new _WebSocketPong(_controlPayload));
316 break; 335 break;
317 } 336 }
318 _prepareForNextFrame(); 337 _prepareForNextFrame();
319 } 338 }
320 339
321 bool _isControlFrame() { 340 bool _isControlFrame() {
322 return _opcode == _WebSocketOpcode.CLOSE || 341 return _opcode == _WebSocketOpcode.CLOSE ||
323 _opcode == _WebSocketOpcode.PING || 342 _opcode == _WebSocketOpcode.PING ||
324 _opcode == _WebSocketOpcode.PONG; 343 _opcode == _WebSocketOpcode.PONG;
325 } 344 }
(...skipping 107 matching lines...) Expand 10 before | Expand all | Expand 10 after
433 } 452 }
434 String key = request.headers.value("Sec-WebSocket-Key"); 453 String key = request.headers.value("Sec-WebSocket-Key");
435 if (key == null) { 454 if (key == null) {
436 return false; 455 return false;
437 } 456 }
438 return true; 457 return true;
439 } 458 }
440 } 459 }
441 460
442 461
443 class _WebSocketOutgoingTransformer extends StreamEventTransformer { 462 class _WebSocketOutgoingTransformer implements StreamTransformer, EventSink {
444 final _WebSocketImpl webSocket; 463 final _WebSocketImpl webSocket;
464 EventSink _eventSink;
445 465
446 _WebSocketOutgoingTransformer(_WebSocketImpl this.webSocket); 466 _WebSocketOutgoingTransformer(_WebSocketImpl this.webSocket);
447 467
448 void handleData(message, EventSink<List<int>> sink) { 468 Stream bind(Stream stream) {
469 return new Stream.eventTransformed(
470 stream,
471 (EventSink eventSink) {
472 if (_eventSink != null) {
473 throw new StateError("WebSocket transformer already used");
Lasse Reichstein Nielsen 2013/10/07 11:47:00 If it didn't try to put everything into one class,
floitsch 2013/10/10 15:39:57 Agreed. Added a TODO. I just wanted to keep the ch
474 }
475 _eventSink = eventSink;
476 return this;
477 });
478 }
479
480 void add(message) {
449 if (message is _WebSocketPong) { 481 if (message is _WebSocketPong) {
450 addFrame(_WebSocketOpcode.PONG, message.payload, sink); 482 addFrame(_WebSocketOpcode.PONG, message.payload);
451 return; 483 return;
452 } 484 }
453 if (message is _WebSocketPing) { 485 if (message is _WebSocketPing) {
454 addFrame(_WebSocketOpcode.PING, message.payload, sink); 486 addFrame(_WebSocketOpcode.PING, message.payload);
455 return; 487 return;
456 } 488 }
457 List<int> data; 489 List<int> data;
458 int opcode; 490 int opcode;
459 if (message != null) { 491 if (message != null) {
460 if (message is String) { 492 if (message is String) {
461 opcode = _WebSocketOpcode.TEXT; 493 opcode = _WebSocketOpcode.TEXT;
462 data = UTF8.encode(message); 494 data = UTF8.encode(message);
463 } else { 495 } else {
464 if (message is !List<int>) { 496 if (message is !List<int>) {
465 throw new ArgumentError(message); 497 throw new ArgumentError(message);
466 } 498 }
467 opcode = _WebSocketOpcode.BINARY; 499 opcode = _WebSocketOpcode.BINARY;
468 data = message; 500 data = message;
469 } 501 }
470 } else { 502 } else {
471 opcode = _WebSocketOpcode.TEXT; 503 opcode = _WebSocketOpcode.TEXT;
472 } 504 }
473 addFrame(opcode, data, sink); 505 addFrame(opcode, data);
474 } 506 }
475 507
476 void handleDone(EventSink<List<int>> sink) { 508 void addError(Object error, [StackTrace stackTrace]) {
509 _eventSink.addError(error, stackTrace);
510 }
511
512 void close() {
477 int code = webSocket._outCloseCode; 513 int code = webSocket._outCloseCode;
478 String reason = webSocket._outCloseReason; 514 String reason = webSocket._outCloseReason;
479 List<int> data; 515 List<int> data;
480 if (code != null) { 516 if (code != null) {
481 data = new List<int>(); 517 data = new List<int>();
482 data.add((code >> 8) & 0xFF); 518 data.add((code >> 8) & 0xFF);
483 data.add(code & 0xFF); 519 data.add(code & 0xFF);
484 if (reason != null) { 520 if (reason != null) {
485 data.addAll(UTF8.encode(reason)); 521 data.addAll(UTF8.encode(reason));
486 } 522 }
487 } 523 }
488 addFrame(_WebSocketOpcode.CLOSE, data, sink); 524 addFrame(_WebSocketOpcode.CLOSE, data);
489 sink.close(); 525 _eventSink.close();
490 } 526 }
491 527
492 void addFrame(int opcode, List<int> data, EventSink<List<int>> sink) { 528 void addFrame(int opcode, List<int> data) {
493 createFrame(opcode, data, webSocket._serverSide).forEach(sink.add); 529 createFrame(opcode, data, webSocket._serverSide).forEach(_eventSink.add);
494 } 530 }
495 531
496 static Iterable createFrame(int opcode, List<int> data, bool serverSide) { 532 static Iterable createFrame(int opcode, List<int> data, bool serverSide) {
497 bool mask = !serverSide; // Masking not implemented for server. 533 bool mask = !serverSide; // Masking not implemented for server.
498 int dataLength = data == null ? 0 : data.length; 534 int dataLength = data == null ? 0 : data.length;
499 // Determine the header size. 535 // Determine the header size.
500 int headerSize = (mask) ? 6 : 2; 536 int headerSize = (mask) ? 6 : 2;
501 if (dataLength > 65535) { 537 if (dataLength > 65535) {
502 headerSize += 8; 538 headerSize += 8;
503 } else if (dataLength > 125) { 539 } else if (dataLength > 125) {
(...skipping 386 matching lines...) Expand 10 before | Expand all | Expand 10 after
890 (code < WebSocketStatus.NORMAL_CLOSURE || 926 (code < WebSocketStatus.NORMAL_CLOSURE ||
891 code == WebSocketStatus.RESERVED_1004 || 927 code == WebSocketStatus.RESERVED_1004 ||
892 code == WebSocketStatus.NO_STATUS_RECEIVED || 928 code == WebSocketStatus.NO_STATUS_RECEIVED ||
893 code == WebSocketStatus.ABNORMAL_CLOSURE || 929 code == WebSocketStatus.ABNORMAL_CLOSURE ||
894 (code > WebSocketStatus.INTERNAL_SERVER_ERROR && 930 (code > WebSocketStatus.INTERNAL_SERVER_ERROR &&
895 code < WebSocketStatus.RESERVED_1015) || 931 code < WebSocketStatus.RESERVED_1015) ||
896 (code >= WebSocketStatus.RESERVED_1015 && 932 (code >= WebSocketStatus.RESERVED_1015 &&
897 code < 3000)); 933 code < 3000));
898 } 934 }
899 } 935 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698