Chromium Code Reviews| Index: remoting/base/buffered_socket_writer.cc |
| diff --git a/remoting/base/buffered_socket_writer.cc b/remoting/base/buffered_socket_writer.cc |
| index 8213f40a887ac70aabcd391798f6b2493e029958..5d1f889593d4d3f91c8b40a7abc0c6e128cdea1f 100644 |
| --- a/remoting/base/buffered_socket_writer.cc |
| +++ b/remoting/base/buffered_socket_writer.cc |
| @@ -5,82 +5,98 @@ |
| #include "remoting/base/buffered_socket_writer.h" |
| #include "base/bind.h" |
| -#include "base/location.h" |
| -#include "base/single_thread_task_runner.h" |
| #include "base/stl_util.h" |
| #include "base/thread_task_runner_handle.h" |
| +#include "net/base/io_buffer.h" |
| #include "net/base/net_errors.h" |
| +#include "net/socket/socket.h" |
| namespace remoting { |
| -struct BufferedSocketWriterBase::PendingPacket { |
| - PendingPacket(scoped_refptr<net::IOBufferWithSize> data, |
| +namespace { |
| + |
| +int WriteNetSocket(net::Socket* socket, |
| + const scoped_refptr<net::IOBuffer>& buf, |
| + int buf_len, |
| + const net::CompletionCallback& callback) { |
| + return socket->Write(buf.get(), buf_len, callback); |
| +} |
| + |
| +} // namespace |
| + |
| +struct BufferedSocketWriter::PendingPacket { |
| + PendingPacket(scoped_refptr<net::DrainableIOBuffer> data, |
| const base::Closure& done_task) |
| : data(data), |
| done_task(done_task) { |
| } |
| - scoped_refptr<net::IOBufferWithSize> data; |
| + scoped_refptr<net::DrainableIOBuffer> data; |
| base::Closure done_task; |
| }; |
| -BufferedSocketWriterBase::BufferedSocketWriterBase() |
| - : socket_(nullptr), |
| - write_pending_(false), |
| - closed_(false), |
| - destroyed_flag_(nullptr) { |
| +// static |
| +scoped_ptr<BufferedSocketWriter> BufferedSocketWriter::CreateForSocket( |
| + net::Socket* socket, const WriteFailedCallback& write_failed_callback) { |
| + scoped_ptr<BufferedSocketWriter> result(new BufferedSocketWriter()); |
| + result->Init(base::Bind(&WriteNetSocket, socket), write_failed_callback); |
| + return result.Pass(); |
| } |
| -void BufferedSocketWriterBase::Init(net::Socket* socket, |
| - const WriteFailedCallback& callback) { |
| - DCHECK(CalledOnValidThread()); |
| - DCHECK(socket); |
| - socket_ = socket; |
| - write_failed_callback_ = callback; |
| +BufferedSocketWriter::BufferedSocketWriter() {} |
| + |
| +BufferedSocketWriter::~BufferedSocketWriter() { |
| + if (destroyed_flag_) |
| + *destroyed_flag_ = true; |
| + |
| + STLDeleteElements(&queue_); |
| +} |
| + |
| +void BufferedSocketWriter::Init( |
| + const WriteCallback& write_callback, |
| + const WriteFailedCallback& write_failed_callback) { |
| + write_callback_ = write_callback; |
| + write_failed_callback_ = write_failed_callback; |
| } |
| -bool BufferedSocketWriterBase::Write( |
| - scoped_refptr<net::IOBufferWithSize> data, const base::Closure& done_task) { |
| - DCHECK(CalledOnValidThread()); |
| - DCHECK(socket_); |
| +bool BufferedSocketWriter::Write( |
| + const scoped_refptr<net::IOBufferWithSize>& data, |
| + const base::Closure& done_task) { |
| + DCHECK(thread_checker_.CalledOnValidThread()); |
| + DCHECK(!write_callback_.is_null()); |
| DCHECK(data.get()); |
| - // Don't write after Close(). |
| + // Don't write after error. |
| if (closed_) |
| return false; |
| - queue_.push_back(new PendingPacket(data, done_task)); |
| + queue_.push_back(new PendingPacket( |
| + new net::DrainableIOBuffer(data.get(), data->size()), done_task)); |
| DoWrite(); |
| - // DoWrite() may trigger OnWriteError() to be called. |
| return !closed_; |
| } |
| -void BufferedSocketWriterBase::DoWrite() { |
| - DCHECK(CalledOnValidThread()); |
| - DCHECK(socket_); |
| +void BufferedSocketWriter::DoWrite() { |
| + DCHECK(thread_checker_.CalledOnValidThread()); |
| + DCHECK(!write_callback_.is_null()); |
| // Don't try to write if there is another write pending. |
| if (write_pending_) |
| return; |
| - // Don't write after Close(). |
| + // Don't write after error. |
| if (closed_) |
| return; |
| while (true) { |
| - net::IOBuffer* current_packet; |
| - int current_packet_size; |
| - GetNextPacket(¤t_packet, ¤t_packet_size); |
| + if (queue_.empty()) |
| + break; |
| - // Return if the queue is empty. |
| - if (!current_packet) |
| - return; |
| - |
| - int result = socket_->Write( |
| - current_packet, current_packet_size, |
| - base::Bind(&BufferedSocketWriterBase::OnWritten, |
| + int result = write_callback_.Run( |
| + queue_.front()->data.get(), queue_.front()->data->size(), |
| + base::Bind(&BufferedSocketWriter::OnWritten, |
| base::Unretained(this))); |
|
Wez
2015/07/14 18:28:09
You're using base::Unretained() here, but this cla
Sergey Ulanov
2015/07/14 21:07:54
Good point. Fixed by using weak pointer here.
|
| bool write_again = false; |
| HandleWriteResult(result, &write_again); |
| @@ -89,25 +105,32 @@ void BufferedSocketWriterBase::DoWrite() { |
| } |
| } |
| -void BufferedSocketWriterBase::HandleWriteResult(int result, |
| - bool* write_again) { |
| +void BufferedSocketWriter::HandleWriteResult(int result, bool* write_again) { |
| *write_again = false; |
| if (result < 0) { |
| if (result == net::ERR_IO_PENDING) { |
| write_pending_ = true; |
| } else { |
| - HandleError(result); |
| + closed_ = true; |
|
Wez
2015/07/14 18:28:09
nit: Could you instead implement a private getter
Sergey Ulanov
2015/07/14 21:07:54
Done.
|
| + STLDeleteElements(&queue_); |
|
Wez
2015/07/14 18:28:09
Why do we need to delete the elements here? Surely
Sergey Ulanov
2015/07/14 21:07:54
Done.
|
| if (!write_failed_callback_.is_null()) |
| write_failed_callback_.Run(result); |
| } |
| return; |
| } |
| - base::Closure done_task = AdvanceBufferPosition(result); |
| - if (!done_task.is_null()) { |
| + DCHECK(!queue_.empty()); |
| + |
| + queue_.front()->data->DidConsume(result); |
| + |
| + if (queue_.front()->data->BytesRemaining() == 0) { |
| + base::Closure done_task = queue_.front()->done_task; |
| + delete queue_.front(); |
| + queue_.pop_front(); |
| bool destroyed = false; |
| destroyed_flag_ = &destroyed; |
|
Wez
2015/07/14 18:28:09
I realise that this was in the original implementa
Sergey Ulanov
2015/07/14 21:07:54
I'm not sure we need it right now, but I think we
|
| - done_task.Run(); |
| + if (!done_task.is_null()) |
| + done_task.Run(); |
| if (destroyed) { |
| // Stop doing anything if we've been destroyed by the callback. |
| return; |
| @@ -118,8 +141,8 @@ void BufferedSocketWriterBase::HandleWriteResult(int result, |
| *write_again = true; |
| } |
| -void BufferedSocketWriterBase::OnWritten(int result) { |
| - DCHECK(CalledOnValidThread()); |
| +void BufferedSocketWriter::OnWritten(int result) { |
| + DCHECK(thread_checker_.CalledOnValidThread()); |
| DCHECK(write_pending_); |
| write_pending_ = false; |
| @@ -129,94 +152,4 @@ void BufferedSocketWriterBase::OnWritten(int result) { |
| DoWrite(); |
| } |
| -void BufferedSocketWriterBase::HandleError(int result) { |
| - DCHECK(CalledOnValidThread()); |
| - |
| - closed_ = true; |
| - |
| - STLDeleteElements(&queue_); |
| - |
| - // Notify subclass that an error is received. |
| - OnError(result); |
| -} |
| - |
| -void BufferedSocketWriterBase::Close() { |
| - DCHECK(CalledOnValidThread()); |
| - closed_ = true; |
| -} |
| - |
| -BufferedSocketWriterBase::~BufferedSocketWriterBase() { |
| - if (destroyed_flag_) |
| - *destroyed_flag_ = true; |
| - |
| - STLDeleteElements(&queue_); |
| -} |
| - |
| -base::Closure BufferedSocketWriterBase::PopQueue() { |
| - base::Closure result = queue_.front()->done_task; |
| - delete queue_.front(); |
| - queue_.pop_front(); |
| - return result; |
| -} |
| - |
| -BufferedSocketWriter::BufferedSocketWriter() { |
| -} |
| - |
| -void BufferedSocketWriter::GetNextPacket( |
| - net::IOBuffer** buffer, int* size) { |
| - if (!current_buf_.get()) { |
| - if (queue_.empty()) { |
| - *buffer = nullptr; |
| - return; // Nothing to write. |
| - } |
| - current_buf_ = new net::DrainableIOBuffer(queue_.front()->data.get(), |
| - queue_.front()->data->size()); |
| - } |
| - |
| - *buffer = current_buf_.get(); |
| - *size = current_buf_->BytesRemaining(); |
| -} |
| - |
| -base::Closure BufferedSocketWriter::AdvanceBufferPosition(int written) { |
| - current_buf_->DidConsume(written); |
| - |
| - if (current_buf_->BytesRemaining() == 0) { |
| - current_buf_ = nullptr; |
| - return PopQueue(); |
| - } |
| - return base::Closure(); |
| -} |
| - |
| -void BufferedSocketWriter::OnError(int result) { |
| - current_buf_ = nullptr; |
| -} |
| - |
| -BufferedSocketWriter::~BufferedSocketWriter() { |
| -} |
| - |
| -BufferedDatagramWriter::BufferedDatagramWriter() { |
| -} |
| - |
| -void BufferedDatagramWriter::GetNextPacket( |
| - net::IOBuffer** buffer, int* size) { |
| - if (queue_.empty()) { |
| - *buffer = nullptr; |
| - return; // Nothing to write. |
| - } |
| - *buffer = queue_.front()->data.get(); |
| - *size = queue_.front()->data->size(); |
| -} |
| - |
| -base::Closure BufferedDatagramWriter::AdvanceBufferPosition(int written) { |
| - DCHECK_EQ(written, queue_.front()->data->size()); |
| - return PopQueue(); |
| -} |
| - |
| -void BufferedDatagramWriter::OnError(int result) { |
| - // Nothing to do here. |
| -} |
| - |
| -BufferedDatagramWriter::~BufferedDatagramWriter() { |
| -} |
| - |
| } // namespace remoting |