Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(125)

Side by Side Diff: Source/core/streams/ReadableStream.cpp

Issue 712043002: Make ReadableStream implementation up to date. (Closed) Base URL: https://chromium.googlesource.com/chromium/blink.git@master
Patch Set: Created 6 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2014 The Chromium Authors. All rights reserved. 1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "config.h" 5 #include "config.h"
6 #include "core/streams/ReadableStream.h" 6 #include "core/streams/ReadableStream.h"
7 7
8 #include "bindings/core/v8/ExceptionState.h" 8 #include "bindings/core/v8/ExceptionState.h"
9 #include "bindings/core/v8/ScriptFunction.h" 9 #include "bindings/core/v8/ScriptFunction.h"
10 #include "bindings/core/v8/ScriptPromiseResolver.h" 10 #include "bindings/core/v8/ScriptPromiseResolver.h"
11 #include "bindings/core/v8/V8Binding.h" 11 #include "bindings/core/v8/V8Binding.h"
12 #include "core/dom/DOMException.h" 12 #include "core/dom/DOMException.h"
13 #include "core/dom/ExceptionCode.h" 13 #include "core/dom/ExceptionCode.h"
14 #include "core/dom/ExecutionContext.h" 14 #include "core/dom/ExecutionContext.h"
15 #include "core/streams/UnderlyingSource.h" 15 #include "core/streams/UnderlyingSource.h"
16 16
17 namespace blink { 17 namespace blink {
18 18
19 namespace {
20
21 class ConstUndefined : public ScriptFunction {
22 public:
23 static v8::Handle<v8::Function> create(ScriptState* scriptState)
24 {
25 return (new ConstUndefined(scriptState))->bindToV8Function();
26 }
27
28 private:
29 explicit ConstUndefined(ScriptState* scriptState) : ScriptFunction(scriptSta te) { }
30 ScriptValue call(ScriptValue value) override
31 {
32 return ScriptValue(scriptState(), v8::Undefined(scriptState()->isolate() ));
33 }
34 };
35
36 } // namespace
37
19 ReadableStream::ReadableStream(ExecutionContext* executionContext, UnderlyingSou rce* source) 38 ReadableStream::ReadableStream(ExecutionContext* executionContext, UnderlyingSou rce* source)
20 : m_source(source) 39 : m_source(source)
21 , m_isStarted(false) 40 , m_isStarted(false)
22 , m_isDraining(false) 41 , m_isDraining(false)
23 , m_isPulling(false) 42 , m_isPulling(false)
24 , m_isSchedulingPull(false)
25 , m_state(Waiting) 43 , m_state(Waiting)
26 , m_wait(new WaitPromise(executionContext, this, WaitPromise::Ready)) 44 , m_wait(new WaitPromise(executionContext, this, WaitPromise::Ready))
27 , m_closed(new ClosedPromise(executionContext, this, ClosedPromise::Closed)) 45 , m_closed(new ClosedPromise(executionContext, this, ClosedPromise::Closed))
28 { 46 {
29 } 47 }
30 48
31 ReadableStream::~ReadableStream() 49 ReadableStream::~ReadableStream()
32 { 50 {
33 } 51 }
34 52
(...skipping 11 matching lines...) Expand all
46 } 64 }
47 ASSERT(false); 65 ASSERT(false);
48 return String(); 66 return String();
49 } 67 }
50 68
51 bool ReadableStream::enqueuePreliminaryCheck(size_t chunkSize) 69 bool ReadableStream::enqueuePreliminaryCheck(size_t chunkSize)
52 { 70 {
53 if (m_state == Errored || m_state == Closed || m_isDraining) 71 if (m_state == Errored || m_state == Closed || m_isDraining)
54 return false; 72 return false;
55 73
56 // FIXME: Query strategy.
57 return true; 74 return true;
tyoshino (SeeGerritForStatus) 2014/11/11 12:43:32 FYI, there was a change about exception throwing f
yhirano 2014/11/12 04:20:29 I see, but I don't want to throw an exception (unl
tyoshino (SeeGerritForStatus) 2014/11/12 04:27:16 I see. OK.
58 } 75 }
59 76
60 bool ReadableStream::enqueuePostAction(size_t totalQueueSize) 77 bool ReadableStream::enqueuePostAction(size_t totalQueueSize)
61 { 78 {
62 m_isPulling = false; 79 m_isPulling = false;
63 80
64 // FIXME: Set needsMore correctly. 81 // FIXME: Set |shouldApplyBackpressure| correctly.
65 bool needsMore = true; 82 bool shouldApplyBackpressure = false;
66 83
67 if (m_state == Waiting) { 84 if (m_state == Waiting) {
68 m_state = Readable; 85 m_state = Readable;
69 m_wait->resolve(V8UndefinedType()); 86 m_wait->resolve(V8UndefinedType());
70 } 87 }
71 88
72 return needsMore; 89 return !shouldApplyBackpressure;
73 } 90 }
74 91
75 void ReadableStream::close() 92 void ReadableStream::close()
76 { 93 {
77 if (m_state == Waiting) { 94 if (m_state == Waiting) {
78 m_wait->resolve(V8UndefinedType()); 95 m_wait->resolve(V8UndefinedType());
79 m_closed->resolve(V8UndefinedType()); 96 m_closed->resolve(V8UndefinedType());
80 m_state = Closed; 97 m_state = Closed;
81 } else if (m_state == Readable) { 98 } else if (m_state == Readable) {
82 m_isDraining = true; 99 m_isDraining = true;
(...skipping 15 matching lines...) Expand all
98 return; 115 return;
99 } 116 }
100 } 117 }
101 118
102 void ReadableStream::readPostAction() 119 void ReadableStream::readPostAction()
103 { 120 {
104 ASSERT(m_state == Readable); 121 ASSERT(m_state == Readable);
105 if (isQueueEmpty()) { 122 if (isQueueEmpty()) {
106 if (m_isDraining) { 123 if (m_isDraining) {
107 m_state = Closed; 124 m_state = Closed;
108 m_wait->reset();
109 m_wait->resolve(V8UndefinedType());
110 m_closed->resolve(V8UndefinedType()); 125 m_closed->resolve(V8UndefinedType());
111 } else { 126 } else {
112 m_state = Waiting; 127 m_state = Waiting;
113 m_wait->reset(); 128 m_wait->reset();
114 callOrSchedulePull();
115 } 129 }
116 } 130 }
131 callPullIfNeeded();
117 } 132 }
118 133
119 ScriptPromise ReadableStream::wait(ScriptState* scriptState) 134 ScriptPromise ReadableStream::wait(ScriptState* scriptState)
120 { 135 {
121 if (m_state == Waiting)
122 callOrSchedulePull();
123 return m_wait->promise(scriptState->world()); 136 return m_wait->promise(scriptState->world());
124 } 137 }
125 138
126 ScriptPromise ReadableStream::cancel(ScriptState* scriptState, ScriptValue reaso n) 139 ScriptPromise ReadableStream::cancel(ScriptState* scriptState, ScriptValue reaso n)
127 { 140 {
128 if (m_state == Errored) {
129 RefPtr<ScriptPromiseResolver> resolver = ScriptPromiseResolver::create(s criptState);
130 ScriptPromise promise = resolver->promise();
131 resolver->reject(m_exception);
132 return promise;
133 }
134 if (m_state == Closed) 141 if (m_state == Closed)
135 return ScriptPromise::cast(scriptState, v8::Undefined(scriptState->isola te())); 142 return ScriptPromise::cast(scriptState, v8::Undefined(scriptState->isola te()));
143 if (m_state == Errored)
144 return ScriptPromise::rejectWithDOMException(scriptState, m_exception);
136 145
137 if (m_state == Waiting) { 146 ASSERT(m_state == Readable || m_state == Waiting);
147 if (m_state == Waiting)
138 m_wait->resolve(V8UndefinedType()); 148 m_wait->resolve(V8UndefinedType());
139 } else {
140 ASSERT(m_state == Readable);
141 m_wait->reset();
142 m_wait->resolve(V8UndefinedType());
143 }
144
145 clearQueue(); 149 clearQueue();
146 m_state = Closed; 150 m_state = Closed;
147 m_closed->resolve(V8UndefinedType()); 151 m_closed->resolve(V8UndefinedType());
148 return m_source->cancelSource(scriptState, reason); 152 return m_source->cancelSource(scriptState, reason).then(ConstUndefined::crea te(scriptState));
149 } 153 }
150 154
151 ScriptPromise ReadableStream::closed(ScriptState* scriptState) 155 ScriptPromise ReadableStream::closed(ScriptState* scriptState)
152 { 156 {
153 return m_closed->promise(scriptState->world()); 157 return m_closed->promise(scriptState->world());
154 } 158 }
155 159
156 void ReadableStream::error(PassRefPtrWillBeRawPtr<DOMException> exception) 160 void ReadableStream::error(PassRefPtrWillBeRawPtr<DOMException> exception)
157 { 161 {
158 if (m_state == Readable) { 162 switch (m_state) {
159 clearQueue(); 163 case Waiting:
160 m_wait->reset();
161 }
162
163 if (m_state == Waiting || m_state == Readable) {
164 m_state = Errored; 164 m_state = Errored;
165 m_exception = exception; 165 m_exception = exception;
166 if (m_wait->state() == m_wait->Pending) 166 m_wait->reject(m_exception);
167 m_wait->reject(m_exception);
168 m_closed->reject(m_exception); 167 m_closed->reject(m_exception);
168 break;
169 case Readable:
170 clearQueue();
171 m_state = Errored;
172 m_exception = exception;
173 m_wait->reset();
174 m_wait->reject(m_exception);
175 m_closed->reject(m_exception);
176 break;
177 default:
178 break;
169 } 179 }
170 } 180 }
171 181
172 void ReadableStream::didSourceStart() 182 void ReadableStream::didSourceStart()
173 { 183 {
174 m_isStarted = true; 184 m_isStarted = true;
175 if (m_isSchedulingPull) 185 callPullIfNeeded();
176 m_source->pullSource();
177 } 186 }
178 187
179 void ReadableStream::callOrSchedulePull() 188 void ReadableStream::callPullIfNeeded()
180 { 189 {
181 if (m_isPulling) 190 if (m_isPulling || m_isDraining || !m_isStarted || m_state == Closed || m_st ate == Errored)
191 return;
192 // FIXME: Set shouldApplyBackpressure correctly.
193 bool shouldApplyBackpressure = false;
194 if (shouldApplyBackpressure)
182 return; 195 return;
183 m_isPulling = true; 196 m_isPulling = true;
184 if (m_isStarted) 197 m_source->pullSource();
185 m_source->pullSource();
186 else
187 m_isSchedulingPull = true;
188 } 198 }
189 199
190 void ReadableStream::trace(Visitor* visitor) 200 void ReadableStream::trace(Visitor* visitor)
191 { 201 {
192 visitor->trace(m_source); 202 visitor->trace(m_source);
193 visitor->trace(m_wait); 203 visitor->trace(m_wait);
194 visitor->trace(m_closed); 204 visitor->trace(m_closed);
195 visitor->trace(m_exception); 205 visitor->trace(m_exception);
196 } 206 }
197 207
198 } // namespace blink 208 } // namespace blink
199 209
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698