Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(690)

Side by Side Diff: Source/core/streams/ReadableStream.cpp

Issue 1004623007: Streams Implementation Update: async read (Closed) Base URL: https://chromium.googlesource.com/chromium/blink.git@stream-reader-read
Patch Set: Created 5 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2014 The Chromium Authors. All rights reserved. 1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "config.h" 5 #include "config.h"
6 #include "core/streams/ReadableStream.h" 6 #include "core/streams/ReadableStream.h"
7 7
8 #include "bindings/core/v8/ExceptionState.h" 8 #include "bindings/core/v8/ExceptionState.h"
9 #include "bindings/core/v8/ScriptFunction.h" 9 #include "bindings/core/v8/ScriptFunction.h"
10 #include "bindings/core/v8/ScriptPromiseResolver.h" 10 #include "bindings/core/v8/ScriptPromiseResolver.h"
(...skipping 16 matching lines...) Expand all
27 } 27 }
28 28
29 private: 29 private:
30 explicit ConstUndefined(ScriptState* scriptState) : ScriptFunction(scriptSta te) { } 30 explicit ConstUndefined(ScriptState* scriptState) : ScriptFunction(scriptSta te) { }
31 ScriptValue call(ScriptValue value) override 31 ScriptValue call(ScriptValue value) override
32 { 32 {
33 return ScriptValue(scriptState(), v8::Undefined(scriptState()->isolate() )); 33 return ScriptValue(scriptState(), v8::Undefined(scriptState()->isolate() ));
34 } 34 }
35 }; 35 };
36 36
37 class ResolveWithReady : public ScriptFunction {
38 public:
39 static v8::Handle<v8::Function> create(ScriptState* scriptState, ReadableStr eam* stream)
40 {
41 return (new ResolveWithReady(scriptState, stream))->bindToV8Function();
42 }
43
44 DEFINE_INLINE_TRACE()
45 {
46 visitor->trace(m_stream);
47 ScriptFunction::trace(visitor);
48 }
49
50 private:
51 ResolveWithReady(ScriptState* scriptState, ReadableStream* stream)
52 : ScriptFunction(scriptState)
53 , m_stream(stream) { }
54
55 ScriptValue call(ScriptValue value) override
56 {
57 return ScriptValue(scriptState(), m_stream->ready(scriptState()).v8Value ());
58 }
59
60 Member<ReadableStream> m_stream;
61 };
62
63 } // namespace 37 } // namespace
64 38
65 ReadableStream::ReadableStream(ExecutionContext* executionContext, UnderlyingSou rce* source) 39 ReadableStream::ReadableStream(ExecutionContext* executionContext, UnderlyingSou rce* source)
66 : ActiveDOMObject(executionContext) 40 : ActiveDOMObject(executionContext)
67 , m_source(source) 41 , m_source(source)
68 , m_isStarted(false) 42 , m_isStarted(false)
69 , m_isDraining(false) 43 , m_isDraining(false)
70 , m_isPulling(false) 44 , m_isPulling(false)
71 , m_state(Waiting) 45 , m_state(Readable)
72 , m_ready(new WaitPromise(executionContext, this, WaitPromise::Ready))
73 , m_closed(new ClosedPromise(executionContext, this, ClosedPromise::Closed))
74 { 46 {
75 suspendIfNeeded(); 47 suspendIfNeeded();
76 } 48 }
77 49
78 ReadableStream::~ReadableStream() 50 ReadableStream::~ReadableStream()
79 { 51 {
80 } 52 }
81 53
82 String ReadableStream::stateString() const
83 {
84 if (m_reader)
85 return "waiting";
86
87 return stateToString(m_state);
88 }
89
90 bool ReadableStream::enqueuePreliminaryCheck() 54 bool ReadableStream::enqueuePreliminaryCheck()
91 { 55 {
92 // This is a bit different from what spec says: it says we should throw 56 // This is a bit different from what spec says: it says we should throw
93 // an exception here. But sometimes a caller is not in any JavaScript 57 // an exception here. But sometimes a caller is not in any JavaScript
94 // context, and we don't want to throw an exception in such a case. 58 // context, and we don't want to throw an exception in such a case.
95 if (m_state == Errored || m_state == Closed || m_isDraining) 59 if (m_state == Errored || m_state == Closed || m_isDraining)
96 return false; 60 return false;
97 61
98 return true; 62 return true;
99 } 63 }
100 64
101 bool ReadableStream::enqueuePostAction() 65 bool ReadableStream::enqueuePostAction()
102 { 66 {
103 m_isPulling = false; 67 m_isPulling = false;
104 68
105 bool shouldApplyBackpressure = this->shouldApplyBackpressure(); 69 bool shouldApplyBackpressure = this->shouldApplyBackpressure();
106 // this->shouldApplyBackpressure may call this->error(). 70 // this->shouldApplyBackpressure may call this->error().
107 if (m_state == Errored) 71 if (m_state == Errored)
108 return false; 72 return false;
109 73
110 if (m_state == Waiting) {
111 // ReadableStream::hasPendingActivity return value gets false when
112 // |m_state| is changed to Closed or Errored from Waiting or Readable.
113 // On the other hand, the wrappers should be kept alive when |m_ready|
114 // and |m_close| resolution and rejection are called. Hence we call
115 // ScriptPromiseProperty::resolve and ScriptPromiseProperty::reject
116 // *before* changing state, no matter if the state change actually
117 // changes hasPendingActivity return value.
118 m_ready->resolve(ToV8UndefinedGenerator());
119 m_state = Readable;
120 }
121
122 return !shouldApplyBackpressure; 74 return !shouldApplyBackpressure;
123 } 75 }
124 76
125 void ReadableStream::close() 77 void ReadableStream::close()
126 { 78 {
127 if (m_state == Waiting) { 79 if (m_state != Readable)
128 m_ready->resolve(ToV8UndefinedGenerator()); 80 return;
129 m_closed->resolve(ToV8UndefinedGenerator()); 81
82 if (isQueueEmpty()) {
83 resolveAllPendingReadsAsDone();
tyoshino (SeeGerritForStatus) 2015/03/18 07:39:10 L83-L86 and L95-L99 are the same.
yhirano 2015/03/18 08:07:49 Done.
84 m_state = Closed;
130 if (m_reader) 85 if (m_reader)
131 m_reader->releaseLock(); 86 m_reader->releaseLock();
132 m_state = Closed; 87 } else {
133 } else if (m_state == Readable) {
134 m_isDraining = true; 88 m_isDraining = true;
135 } 89 }
136 } 90 }
137 91
138 void ReadableStream::readInternalPreliminaryCheck(ExceptionState& exceptionState )
139 {
140 if (m_state == Waiting) {
141 exceptionState.throwTypeError("read is called while state is waiting");
142 return;
143 }
144 if (m_state == Closed) {
145 exceptionState.throwTypeError("read is called while state is closed");
146 return;
147 }
148 if (m_state == Errored) {
149 exceptionState.throwDOMException(m_exception->code(), m_exception->messa ge());
150 return;
151 }
152 }
153
154 void ReadableStream::readInternalPostAction() 92 void ReadableStream::readInternalPostAction()
155 { 93 {
156 ASSERT(m_state == Readable); 94 ASSERT(m_state == Readable);
157 if (isQueueEmpty()) { 95 if (isQueueEmpty() && m_isDraining) {
158 if (m_isDraining) { 96 m_state = Closed;
tyoshino (SeeGerritForStatus) 2015/03/18 07:39:10 this should be done after resolving the promises?
yhirano 2015/03/18 08:07:49 We don't have to care about it, because it is Scri
159 m_state = Closed; 97 resolveAllPendingReadsAsDone();
160 m_closed->resolve(ToV8UndefinedGenerator()); 98 if (m_reader)
161 if (m_reader) 99 m_reader->releaseLock();
162 m_reader->releaseLock();
163 } else {
164 m_ready->reset();
165 m_state = Waiting;
166 }
167 } 100 }
168 callPullIfNeeded(); 101 callPullIfNeeded();
169 } 102 }
170 103
171 ScriptValue ReadableStream::read(ScriptState* scriptState, ExceptionState& excep tionState)
172 {
173 if (m_reader) {
174 exceptionState.throwTypeError("this stream is locked to a ReadableStream Reader");
175 return ScriptValue();
176 }
177 return readInternal(scriptState, exceptionState);
178 }
179
180 ScriptPromise ReadableStream::ready(ScriptState* scriptState)
181 {
182 if (m_reader) {
183 return m_reader->released(scriptState).then(ResolveWithReady::create(scr iptState, this));
184 }
185
186 if (m_state == Waiting) {
187 return readyInternal(scriptState).then(ResolveWithReady::create(scriptSt ate, this));
188 }
189 return readyInternal(scriptState);
190 }
191
192 ScriptPromise ReadableStream::readyInternal(ScriptState* scriptState)
193 {
194 return m_ready->promise(scriptState->world());
195 }
196
197 ScriptPromise ReadableStream::cancel(ScriptState* scriptState, ScriptValue reaso n) 104 ScriptPromise ReadableStream::cancel(ScriptState* scriptState, ScriptValue reaso n)
198 { 105 {
199 if (m_reader) 106 if (m_reader)
200 return ScriptPromise::reject(scriptState, V8ThrowException::createTypeEr ror(scriptState->isolate(), "this stream is locked to a ReadableStreamReader")); 107 return ScriptPromise::reject(scriptState, V8ThrowException::createTypeEr ror(scriptState->isolate(), "this stream is locked to a ReadableStreamReader"));
201 if (m_state == Closed) 108 if (m_state == Closed)
202 return ScriptPromise::cast(scriptState, v8::Undefined(scriptState->isola te())); 109 return ScriptPromise::cast(scriptState, v8::Undefined(scriptState->isola te()));
203 if (m_state == Errored) 110 if (m_state == Errored)
204 return ScriptPromise::rejectWithDOMException(scriptState, m_exception); 111 return ScriptPromise::rejectWithDOMException(scriptState, m_exception);
205 112
206 ASSERT(m_state == Readable || m_state == Waiting); 113 ASSERT(m_state == Readable);
207 if (m_state == Waiting)
208 m_ready->resolve(ToV8UndefinedGenerator());
209 clearQueue(); 114 clearQueue();
210 m_closed->resolve(ToV8UndefinedGenerator());
211 m_state = Closed; 115 m_state = Closed;
116 resolveAllPendingReadsAsDone();
117
212 return m_source->cancelSource(scriptState, reason).then(ConstUndefined::crea te(scriptState)); 118 return m_source->cancelSource(scriptState, reason).then(ConstUndefined::crea te(scriptState));
213 } 119 }
214 120
215 ScriptPromise ReadableStream::closed(ScriptState* scriptState)
216 {
217 return m_closed->promise(scriptState->world());
218 }
219
220 void ReadableStream::error(PassRefPtrWillBeRawPtr<DOMException> exception) 121 void ReadableStream::error(PassRefPtrWillBeRawPtr<DOMException> exception)
221 { 122 {
222 switch (m_state) { 123 if (m_state != ReadableStream::Readable)
223 case Waiting: 124 return;
224 m_exception = exception; 125
225 m_ready->reject(m_exception); 126 m_exception = exception;
226 m_closed->reject(m_exception); 127 clearQueue();
227 m_state = Errored; 128 rejectAllPendingReads(m_exception);
228 if (m_reader) 129 m_state = Errored;
229 m_reader->releaseLock(); 130 if (m_reader)
230 break; 131 m_reader->releaseLock();
231 case Readable:
232 clearQueue();
233 m_exception = exception;
234 m_ready->reset();
235 m_ready->reject(m_exception);
236 m_closed->reject(m_exception);
237 m_state = Errored;
238 if (m_reader)
239 m_reader->releaseLock();
240 break;
241 default:
242 break;
243 }
244 } 132 }
245 133
246 void ReadableStream::didSourceStart() 134 void ReadableStream::didSourceStart()
247 { 135 {
248 m_isStarted = true; 136 m_isStarted = true;
249 callPullIfNeeded(); 137 callPullIfNeeded();
250 } 138 }
251 139
252 ReadableStreamReader* ReadableStream::getReader(ExceptionState& exceptionState) 140 ReadableStreamReader* ReadableStream::getReader(ExceptionState& exceptionState)
253 { 141 {
254 if (m_state == Closed) {
255 exceptionState.throwTypeError("this stream is already closed");
256 return nullptr;
257 }
258 if (m_state == Errored) {
259 exceptionState.throwDOMException(m_exception->code(), m_exception->messa ge());
260 return nullptr;
261 }
262 if (m_reader) { 142 if (m_reader) {
263 exceptionState.throwTypeError("already locked to a ReadableStreamReader" ); 143 exceptionState.throwTypeError("already locked to a ReadableStreamReader" );
264 return nullptr; 144 return nullptr;
265 } 145 }
266 return new ReadableStreamReader(this); 146 return new ReadableStreamReader(this);
267 } 147 }
268 148
269 void ReadableStream::setReader(ReadableStreamReader* reader) 149 void ReadableStream::setReader(ReadableStreamReader* reader)
270 { 150 {
271 ASSERT((reader && !m_reader) || (!reader && m_reader)); 151 ASSERT((reader && !m_reader) || (!reader && m_reader));
272 m_reader = reader; 152 m_reader = reader;
273 } 153 }
274 154
275 void ReadableStream::callPullIfNeeded() 155 void ReadableStream::callPullIfNeeded()
276 { 156 {
277 if (m_isPulling || m_isDraining || !m_isStarted || m_state == Closed || m_st ate == Errored) 157 if (m_isPulling || m_isDraining || !m_isStarted || m_state == Closed || m_st ate == Errored)
278 return; 158 return;
279 159
280 bool shouldApplyBackpressure = this->shouldApplyBackpressure(); 160 bool shouldApplyBackpressure = this->shouldApplyBackpressure();
281 // this->shouldApplyBackpressure may call this->error(). 161 // this->shouldApplyBackpressure may call this->error().
282 if (shouldApplyBackpressure || m_state == Errored) 162 if (shouldApplyBackpressure || m_state == Errored)
283 return; 163 return;
284 m_isPulling = true; 164 m_isPulling = true;
285 m_source->pullSource(); 165 m_source->pullSource();
286 } 166 }
287 167
288 bool ReadableStream::hasPendingActivity() const 168 bool ReadableStream::hasPendingActivity() const
289 { 169 {
290 return m_state == Waiting || m_state == Readable; 170 return m_state == Readable;
291 } 171 }
292 172
293 void ReadableStream::stop() 173 void ReadableStream::stop()
294 { 174 {
295 error(DOMException::create(AbortError, "execution context is stopped")); 175 error(DOMException::create(AbortError, "execution context is stopped"));
296 ActiveDOMObject::stop(); 176 ActiveDOMObject::stop();
297 } 177 }
298 178
299 DEFINE_TRACE(ReadableStream) 179 DEFINE_TRACE(ReadableStream)
300 { 180 {
301 visitor->trace(m_source); 181 visitor->trace(m_source);
302 visitor->trace(m_ready);
303 visitor->trace(m_closed);
304 visitor->trace(m_exception); 182 visitor->trace(m_exception);
305 visitor->trace(m_reader); 183 visitor->trace(m_reader);
306 ActiveDOMObject::trace(visitor); 184 ActiveDOMObject::trace(visitor);
307 } 185 }
308 186
309 String ReadableStream::stateToString(State state)
310 {
311 switch (state) {
312 case Readable:
313 return "readable";
314 case Waiting:
315 return "waiting";
316 case Closed:
317 return "closed";
318 case Errored:
319 return "errored";
320 }
321 ASSERT(false);
322 return String();
323 }
324
325 } // namespace blink 187 } // namespace blink
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698