Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(295)

Unified Diff: net/filter/sdch_stream_source.cc

Issue 1662763002: [ON HOLD] Implement pull-based design for content decoding (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 4 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: net/filter/sdch_stream_source.cc
diff --git a/net/filter/sdch_stream_source.cc b/net/filter/sdch_stream_source.cc
new file mode 100644
index 0000000000000000000000000000000000000000..2ed1f9776a5b91de9d6096f76f7c3764ff0842c2
--- /dev/null
+++ b/net/filter/sdch_stream_source.cc
@@ -0,0 +1,294 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "base/auto_reset.h"
+#include "base/bind.h"
+#include "net/filter/block_buffer.h"
+#include "net/filter/sdch_stream_source.h"
+#include "sdch/open-vcdiff/src/google/vcdecoder.h"
+
+namespace {
+
+const size_t kServerIdLength = 9;
+
+} // namespace
+
+namespace net {
+
+SdchStreamSource::SdchStreamSource(scoped_ptr<StreamSource> previous,
+ SdchStreamSourceDelegate* delegate)
+ : buffer_(new BlockBuffer),
+ previous_(std::move(previous)),
+ decoder_(new open_vcdiff::VCDiffStreamingDecoder),
+ delegate_(delegate),
+ output_replaced_(false),
+ passthrough_(false),
+ dictionary_tried_load_(false),
+ total_bytes_output_(0) {}
+
+SdchStreamSource::~SdchStreamSource() {
+ // In unit tests, delegate_ may be null.
+ if (delegate_)
+ delegate_->NotifyStreamDone();
+}
+
+bool SdchStreamSource::Init() {
+ return true;
+}
+
+void SdchStreamSource::StopDecoding() {
+ DCHECK(in_delegate_handler_);
+ passthrough_ = true;
+}
+
+void SdchStreamSource::ReplaceOutput(const char* data, size_t size) {
+ DCHECK(in_delegate_handler_);
+ buffered_output_.assign(data, size);
+}
+
+net::Error SdchStreamSource::Read(
+ IOBuffer* dest_buffer,
+ size_t buffer_size,
+ size_t* bytes_read,
+ const StreamSource::OnReadCompleteCallback& callback) {
+ *bytes_read = 0;
+
+ Error error;
+ do {
+ // If there is no loaded dictionary, try loading a dictionary ID from
+ // |buffer_| and then loading a dictionary. If anything goes wrong in
+ // LoadDictionary, the delegate's HandleDictionaryError() call is expected
+ // to recover, either by calling StopDecoding() or ReplaceOutput(). Both of
+ // these are handled by Decompress (as is having no dictionary loaded). If
+ // the delegate's HandleDictionaryError cannot recover, this function
+ // returns a read error.
+ if (!dictionary_tried_load_ && !LoadDictionary())
+ return ERR_FAILED;
+
+ // Run the synchronous decompressor.
+ SdchStreamState state = Decompress(dest_buffer, buffer_size, bytes_read);
+
+ // If decompression failed, call HandleDecodingError on the delegate, then
+ // rerun the decompressor. The decompressor shouldn't actually decompress
+ // this time, since the delegate should have called either Passthrough or
+ // ReplaceOutput.
+ if (state == SDCH_STREAM_ERROR) {
+ bool handled = AskDelegateToHandleDecodingError();
+ if (!handled)
+ return ERR_FAILED;
+ state = Decompress(dest_buffer, buffer_size, bytes_read);
+ }
+
+ if (*bytes_read > 0) {
+ total_bytes_output_ += *bytes_read;
+ return OK;
+ }
+
+ DCHECK(state == SDCH_STREAM_MORE_INPUT);
+ DCHECK(!buffer_->HasMoreBytes());
+
+ // If someone called ReplaceOutput to replace this stream's output,
+ // Decompress will return all the buffered output; if it did not do so, this
+ // stream is at EOF.
+ if (output_replaced_)
+ return OK;
+
+ size_t previous_bytes_read;
+ error = previous_->Read(
+ buffer_->buffer(), buffer_->size(), &previous_bytes_read,
+ base::Bind(&SdchStreamSource::OnReadComplete, base::Unretained(this),
+ callback, base::Unretained(dest_buffer), buffer_size));
+
+ // Synchronous EOF from previous source.
+ if (error == OK && previous_bytes_read == 0)
+ return OK;
+
+ // Synchronous success from previous source.
+ if (error == OK)
+ buffer_->WasRefilled(previous_bytes_read);
+ } while (error == net::OK);
+
+ return error;
+}
+
+void SdchStreamSource::OnReadComplete(
+ const StreamSource::OnReadCompleteCallback& callback,
+ IOBuffer* dest_buffer,
+ size_t dest_buffer_size,
+ Error error,
+ size_t bytes_read) {
+ DCHECK(!buffer_->HasMoreBytes());
+
+ // Previous source Read failed. Fail this read too.
+ if (error != OK) {
+ callback.Run(error, bytes_read);
+ return;
+ }
+
+ if (bytes_read == 0) {
+ callback.Run(OK, 0);
+ }
+
+ buffer_->WasRefilled(bytes_read);
+ error = Read(dest_buffer, dest_buffer_size, &bytes_read, callback);
+ if (error != ERR_IO_PENDING) {
+ if (error == OK)
+ total_bytes_output_ += bytes_read;
+ callback.Run(error, bytes_read);
+ }
+}
+
+// Synchronous decompressor core.
+SdchStreamSource::SdchStreamState SdchStreamSource::Decompress(
+ IOBuffer* dest_buffer,
+ size_t buffer_size,
+ size_t* bytes_read) {
+ // If output has been replaced via ReplaceOutput, short-circuit any other
+ // processing, and emit as much of the buffered output as possible. If there's
+ // no buffered output left, return EOF.
+ if (output_replaced_)
+ return Flush(dest_buffer, buffer_size, bytes_read);
+
+ // If this stream is no longer decoding, pass any input through.
+ if (passthrough_)
+ return Passthrough(dest_buffer, buffer_size, bytes_read);
+
+ // Can't do any decoding until a dictionary is loaded, which requires more
+ // input.
+ if (!dictionary_tried_load_)
+ return SDCH_STREAM_MORE_INPUT;
+
+ // If there's any data left in the output buffer, try flushing some of it.
+ if (Flush(dest_buffer, buffer_size, bytes_read) != SDCH_STREAM_MORE_INPUT)
+ return SDCH_STREAM_MORE_OUTPUT_SPACE;
+
+ // This is awkward. In principle, this function is called only when the input
+ // buffer has more bytes, since vcdiff always consumes input; unfortunately,
+ // the dictionary-loading code earlier in the Read() loop can consume all the
+ // currently buffered input. If that happened, Decompress returns early here.
+ if (!buffer_->HasMoreBytes())
+ return SDCH_STREAM_MORE_INPUT;
+
+ // Now there is no data left in the output buffer and there is data in the
+ // input buffer, so run the vcdiff decoder.
+ DCHECK(buffered_output_.empty());
+ DCHECK(buffer_->HasMoreBytes());
+
+ // Calls to DecodeChunk always consume all their input, so this always drains
+ // the entire buffer.
+ bool ok = decoder_->DecodeChunk(buffer_->bytes(), buffer_->bytes_left(),
+ &buffered_output_);
+ if (ok) {
+ buffer_->WasDrained(buffer_->bytes_left());
+ } else {
+ if (!AskDelegateToHandleDecodingError())
+ return SDCH_STREAM_ERROR;
+ }
+
+ return Flush(dest_buffer, buffer_size, bytes_read);
+}
+
+SdchStreamSource::SdchStreamState SdchStreamSource::Passthrough(
+ IOBuffer* dest_buffer,
+ size_t buffer_size,
+ size_t* bytes_read) {
+ SdchStreamState state = Flush(dest_buffer, buffer_size, bytes_read);
+ if (state == SDCH_STREAM_MORE_OUTPUT_SPACE)
+ return state;
+
+ // No buffered data to flush. See if there's any input.
+ DCHECK(buffered_output_.empty());
+ if (!buffer_->HasMoreBytes())
+ return SDCH_STREAM_MORE_INPUT;
+
+ size_t to_copy = buffer_size;
+ if (to_copy > buffer_->bytes_left())
+ to_copy = buffer_->bytes_left();
+ memcpy(dest_buffer->data(), buffer_->bytes(), to_copy);
+ buffer_->WasDrained(to_copy);
+ if (buffer_->HasMoreBytes())
+ return SDCH_STREAM_MORE_OUTPUT_SPACE;
+ else
+ return SDCH_STREAM_MORE_INPUT;
+}
+
+SdchStreamSource::SdchStreamState SdchStreamSource::Flush(IOBuffer* dest_buffer,
+ size_t buffer_size,
+ size_t* bytes_read) {
+ size_t to_copy = buffer_size;
+ if (to_copy > buffered_output_.length())
+ to_copy = buffered_output_.length();
+ if (to_copy == 0)
+ return SDCH_STREAM_MORE_INPUT;
+ memcpy(dest_buffer->data(), buffered_output_.data(), to_copy);
+ buffered_output_.erase(0, to_copy);
+ *bytes_read = to_copy;
+ return SDCH_STREAM_MORE_OUTPUT_SPACE;
+}
+
+// Attempts to find a dictionary ID at the start of the input stream and load
+// the dictionary with that identifier. Returns true if no error happened or an
+// error happened but the delegate repaired it, and false if an error happened
+// and the delegate did not repair it.
+bool SdchStreamSource::LoadDictionary() {
+ size_t to_copy = kServerIdLength - dictionary_id_.length();
+ const std::string* dictionary_text = nullptr;
+ if (to_copy > buffer_->bytes_left())
+ to_copy = buffer_->bytes_left();
+ dictionary_id_.append(buffer_->bytes(), to_copy);
+ buffer_->WasDrained(to_copy);
+
+ // Not enough bytes for a dictionary ID accumulated yet.
+ if (dictionary_id_.length() != kServerIdLength)
+ return true;
+
+ dictionary_tried_load_ = true;
+
+ // If the dictionary ID looks bogus, or the delegate can't find a dictionary
+ // with that identifier, then call HandleDictionaryError and bail.
+ // Also, stick the dictionary ID into the output buffer here; LoadDictionary
+ // will consume it assuming it's a dictionary ID, so if it's really not, the
+ // "dictionary ID" should appear in the output stream.
+ //
+ // To avoid passing a std::string with a null terminator into GetDictionary(),
+ // |stem_id| here removes the last byte blindly, and this method only calls
+ // GetDictionary with it if CouldBeDictionaryId returns true.
+ std::string stem_id = dictionary_id_.substr(0, kServerIdLength - 1);
+ if (!CouldBeDictionaryId(dictionary_id_) ||
+ !delegate_->GetDictionary(stem_id, &dictionary_text)) {
+ buffered_output_.append(dictionary_id_);
+ return AskDelegateToHandleDictionaryError();
+ }
+
+ decoder_->StartDecoding(dictionary_text->data(), dictionary_text->length());
+ return true;
+}
+
+bool SdchStreamSource::CouldBeDictionaryId(const std::string& id) const {
+ std::string kBase64UrlChars =
+ "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789_-";
+ for (size_t i = 0; i < kServerIdLength - 1; i++) {
+ if (kBase64UrlChars.find(id[i]) == std::string::npos)
+ return false;
+ }
+ if (id[kServerIdLength - 1] != '\0')
+ return false;
+ return true;
+}
+
+size_t SdchStreamSource::GetBytesOutput() const {
+ return total_bytes_output_;
+}
+
+bool SdchStreamSource::AskDelegateToHandleDictionaryError() {
+ base::AutoReset<bool> resetter(&in_delegate_handler_, true);
+ return delegate_->HandleDictionaryError(this);
+}
+
+bool SdchStreamSource::AskDelegateToHandleDecodingError() {
+ base::AutoReset<bool> resetter(&in_delegate_handler_, true);
+ return delegate_->HandleDecodingError(this);
+}
+
+} // namespace net

Powered by Google App Engine
This is Rietveld 408576698