Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(771)

Unified Diff: Source/modules/fetch/BodyStreamBuffer.h

Issue 1192913007: Change BodyStreamBuffer to be FetchDataConsumerHandle-based and enable backpressure in Fetch API (Closed) Base URL: https://chromium.googlesource.com/chromium/blink.git@master
Patch Set: Rebase. Created 5 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: Source/modules/fetch/BodyStreamBuffer.h
diff --git a/Source/modules/fetch/BodyStreamBuffer.h b/Source/modules/fetch/BodyStreamBuffer.h
index cdb47baab116261c68167786d96f4c924f8e15d1..30922d7eb3ba07188b3ed6541692eb6acf5b0002 100644
--- a/Source/modules/fetch/BodyStreamBuffer.h
+++ b/Source/modules/fetch/BodyStreamBuffer.h
@@ -7,6 +7,9 @@
#include "core/dom/DOMException.h"
#include "modules/ModulesExport.h"
+#include "modules/fetch/DataConsumerHandleUtil.h"
+#include "modules/fetch/FetchDataConsumerHandle.h"
+#include "modules/fetch/FetchDataLoader.h"
#include "platform/blob/BlobData.h"
#include "platform/heap/Heap.h"
#include "public/platform/WebDataConsumerHandle.h"
@@ -58,14 +61,6 @@ public:
void error(DOMException*);
void cancel() { m_canceller->cancel(); }
- // This function registers an observer so it fails and returns false when an
- // observer was already registered.
- bool readAllAndCreateBlobHandle(const String& contentType, BlobHandleCreatorClient*);
-
- // This function registers an observer so it fails and returns false when an
- // observer was already registered.
- bool startTee(BodyStreamBuffer* out1, BodyStreamBuffer* out2);
-
// When an observer was registered this function fails and returns false.
bool registerObserver(Observer*);
void unregisterObserver();
@@ -85,6 +80,167 @@ private:
Member<Canceller> m_canceller;
};
+/*
+// #define createDebugHandleAlways(handle) FetchDebugDataConsumerHandle::create(handle, __FILE__, __LINE__)
+#define createDebugHandleAlways(handle) (handle)
+#define createDebugHandle(handle) (handle)
+
+class FetchDebugDataConsumerHandle : public FetchDataConsumerHandle {
+public:
+ static PassOwnPtr<FetchDataConsumerHandle> create(PassOwnPtr<FetchDataConsumerHandle> handle, const char* file, int line) { return adoptPtr(new FetchDebugDataConsumerHandle(handle, file, line)); }
+private:
+ FetchDebugDataConsumerHandle(PassOwnPtr<FetchDataConsumerHandle> handle, const char *file, int line)
+ : m_handle(handle)
+ , m_name(m_handle->debugName()) { }
+
+ class ReaderImpl final : public FetchDataConsumerHandle::Reader {
+ public:
+ ReaderImpl(FetchDebugDataConsumerHandle* handle, PassOwnPtr<FetchDataConsumerHandle::Reader> reader) : m_handle(handle), m_name(handle->debugName()), m_reader(reader), m_isInTwoPhaseRead(false) { }
+ ~ReaderImpl()
+ {
+ ASSERT(!m_isInTwoPhaseRead);
+ print("dtor\n", (long long)currentThread(), this);
+ }
+ Result read(void* data, size_t size, Flags flags, size_t* readSize) override
+ {
+ ASSERT(!m_isInTwoPhaseRead);
+ print("read(%lld)\n", (long long)size);
+ Result result = m_reader->read(data, size, flags, readSize);
+ print("read(%lld) -> %d (%lld)\n", (long long)size, result, (long long)*readSize);
+ return result;
+ }
+
+ Result beginRead(const void** buffer, Flags flags, size_t* available) override
+ {
+ ASSERT(!m_isInTwoPhaseRead);
+ print("beginRead()\n");
+ Result result = m_reader->beginRead(buffer, flags, available);
+ print("beginRead() -> %d (%lld)\n", result, (long long)*available);
+ if (result == Ok)
+ m_isInTwoPhaseRead = true;
+ return result;
+ }
+ Result endRead(size_t readSize) override
+ {
+ ASSERT(m_isInTwoPhaseRead);
+ m_isInTwoPhaseRead = false;
+ print("endRead(%lld)\n", (long long)readSize);
+ Result result = m_reader->endRead(readSize);
+ print("endRead(%lld) -> %d\n", (long long)readSize, result);
+ return result;
+ }
+ PassRefPtr<BlobDataHandle> drainAsBlobDataHandle() override
+ {
+ print("drainAsBlobDataHandle()\n");
+ RefPtr<BlobDataHandle> blobDataHandle = m_reader->drainAsBlobDataHandle();
+ print("drainAsBlobDataHandle() -> %p\n", blobDataHandle.get());
+ return blobDataHandle.release();
+ }
+ void print(const char* format, ...)
+ {
+ fprintf(stderr, "[Thread:%lld] [Handle:%p (%s)] [Reader:%p] [TargetReader:%p]: ",
+ (long long)currentThread(),
+ m_handle,
+ m_name.c_str(),
+ this,
+ m_reader.get());
+
+ va_list args;
+ va_start(args, format);
+ vfprintf(stderr, format, args);
+ va_end(args);
+ }
+ private:
+ FetchDebugDataConsumerHandle* m_handle; // debugging only
+ std::string m_name;
+ OwnPtr<FetchDataConsumerHandle::Reader> m_reader;
+ bool m_isInTwoPhaseRead;
+ };
+ class ClientWrapper : public Client {
+ public:
+ ClientWrapper(FetchDebugDataConsumerHandle* handle, Client* client) : m_handle(handle), m_name(handle->debugName()), m_reader(nullptr), m_client(client) { }
+ void didGetReadable() override
+ {
+ print("didGetReadable\n");
+ if (m_client)
+ m_client->didGetReadable();
+ print("didGetReadable done\n");
+ }
+ void setReader(Reader* reader) { m_reader = reader; }
+
+ void print(const char* format, ...)
+ {
+ fprintf(stderr, "[Thread:%lld] [Handle:%p (%s)] [Reader:%p] [Client:%p] [TargetClient:%p]: ",
+ (long long)currentThread(),
+ m_handle,
+ m_name.c_str(),
+ m_reader,
+ this,
+ m_client);
+
+ va_list args;
+ va_start(args, format);
+ vfprintf(stderr, format, args);
+ va_end(args);
+ }
+ private:
+ FetchDebugDataConsumerHandle* m_handle; // debugging only
+ std::string m_name;
+ Reader* m_reader; // logging only
+
+ Client* m_client;
+ };
+ Reader* obtainReaderInternal(Client* client) override
+ {
+ print("obtainReaderInternal(Client=%p)\n", client);
+ ClientWrapper* clientWrapper = new ClientWrapper(this, client);
+ Reader* reader = new ReaderImpl(this, m_handle->obtainReader(clientWrapper)); // FIXME: Leaking
+ clientWrapper->setReader(reader);
+ print("obtainReaderInternal(Client=%p) -> %p\n", client, reader);
+ return reader;
+ }
+ void print(const char* format, ...)
+ {
+ fprintf(stderr, "[Thread:%lld] [Handle:%p (%s)]: ",
+ (long long)currentThread(),
+ this,
+ debugName());
+
+ va_list args;
+ va_start(args, format);
+ vfprintf(stderr, format, args);
+ va_end(args);
+ }
+
+ const char* debugName() const override { return m_name.c_str(); }
+
+ OwnPtr<FetchDataConsumerHandle> m_handle;
+ std::string m_name;
+};
+*/
+
+class MODULES_EXPORT BodyStreamBuffer2 final : public GarbageCollectedFinalized<BodyStreamBuffer2> {
+public:
+ static BodyStreamBuffer2* create(PassOwnPtr<FetchDataConsumerHandle> handle) { return new BodyStreamBuffer2(handle); }
+ static BodyStreamBuffer2* createEmpty() { return BodyStreamBuffer2::create(createFetchDataConsumerHandleFromWebHandle(createDoneDataConsumerHandle())); }
+
+ FetchDataConsumerHandle* handle() const { return m_handle.get(); }
+ PassOwnPtr<FetchDataConsumerHandle> releaseHandle() { return m_handle.release(); }
+
+ void registerLoader(FetchDataLoader* loader) { ASSERT(!m_fetchDataLoader); m_fetchDataLoader = loader; }
+ void unregisterLoader() { m_fetchDataLoader.clear(); }
+ DEFINE_INLINE_TRACE()
+ {
+ visitor->trace(m_fetchDataLoader);
+ }
+
+private:
+ explicit BodyStreamBuffer2(PassOwnPtr<FetchDataConsumerHandle> handle) : m_handle(handle) { }
+
+ OwnPtr<FetchDataConsumerHandle> m_handle;
+ Member<FetchDataLoader> m_fetchDataLoader;
+};
+
} // namespace blink
#endif // BodyStreamBuffer_h

Powered by Google App Engine
This is Rietveld 408576698