Index: remoting/base/buffered_socket_writer.cc |
diff --git a/remoting/base/buffered_socket_writer.cc b/remoting/base/buffered_socket_writer.cc |
index 8213f40a887ac70aabcd391798f6b2493e029958..c8a8302114a2e5f8e3fefe2cec7dc268edb032c0 100644 |
--- a/remoting/base/buffered_socket_writer.cc |
+++ b/remoting/base/buffered_socket_writer.cc |
@@ -5,218 +5,130 @@ |
#include "remoting/base/buffered_socket_writer.h" |
#include "base/bind.h" |
-#include "base/location.h" |
-#include "base/single_thread_task_runner.h" |
#include "base/stl_util.h" |
-#include "base/thread_task_runner_handle.h" |
+#include "net/base/io_buffer.h" |
#include "net/base/net_errors.h" |
+#include "net/socket/socket.h" |
namespace remoting { |
-struct BufferedSocketWriterBase::PendingPacket { |
- PendingPacket(scoped_refptr<net::IOBufferWithSize> data, |
+namespace { |
+ |
+int WriteNetSocket(net::Socket* socket, |
+ const scoped_refptr<net::IOBuffer>& buf, |
+ int buf_len, |
+ const net::CompletionCallback& callback) { |
+ return socket->Write(buf.get(), buf_len, callback); |
+} |
+ |
+} // namespace |
+ |
+struct BufferedSocketWriter::PendingPacket { |
+ PendingPacket(scoped_refptr<net::DrainableIOBuffer> data, |
const base::Closure& done_task) |
: data(data), |
done_task(done_task) { |
} |
- scoped_refptr<net::IOBufferWithSize> data; |
+ scoped_refptr<net::DrainableIOBuffer> data; |
base::Closure done_task; |
}; |
-BufferedSocketWriterBase::BufferedSocketWriterBase() |
- : socket_(nullptr), |
- write_pending_(false), |
- closed_(false), |
- destroyed_flag_(nullptr) { |
+// static |
+scoped_ptr<BufferedSocketWriter> BufferedSocketWriter::CreateForSocket( |
+ net::Socket* socket, |
+ const WriteFailedCallback& write_failed_callback) { |
+ scoped_ptr<BufferedSocketWriter> result(new BufferedSocketWriter()); |
+ result->Init(base::Bind(&WriteNetSocket, socket), write_failed_callback); |
+ return result.Pass(); |
} |
-void BufferedSocketWriterBase::Init(net::Socket* socket, |
- const WriteFailedCallback& callback) { |
- DCHECK(CalledOnValidThread()); |
- DCHECK(socket); |
- socket_ = socket; |
- write_failed_callback_ = callback; |
+BufferedSocketWriter::BufferedSocketWriter() : weak_factory_(this) { |
+} |
+ |
+BufferedSocketWriter::~BufferedSocketWriter() { |
+ STLDeleteElements(&queue_); |
} |
-bool BufferedSocketWriterBase::Write( |
- scoped_refptr<net::IOBufferWithSize> data, const base::Closure& done_task) { |
- DCHECK(CalledOnValidThread()); |
- DCHECK(socket_); |
+void BufferedSocketWriter::Init( |
+ const WriteCallback& write_callback, |
+ const WriteFailedCallback& write_failed_callback) { |
+ write_callback_ = write_callback; |
+ write_failed_callback_ = write_failed_callback; |
+} |
+ |
+bool BufferedSocketWriter::Write( |
+ const scoped_refptr<net::IOBufferWithSize>& data, |
+ const base::Closure& done_task) { |
+ DCHECK(thread_checker_.CalledOnValidThread()); |
DCHECK(data.get()); |
- // Don't write after Close(). |
- if (closed_) |
+ // Don't write after error. |
+ if (is_closed()) |
return false; |
- queue_.push_back(new PendingPacket(data, done_task)); |
+ queue_.push_back(new PendingPacket( |
+ new net::DrainableIOBuffer(data.get(), data->size()), done_task)); |
DoWrite(); |
- // DoWrite() may trigger OnWriteError() to be called. |
- return !closed_; |
+ return !is_closed(); |
} |
-void BufferedSocketWriterBase::DoWrite() { |
- DCHECK(CalledOnValidThread()); |
- DCHECK(socket_); |
- |
- // Don't try to write if there is another write pending. |
- if (write_pending_) |
- return; |
+bool BufferedSocketWriter::is_closed() { |
+ return write_callback_.is_null(); |
+} |
- // Don't write after Close(). |
- if (closed_) |
- return; |
+void BufferedSocketWriter::DoWrite() { |
+ DCHECK(thread_checker_.CalledOnValidThread()); |
- while (true) { |
- net::IOBuffer* current_packet; |
- int current_packet_size; |
- GetNextPacket(¤t_packet, ¤t_packet_size); |
- |
- // Return if the queue is empty. |
- if (!current_packet) |
- return; |
- |
- int result = socket_->Write( |
- current_packet, current_packet_size, |
- base::Bind(&BufferedSocketWriterBase::OnWritten, |
- base::Unretained(this))); |
- bool write_again = false; |
- HandleWriteResult(result, &write_again); |
- if (!write_again) |
- return; |
+ base::WeakPtr<BufferedSocketWriter> self = weak_factory_.GetWeakPtr(); |
+ while (self && !write_pending_ && !is_closed() && !queue_.empty()) { |
+ int result = write_callback_.Run( |
+ queue_.front()->data.get(), queue_.front()->data->BytesRemaining(), |
+ base::Bind(&BufferedSocketWriter::OnWritten, |
+ weak_factory_.GetWeakPtr())); |
+ HandleWriteResult(result); |
} |
} |
-void BufferedSocketWriterBase::HandleWriteResult(int result, |
- bool* write_again) { |
- *write_again = false; |
+void BufferedSocketWriter::HandleWriteResult(int result) { |
if (result < 0) { |
if (result == net::ERR_IO_PENDING) { |
write_pending_ = true; |
} else { |
- HandleError(result); |
- if (!write_failed_callback_.is_null()) |
- write_failed_callback_.Run(result); |
+ write_callback_.Reset(); |
+ if (!write_failed_callback_.is_null()) { |
+ WriteFailedCallback callback = write_failed_callback_; |
+ callback.Run(result); |
+ } |
} |
return; |
} |
- base::Closure done_task = AdvanceBufferPosition(result); |
- if (!done_task.is_null()) { |
- bool destroyed = false; |
- destroyed_flag_ = &destroyed; |
- done_task.Run(); |
- if (destroyed) { |
- // Stop doing anything if we've been destroyed by the callback. |
- return; |
- } |
- destroyed_flag_ = nullptr; |
- } |
- |
- *write_again = true; |
-} |
- |
-void BufferedSocketWriterBase::OnWritten(int result) { |
- DCHECK(CalledOnValidThread()); |
- DCHECK(write_pending_); |
- write_pending_ = false; |
- |
- bool write_again; |
- HandleWriteResult(result, &write_again); |
- if (write_again) |
- DoWrite(); |
-} |
- |
-void BufferedSocketWriterBase::HandleError(int result) { |
- DCHECK(CalledOnValidThread()); |
- |
- closed_ = true; |
- |
- STLDeleteElements(&queue_); |
- |
- // Notify subclass that an error is received. |
- OnError(result); |
-} |
- |
-void BufferedSocketWriterBase::Close() { |
- DCHECK(CalledOnValidThread()); |
- closed_ = true; |
-} |
- |
-BufferedSocketWriterBase::~BufferedSocketWriterBase() { |
- if (destroyed_flag_) |
- *destroyed_flag_ = true; |
- |
- STLDeleteElements(&queue_); |
-} |
- |
-base::Closure BufferedSocketWriterBase::PopQueue() { |
- base::Closure result = queue_.front()->done_task; |
- delete queue_.front(); |
- queue_.pop_front(); |
- return result; |
-} |
- |
-BufferedSocketWriter::BufferedSocketWriter() { |
-} |
- |
-void BufferedSocketWriter::GetNextPacket( |
- net::IOBuffer** buffer, int* size) { |
- if (!current_buf_.get()) { |
- if (queue_.empty()) { |
- *buffer = nullptr; |
- return; // Nothing to write. |
- } |
- current_buf_ = new net::DrainableIOBuffer(queue_.front()->data.get(), |
- queue_.front()->data->size()); |
- } |
- |
- *buffer = current_buf_.get(); |
- *size = current_buf_->BytesRemaining(); |
-} |
- |
-base::Closure BufferedSocketWriter::AdvanceBufferPosition(int written) { |
- current_buf_->DidConsume(written); |
- |
- if (current_buf_->BytesRemaining() == 0) { |
- current_buf_ = nullptr; |
- return PopQueue(); |
- } |
- return base::Closure(); |
-} |
- |
-void BufferedSocketWriter::OnError(int result) { |
- current_buf_ = nullptr; |
-} |
+ DCHECK(!queue_.empty()); |
-BufferedSocketWriter::~BufferedSocketWriter() { |
-} |
+ queue_.front()->data->DidConsume(result); |
-BufferedDatagramWriter::BufferedDatagramWriter() { |
-} |
+ if (queue_.front()->data->BytesRemaining() == 0) { |
+ base::Closure done_task = queue_.front()->done_task; |
+ delete queue_.front(); |
+ queue_.pop_front(); |
-void BufferedDatagramWriter::GetNextPacket( |
- net::IOBuffer** buffer, int* size) { |
- if (queue_.empty()) { |
- *buffer = nullptr; |
- return; // Nothing to write. |
+ if (!done_task.is_null()) |
+ done_task.Run(); |
} |
- *buffer = queue_.front()->data.get(); |
- *size = queue_.front()->data->size(); |
-} |
- |
-base::Closure BufferedDatagramWriter::AdvanceBufferPosition(int written) { |
- DCHECK_EQ(written, queue_.front()->data->size()); |
- return PopQueue(); |
} |
-void BufferedDatagramWriter::OnError(int result) { |
- // Nothing to do here. |
-} |
+void BufferedSocketWriter::OnWritten(int result) { |
+ DCHECK(thread_checker_.CalledOnValidThread()); |
+ DCHECK(write_pending_); |
+ write_pending_ = false; |
-BufferedDatagramWriter::~BufferedDatagramWriter() { |
+ base::WeakPtr<BufferedSocketWriter> self = weak_factory_.GetWeakPtr(); |
+ HandleWriteResult(result); |
+ if (self) |
+ DoWrite(); |
} |
} // namespace remoting |