Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(390)

Side by Side Diff: remoting/base/buffered_socket_writer.cc

Issue 1197853003: Add P2PDatagramSocket and P2PStreamSocket interfaces. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 5 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "remoting/base/buffered_socket_writer.h" 5 #include "remoting/base/buffered_socket_writer.h"
6 6
7 #include "base/bind.h" 7 #include "base/bind.h"
8 #include "base/location.h"
9 #include "base/single_thread_task_runner.h"
10 #include "base/stl_util.h" 8 #include "base/stl_util.h"
11 #include "base/thread_task_runner_handle.h" 9 #include "base/thread_task_runner_handle.h"
10 #include "net/base/io_buffer.h"
12 #include "net/base/net_errors.h" 11 #include "net/base/net_errors.h"
12 #include "net/socket/socket.h"
13 13
14 namespace remoting { 14 namespace remoting {
15 15
16 struct BufferedSocketWriterBase::PendingPacket { 16 namespace {
17 PendingPacket(scoped_refptr<net::IOBufferWithSize> data, 17
18 int WriteNetSocket(net::Socket* socket,
19 const scoped_refptr<net::IOBuffer>& buf,
20 int buf_len,
21 const net::CompletionCallback& callback) {
22 return socket->Write(buf.get(), buf_len, callback);
23 }
24
25 } // namespace
26
27 struct BufferedSocketWriter::PendingPacket {
28 PendingPacket(scoped_refptr<net::DrainableIOBuffer> data,
18 const base::Closure& done_task) 29 const base::Closure& done_task)
19 : data(data), 30 : data(data),
20 done_task(done_task) { 31 done_task(done_task) {
21 } 32 }
22 33
23 scoped_refptr<net::IOBufferWithSize> data; 34 scoped_refptr<net::DrainableIOBuffer> data;
24 base::Closure done_task; 35 base::Closure done_task;
25 }; 36 };
26 37
27 BufferedSocketWriterBase::BufferedSocketWriterBase() 38 // static
28 : socket_(nullptr), 39 scoped_ptr<BufferedSocketWriter> BufferedSocketWriter::CreateForSocket(
29 write_pending_(false), 40 net::Socket* socket, const WriteFailedCallback& write_failed_callback) {
30 closed_(false), 41 scoped_ptr<BufferedSocketWriter> result(new BufferedSocketWriter());
31 destroyed_flag_(nullptr) { 42 result->Init(base::Bind(&WriteNetSocket, socket), write_failed_callback);
43 return result.Pass();
32 } 44 }
33 45
34 void BufferedSocketWriterBase::Init(net::Socket* socket, 46 BufferedSocketWriter::BufferedSocketWriter() {}
35 const WriteFailedCallback& callback) { 47
36 DCHECK(CalledOnValidThread()); 48 BufferedSocketWriter::~BufferedSocketWriter() {
37 DCHECK(socket); 49 if (destroyed_flag_)
38 socket_ = socket; 50 *destroyed_flag_ = true;
39 write_failed_callback_ = callback; 51
52 STLDeleteElements(&queue_);
40 } 53 }
41 54
42 bool BufferedSocketWriterBase::Write( 55 void BufferedSocketWriter::Init(
43 scoped_refptr<net::IOBufferWithSize> data, const base::Closure& done_task) { 56 const WriteCallback& write_callback,
44 DCHECK(CalledOnValidThread()); 57 const WriteFailedCallback& write_failed_callback) {
45 DCHECK(socket_); 58 write_callback_ = write_callback;
59 write_failed_callback_ = write_failed_callback;
60 }
61
62 bool BufferedSocketWriter::Write(
63 const scoped_refptr<net::IOBufferWithSize>& data,
64 const base::Closure& done_task) {
65 DCHECK(thread_checker_.CalledOnValidThread());
66 DCHECK(!write_callback_.is_null());
46 DCHECK(data.get()); 67 DCHECK(data.get());
47 68
48 // Don't write after Close(). 69 // Don't write after error.
49 if (closed_) 70 if (closed_)
50 return false; 71 return false;
51 72
52 queue_.push_back(new PendingPacket(data, done_task)); 73 queue_.push_back(new PendingPacket(
74 new net::DrainableIOBuffer(data.get(), data->size()), done_task));
53 75
54 DoWrite(); 76 DoWrite();
55 77
56 // DoWrite() may trigger OnWriteError() to be called.
57 return !closed_; 78 return !closed_;
58 } 79 }
59 80
60 void BufferedSocketWriterBase::DoWrite() { 81 void BufferedSocketWriter::DoWrite() {
61 DCHECK(CalledOnValidThread()); 82 DCHECK(thread_checker_.CalledOnValidThread());
62 DCHECK(socket_); 83 DCHECK(!write_callback_.is_null());
63 84
64 // Don't try to write if there is another write pending. 85 // Don't try to write if there is another write pending.
65 if (write_pending_) 86 if (write_pending_)
66 return; 87 return;
67 88
68 // Don't write after Close(). 89 // Don't write after error.
69 if (closed_) 90 if (closed_)
70 return; 91 return;
71 92
72 while (true) { 93 while (true) {
73 net::IOBuffer* current_packet; 94 if (queue_.empty())
74 int current_packet_size; 95 break;
75 GetNextPacket(&current_packet, &current_packet_size);
76 96
77 // Return if the queue is empty. 97 int result = write_callback_.Run(
78 if (!current_packet) 98 queue_.front()->data.get(), queue_.front()->data->size(),
79 return; 99 base::Bind(&BufferedSocketWriter::OnWritten,
80
81 int result = socket_->Write(
82 current_packet, current_packet_size,
83 base::Bind(&BufferedSocketWriterBase::OnWritten,
84 base::Unretained(this))); 100 base::Unretained(this)));
Wez 2015/07/14 18:28:09 You're using base::Unretained() here, but this cla
Sergey Ulanov 2015/07/14 21:07:54 Good point. Fixed by using weak pointer here.
85 bool write_again = false; 101 bool write_again = false;
86 HandleWriteResult(result, &write_again); 102 HandleWriteResult(result, &write_again);
87 if (!write_again) 103 if (!write_again)
88 return; 104 return;
89 } 105 }
90 } 106 }
91 107
92 void BufferedSocketWriterBase::HandleWriteResult(int result, 108 void BufferedSocketWriter::HandleWriteResult(int result, bool* write_again) {
93 bool* write_again) {
94 *write_again = false; 109 *write_again = false;
95 if (result < 0) { 110 if (result < 0) {
96 if (result == net::ERR_IO_PENDING) { 111 if (result == net::ERR_IO_PENDING) {
97 write_pending_ = true; 112 write_pending_ = true;
98 } else { 113 } else {
99 HandleError(result); 114 closed_ = true;
Wez 2015/07/14 18:28:09 nit: Could you instead implement a private getter
Sergey Ulanov 2015/07/14 21:07:54 Done.
115 STLDeleteElements(&queue_);
Wez 2015/07/14 18:28:09 Why do we need to delete the elements here? Surely
Sergey Ulanov 2015/07/14 21:07:54 Done.
100 if (!write_failed_callback_.is_null()) 116 if (!write_failed_callback_.is_null())
101 write_failed_callback_.Run(result); 117 write_failed_callback_.Run(result);
102 } 118 }
103 return; 119 return;
104 } 120 }
105 121
106 base::Closure done_task = AdvanceBufferPosition(result); 122 DCHECK(!queue_.empty());
107 if (!done_task.is_null()) { 123
124 queue_.front()->data->DidConsume(result);
125
126 if (queue_.front()->data->BytesRemaining() == 0) {
127 base::Closure done_task = queue_.front()->done_task;
128 delete queue_.front();
129 queue_.pop_front();
108 bool destroyed = false; 130 bool destroyed = false;
109 destroyed_flag_ = &destroyed; 131 destroyed_flag_ = &destroyed;
Wez 2015/07/14 18:28:09 I realise that this was in the original implementa
Sergey Ulanov 2015/07/14 21:07:54 I'm not sure we need it right now, but I think we
110 done_task.Run(); 132 if (!done_task.is_null())
133 done_task.Run();
111 if (destroyed) { 134 if (destroyed) {
112 // Stop doing anything if we've been destroyed by the callback. 135 // Stop doing anything if we've been destroyed by the callback.
113 return; 136 return;
114 } 137 }
115 destroyed_flag_ = nullptr; 138 destroyed_flag_ = nullptr;
116 } 139 }
117 140
118 *write_again = true; 141 *write_again = true;
119 } 142 }
120 143
121 void BufferedSocketWriterBase::OnWritten(int result) { 144 void BufferedSocketWriter::OnWritten(int result) {
122 DCHECK(CalledOnValidThread()); 145 DCHECK(thread_checker_.CalledOnValidThread());
123 DCHECK(write_pending_); 146 DCHECK(write_pending_);
124 write_pending_ = false; 147 write_pending_ = false;
125 148
126 bool write_again; 149 bool write_again;
127 HandleWriteResult(result, &write_again); 150 HandleWriteResult(result, &write_again);
128 if (write_again) 151 if (write_again)
129 DoWrite(); 152 DoWrite();
130 } 153 }
131 154
132 void BufferedSocketWriterBase::HandleError(int result) {
133 DCHECK(CalledOnValidThread());
134
135 closed_ = true;
136
137 STLDeleteElements(&queue_);
138
139 // Notify subclass that an error is received.
140 OnError(result);
141 }
142
143 void BufferedSocketWriterBase::Close() {
144 DCHECK(CalledOnValidThread());
145 closed_ = true;
146 }
147
148 BufferedSocketWriterBase::~BufferedSocketWriterBase() {
149 if (destroyed_flag_)
150 *destroyed_flag_ = true;
151
152 STLDeleteElements(&queue_);
153 }
154
155 base::Closure BufferedSocketWriterBase::PopQueue() {
156 base::Closure result = queue_.front()->done_task;
157 delete queue_.front();
158 queue_.pop_front();
159 return result;
160 }
161
162 BufferedSocketWriter::BufferedSocketWriter() {
163 }
164
165 void BufferedSocketWriter::GetNextPacket(
166 net::IOBuffer** buffer, int* size) {
167 if (!current_buf_.get()) {
168 if (queue_.empty()) {
169 *buffer = nullptr;
170 return; // Nothing to write.
171 }
172 current_buf_ = new net::DrainableIOBuffer(queue_.front()->data.get(),
173 queue_.front()->data->size());
174 }
175
176 *buffer = current_buf_.get();
177 *size = current_buf_->BytesRemaining();
178 }
179
180 base::Closure BufferedSocketWriter::AdvanceBufferPosition(int written) {
181 current_buf_->DidConsume(written);
182
183 if (current_buf_->BytesRemaining() == 0) {
184 current_buf_ = nullptr;
185 return PopQueue();
186 }
187 return base::Closure();
188 }
189
190 void BufferedSocketWriter::OnError(int result) {
191 current_buf_ = nullptr;
192 }
193
194 BufferedSocketWriter::~BufferedSocketWriter() {
195 }
196
197 BufferedDatagramWriter::BufferedDatagramWriter() {
198 }
199
200 void BufferedDatagramWriter::GetNextPacket(
201 net::IOBuffer** buffer, int* size) {
202 if (queue_.empty()) {
203 *buffer = nullptr;
204 return; // Nothing to write.
205 }
206 *buffer = queue_.front()->data.get();
207 *size = queue_.front()->data->size();
208 }
209
210 base::Closure BufferedDatagramWriter::AdvanceBufferPosition(int written) {
211 DCHECK_EQ(written, queue_.front()->data->size());
212 return PopQueue();
213 }
214
215 void BufferedDatagramWriter::OnError(int result) {
216 // Nothing to do here.
217 }
218
219 BufferedDatagramWriter::~BufferedDatagramWriter() {
220 }
221
222 } // namespace remoting 155 } // namespace remoting
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698