Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(652)

Side by Side Diff: Source/modules/fetch/BodyStreamBuffer.h

Issue 1192913007: Change BodyStreamBuffer to be FetchDataConsumerHandle-based and enable backpressure in Fetch API (Closed) Base URL: https://chromium.googlesource.com/chromium/blink.git@master
Patch Set: (temp) alternative to calling didGetReadable in sync. Created 5 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2014 The Chromium Authors. All rights reserved. 1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #ifndef BodyStreamBuffer_h 5 #ifndef BodyStreamBuffer_h
6 #define BodyStreamBuffer_h 6 #define BodyStreamBuffer_h
7 7
8 #include "core/dom/DOMException.h" 8 #include "core/dom/DOMException.h"
9 #include "modules/ModulesExport.h" 9 #include "modules/ModulesExport.h"
10 #include "modules/fetch/DataConsumerHandleUtil.h"
11 #include "modules/fetch/FetchDataConsumerHandle.h"
12 #include "modules/fetch/FetchDataLoader.h"
10 #include "platform/blob/BlobData.h" 13 #include "platform/blob/BlobData.h"
11 #include "platform/heap/Heap.h" 14 #include "platform/heap/Heap.h"
12 #include "public/platform/WebDataConsumerHandle.h" 15 #include "public/platform/WebDataConsumerHandle.h"
13 #include "wtf/Deque.h" 16 #include "wtf/Deque.h"
14 #include "wtf/RefPtr.h" 17 #include "wtf/RefPtr.h"
15 #include "wtf/text/WTFString.h" 18 #include "wtf/text/WTFString.h"
16 19
17 namespace blink { 20 namespace blink {
18 21
19 class DOMArrayBuffer; 22 class DOMArrayBuffer;
20 23
24 /*
25 // #define createDebugHandleAlways(handle) FetchDebugDataConsumerHandle::create( handle, __FILE__, __LINE__)
yhirano 2015/07/03 04:42:28 Can we delete this block?
hiroshige 2015/07/05 07:30:25 Done.
26 #define createDebugHandleAlways(handle) (handle)
27 #define createDebugHandle(handle) (handle)
28
29 class FetchDebugDataConsumerHandle : public FetchDataConsumerHandle {
30 public:
31 static PassOwnPtr<FetchDataConsumerHandle> create(PassOwnPtr<FetchDataConsum erHandle> handle, const char* file, int line) { return adoptPtr(new FetchDebugDa taConsumerHandle(handle, file, line)); }
32 private:
33 FetchDebugDataConsumerHandle(PassOwnPtr<FetchDataConsumerHandle> handle, con st char *file, int line)
34 : m_handle(handle)
35 , m_name(m_handle->debugName()) { }
36
37 class ReaderImpl final : public FetchDataConsumerHandle::Reader {
38 public:
39 ReaderImpl(FetchDebugDataConsumerHandle* handle, PassOwnPtr<FetchDataCon sumerHandle::Reader> reader) : m_handle(handle), m_name(handle->debugName()), m_ reader(reader), m_isInTwoPhaseRead(false) { }
40 ~ReaderImpl()
41 {
42 ASSERT(!m_isInTwoPhaseRead);
43 print("dtor\n", (long long)currentThread(), this);
44 }
45 Result read(void* data, size_t size, Flags flags, size_t* readSize) over ride
46 {
47 ASSERT(!m_isInTwoPhaseRead);
48 print("read(%lld)\n", (long long)size);
49 Result result = m_reader->read(data, size, flags, readSize);
50 print("read(%lld) -> %d (%lld)\n", (long long)size, result, (long lo ng)*readSize);
51 return result;
52 }
53
54 Result beginRead(const void** buffer, Flags flags, size_t* available) ov erride
55 {
56 ASSERT(!m_isInTwoPhaseRead);
57 print("beginRead()\n");
58 Result result = m_reader->beginRead(buffer, flags, available);
59 print("beginRead() -> %d (%lld)\n", result, (long long)*available);
60 if (result == Ok)
61 m_isInTwoPhaseRead = true;
62 return result;
63 }
64 Result endRead(size_t readSize) override
65 {
66 ASSERT(m_isInTwoPhaseRead);
67 m_isInTwoPhaseRead = false;
68 print("endRead(%lld)\n", (long long)readSize);
69 Result result = m_reader->endRead(readSize);
70 print("endRead(%lld) -> %d\n", (long long)readSize, result);
71 return result;
72 }
73 PassRefPtr<BlobDataHandle> drainAsBlobDataHandle(bool allowInvalidSize) override
74 {
75 print("drainAsBlobDataHandle(%d)\n", (int)allowInvalidSize);
76 RefPtr<BlobDataHandle> blobDataHandle = m_reader->drainAsBlobDataHan dle(allowInvalidSize);
77 print("drainAsBlobDataHandle() -> %p\n", blobDataHandle.get());
78 return blobDataHandle.release();
79 }
80 void print(const char* format, ...)
81 {
82 fprintf(stderr, "[Thread:%lld] [Handle:%p (%s)] [Reader:%p] [TargetR eader:%p]: ",
83 (long long)currentThread(),
84 m_handle,
85 m_name.c_str(),
86 this,
87 m_reader.get());
88
89 va_list args;
90 va_start(args, format);
91 vfprintf(stderr, format, args);
92 va_end(args);
93 }
94 private:
95 FetchDebugDataConsumerHandle* m_handle; // debugging only
96 std::string m_name;
97 OwnPtr<FetchDataConsumerHandle::Reader> m_reader;
98 bool m_isInTwoPhaseRead;
99 };
100 class ClientWrapper : public Client {
101 public:
102 ClientWrapper(FetchDebugDataConsumerHandle* handle, Client* client) : m_ handle(handle), m_name(handle->debugName()), m_reader(nullptr), m_client(client) { }
103 void didGetReadable() override
104 {
105 print("didGetReadable\n");
106 if (m_client)
107 m_client->didGetReadable();
108 print("didGetReadable done\n");
109 }
110 void setReader(Reader* reader) { m_reader = reader; }
111
112 void print(const char* format, ...)
113 {
114 fprintf(stderr, "[Thread:%lld] [Handle:%p (%s)] [Reader:%p] [Client: %p] [TargetClient:%p]: ",
115 (long long)currentThread(),
116 m_handle,
117 m_name.c_str(),
118 m_reader,
119 this,
120 m_client);
121
122 va_list args;
123 va_start(args, format);
124 vfprintf(stderr, format, args);
125 va_end(args);
126 }
127 private:
128 FetchDebugDataConsumerHandle* m_handle; // debugging only
129 std::string m_name;
130 Reader* m_reader; // logging only
131
132 Client* m_client;
133 };
134 Reader* obtainReaderInternal(Client* client) override
135 {
136 print("obtainReaderInternal(Client=%p)\n", client);
137 ClientWrapper* clientWrapper = new ClientWrapper(this, client);
138 Reader* reader = new ReaderImpl(this, m_handle->obtainReader(clientWrapp er)); // FIXME: Leaking
139 clientWrapper->setReader(reader);
140 print("obtainReaderInternal(Client=%p) -> %p\n", client, reader);
141 return reader;
142 }
143 void print(const char* format, ...)
144 {
145 fprintf(stderr, "[Thread:%lld] [Handle:%p (%s)]: ",
146 (long long)currentThread(),
147 this,
148 debugName());
149
150 va_list args;
151 va_start(args, format);
152 vfprintf(stderr, format, args);
153 va_end(args);
154 }
155
156 const char* debugName() const override { return m_name.c_str(); }
157
158 OwnPtr<FetchDataConsumerHandle> m_handle;
159 std::string m_name;
160 };
161 */
162
163 class DrainingBodyStreamBuffer;
164
21 class MODULES_EXPORT BodyStreamBuffer final : public GarbageCollectedFinalized<B odyStreamBuffer> { 165 class MODULES_EXPORT BodyStreamBuffer final : public GarbageCollectedFinalized<B odyStreamBuffer> {
22 public: 166 public:
23 class Observer : public GarbageCollectedFinalized<Observer> { 167 static BodyStreamBuffer* create(PassOwnPtr<FetchDataConsumerHandle> handle) { return new BodyStreamBuffer(handle); }
168 static BodyStreamBuffer* createEmpty();
169
170 FetchDataConsumerHandle* handle() const;
171 PassOwnPtr<FetchDataConsumerHandle> releaseHandle();
172
173 class DrainingStreamNotificationClient : public GarbageCollectedMixin {
24 public: 174 public:
25 virtual ~Observer() { } 175 virtual ~DrainingStreamNotificationClient() { }
26 virtual void onWrite() = 0; 176 // Called after FetchDataLoader::Client methods.
27 virtual void onClose() = 0; 177 virtual void didFetchDataLoadFinishedFromDrainingStream() = 0;
28 virtual void onError() = 0;
29 DEFINE_INLINE_VIRTUAL_TRACE() { }
30 }; 178 };
31 179
32 class Canceller : public GarbageCollected<Canceller> { 180 DEFINE_INLINE_TRACE()
33 public: 181 {
34 virtual void cancel() = 0; 182 visitor->trace(m_fetchDataLoader);
35 DEFINE_INLINE_VIRTUAL_TRACE() { } 183 visitor->trace(m_drainingStreamNotificationClient);
36 }; 184 }
37 185
38 class BlobHandleCreatorClient : public GarbageCollectedFinalized<BlobHandleC reatorClient> { 186 void didFetchDataLoadFinished();
39 public:
40 virtual ~BlobHandleCreatorClient() { }
41 virtual void didCreateBlobHandle(PassRefPtr<BlobDataHandle>) = 0;
42 virtual void didFail(DOMException*) = 0;
43 DEFINE_INLINE_VIRTUAL_TRACE() { }
44 };
45 explicit BodyStreamBuffer(Canceller*);
46 ~BodyStreamBuffer() { }
47
48 PassRefPtr<DOMArrayBuffer> read();
49 bool isClosed() const { return m_isClosed; }
50 bool hasError() const { return m_exception; }
51 DOMException* exception() const { return m_exception; }
52
53 // Can't call after close() or error() was called.
54 void write(PassRefPtr<DOMArrayBuffer>);
55 // Can't call after close() or error() was called.
56 void close();
57 // Can't call after close() or error() was called.
58 void error(DOMException*);
59 void cancel() { m_canceller->cancel(); }
60
61 // This function registers an observer so it fails and returns false when an
62 // observer was already registered.
63 bool readAllAndCreateBlobHandle(const String& contentType, BlobHandleCreator Client*);
64
65 // This function registers an observer so it fails and returns false when an
66 // observer was already registered.
67 bool startTee(BodyStreamBuffer* out1, BodyStreamBuffer* out2);
68
69 // When an observer was registered this function fails and returns false.
70 bool registerObserver(Observer*);
71 void unregisterObserver();
72 bool isObserverRegistered() const { return m_observer.get(); }
73 DECLARE_TRACE();
74
75 // Creates a BodyStreamBuffer from |handle| as the source.
76 // On failure, BodyStreamBuffer::error() is called with a NetworkError
77 // with |failureMessage|.
78 static BodyStreamBuffer* create(PassOwnPtr<WebDataConsumerHandle> /* handle */, const String& failureMessage);
79 187
80 private: 188 private:
81 Deque<RefPtr<DOMArrayBuffer>> m_queue; 189 explicit BodyStreamBuffer(PassOwnPtr<FetchDataConsumerHandle> handle) : m_ha ndle(handle) { }
82 bool m_isClosed; 190
83 Member<DOMException> m_exception; 191 void setDrainingStreamNotificationClient(DrainingStreamNotificationClient*);
84 Member<Observer> m_observer; 192
85 Member<Canceller> m_canceller; 193 void startLoading(FetchDataLoader*, FetchDataLoader::Client*);
194 // Call DrainingStreamNotificationClient.
195 void doDrainingStreamNotification();
196 // Clear DrainingStreamNotificationClient without calling.
197 void clearDrainingStreamNotification();
198
199 friend class DrainingBodyStreamBuffer;
200
201 OwnPtr<FetchDataConsumerHandle> m_handle;
202 Member<FetchDataLoader> m_fetchDataLoader;
203 Member<DrainingStreamNotificationClient> m_drainingStreamNotificationClient;
86 }; 204 };
87 205
206 // DrainingBodyStreamBuffer wraps BodyStreamBuffer returned from
207 // Body::createDrainingStream() and calls DrainingStreamNotificationClient
208 // callbacks unless leak() is called:
209 // - If startLoading() is called, the callback is called after loading finished.
210 // - If drainAsBlobDataHandle() is called, the callback is called immediately.
211 // - If leak() is called, the callback is no longer called.
212 // Any calls to DrainingBodyStreamBuffer methods after a call to either of
213 // methods above is no-op.
214 // After calling one of the methods above, we don't have to keep
215 // DrainingBodyStreamBuffer alive.
216 // If DrainingBodyStreamBuffer is destructed before any of above is called,
217 // the callback is called at destruction.
218 class MODULES_EXPORT DrainingBodyStreamBuffer final {
219 public:
220 static PassOwnPtr<DrainingBodyStreamBuffer> create(BodyStreamBuffer* buffer, BodyStreamBuffer::DrainingStreamNotificationClient* client)
221 {
222 return adoptPtr(new DrainingBodyStreamBuffer(buffer, client));
223 }
224 ~DrainingBodyStreamBuffer();
225 void startLoading(FetchDataLoader*, FetchDataLoader::Client*);
226 BodyStreamBuffer* leak();
yhirano 2015/07/03 04:42:28 |leakBuffer| or |releaseBuffer| might be a better
hiroshige 2015/07/05 07:30:25 Done.
227 PassRefPtr<BlobDataHandle> drainAsBlobDataHandle(FetchDataConsumerHandle::Re ader::BlobSizePolicy);
228
229 private:
230 explicit DrainingBodyStreamBuffer(BodyStreamBuffer*, BodyStreamBuffer::Drain ingStreamNotificationClient*);
yhirano 2015/07/03 04:42:28 -explicit
hiroshige 2015/07/05 07:30:25 Done.
231
232 Persistent<BodyStreamBuffer> m_buffer;
233 };
234
88 } // namespace blink 235 } // namespace blink
89 236
90 #endif // BodyStreamBuffer_h 237 #endif // BodyStreamBuffer_h
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698