Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(267)

Unified Diff: remoting/base/buffered_socket_writer.cc

Issue 1197853003: Add P2PDatagramSocket and P2PStreamSocket interfaces. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 5 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « remoting/base/buffered_socket_writer.h ('k') | remoting/base/buffered_socket_writer_unittest.cc » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: remoting/base/buffered_socket_writer.cc
diff --git a/remoting/base/buffered_socket_writer.cc b/remoting/base/buffered_socket_writer.cc
index 8213f40a887ac70aabcd391798f6b2493e029958..c8a8302114a2e5f8e3fefe2cec7dc268edb032c0 100644
--- a/remoting/base/buffered_socket_writer.cc
+++ b/remoting/base/buffered_socket_writer.cc
@@ -5,218 +5,130 @@
#include "remoting/base/buffered_socket_writer.h"
#include "base/bind.h"
-#include "base/location.h"
-#include "base/single_thread_task_runner.h"
#include "base/stl_util.h"
-#include "base/thread_task_runner_handle.h"
+#include "net/base/io_buffer.h"
#include "net/base/net_errors.h"
+#include "net/socket/socket.h"
namespace remoting {
-struct BufferedSocketWriterBase::PendingPacket {
- PendingPacket(scoped_refptr<net::IOBufferWithSize> data,
+namespace {
+
+int WriteNetSocket(net::Socket* socket,
+ const scoped_refptr<net::IOBuffer>& buf,
+ int buf_len,
+ const net::CompletionCallback& callback) {
+ return socket->Write(buf.get(), buf_len, callback);
+}
+
+} // namespace
+
+struct BufferedSocketWriter::PendingPacket {
+ PendingPacket(scoped_refptr<net::DrainableIOBuffer> data,
const base::Closure& done_task)
: data(data),
done_task(done_task) {
}
- scoped_refptr<net::IOBufferWithSize> data;
+ scoped_refptr<net::DrainableIOBuffer> data;
base::Closure done_task;
};
-BufferedSocketWriterBase::BufferedSocketWriterBase()
- : socket_(nullptr),
- write_pending_(false),
- closed_(false),
- destroyed_flag_(nullptr) {
+// static
+scoped_ptr<BufferedSocketWriter> BufferedSocketWriter::CreateForSocket(
+ net::Socket* socket,
+ const WriteFailedCallback& write_failed_callback) {
+ scoped_ptr<BufferedSocketWriter> result(new BufferedSocketWriter());
+ result->Init(base::Bind(&WriteNetSocket, socket), write_failed_callback);
+ return result.Pass();
}
-void BufferedSocketWriterBase::Init(net::Socket* socket,
- const WriteFailedCallback& callback) {
- DCHECK(CalledOnValidThread());
- DCHECK(socket);
- socket_ = socket;
- write_failed_callback_ = callback;
+BufferedSocketWriter::BufferedSocketWriter() : weak_factory_(this) {
+}
+
+BufferedSocketWriter::~BufferedSocketWriter() {
+ STLDeleteElements(&queue_);
}
-bool BufferedSocketWriterBase::Write(
- scoped_refptr<net::IOBufferWithSize> data, const base::Closure& done_task) {
- DCHECK(CalledOnValidThread());
- DCHECK(socket_);
+void BufferedSocketWriter::Init(
+ const WriteCallback& write_callback,
+ const WriteFailedCallback& write_failed_callback) {
+ write_callback_ = write_callback;
+ write_failed_callback_ = write_failed_callback;
+}
+
+bool BufferedSocketWriter::Write(
+ const scoped_refptr<net::IOBufferWithSize>& data,
+ const base::Closure& done_task) {
+ DCHECK(thread_checker_.CalledOnValidThread());
DCHECK(data.get());
- // Don't write after Close().
- if (closed_)
+ // Don't write after error.
+ if (is_closed())
return false;
- queue_.push_back(new PendingPacket(data, done_task));
+ queue_.push_back(new PendingPacket(
+ new net::DrainableIOBuffer(data.get(), data->size()), done_task));
DoWrite();
- // DoWrite() may trigger OnWriteError() to be called.
- return !closed_;
+ return !is_closed();
}
-void BufferedSocketWriterBase::DoWrite() {
- DCHECK(CalledOnValidThread());
- DCHECK(socket_);
-
- // Don't try to write if there is another write pending.
- if (write_pending_)
- return;
+bool BufferedSocketWriter::is_closed() {
+ return write_callback_.is_null();
+}
- // Don't write after Close().
- if (closed_)
- return;
+void BufferedSocketWriter::DoWrite() {
+ DCHECK(thread_checker_.CalledOnValidThread());
- while (true) {
- net::IOBuffer* current_packet;
- int current_packet_size;
- GetNextPacket(&current_packet, &current_packet_size);
-
- // Return if the queue is empty.
- if (!current_packet)
- return;
-
- int result = socket_->Write(
- current_packet, current_packet_size,
- base::Bind(&BufferedSocketWriterBase::OnWritten,
- base::Unretained(this)));
- bool write_again = false;
- HandleWriteResult(result, &write_again);
- if (!write_again)
- return;
+ base::WeakPtr<BufferedSocketWriter> self = weak_factory_.GetWeakPtr();
+ while (self && !write_pending_ && !is_closed() && !queue_.empty()) {
+ int result = write_callback_.Run(
+ queue_.front()->data.get(), queue_.front()->data->BytesRemaining(),
+ base::Bind(&BufferedSocketWriter::OnWritten,
+ weak_factory_.GetWeakPtr()));
+ HandleWriteResult(result);
}
}
-void BufferedSocketWriterBase::HandleWriteResult(int result,
- bool* write_again) {
- *write_again = false;
+void BufferedSocketWriter::HandleWriteResult(int result) {
if (result < 0) {
if (result == net::ERR_IO_PENDING) {
write_pending_ = true;
} else {
- HandleError(result);
- if (!write_failed_callback_.is_null())
- write_failed_callback_.Run(result);
+ write_callback_.Reset();
+ if (!write_failed_callback_.is_null()) {
+ WriteFailedCallback callback = write_failed_callback_;
+ callback.Run(result);
+ }
}
return;
}
- base::Closure done_task = AdvanceBufferPosition(result);
- if (!done_task.is_null()) {
- bool destroyed = false;
- destroyed_flag_ = &destroyed;
- done_task.Run();
- if (destroyed) {
- // Stop doing anything if we've been destroyed by the callback.
- return;
- }
- destroyed_flag_ = nullptr;
- }
-
- *write_again = true;
-}
-
-void BufferedSocketWriterBase::OnWritten(int result) {
- DCHECK(CalledOnValidThread());
- DCHECK(write_pending_);
- write_pending_ = false;
-
- bool write_again;
- HandleWriteResult(result, &write_again);
- if (write_again)
- DoWrite();
-}
-
-void BufferedSocketWriterBase::HandleError(int result) {
- DCHECK(CalledOnValidThread());
-
- closed_ = true;
-
- STLDeleteElements(&queue_);
-
- // Notify subclass that an error is received.
- OnError(result);
-}
-
-void BufferedSocketWriterBase::Close() {
- DCHECK(CalledOnValidThread());
- closed_ = true;
-}
-
-BufferedSocketWriterBase::~BufferedSocketWriterBase() {
- if (destroyed_flag_)
- *destroyed_flag_ = true;
-
- STLDeleteElements(&queue_);
-}
-
-base::Closure BufferedSocketWriterBase::PopQueue() {
- base::Closure result = queue_.front()->done_task;
- delete queue_.front();
- queue_.pop_front();
- return result;
-}
-
-BufferedSocketWriter::BufferedSocketWriter() {
-}
-
-void BufferedSocketWriter::GetNextPacket(
- net::IOBuffer** buffer, int* size) {
- if (!current_buf_.get()) {
- if (queue_.empty()) {
- *buffer = nullptr;
- return; // Nothing to write.
- }
- current_buf_ = new net::DrainableIOBuffer(queue_.front()->data.get(),
- queue_.front()->data->size());
- }
-
- *buffer = current_buf_.get();
- *size = current_buf_->BytesRemaining();
-}
-
-base::Closure BufferedSocketWriter::AdvanceBufferPosition(int written) {
- current_buf_->DidConsume(written);
-
- if (current_buf_->BytesRemaining() == 0) {
- current_buf_ = nullptr;
- return PopQueue();
- }
- return base::Closure();
-}
-
-void BufferedSocketWriter::OnError(int result) {
- current_buf_ = nullptr;
-}
+ DCHECK(!queue_.empty());
-BufferedSocketWriter::~BufferedSocketWriter() {
-}
+ queue_.front()->data->DidConsume(result);
-BufferedDatagramWriter::BufferedDatagramWriter() {
-}
+ if (queue_.front()->data->BytesRemaining() == 0) {
+ base::Closure done_task = queue_.front()->done_task;
+ delete queue_.front();
+ queue_.pop_front();
-void BufferedDatagramWriter::GetNextPacket(
- net::IOBuffer** buffer, int* size) {
- if (queue_.empty()) {
- *buffer = nullptr;
- return; // Nothing to write.
+ if (!done_task.is_null())
+ done_task.Run();
}
- *buffer = queue_.front()->data.get();
- *size = queue_.front()->data->size();
-}
-
-base::Closure BufferedDatagramWriter::AdvanceBufferPosition(int written) {
- DCHECK_EQ(written, queue_.front()->data->size());
- return PopQueue();
}
-void BufferedDatagramWriter::OnError(int result) {
- // Nothing to do here.
-}
+void BufferedSocketWriter::OnWritten(int result) {
+ DCHECK(thread_checker_.CalledOnValidThread());
+ DCHECK(write_pending_);
+ write_pending_ = false;
-BufferedDatagramWriter::~BufferedDatagramWriter() {
+ base::WeakPtr<BufferedSocketWriter> self = weak_factory_.GetWeakPtr();
+ HandleWriteResult(result);
+ if (self)
+ DoWrite();
}
} // namespace remoting
« no previous file with comments | « remoting/base/buffered_socket_writer.h ('k') | remoting/base/buffered_socket_writer_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698