| Index: net/curvecp/messenger.cc
|
| diff --git a/net/curvecp/messenger.cc b/net/curvecp/messenger.cc
|
| index a99abda614d931246bb244582082badd035530a6..355873b7554715d1a08f7cccfb575c6ea0edae61 100644
|
| --- a/net/curvecp/messenger.cc
|
| +++ b/net/curvecp/messenger.cc
|
| @@ -4,6 +4,7 @@
|
|
|
| #include "net/curvecp/messenger.h"
|
|
|
| +#include "base/bind.h"
|
| #include "base/logging.h"
|
| #include "base/message_loop.h"
|
| #include "net/base/io_buffer.h"
|
| @@ -58,39 +59,17 @@ static const size_t kReceiveBufferSize = (128 * 1024);
|
| Messenger::Messenger(Packetizer* packetizer)
|
| : packetizer_(packetizer),
|
| send_buffer_(kSendBufferSize),
|
| - send_complete_callback_(NULL),
|
| - old_receive_complete_callback_(NULL),
|
| pending_receive_length_(0),
|
| - send_message_in_progress_(false),
|
| - ALLOW_THIS_IN_INITIALIZER_LIST(
|
| - send_message_callback_(this, &Messenger::OnSendMessageComplete)),
|
| - ALLOW_THIS_IN_INITIALIZER_LIST(factory_(this)) {
|
| + send_message_in_progress_(false) {
|
| }
|
|
|
| Messenger::~Messenger() {
|
| }
|
|
|
| -int Messenger::Read(IOBuffer* buf, int buf_len, OldCompletionCallback* callback) {
|
| - DCHECK(CalledOnValidThread());
|
| - DCHECK(!old_receive_complete_callback_ &&
|
| - receive_complete_callback_.is_null());
|
| -
|
| - if (!received_list_.bytes_available()) {
|
| - old_receive_complete_callback_ = callback;
|
| - pending_receive_ = buf;
|
| - pending_receive_length_ = buf_len;
|
| - return ERR_IO_PENDING;
|
| - }
|
| -
|
| - int bytes_read = InternalRead(buf, buf_len);
|
| - DCHECK_LT(0, bytes_read);
|
| - return bytes_read;
|
| -}
|
| int Messenger::Read(IOBuffer* buf, int buf_len,
|
| const CompletionCallback& callback) {
|
| DCHECK(CalledOnValidThread());
|
| - DCHECK(!old_receive_complete_callback_ &&
|
| - receive_complete_callback_.is_null());
|
| + DCHECK(receive_complete_callback_.is_null());
|
|
|
| if (!received_list_.bytes_available()) {
|
| receive_complete_callback_ = callback;
|
| @@ -104,10 +83,11 @@ int Messenger::Read(IOBuffer* buf, int buf_len,
|
| return bytes_read;
|
| }
|
|
|
| -int Messenger::Write(IOBuffer* buf, int buf_len, OldCompletionCallback* callback) {
|
| +int Messenger::Write(IOBuffer* buf, int buf_len,
|
| + const CompletionCallback& callback) {
|
| DCHECK(CalledOnValidThread());
|
| DCHECK(!pending_send_.get()); // Already a write pending!
|
| - DCHECK(!send_complete_callback_);
|
| + DCHECK(send_complete_callback_.is_null());
|
| DCHECK_LT(0, buf_len);
|
|
|
| int len = send_buffer_.write(buf->data(), buf_len);
|
| @@ -168,15 +148,15 @@ IOBufferWithSize* Messenger::CreateBufferFromSendQueue() {
|
| DCHECK_EQ(bytes, length);
|
|
|
| // We consumed data, check to see if someone is waiting to write more data.
|
| - if (send_complete_callback_) {
|
| + if (!send_complete_callback_.is_null()) {
|
| DCHECK(pending_send_.get());
|
|
|
| int len = send_buffer_.write(pending_send_->data(), pending_send_length_);
|
| if (len) {
|
| pending_send_ = NULL;
|
| - OldCompletionCallback* callback = send_complete_callback_;
|
| - send_complete_callback_ = NULL;
|
| - callback->Run(len);
|
| + CompletionCallback callback = send_complete_callback_;
|
| + send_complete_callback_.Reset();
|
| + callback.Run(len);
|
| }
|
| }
|
|
|
| @@ -274,10 +254,9 @@ void Messenger::SendMessage(int64 position) {
|
|
|
| sent_list_.MarkBlockSent(position, id);
|
|
|
| - int rv = packetizer_->SendMessage(key_,
|
| - message->data(),
|
| - padded_size,
|
| - &send_message_callback_);
|
| + int rv = packetizer_->SendMessage(
|
| + key_, message->data(), padded_size,
|
| + base::Bind(&Messenger::OnSendMessageComplete, base::Unretained(this)));
|
| if (rv == ERR_IO_PENDING) {
|
| send_message_in_progress_ = true;
|
| return;
|
| @@ -357,19 +336,12 @@ void Messenger::RecvMessage() {
|
|
|
| // If we have data available, and a read is pending, notify the callback.
|
| if (received_list_.bytes_available() &&
|
| - (old_receive_complete_callback_ ||
|
| - !receive_complete_callback_.is_null())) {
|
| + !receive_complete_callback_.is_null()) {
|
| // Pass the data up to the caller.
|
| int bytes_read = InternalRead(pending_receive_, pending_receive_length_);
|
| - if (old_receive_complete_callback_) {
|
| - OldCompletionCallback* callback = old_receive_complete_callback_;
|
| - old_receive_complete_callback_ = NULL;
|
| - callback->Run(bytes_read);
|
| - } else {
|
| - CompletionCallback callback = receive_complete_callback_;
|
| - receive_complete_callback_.Reset();
|
| - callback.Run(bytes_read);
|
| - }
|
| + CompletionCallback callback = receive_complete_callback_;
|
| + receive_complete_callback_.Reset();
|
| + callback.Run(bytes_read);
|
| }
|
| }
|
|
|
| @@ -386,10 +358,9 @@ void Messenger::SendAck(uint32 last_message_received) {
|
| // in progress here...
|
| DCHECK(!send_message_in_progress_);
|
|
|
| - int rv = packetizer_->SendMessage(key_,
|
| - buffer->data(),
|
| - sizeof(Message),
|
| - &send_message_callback_);
|
| + int rv = packetizer_->SendMessage(
|
| + key_, buffer->data(), sizeof(Message),
|
| + base::Bind(&Messenger::OnSendMessageComplete, base::Unretained(this)));
|
| // TODO(mbelshe): Fix me! Deal with the error cases
|
| DCHECK(rv == sizeof(Message));
|
| }
|
|
|