Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(698)

Side by Side Diff: third_party/WebKit/Source/modules/fetch/BytesConsumer.cpp

Issue 2269953004: Implment BytesConsumer::tee (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: fix Created 4 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2016 The Chromium Authors. All rights reserved. 1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "modules/fetch/BytesConsumer.h" 5 #include "modules/fetch/BytesConsumer.h"
6 6
7 #include "core/dom/ExecutionContext.h"
8 #include "core/dom/TaskRunnerHelper.h"
9 #include "modules/fetch/BytesConsumerForDataConsumerHandle.h"
10 #include "modules/fetch/FetchBlobDataConsumerHandle.h"
11 #include "platform/blob/BlobData.h"
12 #include "public/platform/WebTaskRunner.h"
13 #include "wtf/Functional.h"
14 #include "wtf/RefPtr.h"
7 #include <algorithm> 15 #include <algorithm>
8 #include <string.h> 16 #include <string.h>
9 17
10 namespace blink { 18 namespace blink {
11 19
20 namespace {
21
22 class NoopClient final : public GarbageCollectedFinalized<NoopClient>, public By tesConsumer::Client {
23 USING_GARBAGE_COLLECTED_MIXIN(NoopClient);
24 public:
25 void onStateChange() override {}
26 };
27
28 class Tee final : public GarbageCollectedFinalized<Tee>, public BytesConsumer::C lient {
29 USING_GARBAGE_COLLECTED_MIXIN(Tee);
30 public:
31 Tee(ExecutionContext* executionContext, BytesConsumer* consumer)
32 : m_src(consumer)
33 , m_destination1(new Destination(executionContext, this))
34 , m_destination2(new Destination(executionContext, this))
35 {
36 consumer->setClient(this);
37 // As no client is set to either destinations, Destination::notify() is
38 // no-op in this function.
39 onStateChange();
40 }
41
42 void onStateChange() override
43 {
44 bool destination1WasEmpty = m_destination1->isEmpty();
45 bool destination2WasEmpty = m_destination2->isEmpty();
46 bool hasEnqueued = false;
47
48 while (true) {
49 const char* buffer = nullptr;
50 size_t available = 0;
51 Result result = m_src->beginRead(&buffer, &available);
52 if (result == Result::ShouldWait) {
53 if (hasEnqueued && destination1WasEmpty)
54 m_destination1->notify();
55 if (hasEnqueued && destination2WasEmpty)
56 m_destination2->notify();
57 return;
58 }
59 if (result == Result::Done) {
60 if (destination1WasEmpty)
61 m_destination1->notify();
62 if (destination2WasEmpty)
63 m_destination2->notify();
64 return;
65 }
66 if (result == Result::Error) {
67 clearAndNotify();
68 return;
69 }
70 DCHECK_EQ(Result::Ok, result);
71 Bytes* bytes = new Bytes(buffer, available);
72 if (m_src->endRead(available) != Result::Ok) {
73 clearAndNotify();
74 return;
75 }
76 m_destination1->enqueue(bytes);
77 m_destination2->enqueue(bytes);
78 hasEnqueued = true;
79 }
80 }
81
82 BytesConsumer::PublicState getPublicState() const
83 {
84 return m_src->getPublicState();
85 }
86
87 BytesConsumer::Error getError() const
88 {
89 return m_src->getError();
90 }
91
92 void cancel()
93 {
94 if (!m_destination1->isCancelled() || !m_destination2->isCancelled())
95 return;
96 m_src->cancel();
97 }
98
99 BytesConsumer* destination1() const { return m_destination1; }
100 BytesConsumer* destination2() const { return m_destination2; }
101
102 DEFINE_INLINE_TRACE()
103 {
104 visitor->trace(m_src);
105 visitor->trace(m_destination1);
106 visitor->trace(m_destination2);
107 BytesConsumer::Client::trace(visitor);
108 }
109
110 private:
111 using Result = BytesConsumer::Result;
112 class Bytes final : public GarbageCollectedFinalized<Bytes> {
113 public:
114 Bytes(const char* data, size_t size)
115 {
116 m_buffer.reserveCapacity(size);
117 m_buffer.append(data, size);
118 }
119 const char* data() const { return m_buffer.data(); }
120 size_t size() const { return m_buffer.size(); }
121
122 DEFINE_INLINE_TRACE() {}
123
124 private:
125 Vector<char> m_buffer;
126 };
127
128 class Destination final : public BytesConsumer {
129 public:
130 Destination(ExecutionContext* executionContext, Tee* tee)
131 : m_executionContext(executionContext)
132 , m_tee(tee)
133 {
134 }
135
136 Result beginRead(const char** buffer, size_t* available) override
137 {
138 DCHECK(!m_chunkInUse);
139 *buffer = nullptr;
140 *available = 0;
141 if (m_isCancelled || m_isClosed)
142 return Result::Done;
143 if (!m_chunks.isEmpty()) {
144 Bytes* bytes = m_chunks[0];
hiroshige 2016/09/07 09:17:47 We use "bytes" and "chunks" for the same thing. Ca
yhirano 2016/09/07 10:51:34 Done.
145 DCHECK_LE(m_offset, bytes->size());
146 *buffer = bytes->data() + m_offset;
147 *available = bytes->size() - m_offset;
148 m_chunkInUse = bytes;
149 return Result::Ok;
150 }
151 switch (m_tee->getPublicState()) {
152 case PublicState::ReadableOrWaiting:
153 return Result::ShouldWait;
154 case PublicState::Closed:
155 m_isClosed = true;
156 clearClient();
157 return Result::Done;
158 case PublicState::Errored:
159 clearClient();
160 return Result::Error;
161 }
162 NOTREACHED();
163 return Result::Error;
164 }
165
166 Result endRead(size_t read) override
167 {
168 DCHECK(m_chunkInUse);
hiroshige 2016/09/07 09:17:47 optional: Adding DCHECK(m_chunkInUse == m_chunks[0
yhirano 2016/09/07 10:51:34 Done.
169 m_chunkInUse = nullptr;
170 if (m_chunks.isEmpty()) {
171 // This object becomes errored during the two-phase read.
172 DCHECK_EQ(PublicState::Errored, getPublicState());
173 return Result::Ok;
174 }
175 Bytes* bytes = m_chunks[0];
176 DCHECK_LE(m_offset + read, bytes->size());
177 m_offset += read;
178 if (bytes->size() == m_offset) {
179 m_offset = 0;
180 m_chunks.removeFirst();
181 }
182 if (m_chunks.isEmpty() && m_tee->getPublicState() == PublicState::Cl osed) {
183 // All data has been consumed.
184 TaskRunnerHelper::get(TaskType::Networking, m_executionContext)- >postTask(BLINK_FROM_HERE, WTF::bind(&Destination::close, wrapPersistent(this))) ;
185 }
186 return Result::Ok;
187 }
188
189 void setClient(BytesConsumer::Client* client) override
190 {
191 DCHECK(!m_client);
192 DCHECK(client);
193 auto state = getPublicState();
194 if (state == PublicState::Closed || state == PublicState::Errored)
195 return;
196 m_client = client;
197 }
198
199 void clearClient() override
200 {
201 m_client = nullptr;
202 }
203
204 void cancel() override
205 {
206 DCHECK(!m_chunkInUse);
207 auto state = getPublicState();
208 if (state == PublicState::Closed || state == PublicState::Errored)
209 return;
210 m_isCancelled = true;
211 m_chunks.clear();
212 clearClient();
213 m_tee->cancel();
214 }
215
216 PublicState getPublicState() const override
217 {
218 if (m_isCancelled || m_isClosed)
219 return PublicState::Closed;
220 auto state = m_tee->getPublicState();
221 // We don't say this object is closed unless m_isCancelled or
222 // m_isClosed is set.
223 return state == PublicState::Closed ? PublicState::ReadableOrWaiting : state;
224 }
225
226 Error getError() const override { return m_tee->getError(); }
227
228 String debugName() const override { return "Tee::Destination"; }
229
230 void enqueue(Bytes* chunk)
231 {
232 if (m_isCancelled)
233 return;
234 m_chunks.append(chunk);
235 }
236
237 bool isEmpty() const { return m_chunks.isEmpty(); }
238
239 void clearChunks()
240 {
241 m_chunks.clear();
242 m_offset = 0;
243 }
244
245 void notify()
246 {
247 if (m_isCancelled || m_isClosed)
248 return;
249 if (m_chunks.isEmpty() && m_tee->getPublicState() == PublicState::Cl osed) {
250 close();
251 return;
252 }
253 if (m_client) {
254 m_client->onStateChange();
255 if (getPublicState() == PublicState::Errored)
256 clearClient();
257 }
258 }
259
260 bool isCancelled() const { return m_isCancelled; }
261
262 DEFINE_INLINE_TRACE()
263 {
264 visitor->trace(m_executionContext);
265 visitor->trace(m_tee);
266 visitor->trace(m_client);
267 visitor->trace(m_chunks);
268 visitor->trace(m_chunkInUse);
269 BytesConsumer::trace(visitor);
270 }
271
272 private:
273 void close()
274 {
275 DCHECK_EQ(PublicState::Closed, m_tee->getPublicState());
276 DCHECK(m_chunks.isEmpty());
277 if (m_isClosed || m_isCancelled) {
278 // It's possible to reach here because this function can be
279 // called asynchronously.
280 return;
281 }
282 DCHECK_EQ(PublicState::ReadableOrWaiting, getPublicState());
283 m_isClosed = true;
284 if (m_client) {
285 m_client->onStateChange();
286 clearClient();
287 }
288 }
289
290 Member<ExecutionContext> m_executionContext;
291 Member<Tee> m_tee;
292 Member<BytesConsumer::Client> m_client;
293 HeapDeque<Member<Bytes>> m_chunks;
294 Member<Bytes> m_chunkInUse;
295 size_t m_offset = 0;
296 bool m_isCancelled = false;
297 bool m_isClosed = false;
298 };
299
300 void clearAndNotify()
301 {
302 m_destination1->clearChunks();
303 m_destination2->clearChunks();
304 m_destination1->notify();
305 m_destination2->notify();
306 }
307
308 Member<BytesConsumer> m_src;
309 Member<Destination> m_destination1;
310 Member<Destination> m_destination2;
311 };
312
313 } // namespace
314
12 BytesConsumer::Result BytesConsumer::read(char* buffer, size_t size, size_t* rea dSize) 315 BytesConsumer::Result BytesConsumer::read(char* buffer, size_t size, size_t* rea dSize)
13 { 316 {
14 *readSize = 0; 317 *readSize = 0;
15 const char* src = nullptr; 318 const char* src = nullptr;
16 size_t available; 319 size_t available;
17 Result r = beginRead(&src, &available); 320 Result r = beginRead(&src, &available);
18 if (r != Result::Ok) 321 if (r != Result::Ok)
19 return r; 322 return r;
20 *readSize = std::min(available, size); 323 *readSize = std::min(available, size);
21 memcpy(buffer, src, *readSize); 324 memcpy(buffer, src, *readSize);
22 return endRead(*readSize); 325 return endRead(*readSize);
23 } 326 }
24 327
328 void BytesConsumer::tee(ExecutionContext* executionContext, BytesConsumer* src, BytesConsumer** dest1, BytesConsumer** dest2)
329 {
330 RefPtr<BlobDataHandle> blobDataHandle = src->drainAsBlobDataHandle(BlobSizeP olicy::AllowBlobWithInvalidSize);
331 if (blobDataHandle) {
332 // Register a client in order to be consistent.
333 src->setClient(new NoopClient);
334 // TODO(yhirano): Do not use FetchBlobDataConsumerHandle.
335 *dest1 = new BytesConsumerForDataConsumerHandle(FetchBlobDataConsumerHan dle::create(executionContext, blobDataHandle));
336 *dest2 = new BytesConsumerForDataConsumerHandle(FetchBlobDataConsumerHan dle::create(executionContext, blobDataHandle));
337 return;
338 }
339
340 Tee* tee = new Tee(executionContext, src);
341 *dest1 = tee->destination1();
342 *dest2 = tee->destination2();
343 }
344
25 } // namespace blink 345 } // namespace blink
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698