Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(55)

Side by Side Diff: remoting/base/buffered_socket_writer.cc

Issue 1197853003: Add P2PDatagramSocket and P2PStreamSocket interfaces. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 5 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "remoting/base/buffered_socket_writer.h" 5 #include "remoting/base/buffered_socket_writer.h"
6 6
7 #include "base/bind.h" 7 #include "base/bind.h"
8 #include "base/location.h" 8 #include "base/location.h"
9 #include "base/single_thread_task_runner.h" 9 #include "base/single_thread_task_runner.h"
10 #include "base/stl_util.h" 10 #include "base/stl_util.h"
11 #include "base/thread_task_runner_handle.h" 11 #include "base/thread_task_runner_handle.h"
12 #include "net/base/io_buffer.h"
12 #include "net/base/net_errors.h" 13 #include "net/base/net_errors.h"
14 #include "net/socket/socket.h"
13 15
14 namespace remoting { 16 namespace remoting {
15 17
16 struct BufferedSocketWriterBase::PendingPacket { 18 namespace {
17 PendingPacket(scoped_refptr<net::IOBufferWithSize> data, 19
20 int WriteNetSocket(net::Socket* socket,
21 const scoped_refptr<net::IOBuffer>& buf,
22 int buf_len,
23 const net::CompletionCallback& callback) {
24 return socket->Write(buf.get(), buf_len, callback);
25 }
26
27 } // namespace
28
29 struct BufferedSocketWriter::PendingPacket {
30 PendingPacket(scoped_refptr<net::DrainableIOBuffer> data,
18 const base::Closure& done_task) 31 const base::Closure& done_task)
19 : data(data), 32 : data(data),
20 done_task(done_task) { 33 done_task(done_task) {
21 } 34 }
22 35
23 scoped_refptr<net::IOBufferWithSize> data; 36 scoped_refptr<net::DrainableIOBuffer> data;
24 base::Closure done_task; 37 base::Closure done_task;
25 }; 38 };
26 39
27 BufferedSocketWriterBase::BufferedSocketWriterBase() 40 // static
28 : socket_(nullptr), 41 scoped_ptr<BufferedSocketWriter> BufferedSocketWriter::CreateForSocket(
29 write_pending_(false), 42 net::Socket* socket, const WriteFailedCallback& write_failed_callback) {
30 closed_(false), 43 scoped_ptr<BufferedSocketWriter> result(new BufferedSocketWriter());
31 destroyed_flag_(nullptr) { 44 result->Init(base::Bind(&WriteNetSocket, socket), write_failed_callback);
45 return result.Pass();
32 } 46 }
33 47
34 void BufferedSocketWriterBase::Init(net::Socket* socket, 48 BufferedSocketWriter::BufferedSocketWriter() : weak_factory_(this) {
35 const WriteFailedCallback& callback) {
36 DCHECK(CalledOnValidThread());
37 DCHECK(socket);
38 socket_ = socket;
39 write_failed_callback_ = callback;
40 } 49 }
41 50
42 bool BufferedSocketWriterBase::Write( 51 BufferedSocketWriter::~BufferedSocketWriter() {
43 scoped_refptr<net::IOBufferWithSize> data, const base::Closure& done_task) { 52 STLDeleteElements(&queue_);
44 DCHECK(CalledOnValidThread()); 53 }
45 DCHECK(socket_); 54
55 void BufferedSocketWriter::Init(
56 const WriteCallback& write_callback,
57 const WriteFailedCallback& write_failed_callback) {
58 write_callback_ = write_callback;
59 write_failed_callback_ = write_failed_callback;
60 }
61
62 bool BufferedSocketWriter::Write(
63 const scoped_refptr<net::IOBufferWithSize>& data,
64 const base::Closure& done_task) {
65 DCHECK(thread_checker_.CalledOnValidThread());
46 DCHECK(data.get()); 66 DCHECK(data.get());
47 67
48 // Don't write after Close(). 68 // Don't write after error.
49 if (closed_) 69 if (is_closed())
50 return false; 70 return false;
51 71
52 queue_.push_back(new PendingPacket(data, done_task)); 72 queue_.push_back(new PendingPacket(
73 new net::DrainableIOBuffer(data.get(), data->size()), done_task));
53 74
54 DoWrite(); 75 DoWrite();
55 76
56 // DoWrite() may trigger OnWriteError() to be called. 77 return !is_closed();
57 return !closed_;
58 } 78 }
59 79
60 void BufferedSocketWriterBase::DoWrite() { 80 bool BufferedSocketWriter::is_closed() {
61 DCHECK(CalledOnValidThread()); 81 return write_callback_.is_null();
62 DCHECK(socket_); 82 }
83
84 void BufferedSocketWriter::DoWrite() {
85 DCHECK(thread_checker_.CalledOnValidThread());
86 DCHECK(!write_callback_.is_null());
63 87
64 // Don't try to write if there is another write pending. 88 // Don't try to write if there is another write pending.
65 if (write_pending_) 89 if (write_pending_)
66 return; 90 return;
67 91
68 // Don't write after Close(). 92 // Don't write after error.
69 if (closed_) 93 if (is_closed())
70 return; 94 return;
71 95
72 while (true) { 96 while (true) {
73 net::IOBuffer* current_packet; 97 if (queue_.empty())
74 int current_packet_size; 98 break;
75 GetNextPacket(&current_packet, &current_packet_size);
76 99
77 // Return if the queue is empty. 100 int result = write_callback_.Run(
78 if (!current_packet) 101 queue_.front()->data.get(), queue_.front()->data->size(),
79 return; 102 base::Bind(&BufferedSocketWriter::OnWritten,
80 103 weak_factory_.GetWeakPtr()));
81 int result = socket_->Write(
82 current_packet, current_packet_size,
83 base::Bind(&BufferedSocketWriterBase::OnWritten,
84 base::Unretained(this)));
85 bool write_again = false; 104 bool write_again = false;
86 HandleWriteResult(result, &write_again); 105 HandleWriteResult(result, &write_again);
87 if (!write_again) 106 if (!write_again)
88 return; 107 return;
89 } 108 }
90 } 109 }
91 110
92 void BufferedSocketWriterBase::HandleWriteResult(int result, 111 void BufferedSocketWriter::HandleWriteResult(int result, bool* write_again) {
93 bool* write_again) {
94 *write_again = false; 112 *write_again = false;
95 if (result < 0) { 113 if (result < 0) {
96 if (result == net::ERR_IO_PENDING) { 114 if (result == net::ERR_IO_PENDING) {
97 write_pending_ = true; 115 write_pending_ = true;
98 } else { 116 } else {
99 HandleError(result); 117 write_callback_.Reset();
100 if (!write_failed_callback_.is_null()) 118 if (!write_failed_callback_.is_null())
101 write_failed_callback_.Run(result); 119 write_failed_callback_.Run(result);
102 } 120 }
103 return; 121 return;
104 } 122 }
105 123
106 base::Closure done_task = AdvanceBufferPosition(result); 124 DCHECK(!queue_.empty());
107 if (!done_task.is_null()) { 125
108 bool destroyed = false; 126 queue_.front()->data->DidConsume(result);
109 destroyed_flag_ = &destroyed; 127
110 done_task.Run(); 128 if (queue_.front()->data->BytesRemaining() == 0) {
111 if (destroyed) { 129 base::Closure done_task = queue_.front()->done_task;
112 // Stop doing anything if we've been destroyed by the callback. 130 delete queue_.front();
131 queue_.pop_front();
132
133 if (!done_task.is_null()) {
134 // |done_task| is allowed to delete the writer, so here we post a task to
135 // continue writing after calling |done_task| and return with
136 // |write_again| set to false.
137 base::ThreadTaskRunnerHandle::Get()->PostTask(
138 FROM_HERE, base::Bind(&BufferedSocketWriter::DoWrite,
139 weak_factory_.GetWeakPtr()));
Wez 2015/07/14 21:49:20 Now that you're using WeakPtr you don't even need
Sergey Ulanov 2015/07/14 22:53:20 We don't want to call DoWrite() directly here beca
Wez 2015/07/16 21:55:36 If it's the last thing that HandleWriteResult() ca
Sergey Ulanov 2015/07/16 22:11:03 AFAIK some C++ compilers implement tail call optim
Sergey Ulanov 2015/07/16 22:25:57 Also there are cases when tail call optimization i
140 done_task.Run();
113 return; 141 return;
114 } 142 }
115 destroyed_flag_ = nullptr;
116 } 143 }
117 144
118 *write_again = true; 145 *write_again = true;
Wez 2015/07/14 21:49:20 Why is |write_again| an out parameter rather than
Sergey Ulanov 2015/07/14 22:53:20 Just for readability, to make the purpose of the r
Wez 2015/07/16 21:55:36 You don't need the PostTask, though - you can just
Sergey Ulanov 2015/07/16 22:11:03 So if I understand correctly you are suggesting th
119 } 146 }
120 147
121 void BufferedSocketWriterBase::OnWritten(int result) { 148 void BufferedSocketWriter::OnWritten(int result) {
122 DCHECK(CalledOnValidThread()); 149 DCHECK(thread_checker_.CalledOnValidThread());
123 DCHECK(write_pending_); 150 DCHECK(write_pending_);
124 write_pending_ = false; 151 write_pending_ = false;
125 152
126 bool write_again; 153 bool write_again;
127 HandleWriteResult(result, &write_again); 154 HandleWriteResult(result, &write_again);
128 if (write_again) 155 if (write_again)
129 DoWrite(); 156 DoWrite();
Wez 2015/07/14 21:49:20 As noted above, if you have HandleWriteResult() ca
Sergey Ulanov 2015/07/14 22:53:21 Done.
130 } 157 }
131 158
132 void BufferedSocketWriterBase::HandleError(int result) {
133 DCHECK(CalledOnValidThread());
134
135 closed_ = true;
136
137 STLDeleteElements(&queue_);
138
139 // Notify subclass that an error is received.
140 OnError(result);
141 }
142
143 void BufferedSocketWriterBase::Close() {
144 DCHECK(CalledOnValidThread());
145 closed_ = true;
146 }
147
148 BufferedSocketWriterBase::~BufferedSocketWriterBase() {
149 if (destroyed_flag_)
150 *destroyed_flag_ = true;
151
152 STLDeleteElements(&queue_);
153 }
154
155 base::Closure BufferedSocketWriterBase::PopQueue() {
156 base::Closure result = queue_.front()->done_task;
157 delete queue_.front();
158 queue_.pop_front();
159 return result;
160 }
161
162 BufferedSocketWriter::BufferedSocketWriter() {
163 }
164
165 void BufferedSocketWriter::GetNextPacket(
166 net::IOBuffer** buffer, int* size) {
167 if (!current_buf_.get()) {
168 if (queue_.empty()) {
169 *buffer = nullptr;
170 return; // Nothing to write.
171 }
172 current_buf_ = new net::DrainableIOBuffer(queue_.front()->data.get(),
173 queue_.front()->data->size());
174 }
175
176 *buffer = current_buf_.get();
177 *size = current_buf_->BytesRemaining();
178 }
179
180 base::Closure BufferedSocketWriter::AdvanceBufferPosition(int written) {
181 current_buf_->DidConsume(written);
182
183 if (current_buf_->BytesRemaining() == 0) {
184 current_buf_ = nullptr;
185 return PopQueue();
186 }
187 return base::Closure();
188 }
189
190 void BufferedSocketWriter::OnError(int result) {
191 current_buf_ = nullptr;
192 }
193
194 BufferedSocketWriter::~BufferedSocketWriter() {
195 }
196
197 BufferedDatagramWriter::BufferedDatagramWriter() {
198 }
199
200 void BufferedDatagramWriter::GetNextPacket(
201 net::IOBuffer** buffer, int* size) {
202 if (queue_.empty()) {
203 *buffer = nullptr;
204 return; // Nothing to write.
205 }
206 *buffer = queue_.front()->data.get();
207 *size = queue_.front()->data->size();
208 }
209
210 base::Closure BufferedDatagramWriter::AdvanceBufferPosition(int written) {
211 DCHECK_EQ(written, queue_.front()->data->size());
212 return PopQueue();
213 }
214
215 void BufferedDatagramWriter::OnError(int result) {
216 // Nothing to do here.
217 }
218
219 BufferedDatagramWriter::~BufferedDatagramWriter() {
220 }
221
222 } // namespace remoting 159 } // namespace remoting
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698