Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(72)

Side by Side Diff: remoting/protocol/buffered_socket_writer.cc

Issue 8116021: Switch remoting/protocol to new callbacks (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: - Created 9 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « remoting/protocol/buffered_socket_writer.h ('k') | remoting/protocol/client_control_sender.h » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2011 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2011 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "remoting/protocol/buffered_socket_writer.h" 5 #include "remoting/protocol/buffered_socket_writer.h"
6 6
7 #include "base/bind.h"
7 #include "base/location.h" 8 #include "base/location.h"
8 #include "base/message_loop_proxy.h" 9 #include "base/message_loop_proxy.h"
9 #include "base/stl_util.h" 10 #include "base/stl_util.h"
10 #include "net/base/net_errors.h" 11 #include "net/base/net_errors.h"
11 12
12 namespace remoting { 13 namespace remoting {
13 namespace protocol { 14 namespace protocol {
14 15
15 class BufferedSocketWriterBase::PendingPacket { 16 class BufferedSocketWriterBase::PendingPacket {
16 public: 17 public:
17 PendingPacket(scoped_refptr<net::IOBufferWithSize> data, Task* done_task) 18 PendingPacket(scoped_refptr<net::IOBufferWithSize> data,
19 const base::Closure& done_task)
18 : data_(data), 20 : data_(data),
19 done_task_(done_task) { 21 done_task_(done_task) {
20 } 22 }
21 ~PendingPacket() { 23 ~PendingPacket() {
22 if (done_task_.get()) 24 if (!done_task_.is_null())
23 done_task_->Run(); 25 done_task_.Run();
24 } 26 }
25 27
26 net::IOBufferWithSize* data() { 28 net::IOBufferWithSize* data() {
27 return data_; 29 return data_;
28 } 30 }
29 31
30 private: 32 private:
31 scoped_refptr<net::IOBufferWithSize> data_; 33 scoped_refptr<net::IOBufferWithSize> data_;
32 scoped_ptr<Task> done_task_; 34 base::Closure done_task_;
33 35
34 DISALLOW_COPY_AND_ASSIGN(PendingPacket); 36 DISALLOW_COPY_AND_ASSIGN(PendingPacket);
35 }; 37 };
36 38
37 BufferedSocketWriterBase::BufferedSocketWriterBase( 39 BufferedSocketWriterBase::BufferedSocketWriterBase(
38 base::MessageLoopProxy* message_loop) 40 base::MessageLoopProxy* message_loop)
39 : buffer_size_(0), 41 : buffer_size_(0),
40 socket_(NULL), 42 socket_(NULL),
41 message_loop_(message_loop), 43 message_loop_(message_loop),
42 write_pending_(false), 44 write_pending_(false),
43 ALLOW_THIS_IN_INITIALIZER_LIST( 45 ALLOW_THIS_IN_INITIALIZER_LIST(
44 written_callback_(this, &BufferedSocketWriterBase::OnWritten)), 46 written_callback_(this, &BufferedSocketWriterBase::OnWritten)),
45 closed_(false) { 47 closed_(false) {
46 } 48 }
47 49
48 BufferedSocketWriterBase::~BufferedSocketWriterBase() { } 50 BufferedSocketWriterBase::~BufferedSocketWriterBase() { }
49 51
50 void BufferedSocketWriterBase::Init(net::Socket* socket, 52 void BufferedSocketWriterBase::Init(net::Socket* socket,
51 WriteFailedCallback* callback) { 53 const WriteFailedCallback& callback) {
52 // TODO(garykac) Save copy of WriteFailedCallback. 54 DCHECK(message_loop_->BelongsToCurrentThread());
53 base::AutoLock auto_lock(lock_); 55 DCHECK(socket);
54 socket_ = socket; 56 socket_ = socket;
55 DCHECK(socket_); 57 write_failed_callback_ = callback;
56 } 58 }
57 59
58 bool BufferedSocketWriterBase::Write( 60 bool BufferedSocketWriterBase::Write(
59 scoped_refptr<net::IOBufferWithSize> data, Task* done_task) { 61 scoped_refptr<net::IOBufferWithSize> data, const base::Closure& done_task) {
60 { 62 {
61 base::AutoLock auto_lock(lock_); 63 base::AutoLock auto_lock(lock_);
62 queue_.push_back(new PendingPacket(data, done_task)); 64 queue_.push_back(new PendingPacket(data, done_task));
63 buffer_size_ += data->size(); 65 buffer_size_ += data->size();
64 } 66 }
65 message_loop_->PostTask( 67 message_loop_->PostTask(
66 FROM_HERE, NewRunnableMethod(this, &BufferedSocketWriterBase::DoWrite)); 68 FROM_HERE, base::Bind(&BufferedSocketWriterBase::DoWrite, this));
67 return true; 69 return true;
68 } 70 }
69 71
70 void BufferedSocketWriterBase::DoWrite() { 72 void BufferedSocketWriterBase::DoWrite() {
71 DCHECK(message_loop_->BelongsToCurrentThread()); 73 DCHECK(message_loop_->BelongsToCurrentThread());
72 DCHECK(socket_); 74 DCHECK(socket_);
73 75
74 // Don't try to write if there is another write pending. 76 // Don't try to write if there is another write pending.
75 if (write_pending_) 77 if (write_pending_)
76 return; 78 return;
(...skipping 17 matching lines...) Expand all
94 int result = socket_->Write(current_packet, current_packet_size, 96 int result = socket_->Write(current_packet, current_packet_size,
95 &written_callback_); 97 &written_callback_);
96 if (result >= 0) { 98 if (result >= 0) {
97 base::AutoLock auto_lock(lock_); 99 base::AutoLock auto_lock(lock_);
98 AdvanceBufferPosition_Locked(result); 100 AdvanceBufferPosition_Locked(result);
99 } else { 101 } else {
100 if (result == net::ERR_IO_PENDING) { 102 if (result == net::ERR_IO_PENDING) {
101 write_pending_ = true; 103 write_pending_ = true;
102 } else { 104 } else {
103 HandleError(result); 105 HandleError(result);
104 if (write_failed_callback_.get()) 106 if (!write_failed_callback_.is_null())
105 write_failed_callback_->Run(result); 107 write_failed_callback_.Run(result);
106 } 108 }
107 return; 109 return;
108 } 110 }
109 } 111 }
110 } 112 }
111 113
112 void BufferedSocketWriterBase::OnWritten(int result) { 114 void BufferedSocketWriterBase::OnWritten(int result) {
113 DCHECK(message_loop_->BelongsToCurrentThread()); 115 DCHECK(message_loop_->BelongsToCurrentThread());
114 write_pending_ = false; 116 write_pending_ = false;
115 117
116 if (result < 0) { 118 if (result < 0) {
117 HandleError(result); 119 HandleError(result);
118 if (write_failed_callback_.get()) 120 if (!write_failed_callback_.is_null())
119 write_failed_callback_->Run(result); 121 write_failed_callback_.Run(result);
120 return; 122 return;
121 } 123 }
122 124
123 { 125 {
124 base::AutoLock auto_lock(lock_); 126 base::AutoLock auto_lock(lock_);
125 AdvanceBufferPosition_Locked(result); 127 AdvanceBufferPosition_Locked(result);
126 } 128 }
127 129
128 // Schedule next write. 130 // Schedule next write.
129 message_loop_->PostTask( 131 message_loop_->PostTask(
(...skipping 92 matching lines...) Expand 10 before | Expand all | Expand 10 after
222 buffer_size_ -= queue_.front()->data()->size(); 224 buffer_size_ -= queue_.front()->data()->size();
223 PopQueue(); 225 PopQueue();
224 } 226 }
225 227
226 void BufferedDatagramWriter::OnError_Locked(int result) { 228 void BufferedDatagramWriter::OnError_Locked(int result) {
227 // Nothing to do here. 229 // Nothing to do here.
228 } 230 }
229 231
230 } // namespace protocol 232 } // namespace protocol
231 } // namespace remoting 233 } // namespace remoting
OLDNEW
« no previous file with comments | « remoting/protocol/buffered_socket_writer.h ('k') | remoting/protocol/client_control_sender.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698