Index: device/serial/data_source_sender.cc |
diff --git a/device/serial/data_source_sender.cc b/device/serial/data_source_sender.cc |
index 4eaee758b987b337710244dc38ec0ac24fb46821..ac568bd318a83fd528dbdaa84f41f9a21458c0ac 100644 |
--- a/device/serial/data_source_sender.cc |
+++ b/device/serial/data_source_sender.cc |
@@ -4,11 +4,11 @@ |
#include "device/serial/data_source_sender.h" |
+#include <algorithm> |
#include <limits> |
#include "base/bind.h" |
#include "base/message_loop/message_loop.h" |
-#include "device/serial/async_waiter.h" |
namespace device { |
@@ -17,9 +17,9 @@ class DataSourceSender::PendingSend { |
public: |
PendingSend(DataSourceSender* sender, const ReadyCallback& callback); |
- // Asynchronously fills |data| with up to |num_bytes| of data. Following this, |
- // one of Done() and DoneWithError() will be called with the result. |
- void GetData(void* data, uint32_t num_bytes); |
+ // Asynchronously fills |data_| with up to |num_bytes| of data. Following |
+ // this, one of Done() and DoneWithError() will be called with the result. |
+ void GetData(uint32_t num_bytes); |
private: |
class Buffer; |
@@ -39,9 +39,12 @@ class DataSourceSender::PendingSend { |
// Whether the buffer specified by GetData() has been passed to |callback_|, |
// but has not yet called Done() or DoneWithError(). |
bool buffer_in_use_; |
+ |
+ // The data obtained using |callback_| to be dispatched to the client. |
+ std::vector<char> data_; |
}; |
-// A Writable implementation that provides a view of a data pipe owned by a |
+// A Writable implementation that provides a view of a buffer owned by a |
// DataSourceSender. |
class DataSourceSender::PendingSend::Buffer : public WritableBuffer { |
public: |
@@ -58,7 +61,7 @@ class DataSourceSender::PendingSend::Buffer : public WritableBuffer { |
virtual void DoneWithError(uint32_t bytes_written, int32_t error) override; |
private: |
- // The DataSourceSender whose data pipe we are providing a view. |
+ // The DataSourceSender of whose buffer we are providing a view. |
scoped_refptr<DataSourceSender> sender_; |
// The PendingSend to which this buffer has been created in response. |
@@ -72,14 +75,15 @@ DataSourceSender::DataSourceSender(const ReadyCallback& ready_callback, |
const ErrorCallback& error_callback) |
: ready_callback_(ready_callback), |
error_callback_(error_callback), |
- bytes_sent_(0), |
- shut_down_(false) { |
+ available_buffer_capacity_(0), |
+ paused_(false), |
+ shut_down_(false), |
+ weak_factory_(this) { |
DCHECK(!ready_callback.is_null() && !error_callback.is_null()); |
} |
void DataSourceSender::ShutDown() { |
shut_down_ = true; |
- waiter_.reset(); |
ready_callback_.Reset(); |
error_callback_.Reset(); |
} |
@@ -88,84 +92,70 @@ DataSourceSender::~DataSourceSender() { |
DCHECK(shut_down_); |
} |
-void DataSourceSender::Init(mojo::ScopedDataPipeProducerHandle handle) { |
- // This should never occur. |handle_| is only valid and |pending_send_| is |
- // only set after Init is called. |
- if (pending_send_ || handle_.is_valid() || shut_down_) { |
- DispatchFatalError(); |
- return; |
- } |
- handle_ = handle.Pass(); |
- pending_send_.reset(new PendingSend(this, ready_callback_)); |
- StartWaiting(); |
+void DataSourceSender::Init(uint32_t buffer_size) { |
+ available_buffer_capacity_ = buffer_size; |
+ GetMoreData(); |
} |
void DataSourceSender::Resume() { |
- if (pending_send_ || !handle_.is_valid()) { |
+ if (pending_send_) { |
DispatchFatalError(); |
return; |
} |
- pending_send_.reset(new PendingSend(this, ready_callback_)); |
- StartWaiting(); |
+ paused_ = false; |
+ GetMoreData(); |
} |
-void DataSourceSender::OnConnectionError() { |
- DispatchFatalError(); |
+void DataSourceSender::ReportBytesSent(uint32_t bytes_sent) { |
+ available_buffer_capacity_ += bytes_sent; |
+ if (!pending_send_ && !paused_) |
+ GetMoreData(); |
} |
-void DataSourceSender::StartWaiting() { |
- DCHECK(pending_send_ && !waiter_); |
- waiter_.reset( |
- new AsyncWaiter(handle_.get(), |
- MOJO_HANDLE_SIGNAL_WRITABLE, |
- base::Bind(&DataSourceSender::OnDoneWaiting, this))); |
+void DataSourceSender::OnConnectionError() { |
+ DispatchFatalError(); |
} |
-void DataSourceSender::OnDoneWaiting(MojoResult result) { |
- DCHECK(pending_send_ && !shut_down_ && waiter_); |
- waiter_.reset(); |
- if (result != MOJO_RESULT_OK) { |
- DispatchFatalError(); |
+void DataSourceSender::GetMoreData() { |
+ if (shut_down_ || paused_ || pending_send_ || !available_buffer_capacity_) |
return; |
- } |
- void* data = NULL; |
- uint32_t num_bytes = std::numeric_limits<uint32_t>::max(); |
- result = mojo::BeginWriteDataRaw( |
- handle_.get(), &data, &num_bytes, MOJO_WRITE_DATA_FLAG_NONE); |
- if (result != MOJO_RESULT_OK) { |
- DispatchFatalError(); |
- return; |
- } |
- pending_send_->GetData(static_cast<char*>(data), num_bytes); |
+ |
+ pending_send_.reset(new PendingSend(this, ready_callback_)); |
+ pending_send_->GetData(available_buffer_capacity_); |
} |
-void DataSourceSender::Done(uint32_t bytes_written) { |
- DoneInternal(bytes_written); |
- if (!shut_down_) |
- StartWaiting(); |
+void DataSourceSender::Done(const std::vector<char>& data) { |
+ DoneInternal(data); |
+ if (!shut_down_ && available_buffer_capacity_) { |
+ base::MessageLoop::current()->PostTask( |
+ FROM_HERE, |
+ base::Bind(&DataSourceSender::GetMoreData, weak_factory_.GetWeakPtr())); |
+ } |
} |
-void DataSourceSender::DoneWithError(uint32_t bytes_written, int32_t error) { |
- DoneInternal(bytes_written); |
- pending_send_.reset(); |
+void DataSourceSender::DoneWithError(const std::vector<char>& data, |
+ int32_t error) { |
+ DoneInternal(data); |
if (!shut_down_) |
- client()->OnError(bytes_sent_, error); |
- // We don't call StartWaiting here so we don't send any additional data until |
+ client()->OnError(error); |
+ paused_ = true; |
+ // We don't call GetMoreData here so we don't send any additional data until |
// Resume() is called. |
} |
-void DataSourceSender::DoneInternal(uint32_t bytes_written) { |
+void DataSourceSender::DoneInternal(const std::vector<char>& data) { |
DCHECK(pending_send_); |
if (shut_down_) |
return; |
- bytes_sent_ += bytes_written; |
- MojoResult result = mojo::EndWriteDataRaw(handle_.get(), bytes_written); |
- if (result != MOJO_RESULT_OK) { |
- DispatchFatalError(); |
- return; |
+ available_buffer_capacity_ -= data.size(); |
+ if (!data.empty()) { |
+ mojo::Array<uint8_t> data_to_send(data.size()); |
+ std::copy(data.begin(), data.end(), &data_to_send[0]); |
+ client()->OnData(data_to_send.Pass()); |
} |
+ pending_send_.reset(); |
} |
void DataSourceSender::DispatchFatalError() { |
@@ -181,24 +171,30 @@ DataSourceSender::PendingSend::PendingSend(DataSourceSender* sender, |
: sender_(sender), callback_(callback), buffer_in_use_(false) { |
} |
-void DataSourceSender::PendingSend::GetData(void* data, uint32_t num_bytes) { |
+void DataSourceSender::PendingSend::GetData(uint32_t num_bytes) { |
+ DCHECK(num_bytes); |
DCHECK(!buffer_in_use_); |
buffer_in_use_ = true; |
+ data_.resize(num_bytes); |
callback_.Run(scoped_ptr<WritableBuffer>( |
- new Buffer(sender_, this, static_cast<char*>(data), num_bytes))); |
+ new Buffer(sender_, this, &data_[0], num_bytes))); |
} |
void DataSourceSender::PendingSend::Done(uint32_t bytes_written) { |
DCHECK(buffer_in_use_); |
+ DCHECK_LE(bytes_written, data_.size()); |
buffer_in_use_ = false; |
- sender_->Done(bytes_written); |
+ data_.resize(bytes_written); |
+ sender_->Done(data_); |
} |
void DataSourceSender::PendingSend::DoneWithError(uint32_t bytes_written, |
int32_t error) { |
DCHECK(buffer_in_use_); |
+ DCHECK_LE(bytes_written, data_.size()); |
buffer_in_use_ = false; |
- sender_->DoneWithError(bytes_written, error); |
+ data_.resize(bytes_written); |
+ sender_->DoneWithError(data_, error); |
} |
DataSourceSender::PendingSend::Buffer::Buffer( |
@@ -227,22 +223,20 @@ uint32_t DataSourceSender::PendingSend::Buffer::GetSize() { |
void DataSourceSender::PendingSend::Buffer::Done(uint32_t bytes_written) { |
DCHECK(sender_.get()); |
- pending_send_->Done(bytes_written); |
- sender_ = NULL; |
- pending_send_ = NULL; |
- buffer_ = NULL; |
- buffer_size_ = 0; |
+ PendingSend* send = pending_send_; |
+ pending_send_ = nullptr; |
+ sender_ = nullptr; |
+ send->Done(bytes_written); |
} |
void DataSourceSender::PendingSend::Buffer::DoneWithError( |
uint32_t bytes_written, |
int32_t error) { |
DCHECK(sender_.get()); |
- pending_send_->DoneWithError(bytes_written, error); |
- sender_ = NULL; |
- pending_send_ = NULL; |
- buffer_ = NULL; |
- buffer_size_ = 0; |
+ PendingSend* send = pending_send_; |
+ pending_send_ = nullptr; |
+ sender_ = nullptr; |
+ send->DoneWithError(bytes_written, error); |
} |
} // namespace device |