Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1595)

Side by Side Diff: sdk/lib/io/websocket_impl.dart

Issue 12610006: Renamed StreamSink to EventSink. Renamed signalError to addError. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Changed inheritance back! Now create StreamSink instead of EventSink where we create them. Created 7 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 part of dart.io; 5 part of dart.io;
6 6
7 const String _webSocketGUID = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"; 7 const String _webSocketGUID = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
8 8
9 class _WebSocketMessageType { 9 class _WebSocketMessageType {
10 static const int NONE = 0; 10 static const int NONE = 0;
(...skipping 40 matching lines...) Expand 10 before | Expand all | Expand 10 after
51 static const int FAILURE = 6; 51 static const int FAILURE = 6;
52 52
53 _WebSocketProtocolTransformer() { 53 _WebSocketProtocolTransformer() {
54 _prepareForNextFrame(); 54 _prepareForNextFrame();
55 _currentMessageType = _WebSocketMessageType.NONE; 55 _currentMessageType = _WebSocketMessageType.NONE;
56 } 56 }
57 57
58 /** 58 /**
59 * Process data received from the underlying communication channel. 59 * Process data received from the underlying communication channel.
60 */ 60 */
61 void handleData(List<int> buffer, StreamSink sink) { 61 void handleData(List<int> buffer, EventSink sink) {
62 int count = buffer.length; 62 int count = buffer.length;
63 int index = 0; 63 int index = 0;
64 int lastIndex = count; 64 int lastIndex = count;
65 try { 65 try {
66 if (_state == CLOSED) { 66 if (_state == CLOSED) {
67 throw new WebSocketException("Data on closed connection"); 67 throw new WebSocketException("Data on closed connection");
68 } 68 }
69 if (_state == FAILURE) { 69 if (_state == FAILURE) {
70 throw new WebSocketException("Data on failed connection"); 70 throw new WebSocketException("Data on failed connection");
71 } 71 }
(...skipping 137 matching lines...) Expand 10 before | Expand all | Expand 10 after
209 // Hack - as we always do index++ below. 209 // Hack - as we always do index++ below.
210 index--; 210 index--;
211 break; 211 break;
212 } 212 }
213 213
214 // Move to the next byte. 214 // Move to the next byte.
215 index++; 215 index++;
216 } 216 }
217 } catch (e) { 217 } catch (e) {
218 _state = FAILURE; 218 _state = FAILURE;
219 sink.signalError(e); 219 sink.addError(e);
220 } 220 }
221 } 221 }
222 222
223 void _lengthDone(StreamSink sink) { 223 void _lengthDone(EventSink sink) {
224 if (_masked) { 224 if (_masked) {
225 _state = MASK; 225 _state = MASK;
226 _remainingMaskingKeyBytes = 4; 226 _remainingMaskingKeyBytes = 4;
227 } else { 227 } else {
228 _remainingPayloadBytes = _len; 228 _remainingPayloadBytes = _len;
229 _startPayload(sink); 229 _startPayload(sink);
230 } 230 }
231 } 231 }
232 232
233 void _maskDone(StreamSink sink) { 233 void _maskDone(EventSink sink) {
234 _remainingPayloadBytes = _len; 234 _remainingPayloadBytes = _len;
235 _startPayload(sink); 235 _startPayload(sink);
236 } 236 }
237 237
238 void _startPayload(StreamSink sink) { 238 void _startPayload(EventSink sink) {
239 // If there is no actual payload perform perform callbacks without 239 // If there is no actual payload perform perform callbacks without
240 // going through the PAYLOAD state. 240 // going through the PAYLOAD state.
241 if (_remainingPayloadBytes == 0) { 241 if (_remainingPayloadBytes == 0) {
242 if (_isControlFrame()) { 242 if (_isControlFrame()) {
243 switch (_opcode) { 243 switch (_opcode) {
244 case _WebSocketOpcode.CLOSE: 244 case _WebSocketOpcode.CLOSE:
245 _state = CLOSED; 245 _state = CLOSED;
246 sink.close(); 246 sink.close();
247 break; 247 break;
248 case _WebSocketOpcode.PING: 248 case _WebSocketOpcode.PING:
249 // TODO(ajohnsen): Handle ping. 249 // TODO(ajohnsen): Handle ping.
250 break; 250 break;
251 case _WebSocketOpcode.PONG: 251 case _WebSocketOpcode.PONG:
252 // TODO(ajohnsen): Handle pong. 252 // TODO(ajohnsen): Handle pong.
253 break; 253 break;
254 } 254 }
255 _prepareForNextFrame(); 255 _prepareForNextFrame();
256 } else { 256 } else {
257 _messageFrameEnd(sink); 257 _messageFrameEnd(sink);
258 } 258 }
259 } else { 259 } else {
260 _state = PAYLOAD; 260 _state = PAYLOAD;
261 } 261 }
262 } 262 }
263 263
264 void _messageFrameEnd(StreamSink sink) { 264 void _messageFrameEnd(EventSink sink) {
265 if (_fin) { 265 if (_fin) {
266 switch (_currentMessageType) { 266 switch (_currentMessageType) {
267 case _WebSocketMessageType.TEXT: 267 case _WebSocketMessageType.TEXT:
268 sink.add(_buffer.toString()); 268 sink.add(_buffer.toString());
269 break; 269 break;
270 case _WebSocketMessageType.BINARY: 270 case _WebSocketMessageType.BINARY:
271 if (_buffer.length == 0) { 271 if (_buffer.length == 0) {
272 sink.add(const []); 272 sink.add(const []);
273 } else { 273 } else {
274 sink.add(_buffer.readBytes(_buffer.length)); 274 sink.add(_buffer.readBytes(_buffer.length));
275 } 275 }
276 break; 276 break;
277 } 277 }
278 _buffer = null; 278 _buffer = null;
279 _currentMessageType = _WebSocketMessageType.NONE; 279 _currentMessageType = _WebSocketMessageType.NONE;
280 } 280 }
281 _prepareForNextFrame(); 281 _prepareForNextFrame();
282 } 282 }
283 283
284 void _controlFrameEnd(StreamSink sink) { 284 void _controlFrameEnd(EventSink sink) {
285 switch (_opcode) { 285 switch (_opcode) {
286 case _WebSocketOpcode.CLOSE: 286 case _WebSocketOpcode.CLOSE:
287 closeCode = WebSocketStatus.NO_STATUS_RECEIVED; 287 closeCode = WebSocketStatus.NO_STATUS_RECEIVED;
288 if (_controlPayload.length > 0) { 288 if (_controlPayload.length > 0) {
289 if (_controlPayload.length == 1) { 289 if (_controlPayload.length == 1) {
290 throw new WebSocketException("Protocol error"); 290 throw new WebSocketException("Protocol error");
291 } 291 }
292 closeCode = _controlPayload[0] << 8 | _controlPayload[1]; 292 closeCode = _controlPayload[0] << 8 | _controlPayload[1];
293 if (closeCode == WebSocketStatus.NO_STATUS_RECEIVED) { 293 if (closeCode == WebSocketStatus.NO_STATUS_RECEIVED) {
294 throw new WebSocketException("Protocol error"); 294 throw new WebSocketException("Protocol error");
(...skipping 59 matching lines...) Expand 10 before | Expand all | Expand 10 after
354 354
355 355
356 class _WebSocketTransformerImpl implements WebSocketTransformer { 356 class _WebSocketTransformerImpl implements WebSocketTransformer {
357 final StreamController<WebSocket> _controller = 357 final StreamController<WebSocket> _controller =
358 new StreamController<WebSocket>(); 358 new StreamController<WebSocket>();
359 359
360 Stream<WebSocket> bind(Stream<HttpRequest> stream) { 360 Stream<WebSocket> bind(Stream<HttpRequest> stream) {
361 stream.listen((request) { 361 stream.listen((request) {
362 _upgrade(request) 362 _upgrade(request)
363 .then((WebSocket webSocket) => _controller.add(webSocket)) 363 .then((WebSocket webSocket) => _controller.add(webSocket))
364 .catchError((error) => _controller.signalError(error)); 364 .catchError((error) => _controller.addError(error));
365 }); 365 });
366 366
367 return _controller.stream; 367 return _controller.stream;
368 } 368 }
369 369
370 static Future<WebSocket> _upgrade(HttpRequest request) { 370 static Future<WebSocket> _upgrade(HttpRequest request) {
371 var response = request.response; 371 var response = request.response;
372 if (!_isUpgradeRequest(request)) { 372 if (!_isUpgradeRequest(request)) {
373 // Send error response and drain the request. 373 // Send error response and drain the request.
374 request.listen((_) {}, onDone: () { 374 request.listen((_) {}, onDone: () {
(...skipping 134 matching lines...) Expand 10 before | Expand all | Expand 10 after
509 509
510 bool closed = false; 510 bool closed = false;
511 var transformer = new _WebSocketProtocolTransformer(); 511 var transformer = new _WebSocketProtocolTransformer();
512 _socket.transform(transformer).listen( 512 _socket.transform(transformer).listen(
513 (data) { 513 (data) {
514 _controller.add(data); 514 _controller.add(data);
515 }, 515 },
516 onError: (error) { 516 onError: (error) {
517 if (closed) return; 517 if (closed) return;
518 closed = true; 518 closed = true;
519 _controller.signalError(error); 519 _controller.addError(error);
520 _controller.close(); 520 _controller.close();
521 }, 521 },
522 onDone: () { 522 onDone: () {
523 if (closed) return; 523 if (closed) return;
524 closed = true; 524 closed = true;
525 if (_readyState == WebSocket.OPEN) { 525 if (_readyState == WebSocket.OPEN) {
526 _readyState = WebSocket.CLOSING; 526 _readyState = WebSocket.CLOSING;
527 if (transformer.closeCode != WebSocketStatus.NO_STATUS_RECEIVED) { 527 if (transformer.closeCode != WebSocketStatus.NO_STATUS_RECEIVED) {
528 _close(transformer.closeCode); 528 _close(transformer.closeCode);
529 } else { 529 } else {
530 _close(); 530 _close();
531 } 531 }
532 _readyState = WebSocket.CLOSED; 532 _readyState = WebSocket.CLOSED;
533 } 533 }
534 _closeCode = transformer.closeCode; 534 _closeCode = transformer.closeCode;
535 _closeReason = transformer.closeReason; 535 _closeReason = transformer.closeReason;
536 _controller.close(); 536 _controller.close();
537 if (_writeClosed) _socket.destroy(); 537 if (_writeClosed) _socket.destroy();
538 }, 538 },
539 unsubscribeOnError: true); 539 unsubscribeOnError: true);
540 540
541 _socket.done 541 _socket.done
542 .catchError((error) { 542 .catchError((error) {
543 if (closed) return; 543 if (closed) return;
544 closed = true; 544 closed = true;
545 _readyState = WebSocket.CLOSED; 545 _readyState = WebSocket.CLOSED;
546 _closeCode = WebSocketStatus.ABNORMAL_CLOSURE; 546 _closeCode = WebSocketStatus.ABNORMAL_CLOSURE;
547 _controller.signalError(error); 547 _controller.addError(error);
548 _controller.close(); 548 _controller.close();
549 }) 549 })
550 .whenComplete(() { 550 .whenComplete(() {
551 _writeClosed = true; 551 _writeClosed = true;
552 }); 552 });
553 } 553 }
554 554
555 StreamSubscription listen(void onData(message), 555 StreamSubscription listen(void onData(message),
556 {void onError(AsyncError error), 556 {void onError(AsyncError error),
557 void onDone(), 557 void onDone(),
(...skipping 105 matching lines...) Expand 10 before | Expand all | Expand 10 after
663 if (data != null) { 663 if (data != null) {
664 _socket.add(data); 664 _socket.add(data);
665 } 665 }
666 } catch (_) { 666 } catch (_) {
667 // The socket can be closed before _socket.done have a chance 667 // The socket can be closed before _socket.done have a chance
668 // to complete. 668 // to complete.
669 _writeClosed = true; 669 _writeClosed = true;
670 } 670 }
671 } 671 }
672 } 672 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698