Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1626)

Unified Diff: device/serial/data_source_sender.cc

Issue 646063003: Change data pipe wrappers used by SerialConnection to use message pipe. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: address comments Created 6 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: device/serial/data_source_sender.cc
diff --git a/device/serial/data_source_sender.cc b/device/serial/data_source_sender.cc
index 4eaee758b987b337710244dc38ec0ac24fb46821..ac568bd318a83fd528dbdaa84f41f9a21458c0ac 100644
--- a/device/serial/data_source_sender.cc
+++ b/device/serial/data_source_sender.cc
@@ -4,11 +4,11 @@
#include "device/serial/data_source_sender.h"
+#include <algorithm>
#include <limits>
#include "base/bind.h"
#include "base/message_loop/message_loop.h"
-#include "device/serial/async_waiter.h"
namespace device {
@@ -17,9 +17,9 @@ class DataSourceSender::PendingSend {
public:
PendingSend(DataSourceSender* sender, const ReadyCallback& callback);
- // Asynchronously fills |data| with up to |num_bytes| of data. Following this,
- // one of Done() and DoneWithError() will be called with the result.
- void GetData(void* data, uint32_t num_bytes);
+ // Asynchronously fills |data_| with up to |num_bytes| of data. Following
+ // this, one of Done() and DoneWithError() will be called with the result.
+ void GetData(uint32_t num_bytes);
private:
class Buffer;
@@ -39,9 +39,12 @@ class DataSourceSender::PendingSend {
// Whether the buffer specified by GetData() has been passed to |callback_|,
// but has not yet called Done() or DoneWithError().
bool buffer_in_use_;
+
+ // The data obtained using |callback_| to be dispatched to the client.
+ std::vector<char> data_;
};
-// A Writable implementation that provides a view of a data pipe owned by a
+// A Writable implementation that provides a view of a buffer owned by a
// DataSourceSender.
class DataSourceSender::PendingSend::Buffer : public WritableBuffer {
public:
@@ -58,7 +61,7 @@ class DataSourceSender::PendingSend::Buffer : public WritableBuffer {
virtual void DoneWithError(uint32_t bytes_written, int32_t error) override;
private:
- // The DataSourceSender whose data pipe we are providing a view.
+ // The DataSourceSender of whose buffer we are providing a view.
scoped_refptr<DataSourceSender> sender_;
// The PendingSend to which this buffer has been created in response.
@@ -72,14 +75,15 @@ DataSourceSender::DataSourceSender(const ReadyCallback& ready_callback,
const ErrorCallback& error_callback)
: ready_callback_(ready_callback),
error_callback_(error_callback),
- bytes_sent_(0),
- shut_down_(false) {
+ available_buffer_capacity_(0),
+ paused_(false),
+ shut_down_(false),
+ weak_factory_(this) {
DCHECK(!ready_callback.is_null() && !error_callback.is_null());
}
void DataSourceSender::ShutDown() {
shut_down_ = true;
- waiter_.reset();
ready_callback_.Reset();
error_callback_.Reset();
}
@@ -88,84 +92,70 @@ DataSourceSender::~DataSourceSender() {
DCHECK(shut_down_);
}
-void DataSourceSender::Init(mojo::ScopedDataPipeProducerHandle handle) {
- // This should never occur. |handle_| is only valid and |pending_send_| is
- // only set after Init is called.
- if (pending_send_ || handle_.is_valid() || shut_down_) {
- DispatchFatalError();
- return;
- }
- handle_ = handle.Pass();
- pending_send_.reset(new PendingSend(this, ready_callback_));
- StartWaiting();
+void DataSourceSender::Init(uint32_t buffer_size) {
+ available_buffer_capacity_ = buffer_size;
+ GetMoreData();
}
void DataSourceSender::Resume() {
- if (pending_send_ || !handle_.is_valid()) {
+ if (pending_send_) {
DispatchFatalError();
return;
}
- pending_send_.reset(new PendingSend(this, ready_callback_));
- StartWaiting();
+ paused_ = false;
+ GetMoreData();
}
-void DataSourceSender::OnConnectionError() {
- DispatchFatalError();
+void DataSourceSender::ReportBytesSent(uint32_t bytes_sent) {
+ available_buffer_capacity_ += bytes_sent;
+ if (!pending_send_ && !paused_)
+ GetMoreData();
}
-void DataSourceSender::StartWaiting() {
- DCHECK(pending_send_ && !waiter_);
- waiter_.reset(
- new AsyncWaiter(handle_.get(),
- MOJO_HANDLE_SIGNAL_WRITABLE,
- base::Bind(&DataSourceSender::OnDoneWaiting, this)));
+void DataSourceSender::OnConnectionError() {
+ DispatchFatalError();
}
-void DataSourceSender::OnDoneWaiting(MojoResult result) {
- DCHECK(pending_send_ && !shut_down_ && waiter_);
- waiter_.reset();
- if (result != MOJO_RESULT_OK) {
- DispatchFatalError();
+void DataSourceSender::GetMoreData() {
+ if (shut_down_ || paused_ || pending_send_ || !available_buffer_capacity_)
return;
- }
- void* data = NULL;
- uint32_t num_bytes = std::numeric_limits<uint32_t>::max();
- result = mojo::BeginWriteDataRaw(
- handle_.get(), &data, &num_bytes, MOJO_WRITE_DATA_FLAG_NONE);
- if (result != MOJO_RESULT_OK) {
- DispatchFatalError();
- return;
- }
- pending_send_->GetData(static_cast<char*>(data), num_bytes);
+
+ pending_send_.reset(new PendingSend(this, ready_callback_));
+ pending_send_->GetData(available_buffer_capacity_);
}
-void DataSourceSender::Done(uint32_t bytes_written) {
- DoneInternal(bytes_written);
- if (!shut_down_)
- StartWaiting();
+void DataSourceSender::Done(const std::vector<char>& data) {
+ DoneInternal(data);
+ if (!shut_down_ && available_buffer_capacity_) {
+ base::MessageLoop::current()->PostTask(
+ FROM_HERE,
+ base::Bind(&DataSourceSender::GetMoreData, weak_factory_.GetWeakPtr()));
+ }
}
-void DataSourceSender::DoneWithError(uint32_t bytes_written, int32_t error) {
- DoneInternal(bytes_written);
- pending_send_.reset();
+void DataSourceSender::DoneWithError(const std::vector<char>& data,
+ int32_t error) {
+ DoneInternal(data);
if (!shut_down_)
- client()->OnError(bytes_sent_, error);
- // We don't call StartWaiting here so we don't send any additional data until
+ client()->OnError(error);
+ paused_ = true;
+ // We don't call GetMoreData here so we don't send any additional data until
// Resume() is called.
}
-void DataSourceSender::DoneInternal(uint32_t bytes_written) {
+void DataSourceSender::DoneInternal(const std::vector<char>& data) {
DCHECK(pending_send_);
if (shut_down_)
return;
- bytes_sent_ += bytes_written;
- MojoResult result = mojo::EndWriteDataRaw(handle_.get(), bytes_written);
- if (result != MOJO_RESULT_OK) {
- DispatchFatalError();
- return;
+ available_buffer_capacity_ -= data.size();
+ if (!data.empty()) {
+ mojo::Array<uint8_t> data_to_send(data.size());
+ std::copy(data.begin(), data.end(), &data_to_send[0]);
+ client()->OnData(data_to_send.Pass());
}
+ pending_send_.reset();
}
void DataSourceSender::DispatchFatalError() {
@@ -181,24 +171,30 @@ DataSourceSender::PendingSend::PendingSend(DataSourceSender* sender,
: sender_(sender), callback_(callback), buffer_in_use_(false) {
}
-void DataSourceSender::PendingSend::GetData(void* data, uint32_t num_bytes) {
+void DataSourceSender::PendingSend::GetData(uint32_t num_bytes) {
+ DCHECK(num_bytes);
DCHECK(!buffer_in_use_);
buffer_in_use_ = true;
+ data_.resize(num_bytes);
callback_.Run(scoped_ptr<WritableBuffer>(
- new Buffer(sender_, this, static_cast<char*>(data), num_bytes)));
+ new Buffer(sender_, this, &data_[0], num_bytes)));
}
void DataSourceSender::PendingSend::Done(uint32_t bytes_written) {
DCHECK(buffer_in_use_);
+ DCHECK_LE(bytes_written, data_.size());
buffer_in_use_ = false;
- sender_->Done(bytes_written);
+ data_.resize(bytes_written);
+ sender_->Done(data_);
}
void DataSourceSender::PendingSend::DoneWithError(uint32_t bytes_written,
int32_t error) {
DCHECK(buffer_in_use_);
+ DCHECK_LE(bytes_written, data_.size());
buffer_in_use_ = false;
- sender_->DoneWithError(bytes_written, error);
+ data_.resize(bytes_written);
+ sender_->DoneWithError(data_, error);
}
DataSourceSender::PendingSend::Buffer::Buffer(
@@ -227,22 +223,20 @@ uint32_t DataSourceSender::PendingSend::Buffer::GetSize() {
void DataSourceSender::PendingSend::Buffer::Done(uint32_t bytes_written) {
DCHECK(sender_.get());
- pending_send_->Done(bytes_written);
- sender_ = NULL;
- pending_send_ = NULL;
- buffer_ = NULL;
- buffer_size_ = 0;
+ PendingSend* send = pending_send_;
+ pending_send_ = nullptr;
+ sender_ = nullptr;
+ send->Done(bytes_written);
}
void DataSourceSender::PendingSend::Buffer::DoneWithError(
uint32_t bytes_written,
int32_t error) {
DCHECK(sender_.get());
- pending_send_->DoneWithError(bytes_written, error);
- sender_ = NULL;
- pending_send_ = NULL;
- buffer_ = NULL;
- buffer_size_ = 0;
+ PendingSend* send = pending_send_;
+ pending_send_ = nullptr;
+ sender_ = nullptr;
+ send->DoneWithError(bytes_written, error);
}
} // namespace device

Powered by Google App Engine
This is Rietveld 408576698