Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(633)

Unified Diff: remoting/base/buffered_socket_writer.cc

Issue 1197853003: Add P2PDatagramSocket and P2PStreamSocket interfaces. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 5 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: remoting/base/buffered_socket_writer.cc
diff --git a/remoting/base/buffered_socket_writer.cc b/remoting/base/buffered_socket_writer.cc
index 8213f40a887ac70aabcd391798f6b2493e029958..c1b6b8ebd931ee8fee73bccd0ac6ad790c8780e2 100644
--- a/remoting/base/buffered_socket_writer.cc
+++ b/remoting/base/buffered_socket_writer.cc
@@ -9,79 +9,98 @@
#include "base/single_thread_task_runner.h"
#include "base/stl_util.h"
#include "base/thread_task_runner_handle.h"
+#include "net/base/io_buffer.h"
#include "net/base/net_errors.h"
+#include "net/socket/socket.h"
namespace remoting {
-struct BufferedSocketWriterBase::PendingPacket {
- PendingPacket(scoped_refptr<net::IOBufferWithSize> data,
+namespace {
+
+int WriteNetSocket(net::Socket* socket,
+ const scoped_refptr<net::IOBuffer>& buf,
+ int buf_len,
+ const net::CompletionCallback& callback) {
+ return socket->Write(buf.get(), buf_len, callback);
+}
+
+} // namespace
+
+struct BufferedSocketWriter::PendingPacket {
+ PendingPacket(scoped_refptr<net::DrainableIOBuffer> data,
const base::Closure& done_task)
: data(data),
done_task(done_task) {
}
- scoped_refptr<net::IOBufferWithSize> data;
+ scoped_refptr<net::DrainableIOBuffer> data;
base::Closure done_task;
};
-BufferedSocketWriterBase::BufferedSocketWriterBase()
- : socket_(nullptr),
- write_pending_(false),
- closed_(false),
- destroyed_flag_(nullptr) {
+// static
+scoped_ptr<BufferedSocketWriter> BufferedSocketWriter::CreateForSocket(
+ net::Socket* socket, const WriteFailedCallback& write_failed_callback) {
+ scoped_ptr<BufferedSocketWriter> result(new BufferedSocketWriter());
+ result->Init(base::Bind(&WriteNetSocket, socket), write_failed_callback);
+ return result.Pass();
}
-void BufferedSocketWriterBase::Init(net::Socket* socket,
- const WriteFailedCallback& callback) {
- DCHECK(CalledOnValidThread());
- DCHECK(socket);
- socket_ = socket;
- write_failed_callback_ = callback;
+BufferedSocketWriter::BufferedSocketWriter() : weak_factory_(this) {
+}
+
+BufferedSocketWriter::~BufferedSocketWriter() {
+ STLDeleteElements(&queue_);
}
-bool BufferedSocketWriterBase::Write(
- scoped_refptr<net::IOBufferWithSize> data, const base::Closure& done_task) {
- DCHECK(CalledOnValidThread());
- DCHECK(socket_);
+void BufferedSocketWriter::Init(
+ const WriteCallback& write_callback,
+ const WriteFailedCallback& write_failed_callback) {
+ write_callback_ = write_callback;
+ write_failed_callback_ = write_failed_callback;
+}
+
+bool BufferedSocketWriter::Write(
+ const scoped_refptr<net::IOBufferWithSize>& data,
+ const base::Closure& done_task) {
+ DCHECK(thread_checker_.CalledOnValidThread());
DCHECK(data.get());
- // Don't write after Close().
- if (closed_)
+ // Don't write after error.
+ if (is_closed())
return false;
- queue_.push_back(new PendingPacket(data, done_task));
+ queue_.push_back(new PendingPacket(
+ new net::DrainableIOBuffer(data.get(), data->size()), done_task));
DoWrite();
- // DoWrite() may trigger OnWriteError() to be called.
- return !closed_;
+ return !is_closed();
+}
+
+bool BufferedSocketWriter::is_closed() {
+ return write_callback_.is_null();
}
-void BufferedSocketWriterBase::DoWrite() {
- DCHECK(CalledOnValidThread());
- DCHECK(socket_);
+void BufferedSocketWriter::DoWrite() {
+ DCHECK(thread_checker_.CalledOnValidThread());
+ DCHECK(!write_callback_.is_null());
// Don't try to write if there is another write pending.
if (write_pending_)
return;
- // Don't write after Close().
- if (closed_)
+ // Don't write after error.
+ if (is_closed())
return;
while (true) {
- net::IOBuffer* current_packet;
- int current_packet_size;
- GetNextPacket(&current_packet, &current_packet_size);
+ if (queue_.empty())
+ break;
- // Return if the queue is empty.
- if (!current_packet)
- return;
-
- int result = socket_->Write(
- current_packet, current_packet_size,
- base::Bind(&BufferedSocketWriterBase::OnWritten,
- base::Unretained(this)));
+ int result = write_callback_.Run(
+ queue_.front()->data.get(), queue_.front()->data->size(),
+ base::Bind(&BufferedSocketWriter::OnWritten,
+ weak_factory_.GetWeakPtr()));
bool write_again = false;
HandleWriteResult(result, &write_again);
if (!write_again)
@@ -89,37 +108,45 @@ void BufferedSocketWriterBase::DoWrite() {
}
}
-void BufferedSocketWriterBase::HandleWriteResult(int result,
- bool* write_again) {
+void BufferedSocketWriter::HandleWriteResult(int result, bool* write_again) {
*write_again = false;
if (result < 0) {
if (result == net::ERR_IO_PENDING) {
write_pending_ = true;
} else {
- HandleError(result);
+ write_callback_.Reset();
if (!write_failed_callback_.is_null())
write_failed_callback_.Run(result);
}
return;
}
- base::Closure done_task = AdvanceBufferPosition(result);
- if (!done_task.is_null()) {
- bool destroyed = false;
- destroyed_flag_ = &destroyed;
- done_task.Run();
- if (destroyed) {
- // Stop doing anything if we've been destroyed by the callback.
+ DCHECK(!queue_.empty());
+
+ queue_.front()->data->DidConsume(result);
+
+ if (queue_.front()->data->BytesRemaining() == 0) {
+ base::Closure done_task = queue_.front()->done_task;
+ delete queue_.front();
+ queue_.pop_front();
+
+ if (!done_task.is_null()) {
+ // |done_task| is allowed to delete the writer, so here we post a task to
+ // continue writing after calling |done_task| and return with
+ // |write_again| set to false.
+ base::ThreadTaskRunnerHandle::Get()->PostTask(
+ FROM_HERE, base::Bind(&BufferedSocketWriter::DoWrite,
+ weak_factory_.GetWeakPtr()));
Wez 2015/07/14 21:49:20 Now that you're using WeakPtr you don't even need
Sergey Ulanov 2015/07/14 22:53:20 We don't want to call DoWrite() directly here beca
Wez 2015/07/16 21:55:36 If it's the last thing that HandleWriteResult() ca
Sergey Ulanov 2015/07/16 22:11:03 AFAIK some C++ compilers implement tail call optim
Sergey Ulanov 2015/07/16 22:25:57 Also there are cases when tail call optimization i
+ done_task.Run();
return;
}
- destroyed_flag_ = nullptr;
}
*write_again = true;
Wez 2015/07/14 21:49:20 Why is |write_again| an out parameter rather than
Sergey Ulanov 2015/07/14 22:53:20 Just for readability, to make the purpose of the r
Wez 2015/07/16 21:55:36 You don't need the PostTask, though - you can just
Sergey Ulanov 2015/07/16 22:11:03 So if I understand correctly you are suggesting th
}
-void BufferedSocketWriterBase::OnWritten(int result) {
- DCHECK(CalledOnValidThread());
+void BufferedSocketWriter::OnWritten(int result) {
+ DCHECK(thread_checker_.CalledOnValidThread());
DCHECK(write_pending_);
write_pending_ = false;
@@ -129,94 +156,4 @@ void BufferedSocketWriterBase::OnWritten(int result) {
DoWrite();
Wez 2015/07/14 21:49:20 As noted above, if you have HandleWriteResult() ca
Sergey Ulanov 2015/07/14 22:53:21 Done.
}
-void BufferedSocketWriterBase::HandleError(int result) {
- DCHECK(CalledOnValidThread());
-
- closed_ = true;
-
- STLDeleteElements(&queue_);
-
- // Notify subclass that an error is received.
- OnError(result);
-}
-
-void BufferedSocketWriterBase::Close() {
- DCHECK(CalledOnValidThread());
- closed_ = true;
-}
-
-BufferedSocketWriterBase::~BufferedSocketWriterBase() {
- if (destroyed_flag_)
- *destroyed_flag_ = true;
-
- STLDeleteElements(&queue_);
-}
-
-base::Closure BufferedSocketWriterBase::PopQueue() {
- base::Closure result = queue_.front()->done_task;
- delete queue_.front();
- queue_.pop_front();
- return result;
-}
-
-BufferedSocketWriter::BufferedSocketWriter() {
-}
-
-void BufferedSocketWriter::GetNextPacket(
- net::IOBuffer** buffer, int* size) {
- if (!current_buf_.get()) {
- if (queue_.empty()) {
- *buffer = nullptr;
- return; // Nothing to write.
- }
- current_buf_ = new net::DrainableIOBuffer(queue_.front()->data.get(),
- queue_.front()->data->size());
- }
-
- *buffer = current_buf_.get();
- *size = current_buf_->BytesRemaining();
-}
-
-base::Closure BufferedSocketWriter::AdvanceBufferPosition(int written) {
- current_buf_->DidConsume(written);
-
- if (current_buf_->BytesRemaining() == 0) {
- current_buf_ = nullptr;
- return PopQueue();
- }
- return base::Closure();
-}
-
-void BufferedSocketWriter::OnError(int result) {
- current_buf_ = nullptr;
-}
-
-BufferedSocketWriter::~BufferedSocketWriter() {
-}
-
-BufferedDatagramWriter::BufferedDatagramWriter() {
-}
-
-void BufferedDatagramWriter::GetNextPacket(
- net::IOBuffer** buffer, int* size) {
- if (queue_.empty()) {
- *buffer = nullptr;
- return; // Nothing to write.
- }
- *buffer = queue_.front()->data.get();
- *size = queue_.front()->data->size();
-}
-
-base::Closure BufferedDatagramWriter::AdvanceBufferPosition(int written) {
- DCHECK_EQ(written, queue_.front()->data->size());
- return PopQueue();
-}
-
-void BufferedDatagramWriter::OnError(int result) {
- // Nothing to do here.
-}
-
-BufferedDatagramWriter::~BufferedDatagramWriter() {
-}
-
} // namespace remoting

Powered by Google App Engine
This is Rietveld 408576698