Index: device/serial/data_source_sender.cc |
diff --git a/device/serial/data_source_sender.cc b/device/serial/data_source_sender.cc |
index 4eaee758b987b337710244dc38ec0ac24fb46821..d9372808478bf01a9a7c469e164d794d8c3a3338 100644 |
--- a/device/serial/data_source_sender.cc |
+++ b/device/serial/data_source_sender.cc |
@@ -5,10 +5,10 @@ |
#include "device/serial/data_source_sender.h" |
#include <limits> |
+#include <vector> |
#include "base/bind.h" |
#include "base/message_loop/message_loop.h" |
-#include "device/serial/async_waiter.h" |
namespace device { |
@@ -19,7 +19,9 @@ class DataSourceSender::PendingSend { |
// Asynchronously fills |data| with up to |num_bytes| of data. Following this, |
raymes
2014/10/17 04:04:45
|data| is gone!
Sam McNally
2014/10/20 05:12:59
Done.
|
// one of Done() and DoneWithError() will be called with the result. |
- void GetData(void* data, uint32_t num_bytes); |
+ void GetData(uint32_t num_bytes); |
+ |
+ void DispatchData(serial::DataSourceClient* client); |
raymes
2014/10/17 04:04:45
Please add a comment here.
Sam McNally
2014/10/20 05:12:59
Done.
|
private: |
class Buffer; |
@@ -39,6 +41,8 @@ class DataSourceSender::PendingSend { |
// Whether the buffer specified by GetData() has been passed to |callback_|, |
// but has not yet called Done() or DoneWithError(). |
bool buffer_in_use_; |
+ |
+ std::vector<char> data_; |
}; |
// A Writable implementation that provides a view of a data pipe owned by a |
@@ -72,14 +76,14 @@ DataSourceSender::DataSourceSender(const ReadyCallback& ready_callback, |
const ErrorCallback& error_callback) |
: ready_callback_(ready_callback), |
error_callback_(error_callback), |
- bytes_sent_(0), |
- shut_down_(false) { |
+ available_buffer_capacity_(0), |
+ shut_down_(false), |
+ weak_factory_(this) { |
DCHECK(!ready_callback.is_null() && !error_callback.is_null()); |
} |
void DataSourceSender::ShutDown() { |
shut_down_ = true; |
- waiter_.reset(); |
ready_callback_.Reset(); |
error_callback_.Reset(); |
} |
@@ -88,70 +92,49 @@ DataSourceSender::~DataSourceSender() { |
DCHECK(shut_down_); |
} |
-void DataSourceSender::Init(mojo::ScopedDataPipeProducerHandle handle) { |
- // This should never occur. |handle_| is only valid and |pending_send_| is |
- // only set after Init is called. |
- if (pending_send_ || handle_.is_valid() || shut_down_) { |
- DispatchFatalError(); |
- return; |
- } |
- handle_ = handle.Pass(); |
- pending_send_.reset(new PendingSend(this, ready_callback_)); |
- StartWaiting(); |
+void DataSourceSender::Init(uint32_t buffer_size) { |
+ available_buffer_capacity_ = buffer_size; |
+ GetMoreData(); |
} |
void DataSourceSender::Resume() { |
- if (pending_send_ || !handle_.is_valid()) { |
+ if (pending_send_) { |
DispatchFatalError(); |
return; |
} |
- pending_send_.reset(new PendingSend(this, ready_callback_)); |
- StartWaiting(); |
+ GetMoreData(); |
} |
-void DataSourceSender::OnConnectionError() { |
- DispatchFatalError(); |
+void DataSourceSender::AckData(uint32_t bytes_dispatched) { |
+ available_buffer_capacity_ += bytes_dispatched; |
} |
-void DataSourceSender::StartWaiting() { |
- DCHECK(pending_send_ && !waiter_); |
- waiter_.reset( |
- new AsyncWaiter(handle_.get(), |
- MOJO_HANDLE_SIGNAL_WRITABLE, |
- base::Bind(&DataSourceSender::OnDoneWaiting, this))); |
+void DataSourceSender::OnConnectionError() { |
+ DispatchFatalError(); |
} |
-void DataSourceSender::OnDoneWaiting(MojoResult result) { |
- DCHECK(pending_send_ && !shut_down_ && waiter_); |
- waiter_.reset(); |
- if (result != MOJO_RESULT_OK) { |
- DispatchFatalError(); |
- return; |
- } |
- void* data = NULL; |
- uint32_t num_bytes = std::numeric_limits<uint32_t>::max(); |
- result = mojo::BeginWriteDataRaw( |
- handle_.get(), &data, &num_bytes, MOJO_WRITE_DATA_FLAG_NONE); |
- if (result != MOJO_RESULT_OK) { |
- DispatchFatalError(); |
- return; |
- } |
- pending_send_->GetData(static_cast<char*>(data), num_bytes); |
+void DataSourceSender::GetMoreData() { |
+ DCHECK(!pending_send_ && !shut_down_); |
+ pending_send_.reset(new PendingSend(this, ready_callback_)); |
+ pending_send_->GetData(available_buffer_capacity_); |
raymes
2014/10/17 04:04:45
What if the available buffer capacity is 0? Don't
Sam McNally
2014/10/20 05:12:59
Done.
|
} |
void DataSourceSender::Done(uint32_t bytes_written) { |
DoneInternal(bytes_written); |
- if (!shut_down_) |
- StartWaiting(); |
+ if (!shut_down_) { |
+ base::MessageLoop::current()->PostTask( |
+ FROM_HERE, |
+ base::Bind(&DataSourceSender::GetMoreData, weak_factory_.GetWeakPtr())); |
+ } |
} |
void DataSourceSender::DoneWithError(uint32_t bytes_written, int32_t error) { |
DoneInternal(bytes_written); |
pending_send_.reset(); |
if (!shut_down_) |
- client()->OnError(bytes_sent_, error); |
- // We don't call StartWaiting here so we don't send any additional data until |
+ client()->OnError(error); |
+ // We don't call GetMoreData here so we don't send any additional data until |
// Resume() is called. |
} |
@@ -160,12 +143,9 @@ void DataSourceSender::DoneInternal(uint32_t bytes_written) { |
if (shut_down_) |
return; |
- bytes_sent_ += bytes_written; |
- MojoResult result = mojo::EndWriteDataRaw(handle_.get(), bytes_written); |
- if (result != MOJO_RESULT_OK) { |
- DispatchFatalError(); |
- return; |
- } |
+ available_buffer_capacity_ -= bytes_written; |
+ pending_send_->DispatchData(client()); |
raymes
2014/10/17 04:04:45
Rather than passing the client to the pending send
Sam McNally
2014/10/20 05:12:59
Done.
|
+ pending_send_.reset(); |
} |
void DataSourceSender::DispatchFatalError() { |
@@ -181,22 +161,37 @@ DataSourceSender::PendingSend::PendingSend(DataSourceSender* sender, |
: sender_(sender), callback_(callback), buffer_in_use_(false) { |
} |
-void DataSourceSender::PendingSend::GetData(void* data, uint32_t num_bytes) { |
+void DataSourceSender::PendingSend::GetData(uint32_t num_bytes) { |
DCHECK(!buffer_in_use_); |
buffer_in_use_ = true; |
+ data_.resize(num_bytes); |
callback_.Run(scoped_ptr<WritableBuffer>( |
- new Buffer(sender_, this, static_cast<char*>(data), num_bytes))); |
+ new Buffer(sender_, this, &data_[0], num_bytes))); |
raymes
2014/10/17 04:04:45
We should be careful of a 0 buffer size here.
Sam McNally
2014/10/20 05:12:59
Done.
|
+} |
+ |
+void DataSourceSender::PendingSend::DispatchData( |
+ serial::DataSourceClient* client) { |
+ if (data_.empty()) |
+ return; |
+ |
+ mojo::Array<uint8_t> data(data_.size()); |
+ memcpy(&data[0], reinterpret_cast<uint8_t*>(&data_[0]), data.size()); |
+ client->OnData(data.Pass()); |
} |
void DataSourceSender::PendingSend::Done(uint32_t bytes_written) { |
DCHECK(buffer_in_use_); |
+ DCHECK_LE(bytes_written, data_.size()); |
buffer_in_use_ = false; |
+ data_.resize(bytes_written); |
sender_->Done(bytes_written); |
} |
void DataSourceSender::PendingSend::DoneWithError(uint32_t bytes_written, |
int32_t error) { |
DCHECK(buffer_in_use_); |
+ DCHECK_LE(bytes_written, data_.size()); |
+ data_.resize(bytes_written); |
buffer_in_use_ = false; |
raymes
2014/10/17 04:04:45
nit: ordering of these 3 lines should be the same
Sam McNally
2014/10/20 05:12:59
Done.
|
sender_->DoneWithError(bytes_written, error); |
} |
@@ -227,9 +222,10 @@ uint32_t DataSourceSender::PendingSend::Buffer::GetSize() { |
void DataSourceSender::PendingSend::Buffer::Done(uint32_t bytes_written) { |
DCHECK(sender_.get()); |
- pending_send_->Done(bytes_written); |
- sender_ = NULL; |
- pending_send_ = NULL; |
+ PendingSend* send = pending_send_; |
+ pending_send_ = nullptr; |
+ sender_ = nullptr; |
+ send->Done(bytes_written); |
buffer_ = NULL; |
buffer_size_ = 0; |
} |
@@ -238,9 +234,10 @@ void DataSourceSender::PendingSend::Buffer::DoneWithError( |
uint32_t bytes_written, |
int32_t error) { |
DCHECK(sender_.get()); |
- pending_send_->DoneWithError(bytes_written, error); |
- sender_ = NULL; |
- pending_send_ = NULL; |
+ PendingSend* send = pending_send_; |
+ pending_send_ = nullptr; |
+ sender_ = nullptr; |
+ send->DoneWithError(bytes_written, error); |
buffer_ = NULL; |
buffer_size_ = 0; |
} |