Index: net/websockets/websocket_deflate_stream.cc |
diff --git a/net/websockets/websocket_deflate_stream.cc b/net/websockets/websocket_deflate_stream.cc |
index 9e8a95a7339e3caf2bb5897f091b99ac3dfd33ab..1121452e9cc68d6039b371f89c377edad1e4eb9e 100644 |
--- a/net/websockets/websocket_deflate_stream.cc |
+++ b/net/websockets/websocket_deflate_stream.cc |
@@ -15,6 +15,7 @@ |
#include "net/base/completion_callback.h" |
#include "net/base/io_buffer.h" |
#include "net/base/net_errors.h" |
+#include "net/websockets/websocket_deflate_predictor.h" |
#include "net/websockets/websocket_deflater.h" |
#include "net/websockets/websocket_errors.h" |
#include "net/websockets/websocket_frame.h" |
@@ -34,14 +35,16 @@ const size_t kChunkSize = 4 * 1024; |
WebSocketDeflateStream::WebSocketDeflateStream( |
scoped_ptr<WebSocketStream> stream, |
- WebSocketDeflater::ContextTakeOverMode mode) |
+ WebSocketDeflater::ContextTakeOverMode mode, |
+ scoped_ptr<WebSocketDeflatePredictor> predictor) |
: stream_(stream.Pass()), |
deflater_(mode), |
inflater_(kChunkSize, kChunkSize), |
reading_state_(NOT_READING), |
writing_state_(NOT_WRITING), |
current_reading_opcode_(WebSocketFrameHeader::kOpCodeText), |
- current_writing_opcode_(WebSocketFrameHeader::kOpCodeText) { |
+ current_writing_opcode_(WebSocketFrameHeader::kOpCodeText), |
+ predictor_(predictor.Pass()) { |
DCHECK(stream_); |
deflater_.Initialize(kWindowBits); |
inflater_.Initialize(kWindowBits); |
@@ -100,31 +103,30 @@ void WebSocketDeflateStream::OnReadComplete( |
int WebSocketDeflateStream::Deflate(ScopedVector<WebSocketFrame>* frames) { |
ScopedVector<WebSocketFrame> frames_to_write; |
+ // Store frames of the currently processed message if writing_state_ equals to |
+ // WRITING_POSSIBLY_COMPRESSED_MESSAGE. |
+ ScopedVector<WebSocketFrame> frames_of_message; |
tyoshino (SeeGerritForStatus)
2013/10/30 06:15:04
this postpones sending the initial frame of the me
yhirano
2013/10/30 09:32:17
This is not specific to WRITING_POSSIBLY_COMPRESSE
|
for (size_t i = 0; i < frames->size(); ++i) { |
- scoped_ptr<WebSocketFrame> frame((*frames)[i]); |
- (*frames)[i] = NULL; |
- DCHECK(!frame->header.reserved1); |
- if (!WebSocketFrameHeader::IsKnownDataOpCode(frame->header.opcode)) { |
- frames_to_write.push_back(frame.release()); |
+ DCHECK(!(*frames)[i]->header.reserved1); |
+ if (!WebSocketFrameHeader::IsKnownDataOpCode((*frames)[i]->header.opcode)) { |
+ frames_to_write.push_back((*frames)[i]); |
+ (*frames)[i] = NULL; |
continue; |
} |
+ if (writing_state_ == NOT_WRITING) |
+ OnMessageStart(*frames, i); |
+ |
+ scoped_ptr<WebSocketFrame> frame((*frames)[i]); |
+ (*frames)[i] = NULL; |
+ predictor_->RecordInputDataFrame(frame.get()); |
- if (writing_state_ == NOT_WRITING) { |
- current_writing_opcode_ = frame->header.opcode; |
- DCHECK(current_writing_opcode_ == WebSocketFrameHeader::kOpCodeText || |
- current_writing_opcode_ == WebSocketFrameHeader::kOpCodeBinary); |
- // TODO(yhirano): For now, we unconditionally compress data messages. |
- // Further optimization is needed. |
- // http://crbug.com/163882 |
- writing_state_ = WRITING_COMPRESSED_MESSAGE; |
- } |
if (writing_state_ == WRITING_UNCOMPRESSED_MESSAGE) { |
if (frame->header.final) |
writing_state_ = NOT_WRITING; |
+ predictor_->RecordWrittenDataFrame(frame.get()); |
frames_to_write.push_back(frame.release()); |
current_writing_opcode_ = WebSocketFrameHeader::kOpCodeContinuation; |
} else { |
- DCHECK_EQ(WRITING_COMPRESSED_MESSAGE, writing_state_); |
if (frame->data && !deflater_.AddBytes(frame->data->data(), |
frame->header.payload_length)) { |
DVLOG(1) << "WebSocket protocol error. " |
@@ -136,35 +138,134 @@ int WebSocketDeflateStream::Deflate(ScopedVector<WebSocketFrame>* frames) { |
<< "deflater_.Finish() returns an error."; |
return ERR_WS_PROTOCOL_ERROR; |
} |
- if (deflater_.CurrentOutputSize() >= kChunkSize || frame->header.final) { |
- const WebSocketFrameHeader::OpCode opcode = current_writing_opcode_; |
- scoped_ptr<WebSocketFrame> compressed(new WebSocketFrame(opcode)); |
- scoped_refptr<IOBufferWithSize> data = |
- deflater_.GetOutput(deflater_.CurrentOutputSize()); |
- if (!data) { |
- DVLOG(1) << "WebSocket protocol error. " |
- << "deflater_.GetOutput() returns an error."; |
- return ERR_WS_PROTOCOL_ERROR; |
+ |
+ if (writing_state_ == WRITING_COMPRESSED_MESSAGE) { |
+ if (deflater_.CurrentOutputSize() >= kChunkSize || |
+ frame->header.final) { |
+ int result = AppendCompressedFrame(frame->header, &frames_to_write); |
+ if (result != OK) |
+ return result; |
+ } |
+ if (frame->header.final) |
+ writing_state_ = NOT_WRITING; |
+ } else { |
+ DCHECK_EQ(WRITING_POSSIBLY_COMPRESSED_MESSAGE, writing_state_); |
+ bool final = frame->header.final; |
+ frames_of_message.push_back(frame.release()); |
+ if (final) { |
+ int result = AppendPossiblyCompressedMessage(&frames_of_message, |
+ &frames_to_write); |
+ if (result != OK) |
+ return result; |
+ frames_of_message.clear(); |
+ writing_state_ = NOT_WRITING; |
} |
- compressed->header.CopyFrom(frame->header); |
- compressed->header.opcode = opcode; |
- compressed->header.final = frame->header.final; |
- compressed->header.reserved1 = |
- (opcode != WebSocketFrameHeader::kOpCodeContinuation); |
- compressed->data = data; |
- compressed->header.payload_length = data->size(); |
- |
- current_writing_opcode_ = WebSocketFrameHeader::kOpCodeContinuation; |
- frames_to_write.push_back(compressed.release()); |
} |
- if (frame->header.final) |
- writing_state_ = NOT_WRITING; |
} |
} |
+ DCHECK_NE(WRITING_POSSIBLY_COMPRESSED_MESSAGE, writing_state_); |
frames->swap(frames_to_write); |
return OK; |
} |
+void WebSocketDeflateStream::OnMessageStart( |
+ const ScopedVector<WebSocketFrame>& frames, size_t index) { |
+ WebSocketFrame* frame = frames[index]; |
+ current_writing_opcode_ = frame->header.opcode; |
+ DCHECK(current_writing_opcode_ == WebSocketFrameHeader::kOpCodeText || |
+ current_writing_opcode_ == WebSocketFrameHeader::kOpCodeBinary); |
+ WebSocketDeflatePredictor::Result prediction = |
+ predictor_->Predict(frames, index); |
+ |
+ switch (prediction) { |
+ case WebSocketDeflatePredictor::DEFLATE: |
+ writing_state_ = WRITING_COMPRESSED_MESSAGE; |
+ return; |
+ case WebSocketDeflatePredictor::DO_NOT_DEFLATE: |
+ writing_state_ = WRITING_UNCOMPRESSED_MESSAGE; |
+ return; |
+ case WebSocketDeflatePredictor::TRY_DEFLATE: |
+ writing_state_ = WRITING_POSSIBLY_COMPRESSED_MESSAGE; |
+ return; |
+ } |
+ NOTREACHED(); |
+} |
+ |
+int WebSocketDeflateStream::AppendCompressedFrame( |
+ const WebSocketFrameHeader& header, |
+ ScopedVector<WebSocketFrame>* frames_to_write) { |
+ const WebSocketFrameHeader::OpCode opcode = current_writing_opcode_; |
+ scoped_refptr<IOBufferWithSize> data = |
tyoshino (SeeGerritForStatus)
2013/10/30 06:15:04
s/data/compressed_payload/
yhirano
2013/10/30 09:32:17
Done.
|
+ deflater_.GetOutput(deflater_.CurrentOutputSize()); |
+ if (!data) { |
+ DVLOG(1) << "WebSocket protocol error. " |
+ << "deflater_.GetOutput() returns an error."; |
+ return ERR_WS_PROTOCOL_ERROR; |
+ } |
+ scoped_ptr<WebSocketFrame> compressed(new WebSocketFrame(opcode)); |
+ compressed->header.CopyFrom(header); |
+ compressed->header.opcode = opcode; |
+ compressed->header.final = header.final; |
+ compressed->header.reserved1 = |
+ (opcode != WebSocketFrameHeader::kOpCodeContinuation); |
+ compressed->data = data; |
+ compressed->header.payload_length = data->size(); |
+ |
+ current_writing_opcode_ = WebSocketFrameHeader::kOpCodeContinuation; |
+ predictor_->RecordWrittenDataFrame(compressed.get()); |
+ frames_to_write->push_back(compressed.release()); |
+ return OK; |
+} |
+ |
+int WebSocketDeflateStream::AppendPossiblyCompressedMessage( |
+ ScopedVector<WebSocketFrame>* frames, |
+ ScopedVector<WebSocketFrame>* frames_to_write) { |
+ DCHECK(!frames->empty()); |
+ |
+ const WebSocketFrameHeader::OpCode opcode = current_writing_opcode_; |
+ scoped_refptr<IOBufferWithSize> data = |
tyoshino (SeeGerritForStatus)
2013/10/30 06:15:04
s/data/compressed_payload/
yhirano
2013/10/30 09:32:17
Done.
|
+ deflater_.GetOutput(deflater_.CurrentOutputSize()); |
+ if (!data) { |
+ DVLOG(1) << "WebSocket protocol error. " |
+ << "deflater_.GetOutput() returns an error."; |
+ return ERR_WS_PROTOCOL_ERROR; |
+ } |
+ |
+ uint64 original_payload_length = 0; |
+ for (size_t i = 0; i < frames->size(); ++i) { |
+ WebSocketFrame* frame = (*frames)[i]; |
+ // Asserts checking that frames represent one whole data message. |
+ DCHECK(WebSocketFrameHeader::IsKnownDataOpCode(frame->header.opcode)); |
+ DCHECK_EQ(i == 0, |
+ WebSocketFrameHeader::kOpCodeContinuation != |
+ frame->header.opcode); |
+ DCHECK_EQ(i == frames->size() - 1, frame->header.final); |
+ original_payload_length += frame->header.payload_length; |
+ } |
+ if (original_payload_length <= static_cast<uint64>(data->size())) { |
+ // Compression is not effective. Use the original frames. |
+ for (size_t i = 0; i < frames->size(); ++i) { |
+ WebSocketFrame* frame = (*frames)[i]; |
+ frames_to_write->push_back(frame); |
+ predictor_->RecordWrittenDataFrame(frame); |
+ (*frames)[i] = NULL; |
+ } |
+ frames->weak_clear(); |
+ return OK; |
+ } |
+ scoped_ptr<WebSocketFrame> compressed(new WebSocketFrame(opcode)); |
+ compressed->header.CopyFrom((*frames)[0]->header); |
+ compressed->header.opcode = opcode; |
+ compressed->header.final = true; |
+ compressed->header.reserved1 = true; |
+ compressed->data = data; |
+ compressed->header.payload_length = data->size(); |
+ |
+ predictor_->RecordWrittenDataFrame(compressed.get()); |
+ frames_to_write->push_back(compressed.release()); |
+ return OK; |
+} |
+ |
int WebSocketDeflateStream::Inflate(ScopedVector<WebSocketFrame>* frames) { |
ScopedVector<WebSocketFrame> frames_to_output; |
ScopedVector<WebSocketFrame> frames_passed; |