Chromium Code Reviews| Index: net/websockets/websocket_deflate_stream.cc |
| diff --git a/net/websockets/websocket_deflate_stream.cc b/net/websockets/websocket_deflate_stream.cc |
| index 9e8a95a7339e3caf2bb5897f091b99ac3dfd33ab..79f0bf5b159016082516e29608b1bd958c1843d8 100644 |
| --- a/net/websockets/websocket_deflate_stream.cc |
| +++ b/net/websockets/websocket_deflate_stream.cc |
| @@ -15,6 +15,7 @@ |
| #include "net/base/completion_callback.h" |
| #include "net/base/io_buffer.h" |
| #include "net/base/net_errors.h" |
| +#include "net/websockets/websocket_deflate_predictor.h" |
| #include "net/websockets/websocket_deflater.h" |
| #include "net/websockets/websocket_errors.h" |
| #include "net/websockets/websocket_frame.h" |
| @@ -34,14 +35,16 @@ const size_t kChunkSize = 4 * 1024; |
| WebSocketDeflateStream::WebSocketDeflateStream( |
| scoped_ptr<WebSocketStream> stream, |
| - WebSocketDeflater::ContextTakeOverMode mode) |
| + WebSocketDeflater::ContextTakeOverMode mode, |
| + scoped_ptr<WebSocketDeflatePredictor> predictor) |
| : stream_(stream.Pass()), |
| deflater_(mode), |
| inflater_(kChunkSize, kChunkSize), |
| reading_state_(NOT_READING), |
| writing_state_(NOT_WRITING), |
| current_reading_opcode_(WebSocketFrameHeader::kOpCodeText), |
| - current_writing_opcode_(WebSocketFrameHeader::kOpCodeText) { |
| + current_writing_opcode_(WebSocketFrameHeader::kOpCodeText), |
| + predictor_(predictor.Pass()) { |
| DCHECK(stream_); |
| deflater_.Initialize(kWindowBits); |
| inflater_.Initialize(kWindowBits); |
| @@ -100,31 +103,30 @@ void WebSocketDeflateStream::OnReadComplete( |
| int WebSocketDeflateStream::Deflate(ScopedVector<WebSocketFrame>* frames) { |
| ScopedVector<WebSocketFrame> frames_to_write; |
| + // Store frames of the currently processed message if writing_state_ equals to |
| + // WRITING_POSSIBLY_COMPRESSED_MESSAGE. |
| + ScopedVector<WebSocketFrame> frames_of_message; |
| for (size_t i = 0; i < frames->size(); ++i) { |
| - scoped_ptr<WebSocketFrame> frame((*frames)[i]); |
| - (*frames)[i] = NULL; |
| - DCHECK(!frame->header.reserved1); |
| - if (!WebSocketFrameHeader::IsKnownDataOpCode(frame->header.opcode)) { |
| - frames_to_write.push_back(frame.release()); |
| + DCHECK(!(*frames)[i]->header.reserved1); |
| + if (!WebSocketFrameHeader::IsKnownDataOpCode((*frames)[i]->header.opcode)) { |
| + frames_to_write.push_back((*frames)[i]); |
| + (*frames)[i] = NULL; |
| continue; |
| } |
| + if (writing_state_ == NOT_WRITING) |
| + OnMessageStart(*frames, i); |
| + |
| + scoped_ptr<WebSocketFrame> frame((*frames)[i]); |
| + (*frames)[i] = NULL; |
| + predictor_->RecordProcessedDataFrame(frame.get()); |
| - if (writing_state_ == NOT_WRITING) { |
| - current_writing_opcode_ = frame->header.opcode; |
| - DCHECK(current_writing_opcode_ == WebSocketFrameHeader::kOpCodeText || |
| - current_writing_opcode_ == WebSocketFrameHeader::kOpCodeBinary); |
| - // TODO(yhirano): For now, we unconditionally compress data messages. |
| - // Further optimization is needed. |
| - // http://crbug.com/163882 |
| - writing_state_ = WRITING_COMPRESSED_MESSAGE; |
| - } |
| if (writing_state_ == WRITING_UNCOMPRESSED_MESSAGE) { |
| if (frame->header.final) |
| writing_state_ = NOT_WRITING; |
| + predictor_->RecordSentDataFrame(frame.get()); |
| frames_to_write.push_back(frame.release()); |
| current_writing_opcode_ = WebSocketFrameHeader::kOpCodeContinuation; |
| } else { |
| - DCHECK_EQ(WRITING_COMPRESSED_MESSAGE, writing_state_); |
| if (frame->data && !deflater_.AddBytes(frame->data->data(), |
| frame->header.payload_length)) { |
| DVLOG(1) << "WebSocket protocol error. " |
| @@ -136,35 +138,142 @@ int WebSocketDeflateStream::Deflate(ScopedVector<WebSocketFrame>* frames) { |
| << "deflater_.Finish() returns an error."; |
| return ERR_WS_PROTOCOL_ERROR; |
| } |
| - if (deflater_.CurrentOutputSize() >= kChunkSize || frame->header.final) { |
| - const WebSocketFrameHeader::OpCode opcode = current_writing_opcode_; |
| - scoped_ptr<WebSocketFrame> compressed(new WebSocketFrame(opcode)); |
| - scoped_refptr<IOBufferWithSize> data = |
| - deflater_.GetOutput(deflater_.CurrentOutputSize()); |
| - if (!data) { |
| - DVLOG(1) << "WebSocket protocol error. " |
| - << "deflater_.GetOutput() returns an error."; |
| - return ERR_WS_PROTOCOL_ERROR; |
| + |
| + if (writing_state_ == WRITING_COMPRESSED_MESSAGE) { |
| + if (deflater_.CurrentOutputSize() >= kChunkSize || |
| + frame->header.final) { |
| + int result = AppendCompressedFrame(frame->header, &frames_to_write); |
| + if (result != OK) |
| + return result; |
| + } |
| + if (frame->header.final) |
| + writing_state_ = NOT_WRITING; |
| + } else { |
| + DCHECK_EQ(WRITING_POSSIBLY_COMPRESSED_MESSAGE, writing_state_); |
| + bool final = frame->header.final; |
| + frames_of_message.push_back(frame.release()); |
| + if (final) { |
| + int result = AppendPossiblyCompressedMessage(&frames_of_message, |
| + &frames_to_write); |
| + if (result != OK) |
| + return result; |
| + frames_of_message.clear(); |
| + writing_state_ = NOT_WRITING; |
| } |
| - compressed->header.CopyFrom(frame->header); |
| - compressed->header.opcode = opcode; |
| - compressed->header.final = frame->header.final; |
| - compressed->header.reserved1 = |
| - (opcode != WebSocketFrameHeader::kOpCodeContinuation); |
| - compressed->data = data; |
| - compressed->header.payload_length = data->size(); |
| - |
| - current_writing_opcode_ = WebSocketFrameHeader::kOpCodeContinuation; |
| - frames_to_write.push_back(compressed.release()); |
| } |
| - if (frame->header.final) |
| - writing_state_ = NOT_WRITING; |
| } |
|
Adam Rice
2013/10/29 04:11:42
It appears to me that if writing_state_ is WRITING
yhirano
2013/10/29 06:11:06
Yes, definitely.
So the comment of WebSocketDeflat
|
| } |
| + DCHECK_NE(WRITING_POSSIBLY_COMPRESSED_MESSAGE, writing_state_); |
| frames->swap(frames_to_write); |
| return OK; |
| } |
| +void WebSocketDeflateStream::OnMessageStart( |
| + const ScopedVector<WebSocketFrame>& frames, size_t index) { |
| + WebSocketFrame* frame = frames[index]; |
| + current_writing_opcode_ = frame->header.opcode; |
| + DCHECK(current_writing_opcode_ == WebSocketFrameHeader::kOpCodeText || |
| + current_writing_opcode_ == WebSocketFrameHeader::kOpCodeBinary); |
| + WebSocketDeflatePredictor::Result prediction = |
| + predictor_->Predict(frames, index); |
| + |
| + switch (prediction) { |
| + case WebSocketDeflatePredictor::DEFLATE: |
| + writing_state_ = WRITING_COMPRESSED_MESSAGE; |
| + return; |
| + case WebSocketDeflatePredictor::DO_NOT_DEFLATE: |
| + writing_state_ = WRITING_UNCOMPRESSED_MESSAGE; |
| + return; |
| + case WebSocketDeflatePredictor::TRY_DEFLATE: |
| + writing_state_ = WRITING_POSSIBLY_COMPRESSED_MESSAGE; |
| + return; |
| + } |
| + NOTREACHED(); |
| +} |
| + |
| +int WebSocketDeflateStream::AppendCompressedFrame( |
| + const WebSocketFrameHeader& header, |
| + ScopedVector<WebSocketFrame>* frames_to_write) { |
| + const WebSocketFrameHeader::OpCode opcode = current_writing_opcode_; |
| + scoped_refptr<IOBufferWithSize> data = |
| + deflater_.GetOutput(deflater_.CurrentOutputSize()); |
| + if (!data) { |
| + DVLOG(1) << "WebSocket protocol error. " |
| + << "deflater_.GetOutput() returns an error."; |
| + return ERR_WS_PROTOCOL_ERROR; |
| + } |
| + scoped_ptr<WebSocketFrame> compressed(new WebSocketFrame(opcode)); |
| + compressed->header.CopyFrom(header); |
| + compressed->header.opcode = opcode; |
|
Adam Rice
2013/10/29 04:11:42
"opcode" has been set by the constructor already!
yhirano
2013/10/29 06:11:06
and overwritten by CopyFrom function.
Adam Rice
2013/10/30 00:28:07
Sorry!
|
| + compressed->header.final = header.final; |
| + compressed->header.reserved1 = |
| + (opcode != WebSocketFrameHeader::kOpCodeContinuation); |
| + compressed->data = data; |
| + compressed->header.payload_length = data->size(); |
| + |
| + current_writing_opcode_ = WebSocketFrameHeader::kOpCodeContinuation; |
| + predictor_->RecordSentDataFrame(compressed.get()); |
| + frames_to_write->push_back(compressed.release()); |
| + return OK; |
| +} |
| + |
| +int WebSocketDeflateStream::AppendPossiblyCompressedMessage( |
| + ScopedVector<WebSocketFrame>* frames, |
| + ScopedVector<WebSocketFrame>* frames_to_write) { |
| + DCHECK(!frames->empty()); |
| + |
| + const WebSocketFrameHeader::OpCode opcode = current_writing_opcode_; |
| + scoped_refptr<IOBufferWithSize> data = |
| + deflater_.GetOutput(deflater_.CurrentOutputSize()); |
| + if (!data) { |
| + DVLOG(1) << "WebSocket protocol error. " |
| + << "deflater_.GetOutput() returns an error."; |
| + return ERR_WS_PROTOCOL_ERROR; |
| + } |
| + |
| + uint64 original_payload_length = 0; |
| + for (size_t i = 0; i < frames->size(); ++i) { |
| + WebSocketFrame* frame = (*frames)[i]; |
| + // Many asserts checking that frames represent one data message. |
|
Adam Rice
2013/10/29 04:11:42
"one whole data message"
yhirano
2013/10/29 06:11:06
Done.
|
| + DCHECK(WebSocketFrameHeader::IsKnownDataOpCode(frame->header.opcode)); |
| + if (i == 0) { |
| + DCHECK_NE(WebSocketFrameHeader::kOpCodeContinuation, |
|
Adam Rice
2013/10/29 04:11:42
It's a bit untidy to have an if() statement that d
yhirano
2013/10/29 06:11:06
Done.
|
| + frame->header.opcode); |
| + } else { |
| + DCHECK_EQ(WebSocketFrameHeader::kOpCodeContinuation, |
| + frame->header.opcode); |
| + } |
| + if (i == frames->size() - 1) |
| + DCHECK(frame->header.final); |
| + else |
| + DCHECK(!frame->header.final); |
| + |
| + original_payload_length += frame->header.payload_length; |
| + } |
| + if (original_payload_length <= static_cast<uint64>(data->size())) { |
| + // Compression is not effective. Use the original frames. |
| + for (size_t i = 0; i < frames->size(); ++i) { |
| + WebSocketFrame* frame = (*frames)[i]; |
| + frames_to_write->push_back(frame); |
| + predictor_->RecordSentDataFrame(frame); |
| + (*frames)[i] = NULL; |
| + } |
| + frames->weak_clear(); |
| + return OK; |
| + } |
| + scoped_ptr<WebSocketFrame> compressed(new WebSocketFrame(opcode)); |
| + compressed->header.CopyFrom((*frames)[0]->header); |
| + compressed->header.opcode = opcode; |
| + compressed->header.final = true; |
| + compressed->header.reserved1 = true; |
| + compressed->data = data; |
| + compressed->header.payload_length = data->size(); |
| + |
| + predictor_->RecordSentDataFrame(compressed.get()); |
| + frames_to_write->push_back(compressed.release()); |
| + return OK; |
| +} |
| + |
| int WebSocketDeflateStream::Inflate(ScopedVector<WebSocketFrame>* frames) { |
| ScopedVector<WebSocketFrame> frames_to_output; |
| ScopedVector<WebSocketFrame> frames_passed; |