Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(117)

Side by Side Diff: Source/core/streams/ReadableStreamImpl.h

Issue 1004623007: Streams Implementation Update: async read (Closed) Base URL: https://chromium.googlesource.com/chromium/blink.git@stream-reader-read
Patch Set: Created 5 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « Source/core/streams/ReadableStream.idl ('k') | Source/core/streams/ReadableStreamReader.h » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2014 The Chromium Authors. All rights reserved. 1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #ifndef ReadableStreamImpl_h 5 #ifndef ReadableStreamImpl_h
6 #define ReadableStreamImpl_h 6 #define ReadableStreamImpl_h
7 7
8 #include "bindings/core/v8/ExceptionState.h" 8 #include "bindings/core/v8/ExceptionState.h"
9 #include "bindings/core/v8/ScriptPromise.h"
10 #include "bindings/core/v8/ScriptPromiseResolver.h"
9 #include "bindings/core/v8/ScriptState.h" 11 #include "bindings/core/v8/ScriptState.h"
10 #include "bindings/core/v8/ScriptValue.h" 12 #include "bindings/core/v8/ScriptValue.h"
11 #include "bindings/core/v8/V8ArrayBuffer.h" 13 #include "bindings/core/v8/V8ArrayBuffer.h"
12 #include "bindings/core/v8/V8Binding.h" 14 #include "bindings/core/v8/V8Binding.h"
15 #include "bindings/core/v8/V8IteratorResultValue.h"
13 #include "core/dom/DOMArrayBuffer.h" 16 #include "core/dom/DOMArrayBuffer.h"
17 #include "core/dom/DOMException.h"
14 #include "core/streams/ReadableStream.h" 18 #include "core/streams/ReadableStream.h"
15 #include "wtf/Deque.h" 19 #include "wtf/Deque.h"
16 #include "wtf/RefPtr.h" 20 #include "wtf/RefPtr.h"
17 #include "wtf/text/WTFString.h" 21 #include "wtf/text/WTFString.h"
18 #include <utility> 22 #include <utility>
19 23
20 namespace blink { 24 namespace blink {
21 25
22 // We define the default ChunkTypeTraits for frequently used types. 26 // We define the default ChunkTypeTraits for frequently used types.
23 template<typename ChunkType> 27 template<typename ChunkType>
(...skipping 51 matching lines...) Expand 10 before | Expand all | Expand 10 after
75 79
76 ReadableStreamImpl(ExecutionContext* executionContext, UnderlyingSource* sou rce) 80 ReadableStreamImpl(ExecutionContext* executionContext, UnderlyingSource* sou rce)
77 : ReadableStreamImpl(executionContext, source, new DefaultStrategy) { } 81 : ReadableStreamImpl(executionContext, source, new DefaultStrategy) { }
78 ReadableStreamImpl(ExecutionContext* executionContext, UnderlyingSource* sou rce, Strategy* strategy) 82 ReadableStreamImpl(ExecutionContext* executionContext, UnderlyingSource* sou rce, Strategy* strategy)
79 : ReadableStream(executionContext, source) 83 : ReadableStream(executionContext, source)
80 , m_strategy(strategy) 84 , m_strategy(strategy)
81 , m_totalQueueSize(0) { } 85 , m_totalQueueSize(0) { }
82 ~ReadableStreamImpl() override { } 86 ~ReadableStreamImpl() override { }
83 87
84 // ReadableStream methods 88 // ReadableStream methods
85 ScriptValue readInternal(ScriptState*, ExceptionState&) override; 89 ScriptPromise read(ScriptState*) override;
86 90
87 bool enqueue(typename ChunkTypeTraits::PassType); 91 bool enqueue(typename ChunkTypeTraits::PassType);
88 92
89 // This function is intended to be used by internal code to withdraw 93 // This function is intended to be used by internal code to withdraw
90 // queued data. This pulls all data from this stream's queue, but 94 // queued data. This pulls all data from this stream's queue, but
91 // ReadableStream public APIs can work with the behavior (i.e. it behaves 95 // ReadableStream public APIs can work with the behavior (i.e. it behaves
92 // as if multiple read-one-buffer calls were made). 96 // as if multiple read-one-buffer calls were made).
93 void readInternal(Deque<std::pair<typename ChunkTypeTraits::HoldType, size_t >>& queue); 97 void readInternal(Deque<std::pair<typename ChunkTypeTraits::HoldType, size_t >>& queue);
94 98
95 DEFINE_INLINE_VIRTUAL_TRACE() 99 DEFINE_INLINE_VIRTUAL_TRACE()
96 { 100 {
97 visitor->trace(m_strategy); 101 visitor->trace(m_strategy);
102 #if ENABLE(OILPAN)
103 visitor->trace(m_pendingReads);
104 #endif
98 ReadableStream::trace(visitor); 105 ReadableStream::trace(visitor);
99 } 106 }
100 107
101 private: 108 private:
109 #if ENABLE(OILPAN)
110 using PendingReads = HeapDeque<Member<ScriptPromiseResolver>>;
111 #else
112 using PendingReads = Deque<RefPtr<ScriptPromiseResolver>>;
113 #endif
114
102 // ReadableStream methods 115 // ReadableStream methods
103 bool isQueueEmpty() const override { return m_queue.isEmpty(); } 116 bool isQueueEmpty() const override { return m_queue.isEmpty(); }
104 void clearQueue() override 117 void clearQueue() override
105 { 118 {
106 m_queue.clear(); 119 m_queue.clear();
107 m_totalQueueSize = 0; 120 m_totalQueueSize = 0;
108 } 121 }
122
123 void resolveAllPendingReadsAsDone() override
124 {
125 for (auto& resolver : m_pendingReads) {
126 ScriptState::Scope scope(resolver->scriptState());
127 resolver->resolve(v8IteratorResultDone(resolver->scriptState()));
128 }
129 m_pendingReads.clear();
130 }
131
132 void rejectAllPendingReads(PassRefPtrWillBeRawPtr<DOMException> r) override
133 {
134 RefPtrWillBeRawPtr<DOMException> reason = r;
135 for (auto& resolver : m_pendingReads)
136 resolver->reject(reason);
137 m_pendingReads.clear();
138 }
139
109 bool shouldApplyBackpressure() override 140 bool shouldApplyBackpressure() override
110 { 141 {
111 return m_strategy->shouldApplyBackpressure(m_totalQueueSize, this); 142 return m_strategy->shouldApplyBackpressure(m_totalQueueSize, this);
112 } 143 }
144 bool hasPendingReads() const override { return !m_pendingReads.isEmpty(); }
113 145
114 Member<Strategy> m_strategy; 146 Member<Strategy> m_strategy;
115 Deque<std::pair<typename ChunkTypeTraits::HoldType, size_t>> m_queue; 147 Deque<std::pair<typename ChunkTypeTraits::HoldType, size_t>> m_queue;
148 PendingReads m_pendingReads;
116 size_t m_totalQueueSize; 149 size_t m_totalQueueSize;
117 }; 150 };
118 151
119 template <typename ChunkTypeTraits> 152 template <typename ChunkTypeTraits>
120 bool ReadableStreamImpl<ChunkTypeTraits>::enqueue(typename ChunkTypeTraits::Pass Type chunk) 153 bool ReadableStreamImpl<ChunkTypeTraits>::enqueue(typename ChunkTypeTraits::Pass Type chunk)
121 { 154 {
122 size_t size = m_strategy->size(chunk, this); 155 size_t size = m_strategy->size(chunk, this);
123 if (!enqueuePreliminaryCheck()) 156 if (!enqueuePreliminaryCheck())
124 return false; 157 return false;
125 m_queue.append(std::make_pair(chunk, size)); 158
126 m_totalQueueSize += size; 159 if (m_pendingReads.isEmpty()) {
160 m_queue.append(std::make_pair(chunk, size));
161 m_totalQueueSize += size;
162 return enqueuePostAction();
163 }
164
165 RefPtrWillBeRawPtr<ScriptPromiseResolver> resolver = m_pendingReads.takeFirs t();
166 ScriptState* scriptState = resolver->scriptState();
167 ScriptState::Scope scope(scriptState);
168 resolver->resolve(v8IteratorResult(scriptState, chunk));
127 return enqueuePostAction(); 169 return enqueuePostAction();
128 } 170 }
129 171
130 template <typename ChunkTypeTraits> 172 template <typename ChunkTypeTraits>
131 ScriptValue ReadableStreamImpl<ChunkTypeTraits>::readInternal(ScriptState* scrip tState, ExceptionState& exceptionState) 173 ScriptPromise ReadableStreamImpl<ChunkTypeTraits>::read(ScriptState* scriptState )
132 { 174 {
133 readInternalPreliminaryCheck(exceptionState);
134 if (exceptionState.hadException())
135 return ScriptValue();
136 ASSERT(stateInternal() == Readable); 175 ASSERT(stateInternal() == Readable);
137 ASSERT(!m_queue.isEmpty()); 176 if (m_queue.isEmpty()) {
177 m_pendingReads.append(ScriptPromiseResolver::create(scriptState));
178 return m_pendingReads.last()->promise();
179 }
180
138 auto pair = m_queue.takeFirst(); 181 auto pair = m_queue.takeFirst();
139 typename ChunkTypeTraits::HoldType chunk = pair.first; 182 typename ChunkTypeTraits::HoldType chunk = pair.first;
140 size_t size = pair.second; 183 size_t size = pair.second;
141 ASSERT(m_totalQueueSize >= size); 184 ASSERT(m_totalQueueSize >= size);
142 m_totalQueueSize -= size; 185 m_totalQueueSize -= size;
143 readInternalPostAction(); 186 readInternalPostAction();
144 return ChunkTypeTraits::toScriptValue(scriptState, chunk); 187
188 return ScriptPromise::cast(scriptState, v8IteratorResult(scriptState, chunk) );
145 } 189 }
146 190
147 template <typename ChunkTypeTraits> 191 template <typename ChunkTypeTraits>
148 void ReadableStreamImpl<ChunkTypeTraits>::readInternal(Deque<std::pair<typename ChunkTypeTraits::HoldType, size_t>>& queue) 192 void ReadableStreamImpl<ChunkTypeTraits>::readInternal(Deque<std::pair<typename ChunkTypeTraits::HoldType, size_t>>& queue)
149 { 193 {
150 // We omit the preliminary check. Check it by yourself. 194 // We omit the preliminary check. Check it by yourself.
151 ASSERT(stateInternal() == Readable); 195 ASSERT(stateInternal() == Readable);
152 ASSERT(!m_queue.isEmpty()); 196 ASSERT(m_pendingReads.isEmpty());
153 ASSERT(queue.isEmpty()); 197 ASSERT(queue.isEmpty());
154 198
155 queue.swap(m_queue); 199 queue.swap(m_queue);
156 m_totalQueueSize = 0; 200 m_totalQueueSize = 0;
157 readInternalPostAction(); 201 readInternalPostAction();
158 } 202 }
159 203
160 } // namespace blink 204 } // namespace blink
161 205
162 #endif // ReadableStreamImpl_h 206 #endif // ReadableStreamImpl_h
OLDNEW
« no previous file with comments | « Source/core/streams/ReadableStream.idl ('k') | Source/core/streams/ReadableStreamReader.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698