Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(3)

Side by Side Diff: lib/src/sink_completer.dart

Issue 1756613002: Add an IO implementation of WebSocketChannel. (Closed) Base URL: git@github.com:dart-lang/web_socket_channel.git@master
Patch Set: Created 4 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file.
4
5 import 'dart:async';
6
7 import 'channel.dart';
8
9 /// A [WebSocketSink] where the destination is provided later.
10 ///
11 /// This is like a [StreamSinkCompleter], except that it properly forwards
12 /// paramters to [WebSocketSink.close].
13 class WebSocketSinkCompleter {
14 /// The sink for this completer.
15 ///
16 /// When a destination sink is provided, events that have been passed to the
17 /// sink will be forwarded to the destination.
18 ///
19 /// Events can be added to the sink either before or after a destination sink
20 /// is set.
21 final WebSocketSink sink = new _CompleterSink();
22
23 /// Returns [sink] typed as a [_CompleterSink].
24 _CompleterSink get _sink => sink;
25
26 /// Sets a sink as the destination for events from the
27 /// [WebSocketSinkCompleter]'s [sink].
28 ///
29 /// The completer's [sink] will act exactly as [destinationSink].
30 ///
31 /// If the destination sink is set before events are added to [sink], further
32 /// events are forwarded directly to [destinationSink].
33 ///
34 /// If events are added to [sink] before setting the destination sink, they're
35 /// buffered until the destination is available.
36 ///
37 /// A destination sink may be set at most once.
38 void setDestinationSink(WebSocketSink destinationSink) {
39 if (_sink._destinationSink != null) {
40 throw new StateError("Destination sink already set");
41 }
42 _sink._setDestinationSink(destinationSink);
43 }
44 }
45
46 /// [WebSocketSink] completed by [WebSocketSinkCompleter].
47 class _CompleterSink implements WebSocketSink {
48 /// Controller for an intermediate sink.
49 ///
50 /// Created if the user adds events to this sink before the destination sink
51 /// is set.
52 StreamController _controller;
53
54 /// Completer for [done].
55 ///
56 /// Created if the user requests the [done] future before the destination sink
57 /// is set.
58 Completer _doneCompleter;
59
60 /// Destination sink for the events added to this sink.
61 ///
62 /// Set when [WebSocketSinkCompleter.setDestinationSink] is called.
63 WebSocketSink _destinationSink;
64
65 /// The close code passed to [close].
66 int _closeCode;
67
68 /// The close reason passed to [close].
69 String _closeReason;
70
71 /// Whether events should be sent directly to [_destinationSink], as opposed
72 /// to going through [_controller].
73 bool get _canSendDirectly => _controller == null && _destinationSink != null;
74
75 Future get done {
76 if (_doneCompleter != null) return _doneCompleter.future;
77 if (_destinationSink == null) {
78 _doneCompleter = new Completer.sync();
79 return _doneCompleter.future;
80 }
81 return _destinationSink.done;
82 }
83
84 void add(event) {
85 if (_canSendDirectly) {
86 _destinationSink.add(event);
87 } else {
88 _ensureController();
89 _controller.add(event);
90 }
91 }
92
93 void addError(error, [StackTrace stackTrace]) {
94 if (_canSendDirectly) {
95 _destinationSink.addError(error, stackTrace);
96 } else {
97 _ensureController();
98 _controller.addError(error, stackTrace);
99 }
100 }
101
102 Future addStream(Stream stream) {
103 if (_canSendDirectly) return _destinationSink.addStream(stream);
104
105 _ensureController();
106 return _controller.addStream(stream, cancelOnError: false);
107 }
108
109 Future close([int closeCode, String closeReason]) {
110 if (_canSendDirectly) {
111 _destinationSink.close(closeCode, closeReason);
112 } else {
113 _closeCode = closeCode;
114 _closeReason = closeReason;
115 _ensureController();
116 _controller.close();
117 }
118 return done;
119 }
120
121 /// Create [_controller] if it doesn't yet exist.
122 void _ensureController() {
123 if (_controller == null) _controller = new StreamController(sync: true);
124 }
125
126 /// Sets the destination sink to which events from this sink will be provided.
127 ///
128 /// If set before the user adds events, events will be added directly to the
129 /// destination sink. If the user adds events earlier, an intermediate sink is
130 /// created using a stream controller, and the destination sink is linked to
131 /// it later.
132 void _setDestinationSink(WebSocketSink sink) {
133 assert(_destinationSink == null);
134 _destinationSink = sink;
135
136 // If the user has already added data, it's buffered in the controller, so
137 // we add it to the sink.
138 if (_controller != null) {
139 // Catch any error that may come from [addStream] or [sink.close]. They'll
140 // be reported through [done] anyway.
141 sink
142 .addStream(_controller.stream)
143 .whenComplete(() => sink.close(_closeCode, _closeReason))
144 .catchError((_) {});
145 }
146
147 // If the user has already asked when the sink is done, connect the sink's
148 // done callback to that completer.
149 if (_doneCompleter != null) {
150 _doneCompleter.complete(sink.done);
151 }
152 }
153 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698