Chromium Code Reviews| Index: google_apis/gcm/base/socket_stream.cc |
| diff --git a/google_apis/gcm/base/socket_stream.cc b/google_apis/gcm/base/socket_stream.cc |
| new file mode 100644 |
| index 0000000000000000000000000000000000000000..cf80d806f3747fd97645820e4bcd81389b55c43e |
| --- /dev/null |
| +++ b/google_apis/gcm/base/socket_stream.cc |
| @@ -0,0 +1,335 @@ |
| +// Copyright (c) 2013 The Chromium Authors. All rights reserved. |
| +// Use of this source code is governed by a BSD-style license that can be |
| +// found in the LICENSE file. |
| + |
| +#include "google_apis/gcm/base/socket_stream.h" |
| + |
| +#include "base/callback.h" |
| +#include "net/base/io_buffer.h" |
| +#include "net/base/net_errors.h" |
| +#include "net/socket/stream_socket.h" |
| + |
| +namespace gcm { |
| + |
| +// TODO(zea): consider having dynamically sized buffers if this becomes too |
|
akalin
2013/10/04 18:37:04
dynamically sized -> dynamically-sized
Nicolas Zea
2013/10/04 20:55:28
Done.
|
| +// expensive. |
| +const uint32 kDefaultBufferSize = 8*1024; |
|
akalin
2013/10/04 18:37:04
put this in anon namespace?
Nicolas Zea
2013/10/04 20:55:28
Done.
|
| + |
| +SocketInputStream::SocketInputStream(net::StreamSocket* socket) |
| + : socket_(socket), |
| + io_buffer_(new net::IOBufferWithSize(kDefaultBufferSize)), |
| + drainable_io_buffer_(new net::DrainableIOBuffer(io_buffer_.get(), |
| + io_buffer_->size())), |
| + buffer_read_pos_(0), |
| + buffer_write_pos_(0), |
| + backup_bytes_(0), |
| + skipped_bytes_(0), |
| + last_error_(net::OK), |
| + state_(EMPTY), |
| + weak_ptr_factory_(this) { |
| + DCHECK(socket->IsConnected()); |
| +} |
| + |
| +SocketInputStream::~SocketInputStream() { |
| +} |
| + |
| +bool SocketInputStream::Next(const void** data, int* size) { |
| + DCHECK_NE(state_, CLOSED); |
| + DCHECK_NE(state_, READING); |
| + |
| + if (state_ == EMPTY) { |
| + DVLOG(1) << "No unread data remaining, ending read."; |
| + return false; |
| + } |
| + |
| + if (backup_bytes_ > 0) { |
|
akalin
2013/10/04 18:37:04
DCHECK backup_bytes_ isn't too big?
Nicolas Zea
2013/10/04 20:55:28
n/a now
|
| + *size = backup_bytes_; |
|
akalin
2013/10/04 18:37:04
put *size assignment after *data assignment
Nicolas Zea
2013/10/04 20:55:28
n/a now
|
| + *data = io_buffer_->data() + buffer_read_pos_ - backup_bytes_; |
| + backup_bytes_ = 0; |
| + DCHECK_GT(*size, 0); |
|
akalin
2013/10/04 18:37:04
i'm all for copious DCHECKs but this seems pretty
Nicolas Zea
2013/10/04 20:55:28
n/a now
|
| + DVLOG(1) << "Consuming " << *size << " bytes in input buffer."; |
| + if (backup_bytes_ == 0 && buffer_read_pos_ == buffer_write_pos_) |
| + state_ = EMPTY; |
| + return true; |
| + } |
| + |
| + DCHECK_EQ(state_, READY) |
| + << " Input stream must have pending data before reading."; |
| + DCHECK_NE(buffer_write_pos_, buffer_read_pos_); |
| + *data = io_buffer_->data() + buffer_read_pos_; |
| + *size = buffer_write_pos_ - buffer_read_pos_; |
| + buffer_read_pos_ = buffer_write_pos_; |
| + state_ = EMPTY; |
| + DVLOG(1) << "Consuming " << *size << " bytes in input buffer."; |
| + return true; |
| +} |
| + |
| +void SocketInputStream::BackUp(int count) { |
| + DCHECK(state_ == READY || state_ == EMPTY); |
| + DCHECK_GE(count, 0); |
| + DCHECK_GE(backup_bytes_, 0); |
| + DCHECK_LE(backup_bytes_, buffer_read_pos_); |
| + DCHECK_EQ(skipped_bytes_, 0); |
| + |
| + backup_bytes_ += count; |
| + state_ = READY; |
| + DVLOG(1) << "Backing up " << count << " bytes in input buffer. " |
| + << "Current position now at " << buffer_read_pos_ - backup_bytes_ |
| + << " of " << buffer_write_pos_; |
| +} |
| + |
| +bool SocketInputStream::Skip(int count) { |
| + DCHECK_EQ(state_, READY); |
| + DCHECK_GT(count, 0); |
| + DVLOG(1) << "Skipping " << count << " bytes in stream."; |
| + |
| + if (backup_bytes_ >= count) { |
| + // We have more data left over than we're trying to skip. Just chop it. |
| + backup_bytes_ -= count; |
| + return true; |
| + } |
| + |
| + count -= backup_bytes_; |
| + backup_bytes_ = 0; |
| + skipped_bytes_ += count; |
| + state_ = EMPTY; |
| + |
| + return true; |
| +} |
| + |
| +int64 SocketInputStream::ByteCount() const { |
| + DCHECK_NE(state_, CLOSED); |
| + DCHECK_NE(state_, READING); |
| + return buffer_write_pos_ - buffer_read_pos_ + backup_bytes_; |
| +} |
| + |
| +void SocketInputStream::Refresh(const base::Closure& callback, |
| + int byte_limit) { |
| + DCHECK_NE(state_, CLOSED); |
| + DCHECK_NE(state_, READING); |
| + DCHECK_GT(byte_limit, 0); |
| + DCHECK_LE(byte_limit, drainable_io_buffer_->BytesRemaining()); |
| + |
| + if (buffer_write_pos_ + byte_limit > io_buffer_->size()) { |
| + LOG(ERROR) << "Out of buffer space, closing input stream."; |
| + CloseStream(net::ERR_UNEXPECTED, callback); |
| + return; |
| + } |
| + |
| + if (!socket_->IsConnected()) { |
| + LOG(ERROR) << "Socket was disconnected, closing input stream"; |
| + CloseStream(net::ERR_CONNECTION_CLOSED, callback); |
| + return; |
| + } |
| + |
| + state_ = READING; |
| + |
| + DVLOG(1) << "Refreshing input stream, limit of " << byte_limit << " bytes."; |
| + int result = socket_->Read( |
| + drainable_io_buffer_, |
| + byte_limit, |
| + base::Bind(&SocketInputStream::RefreshCompletionCallback, |
| + weak_ptr_factory_.GetWeakPtr(), |
| + callback)); |
| + DVLOG(1) << "Read returned " << result; |
| + if (result != net::ERR_IO_PENDING) |
| + RefreshCompletionCallback(callback, result); |
| +} |
| + |
| +void SocketInputStream::RebuildBuffer() { |
| + DCHECK_EQ(skipped_bytes_, 0); |
| + DVLOG(1) << "Resetting input stream, consumed " |
| + << buffer_read_pos_ - backup_bytes_ << " bytes."; |
| + DCHECK_NE(state_, READING); |
| + DCHECK_NE(state_, CLOSED); |
| + |
| + int last_read_pos = buffer_read_pos_ - backup_bytes_; |
| + char* unread_data_ptr = io_buffer_->data() + last_read_pos; |
| + int unread_buffer_size = buffer_write_pos_ - last_read_pos; |
| + ResetInternal(); |
| + |
| + if (unread_buffer_size > 0) { |
| + buffer_write_pos_ = unread_buffer_size; |
| + drainable_io_buffer_->SetOffset(buffer_write_pos_); |
| + state_ = READY; |
| + |
| + if (last_read_pos != 0) { |
| + DVLOG(1) << "Have " << buffer_write_pos_ |
| + << " unread bytes remaining, shifting."; |
| + // Move any remaining unread data to the start of the buffer; |
| + std::memmove(io_buffer_->data(), unread_data_ptr, buffer_write_pos_); |
| + } else { |
| + DVLOG(1) << "Have " << buffer_write_pos_ << " unread bytes remaining."; |
| + } |
| + } |
| +} |
| + |
| +int SocketInputStream::last_error() const { |
| + return last_error_; |
| +} |
| + |
| +SocketInputStream::State SocketInputStream::state() const { |
| + return state_; |
| +} |
| + |
| +void SocketInputStream::RefreshCompletionCallback( |
| + const base::Closure& callback, int result) { |
| + DCHECK_EQ(state_, READING); |
| + if (state_ == CLOSED) { |
| + // An error occured before the completion callback could complete. Ignore |
| + // the result. |
| + return; |
| + } |
| + |
| + if (result < net::OK) { |
| + DVLOG(1) << "Failed to refresh socket: " << result; |
| + CloseStream(result, callback); |
| + return; |
| + } |
| + DCHECK_GT(result, 0); |
| + |
| + state_ = READY; |
| + buffer_write_pos_ += result; |
| + int bytes_to_skip = std::min(skipped_bytes_, result); |
| + buffer_read_pos_ += bytes_to_skip; |
| + skipped_bytes_ = std::max(skipped_bytes_ - result, 0); |
| + drainable_io_buffer_->SetOffset(buffer_write_pos_); |
| + if (buffer_read_pos_ - backup_bytes_ == buffer_write_pos_) |
| + state_ = EMPTY; |
| + |
| + DVLOG(1) << "Refresh complete with " << result << " new bytes. " |
| + << "Current position " << buffer_read_pos_ - backup_bytes_ |
| + << " of " << buffer_write_pos_ << "."; |
| + |
| + if (!callback.is_null()) callback.Run(); |
|
akalin
2013/10/04 18:37:04
newline before callback.Run()
Nicolas Zea
2013/10/04 20:55:28
Done.
|
| +} |
| + |
| +void SocketInputStream::ResetInternal() { |
| + weak_ptr_factory_.InvalidateWeakPtrs(); // Invalidate any callbacks. |
| + buffer_write_pos_ = 0; |
| + buffer_read_pos_ = 0; |
| + backup_bytes_ = 0; |
| + skipped_bytes_ = 0; |
| + state_ = EMPTY; |
| + |
| + last_error_ = net::OK; |
| + |
| + // Reset the offset by creating a new one. Note that DrainableIOBuffers don't |
| + // actually allocate their own buffer memory like normal IOBuffers. This will |
| + // just reset the pointers to point to the beginning of io_buffer_'s data. |
| + drainable_io_buffer_ = new net::DrainableIOBuffer(io_buffer_.get(), |
| + io_buffer_->size()); |
| +} |
| + |
| +void SocketInputStream::CloseStream(int error, const base::Closure& callback) { |
| + ResetInternal(); |
| + state_ = CLOSED; |
| + last_error_ = error; |
| + LOG(ERROR) << "Closing stream with result " << error; |
| + if (!callback.is_null()) callback.Run(); |
| +} |
| + |
| +SocketOutputStream::SocketOutputStream(net::StreamSocket* socket) |
|
akalin
2013/10/04 18:37:04
(blanket comment) apply above comments on SocketIn
Nicolas Zea
2013/10/04 20:55:28
Done.
|
| + : socket_(socket), |
| + io_buffer_(new net::IOBufferWithSize(kDefaultBufferSize)), |
| + drainable_io_buffer_(new net::DrainableIOBuffer(io_buffer_.get(), |
| + io_buffer_->size())), |
| + buffer_used_(0), |
| + state_(EMPTY), |
| + weak_ptr_factory_(this) { |
| + DCHECK(socket->IsConnected()); |
| +} |
| + |
| +SocketOutputStream::~SocketOutputStream() { |
| +} |
| + |
| +bool SocketOutputStream::Next(void** data, int* size) { |
| + DCHECK_NE(state_, CLOSED); |
| + DCHECK_NE(state_, FLUSHING); |
| + if (buffer_used_ == io_buffer_->size()) |
| + return false; |
| + |
| + *data = io_buffer_->data() + buffer_used_; |
| + *size = io_buffer_->size() - buffer_used_; |
| + buffer_used_ = io_buffer_->size(); |
| + state_ = READY; |
| + return true; |
| +} |
| + |
| +void SocketOutputStream::BackUp(int count) { |
| + DCHECK_GE(count, 0); |
| + if (count > buffer_used_) |
| + buffer_used_ = 0; |
| + buffer_used_ -= count; |
| + DVLOG(1) << "Backing up " << count << " bytes in output buffer. " |
| + << buffer_used_ << " bytes used."; |
| +} |
| + |
| +int64 SocketOutputStream::ByteCount() const { |
| + DCHECK_NE(state_, CLOSED); |
| + DCHECK_NE(state_, FLUSHING); |
| + return buffer_used_; |
| +} |
| + |
| +void SocketOutputStream::Flush(const base::Closure& callback) { |
| + DCHECK_EQ(state_, READY); |
| + state_ = FLUSHING; |
| + |
| + if (!socket_->IsConnected()) { |
| + LOG(ERROR) << "Socket was disconnected, closing output stream"; |
| + last_error_ = net::ERR_CONNECTION_CLOSED; |
| + state_ = CLOSED; |
| + if (!callback.is_null()) callback.Run(); |
| + return; |
| + } |
| + |
| + DVLOG(1) << "Flushing " << buffer_used_ << " bytes into socket."; |
| + int result = socket_->Write( |
| + drainable_io_buffer_, |
| + buffer_used_, |
| + base::Bind(&SocketOutputStream::FlushCompletionCallback, |
| + weak_ptr_factory_.GetWeakPtr(), |
| + callback)); |
| + DVLOG(1) << "Write returned " << result; |
| + if (result != net::ERR_IO_PENDING) |
| + FlushCompletionCallback(callback, result); |
| +} |
| + |
| +SocketOutputStream::State SocketOutputStream::state() const{ |
| + return state_; |
| +} |
| + |
| +int SocketOutputStream::last_error() const { |
| + return last_error_; |
| +} |
| + |
| +void SocketOutputStream::FlushCompletionCallback( |
| + const base::Closure& callback, int result) { |
| + DCHECK_EQ(state_, FLUSHING); |
| + if (result < net::OK) { |
| + LOG(ERROR) << "Failed to flush socket."; |
| + last_error_ = result; |
| + state_ = CLOSED; |
| + if (!callback.is_null()) callback.Run(); |
| + return; |
| + } |
| + |
| + state_ = READY; |
| + if (drainable_io_buffer_->BytesConsumed() + result < buffer_used_) { |
| + DVLOG(1) << "Partial flush complete. Retrying."; |
| + // Only a partial write was completed. Flush again to finish the write. |
| + drainable_io_buffer_->SetOffset( |
| + drainable_io_buffer_->BytesConsumed() + result); |
| + Flush(callback); |
| + return; |
| + } |
| + |
| + DVLOG(1) << "Socket flush complete."; |
| + drainable_io_buffer_ = new net::DrainableIOBuffer(io_buffer_.get(), |
| + io_buffer_->size()); |
| + state_ = EMPTY; |
| + buffer_used_ = 0; |
| + if (!callback.is_null()) callback.Run(); |
| +} |
| + |
| +} // namespace gcm |