OLD | NEW |
1 // Copyright 2014 The Chromium Authors. All rights reserved. | 1 // Copyright 2014 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #ifndef ReadableStreamImpl_h | 5 #ifndef ReadableStreamImpl_h |
6 #define ReadableStreamImpl_h | 6 #define ReadableStreamImpl_h |
7 | 7 |
8 #include "bindings/core/v8/ExceptionState.h" | 8 #include "bindings/core/v8/ExceptionState.h" |
| 9 #include "bindings/core/v8/ScriptPromise.h" |
| 10 #include "bindings/core/v8/ScriptPromiseResolver.h" |
9 #include "bindings/core/v8/ScriptState.h" | 11 #include "bindings/core/v8/ScriptState.h" |
10 #include "bindings/core/v8/ScriptValue.h" | 12 #include "bindings/core/v8/ScriptValue.h" |
11 #include "bindings/core/v8/V8ArrayBuffer.h" | 13 #include "bindings/core/v8/V8ArrayBuffer.h" |
12 #include "bindings/core/v8/V8Binding.h" | 14 #include "bindings/core/v8/V8Binding.h" |
| 15 #include "bindings/core/v8/V8IteratorResultValue.h" |
13 #include "core/dom/DOMArrayBuffer.h" | 16 #include "core/dom/DOMArrayBuffer.h" |
14 #include "core/streams/ReadableStream.h" | 17 #include "core/streams/ReadableStream.h" |
15 #include "wtf/Deque.h" | 18 #include "wtf/Deque.h" |
16 #include "wtf/RefPtr.h" | 19 #include "wtf/RefPtr.h" |
17 #include "wtf/text/WTFString.h" | 20 #include "wtf/text/WTFString.h" |
18 #include <utility> | 21 #include <utility> |
19 | 22 |
20 namespace blink { | 23 namespace blink { |
21 | 24 |
22 // We define the default ChunkTypeTraits for frequently used types. | 25 // We define the default ChunkTypeTraits for frequently used types. |
(...skipping 52 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
75 | 78 |
76 ReadableStreamImpl(ExecutionContext* executionContext, UnderlyingSource* sou
rce) | 79 ReadableStreamImpl(ExecutionContext* executionContext, UnderlyingSource* sou
rce) |
77 : ReadableStreamImpl(executionContext, source, new DefaultStrategy) { } | 80 : ReadableStreamImpl(executionContext, source, new DefaultStrategy) { } |
78 ReadableStreamImpl(ExecutionContext* executionContext, UnderlyingSource* sou
rce, Strategy* strategy) | 81 ReadableStreamImpl(ExecutionContext* executionContext, UnderlyingSource* sou
rce, Strategy* strategy) |
79 : ReadableStream(executionContext, source) | 82 : ReadableStream(executionContext, source) |
80 , m_strategy(strategy) | 83 , m_strategy(strategy) |
81 , m_totalQueueSize(0) { } | 84 , m_totalQueueSize(0) { } |
82 ~ReadableStreamImpl() override { } | 85 ~ReadableStreamImpl() override { } |
83 | 86 |
84 // ReadableStream methods | 87 // ReadableStream methods |
85 ScriptValue readInternal(ScriptState*, ExceptionState&) override; | 88 ScriptPromise read(ScriptState*) override; |
86 | 89 |
87 bool enqueue(typename ChunkTypeTraits::PassType); | 90 bool enqueue(typename ChunkTypeTraits::PassType); |
88 | 91 |
89 // This function is intended to be used by internal code to withdraw | 92 // This function is intended to be used by internal code to withdraw |
90 // queued data. This pulls all data from this stream's queue, but | 93 // queued data. This pulls all data from this stream's queue, but |
91 // ReadableStream public APIs can work with the behavior (i.e. it behaves | 94 // ReadableStream public APIs can work with the behavior (i.e. it behaves |
92 // as if multiple read-one-buffer calls were made). | 95 // as if multiple read-one-buffer calls were made). |
93 void readInternal(Deque<std::pair<typename ChunkTypeTraits::HoldType, size_t
>>& queue); | 96 void readInternal(Deque<std::pair<typename ChunkTypeTraits::HoldType, size_t
>>& queue); |
94 | 97 |
95 DEFINE_INLINE_VIRTUAL_TRACE() | 98 DEFINE_INLINE_VIRTUAL_TRACE() |
96 { | 99 { |
97 visitor->trace(m_strategy); | 100 visitor->trace(m_strategy); |
| 101 #if ENABLE(OILPAN) |
| 102 visitor->trace(m_pendingReads); |
| 103 #endif |
98 ReadableStream::trace(visitor); | 104 ReadableStream::trace(visitor); |
99 } | 105 } |
100 | 106 |
101 private: | 107 private: |
| 108 #if ENABLE(OILPAN) |
| 109 using PendingReads = HeapDeque<Member<ScriptPromiseResolver>>; |
| 110 #else |
| 111 using PendingReads = Deque<RefPtr<ScriptPromiseResolver>>; |
| 112 #endif |
| 113 |
102 // ReadableStream methods | 114 // ReadableStream methods |
103 bool isQueueEmpty() const override { return m_queue.isEmpty(); } | 115 bool isQueueEmpty() const override { return m_queue.isEmpty(); } |
104 void clearQueue() override | 116 void clearQueue() override |
105 { | 117 { |
106 m_queue.clear(); | 118 m_queue.clear(); |
107 m_totalQueueSize = 0; | 119 m_totalQueueSize = 0; |
108 } | 120 } |
| 121 |
| 122 void resolveAllPendingReadsAsDone() override |
| 123 { |
| 124 for (auto& resolver : m_pendingReads) { |
| 125 ScriptState::Scope scope(resolver->scriptState()); |
| 126 resolver->resolve(v8IteratorResultDone(resolver->scriptState())); |
| 127 } |
| 128 m_pendingReads.clear(); |
| 129 } |
| 130 |
| 131 void rejectAllPendingReads(PassRefPtrWillBeRawPtr<DOMException> r) override |
| 132 { |
| 133 RefPtrWillBeRawPtr<DOMException> reason = r; |
| 134 for (auto& resolver : m_pendingReads) |
| 135 resolver->reject(reason); |
| 136 m_pendingReads.clear(); |
| 137 } |
| 138 |
109 bool shouldApplyBackpressure() override | 139 bool shouldApplyBackpressure() override |
110 { | 140 { |
111 return m_strategy->shouldApplyBackpressure(m_totalQueueSize, this); | 141 return m_strategy->shouldApplyBackpressure(m_totalQueueSize, this); |
112 } | 142 } |
| 143 bool hasPendingReads() const override { return !m_pendingReads.isEmpty(); } |
113 | 144 |
114 Member<Strategy> m_strategy; | 145 Member<Strategy> m_strategy; |
115 Deque<std::pair<typename ChunkTypeTraits::HoldType, size_t>> m_queue; | 146 Deque<std::pair<typename ChunkTypeTraits::HoldType, size_t>> m_queue; |
| 147 PendingReads m_pendingReads; |
116 size_t m_totalQueueSize; | 148 size_t m_totalQueueSize; |
117 }; | 149 }; |
118 | 150 |
119 template <typename ChunkTypeTraits> | 151 template <typename ChunkTypeTraits> |
120 bool ReadableStreamImpl<ChunkTypeTraits>::enqueue(typename ChunkTypeTraits::Pass
Type chunk) | 152 bool ReadableStreamImpl<ChunkTypeTraits>::enqueue(typename ChunkTypeTraits::Pass
Type chunk) |
121 { | 153 { |
122 size_t size = m_strategy->size(chunk, this); | 154 size_t size = m_strategy->size(chunk, this); |
123 if (!enqueuePreliminaryCheck()) | 155 if (!enqueuePreliminaryCheck()) |
124 return false; | 156 return false; |
125 m_queue.append(std::make_pair(chunk, size)); | 157 |
126 m_totalQueueSize += size; | 158 if (m_pendingReads.isEmpty()) { |
| 159 m_queue.append(std::make_pair(chunk, size)); |
| 160 m_totalQueueSize += size; |
| 161 return enqueuePostAction(); |
| 162 } |
| 163 |
| 164 RefPtrWillBeRawPtr<ScriptPromiseResolver> resolver = m_pendingReads.takeFirs
t(); |
| 165 ScriptState* scriptState = resolver->scriptState(); |
| 166 ScriptState::Scope scope(scriptState); |
| 167 resolver->resolve(v8IteratorResult(scriptState, chunk)); |
127 return enqueuePostAction(); | 168 return enqueuePostAction(); |
128 } | 169 } |
129 | 170 |
130 template <typename ChunkTypeTraits> | 171 template <typename ChunkTypeTraits> |
131 ScriptValue ReadableStreamImpl<ChunkTypeTraits>::readInternal(ScriptState* scrip
tState, ExceptionState& exceptionState) | 172 ScriptPromise ReadableStreamImpl<ChunkTypeTraits>::read(ScriptState* scriptState
) |
132 { | 173 { |
133 readInternalPreliminaryCheck(exceptionState); | |
134 if (exceptionState.hadException()) | |
135 return ScriptValue(); | |
136 ASSERT(stateInternal() == Readable); | 174 ASSERT(stateInternal() == Readable); |
137 ASSERT(!m_queue.isEmpty()); | 175 if (m_queue.isEmpty()) { |
| 176 m_pendingReads.append(ScriptPromiseResolver::create(scriptState)); |
| 177 return m_pendingReads.last()->promise(); |
| 178 } |
| 179 |
138 auto pair = m_queue.takeFirst(); | 180 auto pair = m_queue.takeFirst(); |
139 typename ChunkTypeTraits::HoldType chunk = pair.first; | 181 typename ChunkTypeTraits::HoldType chunk = pair.first; |
140 size_t size = pair.second; | 182 size_t size = pair.second; |
141 ASSERT(m_totalQueueSize >= size); | 183 ASSERT(m_totalQueueSize >= size); |
142 m_totalQueueSize -= size; | 184 m_totalQueueSize -= size; |
143 readInternalPostAction(); | 185 readInternalPostAction(); |
144 return ChunkTypeTraits::toScriptValue(scriptState, chunk); | 186 |
| 187 return ScriptPromise::cast(scriptState, v8IteratorResult(scriptState, chunk)
); |
145 } | 188 } |
146 | 189 |
147 template <typename ChunkTypeTraits> | 190 template <typename ChunkTypeTraits> |
148 void ReadableStreamImpl<ChunkTypeTraits>::readInternal(Deque<std::pair<typename
ChunkTypeTraits::HoldType, size_t>>& queue) | 191 void ReadableStreamImpl<ChunkTypeTraits>::readInternal(Deque<std::pair<typename
ChunkTypeTraits::HoldType, size_t>>& queue) |
149 { | 192 { |
150 // We omit the preliminary check. Check it by yourself. | 193 // We omit the preliminary check. Check it by yourself. |
151 ASSERT(stateInternal() == Readable); | 194 ASSERT(stateInternal() == Readable); |
152 ASSERT(!m_queue.isEmpty()); | 195 ASSERT(m_pendingReads.isEmpty()); |
153 ASSERT(queue.isEmpty()); | 196 ASSERT(queue.isEmpty()); |
154 | 197 |
155 queue.swap(m_queue); | 198 queue.swap(m_queue); |
156 m_totalQueueSize = 0; | 199 m_totalQueueSize = 0; |
157 readInternalPostAction(); | 200 readInternalPostAction(); |
158 } | 201 } |
159 | 202 |
160 } // namespace blink | 203 } // namespace blink |
161 | 204 |
162 #endif // ReadableStreamImpl_h | 205 #endif // ReadableStreamImpl_h |
OLD | NEW |