Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(71)

Side by Side Diff: sdk/lib/io/http_parser.dart

Issue 141553014: Optmize detached socket to not contain a extra stream controller/subscription, but simly forward ca… (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Created 6 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « no previous file | no next file » | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 part of dart.io; 5 part of dart.io;
6 6
7 // Global constants. 7 // Global constants.
8 class _Const { 8 class _Const {
9 // Bytes for "HTTP". 9 // Bytes for "HTTP".
10 static const HTTP = const [72, 84, 84, 80]; 10 static const HTTP = const [72, 84, 84, 80];
(...skipping 78 matching lines...) Expand 10 before | Expand all | Expand 10 after
89 static const int HTTP11 = 2; 89 static const int HTTP11 = 2;
90 } 90 }
91 91
92 // States of the HTTP parser state machine. 92 // States of the HTTP parser state machine.
93 class _MessageType { 93 class _MessageType {
94 static const int UNDETERMINED = 0; 94 static const int UNDETERMINED = 0;
95 static const int REQUEST = 1; 95 static const int REQUEST = 1;
96 static const int RESPONSE = 0; 96 static const int RESPONSE = 0;
97 } 97 }
98 98
99 class _HttpDetachedIncoming extends Stream<List<int>> {
100 StreamController<List<int>> controller;
101 final StreamSubscription subscription;
102 99
Søren Gjesse 2014/02/04 10:17:24 Please add a short description of this subscriptio
Anders Johnsen 2014/02/04 10:24:51 Done.
103 List<int> bufferedData; 100 class _HttpDetachedStreamSubscription implements StreamSubscription<List<int>> {
104 bool paused; 101 StreamSubscription<List<int>> _subscription;
Søren Gjesse 2014/02/04 10:17:24 Can we find a better name that _data? It is pretty
Anders Johnsen 2014/02/04 10:24:51 Done.
102 List<int> _data;
103 bool _isCanceled = false;
104 int _pauseCount = 1;
105 Function _userOnData;
106 bool _scheduled = false;
105 107
106 Completer resumeCompleter; 108 _HttpDetachedStreamSubscription(this._subscription,
109 this._data,
110 this._userOnData);
107 111
108 _HttpDetachedIncoming(this.subscription, this.bufferedData) { 112 bool get isPaused => _subscription.isPaused;
109 controller = new StreamController<List<int>>( 113
110 sync: true, 114 Future asFuture([futureValue]) => _subscription.asFuture(futureValue);
111 onListen: resume, 115
112 onPause: pause, 116 Future cancel() {
113 onResume: resume, 117 _isCanceled = true;
114 onCancel: () => subscription.cancel()); 118 _data = null;
115 if (subscription == null) { 119 return _subscription.cancel();
116 // Socket was already closed. 120 }
117 if (bufferedData != null) controller.add(bufferedData); 121
118 controller.close(); 122 void onData(void handleData(List<int> data)) {
123 _userOnData = handleData;
124 _subscription.onData(handleData);
125 }
126
127 void onDone(void handleDone()) {
128 _subscription.onDone(handleDone);
129 }
130
131 void onError(Function handleError) {
132 _subscription.onError(handleError);
133 }
134
135 void pause([Future resumeSignal]) {
136 if (_data == null) {
137 _subscription.pause(resumeSignal);
119 } else { 138 } else {
120 pause(); 139 _pauseCount++;
121 subscription 140 if (resumeSignal != null) {
122 ..resume() 141 resumeSignal.whenComplete(resume);
123 ..onData(controller.add) 142 }
124 ..onDone(controller.close)
125 ..onError(controller.addError);
126 } 143 }
127 } 144 }
128 145
146 void resume() {
147 if (_data == null) {
148 _subscription.resume();
149 } else {
150 _pauseCount--;
151 _scheduleData();
152 }
153 }
154
155 void _scheduleData() {
Søren Gjesse 2014/02/04 10:17:24 This method does not necessarily schedule the data
Anders Johnsen 2014/02/04 10:24:51 Done.
156 if (_scheduled) return;
157 if (_pauseCount != 0) return;
158 _scheduled = true;
159 scheduleMicrotask(() {
160 _scheduled = false;
161 if (_pauseCount > 0 || _isCanceled) return;
162 var data = _data;
163 _data = null;
Søren Gjesse 2014/02/04 10:17:24 Please add a comment here on why the underlying su
Anders Johnsen 2014/02/04 10:24:51 Done.
164 _subscription.resume();
165 if (_userOnData != null) {
166 _userOnData(data);
167 }
168 });
169 }
170 }
171
172
173 class _HttpDetachedIncoming extends Stream<List<int>> {
174 final StreamSubscription subscription;
175 final List<int> bufferedData;
176
177 _HttpDetachedIncoming(this.subscription, this.bufferedData);
178
129 StreamSubscription<List<int>> listen(void onData(List<int> event), 179 StreamSubscription<List<int>> listen(void onData(List<int> event),
130 {Function onError, 180 {Function onError,
131 void onDone(), 181 void onDone(),
132 bool cancelOnError}) { 182 bool cancelOnError}) {
133 return controller.stream.listen( 183 if (subscription != null) {
134 onData, 184 subscription
135 onError: onError, 185 ..onData(onData)
136 onDone: onDone, 186 ..onError(onError)
137 cancelOnError: cancelOnError); 187 ..onDone(onDone);
138 } 188 if (bufferedData == null) {
139 189 return subscription..resume();
140 void resume() { 190 }
141 paused = false; 191 return new _HttpDetachedStreamSubscription(subscription,
142 if (bufferedData != null) { 192 bufferedData,
143 var data = bufferedData; 193 onData)
144 bufferedData = null; 194 ..resume();
145 controller.add(data); 195 } else {
146 // If the consumer pauses again after the carry-over data, we'll not 196 return new Stream.fromIterable(bufferedData)
147 // continue our subscriber until the next resume. 197 .listen(onData,
148 if (paused) return; 198 onError: onError,
149 } 199 onDone: onDone,
150 if (resumeCompleter != null) { 200 cancelOnError: cancelOnError);
151 resumeCompleter.complete();
152 resumeCompleter = null;
153 }
154 }
155
156 void pause() {
157 paused = true;
158 if (resumeCompleter == null) {
159 resumeCompleter = new Completer();
160 subscription.pause(resumeCompleter.future);
161 } 201 }
162 } 202 }
163 } 203 }
164 204
165 205
166 /** 206 /**
167 * HTTP parser which parses the data stream given to [consume]. 207 * HTTP parser which parses the data stream given to [consume].
168 * 208 *
169 * If an HTTP parser error occours, the parser will signal an error to either 209 * If an HTTP parser error occours, the parser will signal an error to either
170 * the current _HttpIncoming or the _parser itself. 210 * the current _HttpIncoming or the _parser itself.
(...skipping 814 matching lines...) Expand 10 before | Expand all | Expand 10 after
985 } 1025 }
986 } 1026 }
987 1027
988 void _reportError(error, [stackTrace]) { 1028 void _reportError(error, [stackTrace]) {
989 if (_socketSubscription != null) _socketSubscription.cancel(); 1029 if (_socketSubscription != null) _socketSubscription.cancel();
990 _state = _State.FAILURE; 1030 _state = _State.FAILURE;
991 _controller.addError(error, stackTrace); 1031 _controller.addError(error, stackTrace);
992 _controller.close(); 1032 _controller.close();
993 } 1033 }
994 } 1034 }
OLDNEW
« no previous file with comments | « no previous file | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698