Chromium Code Reviews| Index: google_apis/gcm/base/socket_stream.cc |
| diff --git a/google_apis/gcm/base/socket_stream.cc b/google_apis/gcm/base/socket_stream.cc |
| new file mode 100644 |
| index 0000000000000000000000000000000000000000..0abe530be2cdb1c4fd5649b0e0e8900bdd2b895c |
| --- /dev/null |
| +++ b/google_apis/gcm/base/socket_stream.cc |
| @@ -0,0 +1,389 @@ |
| +// Copyright (c) 2013 The Chromium Authors. All rights reserved. |
| +// Use of this source code is governed by a BSD-style license that can be |
| +// found in the LICENSE file. |
| + |
| +#include "google_apis/gcm/base/socket_stream.h" |
| + |
| +#include "base/callback.h" |
| +#include "base/message_loop/message_loop.h" |
| +#include "net/base/io_buffer.h" |
| +#include "net/base/net_errors.h" |
| +#include "net/socket/stream_socket.h" |
| + |
| +namespace gcm { |
| + |
| +// TODO(zea): consider having dynamically sized buffers if this becomes too |
| +// expensive. |
| +const uint32 kDefaultBufferSize = 8*1024; |
| + |
| +SocketInputStream::SocketInputStream(base::TimeDelta read_timeout, |
| + net::StreamSocket* socket) |
| + : socket_(socket), |
| + io_buffer_(new net::IOBufferWithSize(kDefaultBufferSize)), |
| + drainable_io_buffer_(new net::DrainableIOBuffer(io_buffer_.get(), |
| + io_buffer_->size())), |
| + buffer_read_pos_(0), |
| + buffer_write_pos_(0), |
| + limit_(0), |
| + backup_bytes_(0), |
| + skipped_bytes_(0), |
| + last_error_(net::OK), |
| + state_(EMPTY), |
| + read_timeout_(read_timeout), |
| + weak_ptr_factory_(this) { |
| + DCHECK(socket->IsConnected()); |
| +} |
| + |
| +SocketInputStream::~SocketInputStream() { |
| +} |
| + |
| +bool SocketInputStream::Next(const void** data, int* size) { |
| + DCHECK_NE(state_, CLOSED); |
| + DCHECK_NE(state_, READING); |
| + |
| + if (backup_bytes_ > 0) { |
| + *size = backup_bytes_; |
| + *data = io_buffer_->data() + buffer_read_pos_ - backup_bytes_; |
| + backup_bytes_ = 0; |
| + return true; |
| + } |
| + |
| + if (limit_ != 0 && buffer_read_pos_ >= limit_) { |
| + DVLOG(1) << "Reached buffer limit, ending read."; |
| + return false; |
| + } |
| + |
| + DCHECK_EQ(state_, READY) |
| + << " Input stream must have pending data before reading."; |
| + DCHECK_NE(buffer_write_pos_, buffer_read_pos_); |
| + *data = io_buffer_->data() + buffer_read_pos_; |
| + *size = buffer_write_pos_ - buffer_read_pos_; |
| + buffer_read_pos_ = buffer_write_pos_; |
| + state_ = EMPTY; |
| + DVLOG(1) << "Consuming " << *size << " bytes in input buffer."; |
| + return true; |
| +} |
| + |
| +void SocketInputStream::BackUp(int count) { |
| + DCHECK(state_ == READY || state_ == EMPTY); |
| + DCHECK_GE(backup_bytes_, 0); |
| + DCHECK_LE(backup_bytes_, buffer_read_pos_); |
| + DCHECK_EQ(skipped_bytes_, 0); |
| + |
| + backup_bytes_ += count; |
| + state_ = READY; |
| + DVLOG(1) << "Backing up " << count << " bytes in input buffer. " |
| + << "Current position now at " << buffer_read_pos_ - backup_bytes_ |
| + << " of " << buffer_write_pos_; |
| +} |
| + |
| +bool SocketInputStream::Skip(int count) { |
| + DCHECK_EQ(state_, READY); |
| + DCHECK_GT(count, 0); |
| + DVLOG(1) << "Skipping " << count << " bytes in stream."; |
| + |
| + if (backup_bytes_ >= count) { |
| + // We have more data left over than we're trying to skip. Just chop it. |
| + backup_bytes_ -= count; |
| + return true; |
| + } |
| + |
| + count -= backup_bytes_; |
| + backup_bytes_ = 0; |
| + skipped_bytes_ += count; |
| + state_ = EMPTY; |
| + |
| + return true; |
| +} |
| + |
| +int64 SocketInputStream::ByteCount() const { |
| + DCHECK_NE(state_, CLOSED); |
| + DCHECK_NE(state_, READING); |
| + return buffer_write_pos_; |
| +} |
| + |
| +void SocketInputStream::Refresh(const base::Closure& callback) { |
| + DCHECK_NE(state_, CLOSED); |
| + DCHECK_NE(state_, READING); |
| + |
| + if (buffer_read_pos_ - backup_bytes_ == io_buffer_->size()) { |
| + LOG(ERROR) << "Out of buffer space, closing input stream."; |
| + CloseStream(net::ERR_UNEXPECTED, callback); |
| + return; |
| + } |
| + |
| + if (!socket_->IsConnected()) { |
| + LOG(ERROR) << "Socket was disconnected, closing input stream"; |
| + CloseStream(net::ERR_CONNECTION_CLOSED, callback); |
| + return; |
| + } |
| + |
| + state_ = READING; |
| + int read_limit = |
| + limit_ == 0 ? |
| + drainable_io_buffer_->BytesRemaining() : |
| + limit_ - buffer_read_pos_; |
| + read_limit = std::min(read_limit, drainable_io_buffer_->BytesRemaining()); |
| + |
| + if (read_limit <= buffer_write_pos_ - (buffer_read_pos_ - backup_bytes_)) { |
| + state_ = READY; |
| + if (read_timeout_timer_.IsRunning()) |
| + read_timeout_timer_.Reset(); |
| + DVLOG(1) << "Already have enough data for read limit."; |
| + if (!callback.is_null()) callback.Run(); |
| + return; |
| + } |
| + |
| + DVLOG(1) << "Refreshing input stream, limit of " << read_limit << " bytes."; |
| + DCHECK_GT(read_limit, 0); |
| + int result = socket_->Read( |
| + drainable_io_buffer_, |
| + read_limit, |
| + base::Bind(&SocketInputStream::RefreshCompletionCallback, |
| + weak_ptr_factory_.GetWeakPtr(), |
| + callback)); |
| + DVLOG(1) << "Read returned " << result; |
| + if (result != net::ERR_IO_PENDING) |
| + RefreshCompletionCallback(callback, result); |
| +} |
| + |
| +void SocketInputStream::Rebuild() { |
| + DCHECK_EQ(skipped_bytes_, 0); |
| + DVLOG(1) << "Resetting input stream, consumed " |
| + << buffer_read_pos_ - backup_bytes_ << " bytes."; |
| + DCHECK_NE(state_, READING); |
| + DCHECK_NE(state_, CLOSED); |
| + |
| + int last_read_pos = buffer_read_pos_ - backup_bytes_; |
| + char* unread_data_ptr = &(io_buffer_->data()[last_read_pos]); |
|
Ryan Sleevi
2013/09/17 19:43:30
Why not io_buffer_->data() + last_read_pos?
Nicolas Zea
2013/09/25 01:21:27
Done.
|
| + int unread_buffer_size = buffer_write_pos_ - last_read_pos; |
| + ResetInternal(); |
| + |
| + if (unread_buffer_size > 0) { |
| + buffer_write_pos_ = unread_buffer_size; |
| + drainable_io_buffer_->SetOffset(buffer_write_pos_); |
| + state_ = READY; |
| + |
| + if (last_read_pos != 0) { |
| + DVLOG(1) << "Have " << buffer_write_pos_ |
| + << " unread bytes remaining, shifting."; |
| + // Move any remaining unread data to the start of the buffer; |
| + std::memmove(io_buffer_->data(), unread_data_ptr, buffer_write_pos_); |
| + } else { |
| + DVLOG(1) << "Have " << buffer_write_pos_ << " unread bytes remaining."; |
| + } |
| + } |
| +} |
| + |
| +void SocketInputStream::GetNextMessage(int limit, |
| + const base::Closure& callback) { |
| + DCHECK_GE(limit, 0); |
| + DCHECK_LT(limit, io_buffer_->size()); |
| + DVLOG(1) << "Setting read limit to " << limit; |
| + DVLOG(1) << " Current pos: " << buffer_read_pos_; |
| + DVLOG(1) << " Current backup: " << backup_bytes_; |
| + DVLOG(1) << " Current size: " << buffer_write_pos_; |
| + limit_ = limit; |
| + |
| + // Set up the timeout timer. Note that if this does get triggered, the |
| + // IOBuffer used in the Read call will remain owned by the socket itself |
| + // until the read completes (which may never happen). A timeout must therefore |
| + // be a fatal error for the input stream. |
| + read_timeout_timer_.Start( |
| + FROM_HERE, |
| + read_timeout_, |
| + base::Bind(&SocketInputStream::RefreshCompletionCallback, |
| + weak_ptr_factory_.GetWeakPtr(), |
| + callback, |
| + net::ERR_TIMED_OUT)); |
| + Refresh(callback); |
| +} |
| + |
| +int SocketInputStream::last_error() const { |
| + return last_error_; |
| +} |
| + |
| +SocketInputStream::State SocketInputStream::state() const { |
| + return state_; |
| +} |
| + |
| +void SocketInputStream::RefreshCompletionCallback( |
| + const base::Closure& callback, int result) { |
| + DCHECK_EQ(state_, READING); |
| + // TODO(zea): should the timeout be per read or per message? For now it's |
| + // per read, and reset on every read completion (successful or not). |
| + if (read_timeout_timer_.IsRunning()) |
| + read_timeout_timer_.Reset(); |
| + |
| + if (state_ == CLOSED) { |
| + // An error occured before the completion callback could complete. Ignore |
| + // the result. |
| + return; |
| + } |
| + |
| + if (result < net::OK) { |
| + DVLOG(1) << "Failed to refresh socket: " << result; |
| + CloseStream(result, callback); |
| + return; |
| + } |
| + DCHECK_GT(result, 0); |
| + |
| + state_ = READY; |
| + buffer_write_pos_ += result; |
| + int bytes_to_skip = std::min(skipped_bytes_, result); |
| + buffer_read_pos_ += bytes_to_skip; |
| + skipped_bytes_ = std::max(skipped_bytes_ - result, 0); |
| + drainable_io_buffer_->SetOffset(buffer_write_pos_); |
| + if (buffer_read_pos_ - backup_bytes_ == buffer_write_pos_) |
| + state_ = EMPTY; |
| + |
| + DVLOG(1) << "Refresh complete with " << result << " new bytes. " |
| + << "Current position " << buffer_read_pos_ - backup_bytes_ |
| + << " of " << buffer_write_pos_ << "."; |
| + |
| + // If a limit is set, kick off another refresh to get the rest of the message |
| + // data. |
| + if (limit_ != 0 && buffer_write_pos_ < limit_) { |
| + read_timeout_timer_.Start( |
| + FROM_HERE, |
| + read_timeout_, |
| + base::Bind(&SocketInputStream::RefreshCompletionCallback, |
| + weak_ptr_factory_.GetWeakPtr(), |
| + callback, |
| + net::ERR_TIMED_OUT)); |
| + Refresh(callback); |
| + return; |
| + } |
| + |
| + if (!callback.is_null()) callback.Run(); |
| +} |
| + |
| +void SocketInputStream::ResetInternal() { |
| + weak_ptr_factory_.InvalidateWeakPtrs(); // Invalidate any callbacks. |
| + buffer_write_pos_ = 0; |
| + buffer_read_pos_ = 0; |
| + backup_bytes_ = 0; |
| + skipped_bytes_ = 0; |
| + state_ = EMPTY; |
| + limit_ = 0; |
| + |
| + last_error_ = net::OK; |
| + |
| + // Reset the offset by creating a new one. Note that DrainableIOBuffers don't |
| + // actually allocate their own buffer memory like normal IOBuffers. This will |
| + // just reset the pointers to point to the beginning of io_buffer_'s data. |
| + drainable_io_buffer_ = new net::DrainableIOBuffer(io_buffer_.get(), |
| + io_buffer_->size()); |
| +} |
| + |
| +void SocketInputStream::CloseStream(int result, const base::Closure& callback) { |
| + ResetInternal(); |
| + state_ = CLOSED; |
| + last_error_ = result; |
| + if (!callback.is_null()) callback.Run(); |
| +} |
| + |
| +SocketOutputStream::SocketOutputStream(net::StreamSocket* socket) |
| + : socket_(socket), |
| + io_buffer_(new net::IOBufferWithSize(kDefaultBufferSize)), |
| + drainable_io_buffer_(new net::DrainableIOBuffer(io_buffer_.get(), |
| + io_buffer_->size())), |
| + buffer_used_(0), |
| + state_(EMPTY), |
| + weak_ptr_factory_(this) { |
| + DCHECK(socket->IsConnected()); |
| +} |
| + |
| +SocketOutputStream::~SocketOutputStream() { |
| +} |
| + |
| +bool SocketOutputStream::Next(void** data, int* size) { |
| + DCHECK_NE(state_, CLOSED); |
| + DCHECK_NE(state_, FLUSHING); |
| + if (buffer_used_ == io_buffer_->size()) |
| + return false; |
| + |
| + *data = io_buffer_->data() + buffer_used_; |
| + *size = io_buffer_->size() - buffer_used_; |
| + buffer_used_ = io_buffer_->size(); |
| + state_ = READY; |
| + return true; |
| +} |
| + |
| +void SocketOutputStream::BackUp(int count) { |
| + DCHECK_GE(count, 0); |
| + if (count > buffer_used_) |
| + buffer_used_ = 0; |
| + buffer_used_ -= count; |
| + DVLOG(1) << "Backing up " << count << " bytes in output buffer. " |
| + << buffer_used_ << " bytes used."; |
| +} |
| + |
| +int64 SocketOutputStream::ByteCount() const { |
| + DCHECK_NE(state_, CLOSED); |
| + DCHECK_NE(state_, FLUSHING); |
| + return buffer_used_; |
| +} |
| + |
| +void SocketOutputStream::Flush(const base::Closure& callback) { |
| + DCHECK_EQ(state_, READY); |
| + state_ = FLUSHING; |
| + |
| + if (!socket_->IsConnected()) { |
| + LOG(ERROR) << "Socket was disconnected, closing output stream"; |
| + last_error_ = net::ERR_CONNECTION_CLOSED; |
| + state_ = CLOSED; |
| + if (!callback.is_null()) callback.Run(); |
| + return; |
| + } |
| + |
| + DVLOG(1) << "Flushing " << buffer_used_ << " bytes into socket."; |
| + int result = socket_->Write( |
| + drainable_io_buffer_, |
| + buffer_used_, |
| + base::Bind(&SocketOutputStream::FlushCompletionCallback, |
| + weak_ptr_factory_.GetWeakPtr(), |
| + callback)); |
| + DVLOG(1) << "Write returned " << result; |
| + if (result != net::ERR_IO_PENDING) |
| + FlushCompletionCallback(callback, result); |
| +} |
| + |
| +SocketOutputStream::State SocketOutputStream::state() const{ |
| + return state_; |
| +} |
| + |
| +int SocketOutputStream::last_error() const { |
| + return last_error_; |
| +} |
| + |
| +void SocketOutputStream::FlushCompletionCallback( |
| + const base::Closure& callback, int result) { |
| + DCHECK_EQ(state_, FLUSHING); |
| + if (result < net::OK) { |
| + LOG(ERROR) << "Failed to flush socket."; |
| + last_error_ = result; |
| + state_ = CLOSED; |
| + if (!callback.is_null()) callback.Run(); |
| + return; |
| + } |
| + |
| + state_ = READY; |
| + if (drainable_io_buffer_->BytesConsumed() + result < buffer_used_) { |
| + DVLOG(1) << "Partial flush complete. Retrying."; |
| + // Only a partial write was completed. Flush again to finish the write. |
| + drainable_io_buffer_->SetOffset( |
| + drainable_io_buffer_->BytesConsumed() + result); |
| + Flush(callback); |
| + return; |
| + } |
| + |
| + DVLOG(1) << "Socket flush complete."; |
| + drainable_io_buffer_ = new net::DrainableIOBuffer(io_buffer_.get(), |
| + io_buffer_->size()); |
| + state_ = EMPTY; |
| + buffer_used_ = 0; |
| + if (!callback.is_null()) callback.Run(); |
| +} |
| + |
| +} // namespace gcm |