Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(830)

Side by Side Diff: third_party/WebKit/Source/modules/fetch/BytesConsumer.cpp

Issue 2269953004: Implment BytesConsumer::tee (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: fix Created 4 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2016 The Chromium Authors. All rights reserved. 1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "modules/fetch/BytesConsumer.h" 5 #include "modules/fetch/BytesConsumer.h"
6 6
7 #include "modules/fetch/BytesConsumerForDataConsumerHandle.h"
8 #include "modules/fetch/FetchBlobDataConsumerHandle.h"
9 #include "platform/blob/BlobData.h"
10 #include "wtf/RefPtr.h"
7 #include <algorithm> 11 #include <algorithm>
8 #include <string.h> 12 #include <string.h>
9 13
10 namespace blink { 14 namespace blink {
11 15
16 namespace {
17
18 class NoopClient final : public GarbageCollectedFinalized<NoopClient>, public By tesConsumer::Client {
19 USING_GARBAGE_COLLECTED_MIXIN(NoopClient);
20 public:
21 void onStateChange() override {}
22 };
23
24 class Tee final : public GarbageCollectedFinalized<Tee>, public BytesConsumer::C lient {
25 USING_GARBAGE_COLLECTED_MIXIN(Tee);
26 public:
27 explicit Tee(BytesConsumer* consumer)
28 : m_src(consumer)
29 , m_destination1(new Destination(this))
30 , m_destination2(new Destination(this))
31 {
32 consumer->setClient(this);
33 // As no client is set to either destinations, Destination::notify() is
34 // no-op in this function.
35 onStateChange();
36 }
37
38 void onStateChange() override
39 {
40 bool destination1WasEmpty = m_destination1->isEmpty();
41 bool destination2WasEmpty = m_destination2->isEmpty();
42 bool hasEnqueued = false;
43
44 while (true) {
45 const char* buffer = nullptr;
46 size_t available = 0;
47 Result result = m_src->beginRead(&buffer, &available);
48 if (result == Result::ShouldWait) {
49 if (hasEnqueued && destination1WasEmpty)
50 m_destination1->notify();
51 if (hasEnqueued && destination2WasEmpty)
52 m_destination2->notify();
53 return;
54 }
55 if (result == Result::Done) {
56 if (destination1WasEmpty)
57 m_destination1->notify();
58 if (destination2WasEmpty)
59 m_destination2->notify();
60 return;
61 }
62 if (result == Result::Error) {
63 clearAndNotify();
64 return;
65 }
66 DCHECK_EQ(Result::Ok, result);
67 Bytes* bytes = new Bytes(buffer, available);
68 if (m_src->endRead(available) != Result::Ok) {
69 clearAndNotify();
70 return;
71 }
72 m_destination1->enqueue(bytes);
73 m_destination2->enqueue(bytes);
74 hasEnqueued = true;
75 }
76 }
77
78 BytesConsumer::PublicState getPublicState() const
79 {
80 return m_src->getPublicState();
81 }
82
83 BytesConsumer::Error getError() const
84 {
85 return m_src->getError();
86 }
87
88 void cancel()
89 {
90 if (!m_destination1->isCancelled() || !m_destination2->isCancelled())
91 return;
92 m_src->cancel();
93 }
94
95 BytesConsumer* destination1() const { return m_destination1; }
96 BytesConsumer* destination2() const { return m_destination2; }
97
98 DEFINE_INLINE_TRACE()
99 {
100 visitor->trace(m_src);
101 visitor->trace(m_destination1);
102 visitor->trace(m_destination2);
103 BytesConsumer::Client::trace(visitor);
104 }
105
106 private:
107 using Result = BytesConsumer::Result;
108 class Bytes final : public GarbageCollectedFinalized<Bytes> {
109 public:
110 Bytes(const char* data, size_t size)
111 {
112 m_buffer.reserveCapacity(size);
113 m_buffer.append(data, size);
114 }
115 const char* data() const { return m_buffer.data(); }
116 size_t size() const { return m_buffer.size(); }
117
118 DEFINE_INLINE_TRACE() {}
119
120 private:
121 Vector<char> m_buffer;
122 };
123
124 class Destination final : public BytesConsumer {
125 public:
126 explicit Destination(Tee* tee)
127 : m_tee(tee)
128 {
129 }
130
131 Result beginRead(const char** buffer, size_t* available) override
132 {
133 *buffer = nullptr;
134 *available = 0;
135 if (m_isCancelled)
136 return Result::Done;
137 if (m_chunks.isEmpty()) {
hiroshige 2016/08/30 07:42:47 How about turning this if statement into |if (!m_c
yhirano 2016/09/02 11:17:19 Done.
138 switch (m_tee->getPublicState()) {
139 case BytesConsumer::PublicState::ReadableOrWaiting:
140 return Result::ShouldWait;
141 case BytesConsumer::PublicState::Closed:
142 clearClientIfClosedOrErrored();
hiroshige 2016/08/30 07:42:47 Is clearClient() sufficient?
yhirano 2016/09/02 11:17:19 Done.
143 return Result::Done;
144 case BytesConsumer::PublicState::Errored:
145 clearClientIfClosedOrErrored();
hiroshige 2016/08/30 07:42:47 ditto.
yhirano 2016/09/02 11:17:19 Done.
146 return Result::Error;
147 }
148 }
149 Bytes* bytes = m_chunks[0];
150 DCHECK_LE(m_offset, bytes->size());
151 *buffer = bytes->data() + m_offset;
152 *available = bytes->size() - m_offset;
153 return Result::Ok;
154 }
155
156 Result endRead(size_t read) override
157 {
158 DCHECK(!m_chunks.isEmpty());
hiroshige 2016/08/30 07:47:38 This DCHECK means clearChunks() and thus onStateCh
yhirano 2016/09/02 11:17:19 Done.
159 Bytes* bytes = m_chunks[0];
160 DCHECK_LE(m_offset + read, bytes->size());
161 if (bytes->size() == m_offset + read) {
162 m_offset = 0;
163 m_chunks.removeFirst();
164 return Result::Ok;
165 }
166 m_offset += read;
167 return Result::Ok;
168 }
169
170 void setClient(BytesConsumer::Client* client) override
171 {
172 DCHECK(!m_client);
173 DCHECK(client);
174 auto state = getPublicState();
175 if (state == PublicState::Closed || state == PublicState::Errored)
176 return;
177 m_client = client;
178 }
179
180 void clearClient() override
181 {
182 DCHECK(m_client || getPublicState() == PublicState::Closed || getPub licState() == PublicState::Errored);
183 m_client = nullptr;
184 }
185
186 void cancel() override
187 {
188 auto state = getPublicState();
189 if (state == PublicState::Closed || state == PublicState::Errored)
hiroshige 2016/08/30 07:42:47 When |m_isCancelled| is false and |m_chunks.isEmpt
yhirano 2016/09/02 11:17:19 Done.
190 return;
191 m_isCancelled = true;
192 m_chunks.clear();
193 clearClient();
194 m_tee->cancel();
195 }
196
197 PublicState getPublicState() const override
198 {
199 if (m_isCancelled)
200 return PublicState::Closed;
201 if (!m_chunks.isEmpty())
202 return PublicState::ReadableOrWaiting;
203 return m_tee->getPublicState();
204 }
205
206 Error getError() const override { return m_tee->getError(); }
207
208 String debugName() const override { return "Tee::Destination"; }
209
210 void enqueue(Bytes* chunk)
211 {
212 if (m_isCancelled)
213 return;
214 m_chunks.append(chunk);
215 }
216
217 bool isEmpty() const { return m_chunks.isEmpty(); }
218
219 void clearChunks()
220 {
221 m_chunks.clear();
222 m_offset = 0;
223 }
224
225 void notify()
226 {
227 if (m_client) {
228 DCHECK(!m_isCancelled);
229 m_client->onStateChange();
230 clearClientIfClosedOrErrored();
231 }
232 }
233
234 bool isCancelled() const { return m_isCancelled; }
235
236 DEFINE_INLINE_TRACE()
237 {
238 visitor->trace(m_tee);
239 visitor->trace(m_client);
240 visitor->trace(m_chunks);
241 BytesConsumer::trace(visitor);
242 }
243
244 private:
245 void clearClientIfClosedOrErrored()
246 {
247 auto state = getPublicState();
248 if (state == PublicState::Closed || state == PublicState::Errored)
249 clearClient();
250 }
251
252 Member<Tee> m_tee;
253 Member<BytesConsumer::Client> m_client;
254 HeapDeque<Member<Bytes>> m_chunks;
255 size_t m_offset = 0;
256 bool m_isCancelled = false;
257 };
258
259
260 void clearAndNotify()
261 {
262 m_destination1->clearChunks();
263 m_destination2->clearChunks();
264 m_destination1->notify();
265 m_destination2->notify();
266 }
267
268 Member<BytesConsumer> m_src;
269 Member<Destination> m_destination1;
270 Member<Destination> m_destination2;
271 };
272
273 } // namespace
274
12 BytesConsumer::Result BytesConsumer::read(char* buffer, size_t size, size_t* rea dSize) 275 BytesConsumer::Result BytesConsumer::read(char* buffer, size_t size, size_t* rea dSize)
13 { 276 {
14 *readSize = 0; 277 *readSize = 0;
15 const char* src = nullptr; 278 const char* src = nullptr;
16 size_t available; 279 size_t available;
17 Result r = beginRead(&src, &available); 280 Result r = beginRead(&src, &available);
18 if (r != Result::Ok) 281 if (r != Result::Ok)
19 return r; 282 return r;
20 *readSize = std::min(available, size); 283 *readSize = std::min(available, size);
21 memcpy(buffer, src, *readSize); 284 memcpy(buffer, src, *readSize);
22 return endRead(*readSize); 285 return endRead(*readSize);
23 } 286 }
24 287
288 void BytesConsumer::tee(ExecutionContext* executionContext, BytesConsumer* src, BytesConsumer** dest1, BytesConsumer** dest2)
289 {
290 RefPtr<BlobDataHandle> blobDataHandle = src->drainAsBlobDataHandle(BlobSizeP olicy::AllowBlobWithInvalidSize);
291 if (blobDataHandle) {
292 // Register a client in order to be consistent.
293 src->setClient(new NoopClient);
294 // TODO(yhirano): Do not use FetchBlobDataConsumerHandle.
295 *dest1 = new BytesConsumerForDataConsumerHandle(FetchBlobDataConsumerHan dle::create(executionContext, blobDataHandle));
296 *dest2 = new BytesConsumerForDataConsumerHandle(FetchBlobDataConsumerHan dle::create(executionContext, blobDataHandle));
297 return;
298 }
299
300 Tee* tee = new Tee(src);
301 *dest1 = tee->destination1();
302 *dest2 = tee->destination2();
303 }
304
25 } // namespace blink 305 } // namespace blink
OLDNEW
« no previous file with comments | « third_party/WebKit/Source/modules/fetch/BytesConsumer.h ('k') | third_party/WebKit/Source/modules/fetch/BytesConsumerTest.cpp » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698