| Index: net/websockets/websocket_deflate_stream.cc
|
| diff --git a/net/websockets/websocket_deflate_stream.cc b/net/websockets/websocket_deflate_stream.cc
|
| index 9e8a95a7339e3caf2bb5897f091b99ac3dfd33ab..1121452e9cc68d6039b371f89c377edad1e4eb9e 100644
|
| --- a/net/websockets/websocket_deflate_stream.cc
|
| +++ b/net/websockets/websocket_deflate_stream.cc
|
| @@ -15,6 +15,7 @@
|
| #include "net/base/completion_callback.h"
|
| #include "net/base/io_buffer.h"
|
| #include "net/base/net_errors.h"
|
| +#include "net/websockets/websocket_deflate_predictor.h"
|
| #include "net/websockets/websocket_deflater.h"
|
| #include "net/websockets/websocket_errors.h"
|
| #include "net/websockets/websocket_frame.h"
|
| @@ -34,14 +35,16 @@ const size_t kChunkSize = 4 * 1024;
|
|
|
| WebSocketDeflateStream::WebSocketDeflateStream(
|
| scoped_ptr<WebSocketStream> stream,
|
| - WebSocketDeflater::ContextTakeOverMode mode)
|
| + WebSocketDeflater::ContextTakeOverMode mode,
|
| + scoped_ptr<WebSocketDeflatePredictor> predictor)
|
| : stream_(stream.Pass()),
|
| deflater_(mode),
|
| inflater_(kChunkSize, kChunkSize),
|
| reading_state_(NOT_READING),
|
| writing_state_(NOT_WRITING),
|
| current_reading_opcode_(WebSocketFrameHeader::kOpCodeText),
|
| - current_writing_opcode_(WebSocketFrameHeader::kOpCodeText) {
|
| + current_writing_opcode_(WebSocketFrameHeader::kOpCodeText),
|
| + predictor_(predictor.Pass()) {
|
| DCHECK(stream_);
|
| deflater_.Initialize(kWindowBits);
|
| inflater_.Initialize(kWindowBits);
|
| @@ -100,31 +103,30 @@ void WebSocketDeflateStream::OnReadComplete(
|
|
|
| int WebSocketDeflateStream::Deflate(ScopedVector<WebSocketFrame>* frames) {
|
| ScopedVector<WebSocketFrame> frames_to_write;
|
| + // Store frames of the currently processed message if writing_state_ equals to
|
| + // WRITING_POSSIBLY_COMPRESSED_MESSAGE.
|
| + ScopedVector<WebSocketFrame> frames_of_message;
|
| for (size_t i = 0; i < frames->size(); ++i) {
|
| - scoped_ptr<WebSocketFrame> frame((*frames)[i]);
|
| - (*frames)[i] = NULL;
|
| - DCHECK(!frame->header.reserved1);
|
| - if (!WebSocketFrameHeader::IsKnownDataOpCode(frame->header.opcode)) {
|
| - frames_to_write.push_back(frame.release());
|
| + DCHECK(!(*frames)[i]->header.reserved1);
|
| + if (!WebSocketFrameHeader::IsKnownDataOpCode((*frames)[i]->header.opcode)) {
|
| + frames_to_write.push_back((*frames)[i]);
|
| + (*frames)[i] = NULL;
|
| continue;
|
| }
|
| + if (writing_state_ == NOT_WRITING)
|
| + OnMessageStart(*frames, i);
|
| +
|
| + scoped_ptr<WebSocketFrame> frame((*frames)[i]);
|
| + (*frames)[i] = NULL;
|
| + predictor_->RecordInputDataFrame(frame.get());
|
|
|
| - if (writing_state_ == NOT_WRITING) {
|
| - current_writing_opcode_ = frame->header.opcode;
|
| - DCHECK(current_writing_opcode_ == WebSocketFrameHeader::kOpCodeText ||
|
| - current_writing_opcode_ == WebSocketFrameHeader::kOpCodeBinary);
|
| - // TODO(yhirano): For now, we unconditionally compress data messages.
|
| - // Further optimization is needed.
|
| - // http://crbug.com/163882
|
| - writing_state_ = WRITING_COMPRESSED_MESSAGE;
|
| - }
|
| if (writing_state_ == WRITING_UNCOMPRESSED_MESSAGE) {
|
| if (frame->header.final)
|
| writing_state_ = NOT_WRITING;
|
| + predictor_->RecordWrittenDataFrame(frame.get());
|
| frames_to_write.push_back(frame.release());
|
| current_writing_opcode_ = WebSocketFrameHeader::kOpCodeContinuation;
|
| } else {
|
| - DCHECK_EQ(WRITING_COMPRESSED_MESSAGE, writing_state_);
|
| if (frame->data && !deflater_.AddBytes(frame->data->data(),
|
| frame->header.payload_length)) {
|
| DVLOG(1) << "WebSocket protocol error. "
|
| @@ -136,35 +138,134 @@ int WebSocketDeflateStream::Deflate(ScopedVector<WebSocketFrame>* frames) {
|
| << "deflater_.Finish() returns an error.";
|
| return ERR_WS_PROTOCOL_ERROR;
|
| }
|
| - if (deflater_.CurrentOutputSize() >= kChunkSize || frame->header.final) {
|
| - const WebSocketFrameHeader::OpCode opcode = current_writing_opcode_;
|
| - scoped_ptr<WebSocketFrame> compressed(new WebSocketFrame(opcode));
|
| - scoped_refptr<IOBufferWithSize> data =
|
| - deflater_.GetOutput(deflater_.CurrentOutputSize());
|
| - if (!data) {
|
| - DVLOG(1) << "WebSocket protocol error. "
|
| - << "deflater_.GetOutput() returns an error.";
|
| - return ERR_WS_PROTOCOL_ERROR;
|
| +
|
| + if (writing_state_ == WRITING_COMPRESSED_MESSAGE) {
|
| + if (deflater_.CurrentOutputSize() >= kChunkSize ||
|
| + frame->header.final) {
|
| + int result = AppendCompressedFrame(frame->header, &frames_to_write);
|
| + if (result != OK)
|
| + return result;
|
| + }
|
| + if (frame->header.final)
|
| + writing_state_ = NOT_WRITING;
|
| + } else {
|
| + DCHECK_EQ(WRITING_POSSIBLY_COMPRESSED_MESSAGE, writing_state_);
|
| + bool final = frame->header.final;
|
| + frames_of_message.push_back(frame.release());
|
| + if (final) {
|
| + int result = AppendPossiblyCompressedMessage(&frames_of_message,
|
| + &frames_to_write);
|
| + if (result != OK)
|
| + return result;
|
| + frames_of_message.clear();
|
| + writing_state_ = NOT_WRITING;
|
| }
|
| - compressed->header.CopyFrom(frame->header);
|
| - compressed->header.opcode = opcode;
|
| - compressed->header.final = frame->header.final;
|
| - compressed->header.reserved1 =
|
| - (opcode != WebSocketFrameHeader::kOpCodeContinuation);
|
| - compressed->data = data;
|
| - compressed->header.payload_length = data->size();
|
| -
|
| - current_writing_opcode_ = WebSocketFrameHeader::kOpCodeContinuation;
|
| - frames_to_write.push_back(compressed.release());
|
| }
|
| - if (frame->header.final)
|
| - writing_state_ = NOT_WRITING;
|
| }
|
| }
|
| + DCHECK_NE(WRITING_POSSIBLY_COMPRESSED_MESSAGE, writing_state_);
|
| frames->swap(frames_to_write);
|
| return OK;
|
| }
|
|
|
| +void WebSocketDeflateStream::OnMessageStart(
|
| + const ScopedVector<WebSocketFrame>& frames, size_t index) {
|
| + WebSocketFrame* frame = frames[index];
|
| + current_writing_opcode_ = frame->header.opcode;
|
| + DCHECK(current_writing_opcode_ == WebSocketFrameHeader::kOpCodeText ||
|
| + current_writing_opcode_ == WebSocketFrameHeader::kOpCodeBinary);
|
| + WebSocketDeflatePredictor::Result prediction =
|
| + predictor_->Predict(frames, index);
|
| +
|
| + switch (prediction) {
|
| + case WebSocketDeflatePredictor::DEFLATE:
|
| + writing_state_ = WRITING_COMPRESSED_MESSAGE;
|
| + return;
|
| + case WebSocketDeflatePredictor::DO_NOT_DEFLATE:
|
| + writing_state_ = WRITING_UNCOMPRESSED_MESSAGE;
|
| + return;
|
| + case WebSocketDeflatePredictor::TRY_DEFLATE:
|
| + writing_state_ = WRITING_POSSIBLY_COMPRESSED_MESSAGE;
|
| + return;
|
| + }
|
| + NOTREACHED();
|
| +}
|
| +
|
| +int WebSocketDeflateStream::AppendCompressedFrame(
|
| + const WebSocketFrameHeader& header,
|
| + ScopedVector<WebSocketFrame>* frames_to_write) {
|
| + const WebSocketFrameHeader::OpCode opcode = current_writing_opcode_;
|
| + scoped_refptr<IOBufferWithSize> data =
|
| + deflater_.GetOutput(deflater_.CurrentOutputSize());
|
| + if (!data) {
|
| + DVLOG(1) << "WebSocket protocol error. "
|
| + << "deflater_.GetOutput() returns an error.";
|
| + return ERR_WS_PROTOCOL_ERROR;
|
| + }
|
| + scoped_ptr<WebSocketFrame> compressed(new WebSocketFrame(opcode));
|
| + compressed->header.CopyFrom(header);
|
| + compressed->header.opcode = opcode;
|
| + compressed->header.final = header.final;
|
| + compressed->header.reserved1 =
|
| + (opcode != WebSocketFrameHeader::kOpCodeContinuation);
|
| + compressed->data = data;
|
| + compressed->header.payload_length = data->size();
|
| +
|
| + current_writing_opcode_ = WebSocketFrameHeader::kOpCodeContinuation;
|
| + predictor_->RecordWrittenDataFrame(compressed.get());
|
| + frames_to_write->push_back(compressed.release());
|
| + return OK;
|
| +}
|
| +
|
| +int WebSocketDeflateStream::AppendPossiblyCompressedMessage(
|
| + ScopedVector<WebSocketFrame>* frames,
|
| + ScopedVector<WebSocketFrame>* frames_to_write) {
|
| + DCHECK(!frames->empty());
|
| +
|
| + const WebSocketFrameHeader::OpCode opcode = current_writing_opcode_;
|
| + scoped_refptr<IOBufferWithSize> data =
|
| + deflater_.GetOutput(deflater_.CurrentOutputSize());
|
| + if (!data) {
|
| + DVLOG(1) << "WebSocket protocol error. "
|
| + << "deflater_.GetOutput() returns an error.";
|
| + return ERR_WS_PROTOCOL_ERROR;
|
| + }
|
| +
|
| + uint64 original_payload_length = 0;
|
| + for (size_t i = 0; i < frames->size(); ++i) {
|
| + WebSocketFrame* frame = (*frames)[i];
|
| + // Asserts checking that frames represent one whole data message.
|
| + DCHECK(WebSocketFrameHeader::IsKnownDataOpCode(frame->header.opcode));
|
| + DCHECK_EQ(i == 0,
|
| + WebSocketFrameHeader::kOpCodeContinuation !=
|
| + frame->header.opcode);
|
| + DCHECK_EQ(i == frames->size() - 1, frame->header.final);
|
| + original_payload_length += frame->header.payload_length;
|
| + }
|
| + if (original_payload_length <= static_cast<uint64>(data->size())) {
|
| + // Compression is not effective. Use the original frames.
|
| + for (size_t i = 0; i < frames->size(); ++i) {
|
| + WebSocketFrame* frame = (*frames)[i];
|
| + frames_to_write->push_back(frame);
|
| + predictor_->RecordWrittenDataFrame(frame);
|
| + (*frames)[i] = NULL;
|
| + }
|
| + frames->weak_clear();
|
| + return OK;
|
| + }
|
| + scoped_ptr<WebSocketFrame> compressed(new WebSocketFrame(opcode));
|
| + compressed->header.CopyFrom((*frames)[0]->header);
|
| + compressed->header.opcode = opcode;
|
| + compressed->header.final = true;
|
| + compressed->header.reserved1 = true;
|
| + compressed->data = data;
|
| + compressed->header.payload_length = data->size();
|
| +
|
| + predictor_->RecordWrittenDataFrame(compressed.get());
|
| + frames_to_write->push_back(compressed.release());
|
| + return OK;
|
| +}
|
| +
|
| int WebSocketDeflateStream::Inflate(ScopedVector<WebSocketFrame>* frames) {
|
| ScopedVector<WebSocketFrame> frames_to_output;
|
| ScopedVector<WebSocketFrame> frames_passed;
|
|
|