| Index: remoting/base/buffered_socket_writer.cc
|
| diff --git a/remoting/base/buffered_socket_writer.cc b/remoting/base/buffered_socket_writer.cc
|
| index 8213f40a887ac70aabcd391798f6b2493e029958..c8a8302114a2e5f8e3fefe2cec7dc268edb032c0 100644
|
| --- a/remoting/base/buffered_socket_writer.cc
|
| +++ b/remoting/base/buffered_socket_writer.cc
|
| @@ -5,218 +5,130 @@
|
| #include "remoting/base/buffered_socket_writer.h"
|
|
|
| #include "base/bind.h"
|
| -#include "base/location.h"
|
| -#include "base/single_thread_task_runner.h"
|
| #include "base/stl_util.h"
|
| -#include "base/thread_task_runner_handle.h"
|
| +#include "net/base/io_buffer.h"
|
| #include "net/base/net_errors.h"
|
| +#include "net/socket/socket.h"
|
|
|
| namespace remoting {
|
|
|
| -struct BufferedSocketWriterBase::PendingPacket {
|
| - PendingPacket(scoped_refptr<net::IOBufferWithSize> data,
|
| +namespace {
|
| +
|
| +int WriteNetSocket(net::Socket* socket,
|
| + const scoped_refptr<net::IOBuffer>& buf,
|
| + int buf_len,
|
| + const net::CompletionCallback& callback) {
|
| + return socket->Write(buf.get(), buf_len, callback);
|
| +}
|
| +
|
| +} // namespace
|
| +
|
| +struct BufferedSocketWriter::PendingPacket {
|
| + PendingPacket(scoped_refptr<net::DrainableIOBuffer> data,
|
| const base::Closure& done_task)
|
| : data(data),
|
| done_task(done_task) {
|
| }
|
|
|
| - scoped_refptr<net::IOBufferWithSize> data;
|
| + scoped_refptr<net::DrainableIOBuffer> data;
|
| base::Closure done_task;
|
| };
|
|
|
| -BufferedSocketWriterBase::BufferedSocketWriterBase()
|
| - : socket_(nullptr),
|
| - write_pending_(false),
|
| - closed_(false),
|
| - destroyed_flag_(nullptr) {
|
| +// static
|
| +scoped_ptr<BufferedSocketWriter> BufferedSocketWriter::CreateForSocket(
|
| + net::Socket* socket,
|
| + const WriteFailedCallback& write_failed_callback) {
|
| + scoped_ptr<BufferedSocketWriter> result(new BufferedSocketWriter());
|
| + result->Init(base::Bind(&WriteNetSocket, socket), write_failed_callback);
|
| + return result.Pass();
|
| }
|
|
|
| -void BufferedSocketWriterBase::Init(net::Socket* socket,
|
| - const WriteFailedCallback& callback) {
|
| - DCHECK(CalledOnValidThread());
|
| - DCHECK(socket);
|
| - socket_ = socket;
|
| - write_failed_callback_ = callback;
|
| +BufferedSocketWriter::BufferedSocketWriter() : weak_factory_(this) {
|
| +}
|
| +
|
| +BufferedSocketWriter::~BufferedSocketWriter() {
|
| + STLDeleteElements(&queue_);
|
| }
|
|
|
| -bool BufferedSocketWriterBase::Write(
|
| - scoped_refptr<net::IOBufferWithSize> data, const base::Closure& done_task) {
|
| - DCHECK(CalledOnValidThread());
|
| - DCHECK(socket_);
|
| +void BufferedSocketWriter::Init(
|
| + const WriteCallback& write_callback,
|
| + const WriteFailedCallback& write_failed_callback) {
|
| + write_callback_ = write_callback;
|
| + write_failed_callback_ = write_failed_callback;
|
| +}
|
| +
|
| +bool BufferedSocketWriter::Write(
|
| + const scoped_refptr<net::IOBufferWithSize>& data,
|
| + const base::Closure& done_task) {
|
| + DCHECK(thread_checker_.CalledOnValidThread());
|
| DCHECK(data.get());
|
|
|
| - // Don't write after Close().
|
| - if (closed_)
|
| + // Don't write after error.
|
| + if (is_closed())
|
| return false;
|
|
|
| - queue_.push_back(new PendingPacket(data, done_task));
|
| + queue_.push_back(new PendingPacket(
|
| + new net::DrainableIOBuffer(data.get(), data->size()), done_task));
|
|
|
| DoWrite();
|
|
|
| - // DoWrite() may trigger OnWriteError() to be called.
|
| - return !closed_;
|
| + return !is_closed();
|
| }
|
|
|
| -void BufferedSocketWriterBase::DoWrite() {
|
| - DCHECK(CalledOnValidThread());
|
| - DCHECK(socket_);
|
| -
|
| - // Don't try to write if there is another write pending.
|
| - if (write_pending_)
|
| - return;
|
| +bool BufferedSocketWriter::is_closed() {
|
| + return write_callback_.is_null();
|
| +}
|
|
|
| - // Don't write after Close().
|
| - if (closed_)
|
| - return;
|
| +void BufferedSocketWriter::DoWrite() {
|
| + DCHECK(thread_checker_.CalledOnValidThread());
|
|
|
| - while (true) {
|
| - net::IOBuffer* current_packet;
|
| - int current_packet_size;
|
| - GetNextPacket(¤t_packet, ¤t_packet_size);
|
| -
|
| - // Return if the queue is empty.
|
| - if (!current_packet)
|
| - return;
|
| -
|
| - int result = socket_->Write(
|
| - current_packet, current_packet_size,
|
| - base::Bind(&BufferedSocketWriterBase::OnWritten,
|
| - base::Unretained(this)));
|
| - bool write_again = false;
|
| - HandleWriteResult(result, &write_again);
|
| - if (!write_again)
|
| - return;
|
| + base::WeakPtr<BufferedSocketWriter> self = weak_factory_.GetWeakPtr();
|
| + while (self && !write_pending_ && !is_closed() && !queue_.empty()) {
|
| + int result = write_callback_.Run(
|
| + queue_.front()->data.get(), queue_.front()->data->BytesRemaining(),
|
| + base::Bind(&BufferedSocketWriter::OnWritten,
|
| + weak_factory_.GetWeakPtr()));
|
| + HandleWriteResult(result);
|
| }
|
| }
|
|
|
| -void BufferedSocketWriterBase::HandleWriteResult(int result,
|
| - bool* write_again) {
|
| - *write_again = false;
|
| +void BufferedSocketWriter::HandleWriteResult(int result) {
|
| if (result < 0) {
|
| if (result == net::ERR_IO_PENDING) {
|
| write_pending_ = true;
|
| } else {
|
| - HandleError(result);
|
| - if (!write_failed_callback_.is_null())
|
| - write_failed_callback_.Run(result);
|
| + write_callback_.Reset();
|
| + if (!write_failed_callback_.is_null()) {
|
| + WriteFailedCallback callback = write_failed_callback_;
|
| + callback.Run(result);
|
| + }
|
| }
|
| return;
|
| }
|
|
|
| - base::Closure done_task = AdvanceBufferPosition(result);
|
| - if (!done_task.is_null()) {
|
| - bool destroyed = false;
|
| - destroyed_flag_ = &destroyed;
|
| - done_task.Run();
|
| - if (destroyed) {
|
| - // Stop doing anything if we've been destroyed by the callback.
|
| - return;
|
| - }
|
| - destroyed_flag_ = nullptr;
|
| - }
|
| -
|
| - *write_again = true;
|
| -}
|
| -
|
| -void BufferedSocketWriterBase::OnWritten(int result) {
|
| - DCHECK(CalledOnValidThread());
|
| - DCHECK(write_pending_);
|
| - write_pending_ = false;
|
| -
|
| - bool write_again;
|
| - HandleWriteResult(result, &write_again);
|
| - if (write_again)
|
| - DoWrite();
|
| -}
|
| -
|
| -void BufferedSocketWriterBase::HandleError(int result) {
|
| - DCHECK(CalledOnValidThread());
|
| -
|
| - closed_ = true;
|
| -
|
| - STLDeleteElements(&queue_);
|
| -
|
| - // Notify subclass that an error is received.
|
| - OnError(result);
|
| -}
|
| -
|
| -void BufferedSocketWriterBase::Close() {
|
| - DCHECK(CalledOnValidThread());
|
| - closed_ = true;
|
| -}
|
| -
|
| -BufferedSocketWriterBase::~BufferedSocketWriterBase() {
|
| - if (destroyed_flag_)
|
| - *destroyed_flag_ = true;
|
| -
|
| - STLDeleteElements(&queue_);
|
| -}
|
| -
|
| -base::Closure BufferedSocketWriterBase::PopQueue() {
|
| - base::Closure result = queue_.front()->done_task;
|
| - delete queue_.front();
|
| - queue_.pop_front();
|
| - return result;
|
| -}
|
| -
|
| -BufferedSocketWriter::BufferedSocketWriter() {
|
| -}
|
| -
|
| -void BufferedSocketWriter::GetNextPacket(
|
| - net::IOBuffer** buffer, int* size) {
|
| - if (!current_buf_.get()) {
|
| - if (queue_.empty()) {
|
| - *buffer = nullptr;
|
| - return; // Nothing to write.
|
| - }
|
| - current_buf_ = new net::DrainableIOBuffer(queue_.front()->data.get(),
|
| - queue_.front()->data->size());
|
| - }
|
| -
|
| - *buffer = current_buf_.get();
|
| - *size = current_buf_->BytesRemaining();
|
| -}
|
| -
|
| -base::Closure BufferedSocketWriter::AdvanceBufferPosition(int written) {
|
| - current_buf_->DidConsume(written);
|
| -
|
| - if (current_buf_->BytesRemaining() == 0) {
|
| - current_buf_ = nullptr;
|
| - return PopQueue();
|
| - }
|
| - return base::Closure();
|
| -}
|
| -
|
| -void BufferedSocketWriter::OnError(int result) {
|
| - current_buf_ = nullptr;
|
| -}
|
| + DCHECK(!queue_.empty());
|
|
|
| -BufferedSocketWriter::~BufferedSocketWriter() {
|
| -}
|
| + queue_.front()->data->DidConsume(result);
|
|
|
| -BufferedDatagramWriter::BufferedDatagramWriter() {
|
| -}
|
| + if (queue_.front()->data->BytesRemaining() == 0) {
|
| + base::Closure done_task = queue_.front()->done_task;
|
| + delete queue_.front();
|
| + queue_.pop_front();
|
|
|
| -void BufferedDatagramWriter::GetNextPacket(
|
| - net::IOBuffer** buffer, int* size) {
|
| - if (queue_.empty()) {
|
| - *buffer = nullptr;
|
| - return; // Nothing to write.
|
| + if (!done_task.is_null())
|
| + done_task.Run();
|
| }
|
| - *buffer = queue_.front()->data.get();
|
| - *size = queue_.front()->data->size();
|
| -}
|
| -
|
| -base::Closure BufferedDatagramWriter::AdvanceBufferPosition(int written) {
|
| - DCHECK_EQ(written, queue_.front()->data->size());
|
| - return PopQueue();
|
| }
|
|
|
| -void BufferedDatagramWriter::OnError(int result) {
|
| - // Nothing to do here.
|
| -}
|
| +void BufferedSocketWriter::OnWritten(int result) {
|
| + DCHECK(thread_checker_.CalledOnValidThread());
|
| + DCHECK(write_pending_);
|
| + write_pending_ = false;
|
|
|
| -BufferedDatagramWriter::~BufferedDatagramWriter() {
|
| + base::WeakPtr<BufferedSocketWriter> self = weak_factory_.GetWeakPtr();
|
| + HandleWriteResult(result);
|
| + if (self)
|
| + DoWrite();
|
| }
|
|
|
| } // namespace remoting
|
|
|