Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(97)

Side by Side Diff: Source/core/streams/ReadableStream.cpp

Issue 1004623007: Streams Implementation Update: async read (Closed) Base URL: https://chromium.googlesource.com/chromium/blink.git@stream-reader-read
Patch Set: Created 5 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « Source/core/streams/ReadableStream.h ('k') | Source/core/streams/ReadableStream.idl » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2014 The Chromium Authors. All rights reserved. 1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "config.h" 5 #include "config.h"
6 #include "core/streams/ReadableStream.h" 6 #include "core/streams/ReadableStream.h"
7 7
8 #include "bindings/core/v8/ExceptionState.h" 8 #include "bindings/core/v8/ExceptionState.h"
9 #include "bindings/core/v8/ScriptFunction.h" 9 #include "bindings/core/v8/ScriptFunction.h"
10 #include "bindings/core/v8/ScriptPromiseResolver.h" 10 #include "bindings/core/v8/ScriptPromiseResolver.h"
(...skipping 16 matching lines...) Expand all
27 } 27 }
28 28
29 private: 29 private:
30 explicit ConstUndefined(ScriptState* scriptState) : ScriptFunction(scriptSta te) { } 30 explicit ConstUndefined(ScriptState* scriptState) : ScriptFunction(scriptSta te) { }
31 ScriptValue call(ScriptValue value) override 31 ScriptValue call(ScriptValue value) override
32 { 32 {
33 return ScriptValue(scriptState(), v8::Undefined(scriptState()->isolate() )); 33 return ScriptValue(scriptState(), v8::Undefined(scriptState()->isolate() ));
34 } 34 }
35 }; 35 };
36 36
37 class ResolveWithReady : public ScriptFunction {
38 public:
39 static v8::Handle<v8::Function> create(ScriptState* scriptState, ReadableStr eam* stream)
40 {
41 return (new ResolveWithReady(scriptState, stream))->bindToV8Function();
42 }
43
44 DEFINE_INLINE_TRACE()
45 {
46 visitor->trace(m_stream);
47 ScriptFunction::trace(visitor);
48 }
49
50 private:
51 ResolveWithReady(ScriptState* scriptState, ReadableStream* stream)
52 : ScriptFunction(scriptState)
53 , m_stream(stream) { }
54
55 ScriptValue call(ScriptValue value) override
56 {
57 return ScriptValue(scriptState(), m_stream->ready(scriptState()).v8Value ());
58 }
59
60 Member<ReadableStream> m_stream;
61 };
62
63 } // namespace 37 } // namespace
64 38
65 ReadableStream::ReadableStream(ExecutionContext* executionContext, UnderlyingSou rce* source) 39 ReadableStream::ReadableStream(ExecutionContext* executionContext, UnderlyingSou rce* source)
66 : ActiveDOMObject(executionContext) 40 : ActiveDOMObject(executionContext)
67 , m_source(source) 41 , m_source(source)
68 , m_isStarted(false) 42 , m_isStarted(false)
69 , m_isDraining(false) 43 , m_isDraining(false)
70 , m_isPulling(false) 44 , m_isPulling(false)
71 , m_state(Waiting) 45 , m_state(Readable)
72 , m_ready(new WaitPromise(executionContext, this, WaitPromise::Ready))
73 , m_closed(new ClosedPromise(executionContext, this, ClosedPromise::Closed))
74 { 46 {
75 suspendIfNeeded(); 47 suspendIfNeeded();
76 } 48 }
77 49
78 ReadableStream::~ReadableStream() 50 ReadableStream::~ReadableStream()
79 { 51 {
80 } 52 }
81 53
82 String ReadableStream::stateString() const
83 {
84 if (m_reader)
85 return "waiting";
86
87 return stateToString(m_state);
88 }
89
90 bool ReadableStream::enqueuePreliminaryCheck() 54 bool ReadableStream::enqueuePreliminaryCheck()
91 { 55 {
92 // This is a bit different from what spec says: it says we should throw 56 // This is a bit different from what spec says: it says we should throw
93 // an exception here. But sometimes a caller is not in any JavaScript 57 // an exception here. But sometimes a caller is not in any JavaScript
94 // context, and we don't want to throw an exception in such a case. 58 // context, and we don't want to throw an exception in such a case.
95 if (m_state == Errored || m_state == Closed || m_isDraining) 59 if (m_state == Errored || m_state == Closed || m_isDraining)
96 return false; 60 return false;
97 61
98 return true; 62 return true;
99 } 63 }
100 64
101 bool ReadableStream::enqueuePostAction() 65 bool ReadableStream::enqueuePostAction()
102 { 66 {
103 m_isPulling = false; 67 m_isPulling = false;
104 68
105 bool shouldApplyBackpressure = this->shouldApplyBackpressure(); 69 bool shouldApplyBackpressure = this->shouldApplyBackpressure();
106 // this->shouldApplyBackpressure may call this->error(). 70 // this->shouldApplyBackpressure may call this->error().
107 if (m_state == Errored) 71 if (m_state == Errored)
108 return false; 72 return false;
109 73
110 if (m_state == Waiting) {
111 // ReadableStream::hasPendingActivity return value gets false when
112 // |m_state| is changed to Closed or Errored from Waiting or Readable.
113 // On the other hand, the wrappers should be kept alive when |m_ready|
114 // and |m_close| resolution and rejection are called. Hence we call
115 // ScriptPromiseProperty::resolve and ScriptPromiseProperty::reject
116 // *before* changing state, no matter if the state change actually
117 // changes hasPendingActivity return value.
118 m_ready->resolve(ToV8UndefinedGenerator());
119 m_state = Readable;
120 }
121
122 return !shouldApplyBackpressure; 74 return !shouldApplyBackpressure;
123 } 75 }
124 76
125 void ReadableStream::close() 77 void ReadableStream::close()
126 { 78 {
127 if (m_state == Waiting) { 79 if (m_state != Readable)
128 m_ready->resolve(ToV8UndefinedGenerator()); 80 return;
129 m_closed->resolve(ToV8UndefinedGenerator()); 81
130 if (m_reader) 82 if (isQueueEmpty())
131 m_reader->releaseLock(); 83 closeInternal();
132 m_state = Closed; 84 else
133 } else if (m_state == Readable) {
134 m_isDraining = true; 85 m_isDraining = true;
135 }
136 }
137
138 void ReadableStream::readInternalPreliminaryCheck(ExceptionState& exceptionState )
139 {
140 if (m_state == Waiting) {
141 exceptionState.throwTypeError("read is called while state is waiting");
142 return;
143 }
144 if (m_state == Closed) {
145 exceptionState.throwTypeError("read is called while state is closed");
146 return;
147 }
148 if (m_state == Errored) {
149 exceptionState.throwDOMException(m_exception->code(), m_exception->messa ge());
150 return;
151 }
152 } 86 }
153 87
154 void ReadableStream::readInternalPostAction() 88 void ReadableStream::readInternalPostAction()
155 { 89 {
156 ASSERT(m_state == Readable); 90 ASSERT(m_state == Readable);
157 if (isQueueEmpty()) { 91 if (isQueueEmpty() && m_isDraining)
158 if (m_isDraining) { 92 closeInternal();
159 m_state = Closed;
160 m_closed->resolve(ToV8UndefinedGenerator());
161 if (m_reader)
162 m_reader->releaseLock();
163 } else {
164 m_ready->reset();
165 m_state = Waiting;
166 }
167 }
168 callPullIfNeeded(); 93 callPullIfNeeded();
169 } 94 }
170 95
171 ScriptValue ReadableStream::read(ScriptState* scriptState, ExceptionState& excep tionState)
172 {
173 if (m_reader) {
174 exceptionState.throwTypeError("this stream is locked to a ReadableStream Reader");
175 return ScriptValue();
176 }
177 return readInternal(scriptState, exceptionState);
178 }
179
180 ScriptPromise ReadableStream::ready(ScriptState* scriptState)
181 {
182 if (m_reader) {
183 return m_reader->released(scriptState).then(ResolveWithReady::create(scr iptState, this));
184 }
185
186 if (m_state == Waiting) {
187 return readyInternal(scriptState).then(ResolveWithReady::create(scriptSt ate, this));
188 }
189 return readyInternal(scriptState);
190 }
191
192 ScriptPromise ReadableStream::readyInternal(ScriptState* scriptState)
193 {
194 return m_ready->promise(scriptState->world());
195 }
196
197 ScriptPromise ReadableStream::cancel(ScriptState* scriptState, ScriptValue reaso n) 96 ScriptPromise ReadableStream::cancel(ScriptState* scriptState, ScriptValue reaso n)
198 { 97 {
199 if (m_reader) 98 if (m_reader)
200 return ScriptPromise::reject(scriptState, V8ThrowException::createTypeEr ror(scriptState->isolate(), "this stream is locked to a ReadableStreamReader")); 99 return ScriptPromise::reject(scriptState, V8ThrowException::createTypeEr ror(scriptState->isolate(), "this stream is locked to a ReadableStreamReader"));
201 if (m_state == Closed) 100 if (m_state == Closed)
202 return ScriptPromise::cast(scriptState, v8::Undefined(scriptState->isola te())); 101 return ScriptPromise::cast(scriptState, v8::Undefined(scriptState->isola te()));
203 if (m_state == Errored) 102 if (m_state == Errored)
204 return ScriptPromise::rejectWithDOMException(scriptState, m_exception); 103 return ScriptPromise::rejectWithDOMException(scriptState, m_exception);
205 104
206 ASSERT(m_state == Readable || m_state == Waiting); 105 return cancelInternal(scriptState, reason);
207 if (m_state == Waiting)
208 m_ready->resolve(ToV8UndefinedGenerator());
209 clearQueue();
210 m_closed->resolve(ToV8UndefinedGenerator());
211 m_state = Closed;
212 return m_source->cancelSource(scriptState, reason).then(ConstUndefined::crea te(scriptState));
213 } 106 }
214 107
215 ScriptPromise ReadableStream::closed(ScriptState* scriptState) 108 ScriptPromise ReadableStream::cancelInternal(ScriptState* scriptState, ScriptVal ue reason)
216 { 109 {
217 return m_closed->promise(scriptState->world()); 110 closeInternal();
111 return m_source->cancelSource(scriptState, reason).then(ConstUndefined::crea te(scriptState));
218 } 112 }
219 113
220 void ReadableStream::error(PassRefPtrWillBeRawPtr<DOMException> exception) 114 void ReadableStream::error(PassRefPtrWillBeRawPtr<DOMException> exception)
221 { 115 {
222 switch (m_state) { 116 if (m_state != ReadableStream::Readable)
223 case Waiting: 117 return;
224 m_exception = exception; 118
225 m_ready->reject(m_exception); 119 m_exception = exception;
226 m_closed->reject(m_exception); 120 clearQueue();
227 m_state = Errored; 121 rejectAllPendingReads(m_exception);
228 if (m_reader) 122 m_state = Errored;
229 m_reader->releaseLock(); 123 if (m_reader)
230 break; 124 m_reader->releaseLock();
231 case Readable:
232 clearQueue();
233 m_exception = exception;
234 m_ready->reset();
235 m_ready->reject(m_exception);
236 m_closed->reject(m_exception);
237 m_state = Errored;
238 if (m_reader)
239 m_reader->releaseLock();
240 break;
241 default:
242 break;
243 }
244 } 125 }
245 126
246 void ReadableStream::didSourceStart() 127 void ReadableStream::didSourceStart()
247 { 128 {
248 m_isStarted = true; 129 m_isStarted = true;
249 callPullIfNeeded(); 130 callPullIfNeeded();
250 } 131 }
251 132
252 ReadableStreamReader* ReadableStream::getReader(ExceptionState& exceptionState) 133 ReadableStreamReader* ReadableStream::getReader(ExceptionState& exceptionState)
253 { 134 {
254 if (m_state == Closed) {
255 exceptionState.throwTypeError("this stream is already closed");
256 return nullptr;
257 }
258 if (m_state == Errored) {
259 exceptionState.throwDOMException(m_exception->code(), m_exception->messa ge());
260 return nullptr;
261 }
262 if (m_reader) { 135 if (m_reader) {
263 exceptionState.throwTypeError("already locked to a ReadableStreamReader" ); 136 exceptionState.throwTypeError("already locked to a ReadableStreamReader" );
264 return nullptr; 137 return nullptr;
265 } 138 }
266 return new ReadableStreamReader(this); 139 return new ReadableStreamReader(this);
267 } 140 }
268 141
269 void ReadableStream::setReader(ReadableStreamReader* reader) 142 void ReadableStream::setReader(ReadableStreamReader* reader)
270 { 143 {
271 ASSERT((reader && !m_reader) || (!reader && m_reader)); 144 ASSERT((reader && !m_reader) || (!reader && m_reader));
272 m_reader = reader; 145 m_reader = reader;
273 } 146 }
274 147
275 void ReadableStream::callPullIfNeeded() 148 void ReadableStream::callPullIfNeeded()
276 { 149 {
277 if (m_isPulling || m_isDraining || !m_isStarted || m_state == Closed || m_st ate == Errored) 150 if (m_isPulling || m_isDraining || !m_isStarted || m_state == Closed || m_st ate == Errored)
278 return; 151 return;
279 152
280 bool shouldApplyBackpressure = this->shouldApplyBackpressure(); 153 bool shouldApplyBackpressure = this->shouldApplyBackpressure();
281 // this->shouldApplyBackpressure may call this->error(). 154 // this->shouldApplyBackpressure may call this->error().
282 if (shouldApplyBackpressure || m_state == Errored) 155 if (shouldApplyBackpressure || m_state == Errored)
283 return; 156 return;
284 m_isPulling = true; 157 m_isPulling = true;
285 m_source->pullSource(); 158 m_source->pullSource();
286 } 159 }
287 160
161 void ReadableStream::closeInternal()
162 {
163 ASSERT(m_state == Readable);
164 m_state = Closed;
165 resolveAllPendingReadsAsDone();
166 clearQueue();
167 if (m_reader)
168 m_reader->releaseLock();
169 }
170
288 bool ReadableStream::hasPendingActivity() const 171 bool ReadableStream::hasPendingActivity() const
289 { 172 {
290 return m_state == Waiting || m_state == Readable; 173 return m_state == Readable;
291 } 174 }
292 175
293 void ReadableStream::stop() 176 void ReadableStream::stop()
294 { 177 {
295 error(DOMException::create(AbortError, "execution context is stopped")); 178 error(DOMException::create(AbortError, "execution context is stopped"));
296 ActiveDOMObject::stop(); 179 ActiveDOMObject::stop();
297 } 180 }
298 181
299 DEFINE_TRACE(ReadableStream) 182 DEFINE_TRACE(ReadableStream)
300 { 183 {
301 visitor->trace(m_source); 184 visitor->trace(m_source);
302 visitor->trace(m_ready);
303 visitor->trace(m_closed);
304 visitor->trace(m_exception); 185 visitor->trace(m_exception);
305 visitor->trace(m_reader); 186 visitor->trace(m_reader);
306 ActiveDOMObject::trace(visitor); 187 ActiveDOMObject::trace(visitor);
307 } 188 }
308 189
309 String ReadableStream::stateToString(State state)
310 {
311 switch (state) {
312 case Readable:
313 return "readable";
314 case Waiting:
315 return "waiting";
316 case Closed:
317 return "closed";
318 case Errored:
319 return "errored";
320 }
321 ASSERT(false);
322 return String();
323 }
324
325 } // namespace blink 190 } // namespace blink
OLDNEW
« no previous file with comments | « Source/core/streams/ReadableStream.h ('k') | Source/core/streams/ReadableStream.idl » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698