Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(2)

Side by Side Diff: Source/core/streams/ReadableStreamImpl.h

Issue 1004623007: Streams Implementation Update: async read (Closed) Base URL: https://chromium.googlesource.com/chromium/blink.git@stream-reader-read
Patch Set: Created 5 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2014 The Chromium Authors. All rights reserved. 1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #ifndef ReadableStreamImpl_h 5 #ifndef ReadableStreamImpl_h
6 #define ReadableStreamImpl_h 6 #define ReadableStreamImpl_h
7 7
8 #include "bindings/core/v8/ExceptionState.h" 8 #include "bindings/core/v8/ExceptionState.h"
9 #include "bindings/core/v8/ScriptPromise.h"
10 #include "bindings/core/v8/ScriptPromiseResolver.h"
9 #include "bindings/core/v8/ScriptState.h" 11 #include "bindings/core/v8/ScriptState.h"
10 #include "bindings/core/v8/ScriptValue.h" 12 #include "bindings/core/v8/ScriptValue.h"
11 #include "bindings/core/v8/V8ArrayBuffer.h" 13 #include "bindings/core/v8/V8ArrayBuffer.h"
12 #include "bindings/core/v8/V8Binding.h" 14 #include "bindings/core/v8/V8Binding.h"
15 #include "bindings/core/v8/V8IteratorResultValue.h"
13 #include "core/dom/DOMArrayBuffer.h" 16 #include "core/dom/DOMArrayBuffer.h"
14 #include "core/streams/ReadableStream.h" 17 #include "core/streams/ReadableStream.h"
15 #include "wtf/Deque.h" 18 #include "wtf/Deque.h"
16 #include "wtf/RefPtr.h" 19 #include "wtf/RefPtr.h"
17 #include "wtf/text/WTFString.h" 20 #include "wtf/text/WTFString.h"
18 #include <utility> 21 #include <utility>
19 22
20 namespace blink { 23 namespace blink {
21 24
22 // We define the default ChunkTypeTraits for frequently used types. 25 // We define the default ChunkTypeTraits for frequently used types.
(...skipping 52 matching lines...) Expand 10 before | Expand all | Expand 10 after
75 78
76 ReadableStreamImpl(ExecutionContext* executionContext, UnderlyingSource* sou rce) 79 ReadableStreamImpl(ExecutionContext* executionContext, UnderlyingSource* sou rce)
77 : ReadableStreamImpl(executionContext, source, new DefaultStrategy) { } 80 : ReadableStreamImpl(executionContext, source, new DefaultStrategy) { }
78 ReadableStreamImpl(ExecutionContext* executionContext, UnderlyingSource* sou rce, Strategy* strategy) 81 ReadableStreamImpl(ExecutionContext* executionContext, UnderlyingSource* sou rce, Strategy* strategy)
79 : ReadableStream(executionContext, source) 82 : ReadableStream(executionContext, source)
80 , m_strategy(strategy) 83 , m_strategy(strategy)
81 , m_totalQueueSize(0) { } 84 , m_totalQueueSize(0) { }
82 ~ReadableStreamImpl() override { } 85 ~ReadableStreamImpl() override { }
83 86
84 // ReadableStream methods 87 // ReadableStream methods
85 ScriptValue readInternal(ScriptState*, ExceptionState&) override; 88 ScriptPromise read(ScriptState*) override;
86 89
87 bool enqueue(typename ChunkTypeTraits::PassType); 90 bool enqueue(typename ChunkTypeTraits::PassType);
88 91
89 // This function is intended to be used by internal code to withdraw 92 // This function is intended to be used by internal code to withdraw
90 // queued data. This pulls all data from this stream's queue, but 93 // queued data. This pulls all data from this stream's queue, but
91 // ReadableStream public APIs can work with the behavior (i.e. it behaves 94 // ReadableStream public APIs can work with the behavior (i.e. it behaves
92 // as if multiple read-one-buffer calls were made). 95 // as if multiple read-one-buffer calls were made).
93 void readInternal(Deque<std::pair<typename ChunkTypeTraits::HoldType, size_t >>& queue); 96 void readInternal(Deque<std::pair<typename ChunkTypeTraits::HoldType, size_t >>& queue);
94 97
95 DEFINE_INLINE_VIRTUAL_TRACE() 98 DEFINE_INLINE_VIRTUAL_TRACE()
96 { 99 {
97 visitor->trace(m_strategy); 100 visitor->trace(m_strategy);
98 ReadableStream::trace(visitor); 101 ReadableStream::trace(visitor);
99 } 102 }
100 103
101 private: 104 private:
105 #if ENABLE(OILPAN)
106 using PendingReads = HeapDeque<Member<ScriptPromiseResolver>>;
107 #else
108 using PendingReads = Deque<RefPtr<ScriptPromiseResolver>>;
109 #endif
110
102 // ReadableStream methods 111 // ReadableStream methods
103 bool isQueueEmpty() const override { return m_queue.isEmpty(); } 112 bool isQueueEmpty() const override { return m_queue.isEmpty(); }
104 void clearQueue() override 113 void clearQueue() override
105 { 114 {
106 m_queue.clear(); 115 m_queue.clear();
107 m_totalQueueSize = 0; 116 m_totalQueueSize = 0;
108 } 117 }
118
119 void resolveAllPendingReadsAsDone() override
120 {
121 for (auto& resolver : m_pendingReads) {
122 ScriptState::Scope scope(resolver->scriptState());
123 resolver->resolve(v8IteratorResultDone(resolver->scriptState()));
124 }
125 m_pendingReads.clear();
126 }
127
128 void rejectAllPendingReads(PassRefPtrWillBeRawPtr<DOMException> r) override
129 {
130 RefPtrWillBeRawPtr<DOMException> reason = r;
131 for (auto& resolver : m_pendingReads)
132 resolver->reject(reason);
133 m_pendingReads.clear();
134 }
135
109 bool shouldApplyBackpressure() override 136 bool shouldApplyBackpressure() override
110 { 137 {
111 return m_strategy->shouldApplyBackpressure(m_totalQueueSize, this); 138 return m_strategy->shouldApplyBackpressure(m_totalQueueSize, this);
112 } 139 }
140 bool hasPendingReads() const override { return !m_pendingReads.isEmpty(); }
113 141
114 Member<Strategy> m_strategy; 142 Member<Strategy> m_strategy;
115 Deque<std::pair<typename ChunkTypeTraits::HoldType, size_t>> m_queue; 143 Deque<std::pair<typename ChunkTypeTraits::HoldType, size_t>> m_queue;
144 PendingReads m_pendingReads;
116 size_t m_totalQueueSize; 145 size_t m_totalQueueSize;
117 }; 146 };
118 147
119 template <typename ChunkTypeTraits> 148 template <typename ChunkTypeTraits>
120 bool ReadableStreamImpl<ChunkTypeTraits>::enqueue(typename ChunkTypeTraits::Pass Type chunk) 149 bool ReadableStreamImpl<ChunkTypeTraits>::enqueue(typename ChunkTypeTraits::Pass Type chunk)
121 { 150 {
122 size_t size = m_strategy->size(chunk, this); 151 size_t size = m_strategy->size(chunk, this);
123 if (!enqueuePreliminaryCheck()) 152 if (!enqueuePreliminaryCheck())
124 return false; 153 return false;
125 m_queue.append(std::make_pair(chunk, size)); 154
126 m_totalQueueSize += size; 155 if (m_pendingReads.isEmpty()) {
156 m_queue.append(std::make_pair(chunk, size));
157 m_totalQueueSize += size;
158 return enqueuePostAction();
159 }
160
161 RefPtrWillBeRawPtr<ScriptPromiseResolver> resolver = m_pendingReads.takeFirs t();
162 ScriptState* scriptState = resolver->scriptState();
163 ScriptState::Scope scope(scriptState);
164 resolver->resolve(v8IteratorResult(scriptState, chunk));
127 return enqueuePostAction(); 165 return enqueuePostAction();
128 } 166 }
129 167
130 template <typename ChunkTypeTraits> 168 template <typename ChunkTypeTraits>
131 ScriptValue ReadableStreamImpl<ChunkTypeTraits>::readInternal(ScriptState* scrip tState, ExceptionState& exceptionState) 169 ScriptPromise ReadableStreamImpl<ChunkTypeTraits>::read(ScriptState* scriptState )
132 { 170 {
133 readInternalPreliminaryCheck(exceptionState);
134 if (exceptionState.hadException())
135 return ScriptValue();
136 ASSERT(stateInternal() == Readable); 171 ASSERT(stateInternal() == Readable);
137 ASSERT(!m_queue.isEmpty()); 172 if (m_queue.isEmpty()) {
173 m_pendingReads.append(ScriptPromiseResolver::create(scriptState));
174 return m_pendingReads.last()->promise();
175 }
176
138 auto pair = m_queue.takeFirst(); 177 auto pair = m_queue.takeFirst();
139 typename ChunkTypeTraits::HoldType chunk = pair.first; 178 typename ChunkTypeTraits::HoldType chunk = pair.first;
140 size_t size = pair.second; 179 size_t size = pair.second;
141 ASSERT(m_totalQueueSize >= size); 180 ASSERT(m_totalQueueSize >= size);
142 m_totalQueueSize -= size; 181 m_totalQueueSize -= size;
143 readInternalPostAction(); 182 readInternalPostAction();
144 return ChunkTypeTraits::toScriptValue(scriptState, chunk); 183
184 return ScriptPromise::cast(scriptState, v8IteratorResult(scriptState, chunk) );
145 } 185 }
146 186
147 template <typename ChunkTypeTraits> 187 template <typename ChunkTypeTraits>
148 void ReadableStreamImpl<ChunkTypeTraits>::readInternal(Deque<std::pair<typename ChunkTypeTraits::HoldType, size_t>>& queue) 188 void ReadableStreamImpl<ChunkTypeTraits>::readInternal(Deque<std::pair<typename ChunkTypeTraits::HoldType, size_t>>& queue)
149 { 189 {
150 // We omit the preliminary check. Check it by yourself. 190 // We omit the preliminary check. Check it by yourself.
151 ASSERT(stateInternal() == Readable); 191 ASSERT(stateInternal() == Readable);
152 ASSERT(!m_queue.isEmpty()); 192 ASSERT(m_pendingReads.isEmpty());
153 ASSERT(queue.isEmpty()); 193 ASSERT(queue.isEmpty());
154 194
155 queue.swap(m_queue); 195 queue.swap(m_queue);
156 m_totalQueueSize = 0; 196 m_totalQueueSize = 0;
157 readInternalPostAction(); 197 readInternalPostAction();
158 } 198 }
159 199
160 } // namespace blink 200 } // namespace blink
161 201
162 #endif // ReadableStreamImpl_h 202 #endif // ReadableStreamImpl_h
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698