Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1905)

Unified Diff: net/websockets/websocket_deflate_stream.cc

Issue 39193005: Introduce WebSocketDeflatePredictor. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 7 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: net/websockets/websocket_deflate_stream.cc
diff --git a/net/websockets/websocket_deflate_stream.cc b/net/websockets/websocket_deflate_stream.cc
index 9e8a95a7339e3caf2bb5897f091b99ac3dfd33ab..79f0bf5b159016082516e29608b1bd958c1843d8 100644
--- a/net/websockets/websocket_deflate_stream.cc
+++ b/net/websockets/websocket_deflate_stream.cc
@@ -15,6 +15,7 @@
#include "net/base/completion_callback.h"
#include "net/base/io_buffer.h"
#include "net/base/net_errors.h"
+#include "net/websockets/websocket_deflate_predictor.h"
#include "net/websockets/websocket_deflater.h"
#include "net/websockets/websocket_errors.h"
#include "net/websockets/websocket_frame.h"
@@ -34,14 +35,16 @@ const size_t kChunkSize = 4 * 1024;
WebSocketDeflateStream::WebSocketDeflateStream(
scoped_ptr<WebSocketStream> stream,
- WebSocketDeflater::ContextTakeOverMode mode)
+ WebSocketDeflater::ContextTakeOverMode mode,
+ scoped_ptr<WebSocketDeflatePredictor> predictor)
: stream_(stream.Pass()),
deflater_(mode),
inflater_(kChunkSize, kChunkSize),
reading_state_(NOT_READING),
writing_state_(NOT_WRITING),
current_reading_opcode_(WebSocketFrameHeader::kOpCodeText),
- current_writing_opcode_(WebSocketFrameHeader::kOpCodeText) {
+ current_writing_opcode_(WebSocketFrameHeader::kOpCodeText),
+ predictor_(predictor.Pass()) {
DCHECK(stream_);
deflater_.Initialize(kWindowBits);
inflater_.Initialize(kWindowBits);
@@ -100,31 +103,30 @@ void WebSocketDeflateStream::OnReadComplete(
int WebSocketDeflateStream::Deflate(ScopedVector<WebSocketFrame>* frames) {
ScopedVector<WebSocketFrame> frames_to_write;
+ // Store frames of the currently processed message if writing_state_ equals to
+ // WRITING_POSSIBLY_COMPRESSED_MESSAGE.
+ ScopedVector<WebSocketFrame> frames_of_message;
for (size_t i = 0; i < frames->size(); ++i) {
- scoped_ptr<WebSocketFrame> frame((*frames)[i]);
- (*frames)[i] = NULL;
- DCHECK(!frame->header.reserved1);
- if (!WebSocketFrameHeader::IsKnownDataOpCode(frame->header.opcode)) {
- frames_to_write.push_back(frame.release());
+ DCHECK(!(*frames)[i]->header.reserved1);
+ if (!WebSocketFrameHeader::IsKnownDataOpCode((*frames)[i]->header.opcode)) {
+ frames_to_write.push_back((*frames)[i]);
+ (*frames)[i] = NULL;
continue;
}
+ if (writing_state_ == NOT_WRITING)
+ OnMessageStart(*frames, i);
+
+ scoped_ptr<WebSocketFrame> frame((*frames)[i]);
+ (*frames)[i] = NULL;
+ predictor_->RecordProcessedDataFrame(frame.get());
- if (writing_state_ == NOT_WRITING) {
- current_writing_opcode_ = frame->header.opcode;
- DCHECK(current_writing_opcode_ == WebSocketFrameHeader::kOpCodeText ||
- current_writing_opcode_ == WebSocketFrameHeader::kOpCodeBinary);
- // TODO(yhirano): For now, we unconditionally compress data messages.
- // Further optimization is needed.
- // http://crbug.com/163882
- writing_state_ = WRITING_COMPRESSED_MESSAGE;
- }
if (writing_state_ == WRITING_UNCOMPRESSED_MESSAGE) {
if (frame->header.final)
writing_state_ = NOT_WRITING;
+ predictor_->RecordSentDataFrame(frame.get());
frames_to_write.push_back(frame.release());
current_writing_opcode_ = WebSocketFrameHeader::kOpCodeContinuation;
} else {
- DCHECK_EQ(WRITING_COMPRESSED_MESSAGE, writing_state_);
if (frame->data && !deflater_.AddBytes(frame->data->data(),
frame->header.payload_length)) {
DVLOG(1) << "WebSocket protocol error. "
@@ -136,35 +138,142 @@ int WebSocketDeflateStream::Deflate(ScopedVector<WebSocketFrame>* frames) {
<< "deflater_.Finish() returns an error.";
return ERR_WS_PROTOCOL_ERROR;
}
- if (deflater_.CurrentOutputSize() >= kChunkSize || frame->header.final) {
- const WebSocketFrameHeader::OpCode opcode = current_writing_opcode_;
- scoped_ptr<WebSocketFrame> compressed(new WebSocketFrame(opcode));
- scoped_refptr<IOBufferWithSize> data =
- deflater_.GetOutput(deflater_.CurrentOutputSize());
- if (!data) {
- DVLOG(1) << "WebSocket protocol error. "
- << "deflater_.GetOutput() returns an error.";
- return ERR_WS_PROTOCOL_ERROR;
+
+ if (writing_state_ == WRITING_COMPRESSED_MESSAGE) {
+ if (deflater_.CurrentOutputSize() >= kChunkSize ||
+ frame->header.final) {
+ int result = AppendCompressedFrame(frame->header, &frames_to_write);
+ if (result != OK)
+ return result;
+ }
+ if (frame->header.final)
+ writing_state_ = NOT_WRITING;
+ } else {
+ DCHECK_EQ(WRITING_POSSIBLY_COMPRESSED_MESSAGE, writing_state_);
+ bool final = frame->header.final;
+ frames_of_message.push_back(frame.release());
+ if (final) {
+ int result = AppendPossiblyCompressedMessage(&frames_of_message,
+ &frames_to_write);
+ if (result != OK)
+ return result;
+ frames_of_message.clear();
+ writing_state_ = NOT_WRITING;
}
- compressed->header.CopyFrom(frame->header);
- compressed->header.opcode = opcode;
- compressed->header.final = frame->header.final;
- compressed->header.reserved1 =
- (opcode != WebSocketFrameHeader::kOpCodeContinuation);
- compressed->data = data;
- compressed->header.payload_length = data->size();
-
- current_writing_opcode_ = WebSocketFrameHeader::kOpCodeContinuation;
- frames_to_write.push_back(compressed.release());
}
- if (frame->header.final)
- writing_state_ = NOT_WRITING;
}
}
+ DCHECK_NE(WRITING_POSSIBLY_COMPRESSED_MESSAGE, writing_state_);
frames->swap(frames_to_write);
return OK;
}
+void WebSocketDeflateStream::OnMessageStart(
+ const ScopedVector<WebSocketFrame>& frames, size_t index) {
+ WebSocketFrame* frame = frames[index];
+ current_writing_opcode_ = frame->header.opcode;
+ DCHECK(current_writing_opcode_ == WebSocketFrameHeader::kOpCodeText ||
+ current_writing_opcode_ == WebSocketFrameHeader::kOpCodeBinary);
+ WebSocketDeflatePredictor::Result prediction =
+ predictor_->Predict(frames, index);
+
+ switch (prediction) {
+ case WebSocketDeflatePredictor::DEFLATE:
+ writing_state_ = WRITING_COMPRESSED_MESSAGE;
+ return;
+ case WebSocketDeflatePredictor::DO_NOT_DEFLATE:
+ writing_state_ = WRITING_UNCOMPRESSED_MESSAGE;
+ return;
+ case WebSocketDeflatePredictor::TRY_DEFLATE:
+ writing_state_ = WRITING_POSSIBLY_COMPRESSED_MESSAGE;
+ return;
+ }
+ NOTREACHED();
+}
+
+int WebSocketDeflateStream::AppendCompressedFrame(
+ const WebSocketFrameHeader& header,
+ ScopedVector<WebSocketFrame>* frames_to_write) {
+ const WebSocketFrameHeader::OpCode opcode = current_writing_opcode_;
+ scoped_refptr<IOBufferWithSize> data =
+ deflater_.GetOutput(deflater_.CurrentOutputSize());
+ if (!data) {
+ DVLOG(1) << "WebSocket protocol error. "
+ << "deflater_.GetOutput() returns an error.";
+ return ERR_WS_PROTOCOL_ERROR;
+ }
+ scoped_ptr<WebSocketFrame> compressed(new WebSocketFrame(opcode));
+ compressed->header.CopyFrom(header);
+ compressed->header.opcode = opcode;
+ compressed->header.final = header.final;
+ compressed->header.reserved1 =
+ (opcode != WebSocketFrameHeader::kOpCodeContinuation);
+ compressed->data = data;
+ compressed->header.payload_length = data->size();
+
+ current_writing_opcode_ = WebSocketFrameHeader::kOpCodeContinuation;
+ predictor_->RecordSentDataFrame(compressed.get());
+ frames_to_write->push_back(compressed.release());
+ return OK;
+}
+
+int WebSocketDeflateStream::AppendPossiblyCompressedMessage(
+ ScopedVector<WebSocketFrame>* frames,
+ ScopedVector<WebSocketFrame>* frames_to_write) {
+ DCHECK(!frames->empty());
+
+ const WebSocketFrameHeader::OpCode opcode = current_writing_opcode_;
+ scoped_refptr<IOBufferWithSize> data =
+ deflater_.GetOutput(deflater_.CurrentOutputSize());
+ if (!data) {
+ DVLOG(1) << "WebSocket protocol error. "
+ << "deflater_.GetOutput() returns an error.";
+ return ERR_WS_PROTOCOL_ERROR;
+ }
+
+ uint64 original_payload_length = 0;
+ for (size_t i = 0; i < frames->size(); ++i) {
+ WebSocketFrame* frame = (*frames)[i];
+ // Many asserts checking that frames represent one data message.
+ DCHECK(WebSocketFrameHeader::IsKnownDataOpCode(frame->header.opcode));
+ if (i == 0) {
+ DCHECK_NE(WebSocketFrameHeader::kOpCodeContinuation,
+ frame->header.opcode);
+ } else {
+ DCHECK_EQ(WebSocketFrameHeader::kOpCodeContinuation,
+ frame->header.opcode);
+ }
+ if (i == frames->size() - 1)
+ DCHECK(frame->header.final);
+ else
+ DCHECK(!frame->header.final);
+
+ original_payload_length += frame->header.payload_length;
+ }
+ if (original_payload_length <= static_cast<uint64>(data->size())) {
+ // Compression is not effective. Use the original frames.
+ for (size_t i = 0; i < frames->size(); ++i) {
+ WebSocketFrame* frame = (*frames)[i];
+ frames_to_write->push_back(frame);
+ predictor_->RecordSentDataFrame(frame);
+ (*frames)[i] = NULL;
+ }
+ frames->weak_clear();
+ return OK;
+ }
+ scoped_ptr<WebSocketFrame> compressed(new WebSocketFrame(opcode));
+ compressed->header.CopyFrom((*frames)[0]->header);
+ compressed->header.opcode = opcode;
+ compressed->header.final = true;
+ compressed->header.reserved1 = true;
+ compressed->data = data;
+ compressed->header.payload_length = data->size();
+
+ predictor_->RecordSentDataFrame(compressed.get());
+ frames_to_write->push_back(compressed.release());
+ return OK;
+}
+
int WebSocketDeflateStream::Inflate(ScopedVector<WebSocketFrame>* frames) {
ScopedVector<WebSocketFrame> frames_to_output;
ScopedVector<WebSocketFrame> frames_passed;

Powered by Google App Engine
This is Rietveld 408576698