Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(79)

Side by Side Diff: remoting/base/buffered_socket_writer.cc

Issue 1197853003: Add P2PDatagramSocket and P2PStreamSocket interfaces. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 5 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "remoting/base/buffered_socket_writer.h" 5 #include "remoting/base/buffered_socket_writer.h"
6 6
7 #include "base/bind.h" 7 #include "base/bind.h"
8 #include "base/location.h"
9 #include "base/single_thread_task_runner.h"
10 #include "base/stl_util.h" 8 #include "base/stl_util.h"
11 #include "base/thread_task_runner_handle.h" 9 #include "net/base/io_buffer.h"
12 #include "net/base/net_errors.h" 10 #include "net/base/net_errors.h"
11 #include "net/socket/socket.h"
13 12
14 namespace remoting { 13 namespace remoting {
15 14
16 struct BufferedSocketWriterBase::PendingPacket { 15 namespace {
17 PendingPacket(scoped_refptr<net::IOBufferWithSize> data, 16
17 int WriteNetSocket(net::Socket* socket,
18 const scoped_refptr<net::IOBuffer>& buf,
19 int buf_len,
20 const net::CompletionCallback& callback) {
21 return socket->Write(buf.get(), buf_len, callback);
22 }
23
24 } // namespace
25
26 struct BufferedSocketWriter::PendingPacket {
27 PendingPacket(scoped_refptr<net::DrainableIOBuffer> data,
18 const base::Closure& done_task) 28 const base::Closure& done_task)
19 : data(data), 29 : data(data),
20 done_task(done_task) { 30 done_task(done_task) {
21 } 31 }
22 32
23 scoped_refptr<net::IOBufferWithSize> data; 33 scoped_refptr<net::DrainableIOBuffer> data;
24 base::Closure done_task; 34 base::Closure done_task;
25 }; 35 };
26 36
27 BufferedSocketWriterBase::BufferedSocketWriterBase() 37 // static
28 : socket_(nullptr), 38 scoped_ptr<BufferedSocketWriter> BufferedSocketWriter::CreateForSocket(
29 write_pending_(false), 39 net::Socket* socket,
30 closed_(false), 40 const WriteFailedCallback& write_failed_callback) {
31 destroyed_flag_(nullptr) { 41 scoped_ptr<BufferedSocketWriter> result(new BufferedSocketWriter());
42 result->Init(base::Bind(&WriteNetSocket, socket), write_failed_callback);
43 return result.Pass();
32 } 44 }
33 45
34 void BufferedSocketWriterBase::Init(net::Socket* socket, 46 BufferedSocketWriter::BufferedSocketWriter() : weak_factory_(this) {
35 const WriteFailedCallback& callback) {
36 DCHECK(CalledOnValidThread());
37 DCHECK(socket);
38 socket_ = socket;
39 write_failed_callback_ = callback;
40 } 47 }
41 48
42 bool BufferedSocketWriterBase::Write( 49 BufferedSocketWriter::~BufferedSocketWriter() {
43 scoped_refptr<net::IOBufferWithSize> data, const base::Closure& done_task) { 50 STLDeleteElements(&queue_);
44 DCHECK(CalledOnValidThread()); 51 }
45 DCHECK(socket_); 52
53 void BufferedSocketWriter::Init(
54 const WriteCallback& write_callback,
55 const WriteFailedCallback& write_failed_callback) {
56 write_callback_ = write_callback;
57 write_failed_callback_ = write_failed_callback;
58 }
59
60 bool BufferedSocketWriter::Write(
61 const scoped_refptr<net::IOBufferWithSize>& data,
62 const base::Closure& done_task) {
63 DCHECK(thread_checker_.CalledOnValidThread());
46 DCHECK(data.get()); 64 DCHECK(data.get());
47 65
48 // Don't write after Close(). 66 // Don't write after error.
49 if (closed_) 67 if (is_closed())
50 return false; 68 return false;
51 69
52 queue_.push_back(new PendingPacket(data, done_task)); 70 queue_.push_back(new PendingPacket(
71 new net::DrainableIOBuffer(data.get(), data->size()), done_task));
53 72
54 DoWrite(); 73 DoWrite();
55 74
56 // DoWrite() may trigger OnWriteError() to be called. 75 return !is_closed();
57 return !closed_;
58 } 76 }
59 77
60 void BufferedSocketWriterBase::DoWrite() { 78 bool BufferedSocketWriter::is_closed() {
61 DCHECK(CalledOnValidThread()); 79 return write_callback_.is_null();
62 DCHECK(socket_); 80 }
63 81
64 // Don't try to write if there is another write pending. 82 void BufferedSocketWriter::DoWrite() {
65 if (write_pending_) 83 DCHECK(thread_checker_.CalledOnValidThread());
66 return;
67 84
68 // Don't write after Close(). 85 base::WeakPtr<BufferedSocketWriter> self = weak_factory_.GetWeakPtr();
69 if (closed_) 86 while (self && !write_pending_ && !is_closed() && !queue_.empty()) {
70 return; 87 int result = write_callback_.Run(
71 88 queue_.front()->data.get(), queue_.front()->data->BytesRemaining(),
72 while (true) { 89 base::Bind(&BufferedSocketWriter::OnWritten,
73 net::IOBuffer* current_packet; 90 weak_factory_.GetWeakPtr()));
74 int current_packet_size; 91 HandleWriteResult(result);
75 GetNextPacket(&current_packet, &current_packet_size);
76
77 // Return if the queue is empty.
78 if (!current_packet)
79 return;
80
81 int result = socket_->Write(
82 current_packet, current_packet_size,
83 base::Bind(&BufferedSocketWriterBase::OnWritten,
84 base::Unretained(this)));
85 bool write_again = false;
86 HandleWriteResult(result, &write_again);
87 if (!write_again)
88 return;
89 } 92 }
90 } 93 }
91 94
92 void BufferedSocketWriterBase::HandleWriteResult(int result, 95 void BufferedSocketWriter::HandleWriteResult(int result) {
93 bool* write_again) {
94 *write_again = false;
95 if (result < 0) { 96 if (result < 0) {
96 if (result == net::ERR_IO_PENDING) { 97 if (result == net::ERR_IO_PENDING) {
97 write_pending_ = true; 98 write_pending_ = true;
98 } else { 99 } else {
99 HandleError(result); 100 write_callback_.Reset();
100 if (!write_failed_callback_.is_null()) 101 if (!write_failed_callback_.is_null()) {
101 write_failed_callback_.Run(result); 102 WriteFailedCallback callback = write_failed_callback_;
103 callback.Run(result);
104 }
102 } 105 }
103 return; 106 return;
104 } 107 }
105 108
106 base::Closure done_task = AdvanceBufferPosition(result); 109 DCHECK(!queue_.empty());
107 if (!done_task.is_null()) { 110
108 bool destroyed = false; 111 queue_.front()->data->DidConsume(result);
109 destroyed_flag_ = &destroyed; 112
110 done_task.Run(); 113 if (queue_.front()->data->BytesRemaining() == 0) {
111 if (destroyed) { 114 base::Closure done_task = queue_.front()->done_task;
112 // Stop doing anything if we've been destroyed by the callback. 115 delete queue_.front();
113 return; 116 queue_.pop_front();
114 } 117
115 destroyed_flag_ = nullptr; 118 if (!done_task.is_null())
119 done_task.Run();
116 } 120 }
117
118 *write_again = true;
119 } 121 }
120 122
121 void BufferedSocketWriterBase::OnWritten(int result) { 123 void BufferedSocketWriter::OnWritten(int result) {
122 DCHECK(CalledOnValidThread()); 124 DCHECK(thread_checker_.CalledOnValidThread());
123 DCHECK(write_pending_); 125 DCHECK(write_pending_);
124 write_pending_ = false; 126 write_pending_ = false;
125 127
126 bool write_again; 128 base::WeakPtr<BufferedSocketWriter> self = weak_factory_.GetWeakPtr();
127 HandleWriteResult(result, &write_again); 129 HandleWriteResult(result);
128 if (write_again) 130 if (self)
129 DoWrite(); 131 DoWrite();
130 } 132 }
131 133
132 void BufferedSocketWriterBase::HandleError(int result) {
133 DCHECK(CalledOnValidThread());
134
135 closed_ = true;
136
137 STLDeleteElements(&queue_);
138
139 // Notify subclass that an error is received.
140 OnError(result);
141 }
142
143 void BufferedSocketWriterBase::Close() {
144 DCHECK(CalledOnValidThread());
145 closed_ = true;
146 }
147
148 BufferedSocketWriterBase::~BufferedSocketWriterBase() {
149 if (destroyed_flag_)
150 *destroyed_flag_ = true;
151
152 STLDeleteElements(&queue_);
153 }
154
155 base::Closure BufferedSocketWriterBase::PopQueue() {
156 base::Closure result = queue_.front()->done_task;
157 delete queue_.front();
158 queue_.pop_front();
159 return result;
160 }
161
162 BufferedSocketWriter::BufferedSocketWriter() {
163 }
164
165 void BufferedSocketWriter::GetNextPacket(
166 net::IOBuffer** buffer, int* size) {
167 if (!current_buf_.get()) {
168 if (queue_.empty()) {
169 *buffer = nullptr;
170 return; // Nothing to write.
171 }
172 current_buf_ = new net::DrainableIOBuffer(queue_.front()->data.get(),
173 queue_.front()->data->size());
174 }
175
176 *buffer = current_buf_.get();
177 *size = current_buf_->BytesRemaining();
178 }
179
180 base::Closure BufferedSocketWriter::AdvanceBufferPosition(int written) {
181 current_buf_->DidConsume(written);
182
183 if (current_buf_->BytesRemaining() == 0) {
184 current_buf_ = nullptr;
185 return PopQueue();
186 }
187 return base::Closure();
188 }
189
190 void BufferedSocketWriter::OnError(int result) {
191 current_buf_ = nullptr;
192 }
193
194 BufferedSocketWriter::~BufferedSocketWriter() {
195 }
196
197 BufferedDatagramWriter::BufferedDatagramWriter() {
198 }
199
200 void BufferedDatagramWriter::GetNextPacket(
201 net::IOBuffer** buffer, int* size) {
202 if (queue_.empty()) {
203 *buffer = nullptr;
204 return; // Nothing to write.
205 }
206 *buffer = queue_.front()->data.get();
207 *size = queue_.front()->data->size();
208 }
209
210 base::Closure BufferedDatagramWriter::AdvanceBufferPosition(int written) {
211 DCHECK_EQ(written, queue_.front()->data->size());
212 return PopQueue();
213 }
214
215 void BufferedDatagramWriter::OnError(int result) {
216 // Nothing to do here.
217 }
218
219 BufferedDatagramWriter::~BufferedDatagramWriter() {
220 }
221
222 } // namespace remoting 134 } // namespace remoting
OLDNEW
« no previous file with comments | « remoting/base/buffered_socket_writer.h ('k') | remoting/base/buffered_socket_writer_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698