OLD | NEW |
| (Empty) |
1 // Copyright 2015 The Chromium Authors. All rights reserved. | |
2 // Use of this source code is governed by a BSD-style license that can be | |
3 // found in the LICENSE file. | |
4 | |
5 #include "config.h" | |
6 #include "modules/fetch/ReadableStreamDataConsumerHandle.h" | |
7 | |
8 #include "bindings/core/v8/ExceptionState.h" | |
9 #include "bindings/core/v8/ReadableStreamOperations.h" | |
10 #include "bindings/core/v8/ScriptFunction.h" | |
11 #include "bindings/core/v8/ScriptState.h" | |
12 #include "bindings/core/v8/ScriptValue.h" | |
13 #include "bindings/core/v8/V8BindingMacros.h" | |
14 #include "bindings/core/v8/V8IteratorResultValue.h" | |
15 #include "bindings/core/v8/V8RecursionScope.h" | |
16 #include "bindings/core/v8/V8Uint8Array.h" | |
17 #include "core/dom/DOMTypedArray.h" | |
18 #include "public/platform/Platform.h" | |
19 #include "public/platform/WebTaskRunner.h" | |
20 #include "public/platform/WebThread.h" | |
21 #include "public/platform/WebTraceLocation.h" | |
22 #include "wtf/Assertions.h" | |
23 #include "wtf/Functional.h" | |
24 #include "wtf/RefCounted.h" | |
25 #include <algorithm> | |
26 #include <string.h> | |
27 #include <v8.h> | |
28 | |
29 namespace blink { | |
30 | |
31 using Result = WebDataConsumerHandle::Result; | |
32 using Flags = WebDataConsumerHandle::Flags; | |
33 | |
34 // This context is not yet thread-safe. | |
35 class ReadableStreamDataConsumerHandle::ReadingContext final : public RefCounted
<ReadingContext> { | |
36 WTF_MAKE_NONCOPYABLE(ReadingContext); | |
37 public: | |
38 class OnFulfilled final : public ScriptFunction { | |
39 public: | |
40 static v8::Local<v8::Function> createFunction(ScriptState* scriptState,
PassRefPtr<ReadingContext> context) | |
41 { | |
42 return (new OnFulfilled(scriptState, context))->bindToV8Function(); | |
43 } | |
44 | |
45 ScriptValue call(ScriptValue v) override | |
46 { | |
47 bool done; | |
48 v8::Local<v8::Value> item = v.v8Value(); | |
49 ASSERT(item->IsObject()); | |
50 v8::Local<v8::Value> value = v8CallOrCrash(v8UnpackIteratorResult(v.
scriptState(), item.As<v8::Object>(), &done)); | |
51 if (done) { | |
52 m_readingContext->onReadDone(); | |
53 return v; | |
54 } | |
55 if (!V8Uint8Array::hasInstance(value, v.isolate())) { | |
56 m_readingContext->onRejected(); | |
57 return ScriptValue(); | |
58 } | |
59 m_readingContext->onRead(V8Uint8Array::toImpl(value.As<v8::Object>()
)); | |
60 return v; | |
61 } | |
62 | |
63 private: | |
64 OnFulfilled(ScriptState* scriptState, PassRefPtr<ReadingContext> context
) | |
65 : ScriptFunction(scriptState), m_readingContext(context) {} | |
66 | |
67 RefPtr<ReadingContext> m_readingContext; | |
68 }; | |
69 | |
70 class OnRejected final : public ScriptFunction { | |
71 public: | |
72 static v8::Local<v8::Function> createFunction(ScriptState* scriptState,
PassRefPtr<ReadingContext> context) | |
73 { | |
74 return (new OnRejected(scriptState, context))->bindToV8Function(); | |
75 } | |
76 | |
77 ScriptValue call(ScriptValue v) override | |
78 { | |
79 m_readingContext->onRejected(); | |
80 return v; | |
81 } | |
82 | |
83 private: | |
84 OnRejected(ScriptState* scriptState, PassRefPtr<ReadingContext> context) | |
85 : ScriptFunction(scriptState), m_readingContext(context) {} | |
86 | |
87 RefPtr<ReadingContext> m_readingContext; | |
88 }; | |
89 | |
90 class ReaderImpl final : public FetchDataConsumerHandle::Reader { | |
91 public: | |
92 ReaderImpl(PassRefPtr<ReadingContext> context, Client* client) | |
93 : m_readingContext(context) | |
94 { | |
95 m_readingContext->attachReader(client); | |
96 } | |
97 ~ReaderImpl() override | |
98 { | |
99 m_readingContext->detachReader(); | |
100 } | |
101 | |
102 Result read(void* buffer, size_t size, Flags flags, size_t* readSize) ov
erride | |
103 { | |
104 *readSize = 0; | |
105 const void* src = nullptr; | |
106 size_t available; | |
107 Result r = beginRead(&src, flags, &available); | |
108 if (r != WebDataConsumerHandle::Ok) | |
109 return r; | |
110 *readSize = std::min(available, size); | |
111 memcpy(buffer, src, *readSize); | |
112 return endRead(*readSize); | |
113 } | |
114 | |
115 Result beginRead(const void** buffer, Flags, size_t* available) override | |
116 { | |
117 return m_readingContext->beginRead(buffer, available); | |
118 } | |
119 | |
120 Result endRead(size_t readSize) override | |
121 { | |
122 return m_readingContext->endRead(readSize); | |
123 } | |
124 | |
125 private: | |
126 RefPtr<ReadingContext> m_readingContext; | |
127 }; | |
128 | |
129 static PassRefPtr<ReadingContext> create(ScriptState* scriptState, v8::Local
<v8::Value> stream) | |
130 { | |
131 return adoptRef(new ReadingContext(scriptState, stream)); | |
132 } | |
133 | |
134 void attachReader(WebDataConsumerHandle::Client* client) | |
135 { | |
136 m_client = client; | |
137 notifyLater(); | |
138 } | |
139 | |
140 void detachReader() | |
141 { | |
142 m_client = nullptr; | |
143 } | |
144 | |
145 Result beginRead(const void** buffer, size_t* available) | |
146 { | |
147 *buffer = nullptr; | |
148 *available = 0; | |
149 if (m_hasError) | |
150 return WebDataConsumerHandle::UnexpectedError; | |
151 if (m_isDone) | |
152 return WebDataConsumerHandle::Done; | |
153 | |
154 if (m_pendingBuffer) { | |
155 ASSERT(m_pendingOffset < m_pendingBuffer->length()); | |
156 *buffer = m_pendingBuffer->data() + m_pendingOffset; | |
157 *available = m_pendingBuffer->length() - m_pendingOffset; | |
158 return WebDataConsumerHandle::Ok; | |
159 } | |
160 ASSERT(!m_reader.isEmpty()); | |
161 m_isInRecursion = true; | |
162 if (!m_isReading) { | |
163 m_isReading = true; | |
164 ScriptState::Scope scope(m_reader.scriptState()); | |
165 V8RecursionScope recursionScope(m_reader.isolate()); | |
166 ReadableStreamOperations::read(m_reader.scriptState(), m_reader.v8Va
lue()).then( | |
167 OnFulfilled::createFunction(m_reader.scriptState(), this), | |
168 OnRejected::createFunction(m_reader.scriptState(), this)); | |
169 // Note: Microtasks may run here. | |
170 } | |
171 m_isInRecursion = false; | |
172 return WebDataConsumerHandle::ShouldWait; | |
173 } | |
174 | |
175 Result endRead(size_t readSize) | |
176 { | |
177 ASSERT(m_pendingBuffer); | |
178 ASSERT(m_pendingOffset + readSize <= m_pendingBuffer->length()); | |
179 m_pendingOffset += readSize; | |
180 if (m_pendingOffset == m_pendingBuffer->length()) { | |
181 m_pendingBuffer = nullptr; | |
182 m_pendingOffset = 0; | |
183 } | |
184 return WebDataConsumerHandle::Ok; | |
185 } | |
186 | |
187 void onRead(DOMUint8Array* buffer) | |
188 { | |
189 ASSERT(m_isReading); | |
190 ASSERT(buffer); | |
191 ASSERT(!m_pendingBuffer); | |
192 ASSERT(!m_pendingOffset); | |
193 m_isReading = false; | |
194 m_pendingBuffer = buffer; | |
195 notify(); | |
196 } | |
197 | |
198 void onReadDone() | |
199 { | |
200 ASSERT(m_isReading); | |
201 ASSERT(!m_pendingBuffer); | |
202 m_isReading = false; | |
203 m_isDone = true; | |
204 m_reader.clear(); | |
205 notify(); | |
206 } | |
207 | |
208 void onRejected() | |
209 { | |
210 ASSERT(m_isReading); | |
211 ASSERT(!m_pendingBuffer); | |
212 m_hasError = true; | |
213 m_isReading = false; | |
214 m_reader.clear(); | |
215 notify(); | |
216 } | |
217 | |
218 void notify() | |
219 { | |
220 if (!m_client) | |
221 return; | |
222 if (m_isInRecursion) { | |
223 notifyLater(); | |
224 return; | |
225 } | |
226 m_client->didGetReadable(); | |
227 } | |
228 | |
229 void notifyLater() | |
230 { | |
231 ASSERT(m_client); | |
232 Platform::current()->currentThread()->taskRunner()->postTask(BLINK_FROM_
HERE, bind(&ReadingContext::notify, PassRefPtr<ReadingContext>(this))); | |
233 } | |
234 | |
235 private: | |
236 ReadingContext(ScriptState* scriptState, v8::Local<v8::Value> stream) | |
237 : m_client(nullptr) | |
238 , m_pendingOffset(0) | |
239 , m_isReading(false) | |
240 , m_isDone(false) | |
241 , m_hasError(false) | |
242 , m_isInRecursion(false) | |
243 { | |
244 if (!ReadableStreamOperations::isLocked(scriptState, stream)) { | |
245 // Here the stream implementation must not throw an exception. | |
246 NonThrowableExceptionState es; | |
247 m_reader = ReadableStreamOperations::getReader(scriptState, stream,
es); | |
248 } | |
249 m_hasError = m_reader.isEmpty(); | |
250 } | |
251 | |
252 // This ScriptValue is leaky because it stores a strong reference to a | |
253 // JavaScript object. | |
254 // TODO(yhirano): Fix it. | |
255 // | |
256 // Holding a ScriptValue here is safe in terms of cross-world wrapper | |
257 // leakage because we read only Uint8Array chunks from the reader. | |
258 ScriptValue m_reader; | |
259 WebDataConsumerHandle::Client* m_client; | |
260 RefPtr<DOMUint8Array> m_pendingBuffer; | |
261 size_t m_pendingOffset; | |
262 bool m_isReading; | |
263 bool m_isDone; | |
264 bool m_hasError; | |
265 bool m_isInRecursion; | |
266 }; | |
267 | |
268 ReadableStreamDataConsumerHandle::ReadableStreamDataConsumerHandle(ScriptState*
scriptState, v8::Local<v8::Value> stream) | |
269 : m_readingContext(ReadingContext::create(scriptState, stream)) | |
270 { | |
271 } | |
272 ReadableStreamDataConsumerHandle::~ReadableStreamDataConsumerHandle() = default; | |
273 | |
274 FetchDataConsumerHandle::Reader* ReadableStreamDataConsumerHandle::obtainReaderI
nternal(Client* client) | |
275 { | |
276 return new ReadingContext::ReaderImpl(m_readingContext, client); | |
277 } | |
278 | |
279 } // namespace blink | |
280 | |
OLD | NEW |