| OLD | NEW |
| 1 // Copyright (c) 2011 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2011 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "remoting/protocol/buffered_socket_writer.h" | 5 #include "remoting/protocol/buffered_socket_writer.h" |
| 6 | 6 |
| 7 #include "base/bind.h" |
| 7 #include "base/location.h" | 8 #include "base/location.h" |
| 8 #include "base/message_loop_proxy.h" | 9 #include "base/message_loop_proxy.h" |
| 9 #include "base/stl_util.h" | 10 #include "base/stl_util.h" |
| 10 #include "net/base/net_errors.h" | 11 #include "net/base/net_errors.h" |
| 11 | 12 |
| 12 namespace remoting { | 13 namespace remoting { |
| 13 namespace protocol { | 14 namespace protocol { |
| 14 | 15 |
| 15 class BufferedSocketWriterBase::PendingPacket { | 16 class BufferedSocketWriterBase::PendingPacket { |
| 16 public: | 17 public: |
| 17 PendingPacket(scoped_refptr<net::IOBufferWithSize> data, Task* done_task) | 18 PendingPacket(scoped_refptr<net::IOBufferWithSize> data, |
| 19 const base::Closure& done_task) |
| 18 : data_(data), | 20 : data_(data), |
| 19 done_task_(done_task) { | 21 done_task_(done_task) { |
| 20 } | 22 } |
| 21 ~PendingPacket() { | 23 ~PendingPacket() { |
| 22 if (done_task_.get()) | 24 if (!done_task_.is_null()) |
| 23 done_task_->Run(); | 25 done_task_.Run(); |
| 24 } | 26 } |
| 25 | 27 |
| 26 net::IOBufferWithSize* data() { | 28 net::IOBufferWithSize* data() { |
| 27 return data_; | 29 return data_; |
| 28 } | 30 } |
| 29 | 31 |
| 30 private: | 32 private: |
| 31 scoped_refptr<net::IOBufferWithSize> data_; | 33 scoped_refptr<net::IOBufferWithSize> data_; |
| 32 scoped_ptr<Task> done_task_; | 34 base::Closure done_task_; |
| 33 | 35 |
| 34 DISALLOW_COPY_AND_ASSIGN(PendingPacket); | 36 DISALLOW_COPY_AND_ASSIGN(PendingPacket); |
| 35 }; | 37 }; |
| 36 | 38 |
| 37 BufferedSocketWriterBase::BufferedSocketWriterBase( | 39 BufferedSocketWriterBase::BufferedSocketWriterBase( |
| 38 base::MessageLoopProxy* message_loop) | 40 base::MessageLoopProxy* message_loop) |
| 39 : buffer_size_(0), | 41 : buffer_size_(0), |
| 40 socket_(NULL), | 42 socket_(NULL), |
| 41 message_loop_(message_loop), | 43 message_loop_(message_loop), |
| 42 write_pending_(false), | 44 write_pending_(false), |
| 43 ALLOW_THIS_IN_INITIALIZER_LIST( | 45 ALLOW_THIS_IN_INITIALIZER_LIST( |
| 44 written_callback_(this, &BufferedSocketWriterBase::OnWritten)), | 46 written_callback_(this, &BufferedSocketWriterBase::OnWritten)), |
| 45 closed_(false) { | 47 closed_(false) { |
| 46 } | 48 } |
| 47 | 49 |
| 48 BufferedSocketWriterBase::~BufferedSocketWriterBase() { } | 50 BufferedSocketWriterBase::~BufferedSocketWriterBase() { } |
| 49 | 51 |
| 50 void BufferedSocketWriterBase::Init(net::Socket* socket, | 52 void BufferedSocketWriterBase::Init(net::Socket* socket, |
| 51 WriteFailedCallback* callback) { | 53 const WriteFailedCallback& callback) { |
| 52 // TODO(garykac) Save copy of WriteFailedCallback. | 54 DCHECK(message_loop_->BelongsToCurrentThread()); |
| 53 base::AutoLock auto_lock(lock_); | 55 DCHECK(socket); |
| 54 socket_ = socket; | 56 socket_ = socket; |
| 55 DCHECK(socket_); | 57 write_failed_callback_ = callback; |
| 56 } | 58 } |
| 57 | 59 |
| 58 bool BufferedSocketWriterBase::Write( | 60 bool BufferedSocketWriterBase::Write( |
| 59 scoped_refptr<net::IOBufferWithSize> data, Task* done_task) { | 61 scoped_refptr<net::IOBufferWithSize> data, const base::Closure& done_task) { |
| 60 { | 62 { |
| 61 base::AutoLock auto_lock(lock_); | 63 base::AutoLock auto_lock(lock_); |
| 62 queue_.push_back(new PendingPacket(data, done_task)); | 64 queue_.push_back(new PendingPacket(data, done_task)); |
| 63 buffer_size_ += data->size(); | 65 buffer_size_ += data->size(); |
| 64 } | 66 } |
| 65 message_loop_->PostTask( | 67 message_loop_->PostTask( |
| 66 FROM_HERE, NewRunnableMethod(this, &BufferedSocketWriterBase::DoWrite)); | 68 FROM_HERE, base::Bind(&BufferedSocketWriterBase::DoWrite, this)); |
| 67 return true; | 69 return true; |
| 68 } | 70 } |
| 69 | 71 |
| 70 void BufferedSocketWriterBase::DoWrite() { | 72 void BufferedSocketWriterBase::DoWrite() { |
| 71 DCHECK(message_loop_->BelongsToCurrentThread()); | 73 DCHECK(message_loop_->BelongsToCurrentThread()); |
| 72 DCHECK(socket_); | 74 DCHECK(socket_); |
| 73 | 75 |
| 74 // Don't try to write if there is another write pending. | 76 // Don't try to write if there is another write pending. |
| 75 if (write_pending_) | 77 if (write_pending_) |
| 76 return; | 78 return; |
| (...skipping 17 matching lines...) Expand all Loading... |
| 94 int result = socket_->Write(current_packet, current_packet_size, | 96 int result = socket_->Write(current_packet, current_packet_size, |
| 95 &written_callback_); | 97 &written_callback_); |
| 96 if (result >= 0) { | 98 if (result >= 0) { |
| 97 base::AutoLock auto_lock(lock_); | 99 base::AutoLock auto_lock(lock_); |
| 98 AdvanceBufferPosition_Locked(result); | 100 AdvanceBufferPosition_Locked(result); |
| 99 } else { | 101 } else { |
| 100 if (result == net::ERR_IO_PENDING) { | 102 if (result == net::ERR_IO_PENDING) { |
| 101 write_pending_ = true; | 103 write_pending_ = true; |
| 102 } else { | 104 } else { |
| 103 HandleError(result); | 105 HandleError(result); |
| 104 if (write_failed_callback_.get()) | 106 if (!write_failed_callback_.is_null()) |
| 105 write_failed_callback_->Run(result); | 107 write_failed_callback_.Run(result); |
| 106 } | 108 } |
| 107 return; | 109 return; |
| 108 } | 110 } |
| 109 } | 111 } |
| 110 } | 112 } |
| 111 | 113 |
| 112 void BufferedSocketWriterBase::OnWritten(int result) { | 114 void BufferedSocketWriterBase::OnWritten(int result) { |
| 113 DCHECK(message_loop_->BelongsToCurrentThread()); | 115 DCHECK(message_loop_->BelongsToCurrentThread()); |
| 114 write_pending_ = false; | 116 write_pending_ = false; |
| 115 | 117 |
| 116 if (result < 0) { | 118 if (result < 0) { |
| 117 HandleError(result); | 119 HandleError(result); |
| 118 if (write_failed_callback_.get()) | 120 if (!write_failed_callback_.is_null()) |
| 119 write_failed_callback_->Run(result); | 121 write_failed_callback_.Run(result); |
| 120 return; | 122 return; |
| 121 } | 123 } |
| 122 | 124 |
| 123 { | 125 { |
| 124 base::AutoLock auto_lock(lock_); | 126 base::AutoLock auto_lock(lock_); |
| 125 AdvanceBufferPosition_Locked(result); | 127 AdvanceBufferPosition_Locked(result); |
| 126 } | 128 } |
| 127 | 129 |
| 128 // Schedule next write. | 130 // Schedule next write. |
| 129 message_loop_->PostTask( | 131 message_loop_->PostTask( |
| (...skipping 92 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 222 buffer_size_ -= queue_.front()->data()->size(); | 224 buffer_size_ -= queue_.front()->data()->size(); |
| 223 PopQueue(); | 225 PopQueue(); |
| 224 } | 226 } |
| 225 | 227 |
| 226 void BufferedDatagramWriter::OnError_Locked(int result) { | 228 void BufferedDatagramWriter::OnError_Locked(int result) { |
| 227 // Nothing to do here. | 229 // Nothing to do here. |
| 228 } | 230 } |
| 229 | 231 |
| 230 } // namespace protocol | 232 } // namespace protocol |
| 231 } // namespace remoting | 233 } // namespace remoting |
| OLD | NEW |