OLD | NEW |
1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "remoting/base/buffered_socket_writer.h" | 5 #include "remoting/base/buffered_socket_writer.h" |
6 | 6 |
7 #include "base/bind.h" | 7 #include "base/bind.h" |
8 #include "base/location.h" | |
9 #include "base/single_thread_task_runner.h" | |
10 #include "base/stl_util.h" | 8 #include "base/stl_util.h" |
11 #include "base/thread_task_runner_handle.h" | 9 #include "net/base/io_buffer.h" |
12 #include "net/base/net_errors.h" | 10 #include "net/base/net_errors.h" |
| 11 #include "net/socket/socket.h" |
13 | 12 |
14 namespace remoting { | 13 namespace remoting { |
15 | 14 |
16 struct BufferedSocketWriterBase::PendingPacket { | 15 namespace { |
17 PendingPacket(scoped_refptr<net::IOBufferWithSize> data, | 16 |
| 17 int WriteNetSocket(net::Socket* socket, |
| 18 const scoped_refptr<net::IOBuffer>& buf, |
| 19 int buf_len, |
| 20 const net::CompletionCallback& callback) { |
| 21 return socket->Write(buf.get(), buf_len, callback); |
| 22 } |
| 23 |
| 24 } // namespace |
| 25 |
| 26 struct BufferedSocketWriter::PendingPacket { |
| 27 PendingPacket(scoped_refptr<net::DrainableIOBuffer> data, |
18 const base::Closure& done_task) | 28 const base::Closure& done_task) |
19 : data(data), | 29 : data(data), |
20 done_task(done_task) { | 30 done_task(done_task) { |
21 } | 31 } |
22 | 32 |
23 scoped_refptr<net::IOBufferWithSize> data; | 33 scoped_refptr<net::DrainableIOBuffer> data; |
24 base::Closure done_task; | 34 base::Closure done_task; |
25 }; | 35 }; |
26 | 36 |
27 BufferedSocketWriterBase::BufferedSocketWriterBase() | 37 // static |
28 : socket_(nullptr), | 38 scoped_ptr<BufferedSocketWriter> BufferedSocketWriter::CreateForSocket( |
29 write_pending_(false), | 39 net::Socket* socket, |
30 closed_(false), | 40 const WriteFailedCallback& write_failed_callback) { |
31 destroyed_flag_(nullptr) { | 41 scoped_ptr<BufferedSocketWriter> result(new BufferedSocketWriter()); |
| 42 result->Init(base::Bind(&WriteNetSocket, socket), write_failed_callback); |
| 43 return result.Pass(); |
32 } | 44 } |
33 | 45 |
34 void BufferedSocketWriterBase::Init(net::Socket* socket, | 46 BufferedSocketWriter::BufferedSocketWriter() : weak_factory_(this) { |
35 const WriteFailedCallback& callback) { | |
36 DCHECK(CalledOnValidThread()); | |
37 DCHECK(socket); | |
38 socket_ = socket; | |
39 write_failed_callback_ = callback; | |
40 } | 47 } |
41 | 48 |
42 bool BufferedSocketWriterBase::Write( | 49 BufferedSocketWriter::~BufferedSocketWriter() { |
43 scoped_refptr<net::IOBufferWithSize> data, const base::Closure& done_task) { | 50 STLDeleteElements(&queue_); |
44 DCHECK(CalledOnValidThread()); | 51 } |
45 DCHECK(socket_); | 52 |
| 53 void BufferedSocketWriter::Init( |
| 54 const WriteCallback& write_callback, |
| 55 const WriteFailedCallback& write_failed_callback) { |
| 56 write_callback_ = write_callback; |
| 57 write_failed_callback_ = write_failed_callback; |
| 58 } |
| 59 |
| 60 bool BufferedSocketWriter::Write( |
| 61 const scoped_refptr<net::IOBufferWithSize>& data, |
| 62 const base::Closure& done_task) { |
| 63 DCHECK(thread_checker_.CalledOnValidThread()); |
46 DCHECK(data.get()); | 64 DCHECK(data.get()); |
47 | 65 |
48 // Don't write after Close(). | 66 // Don't write after error. |
49 if (closed_) | 67 if (is_closed()) |
50 return false; | 68 return false; |
51 | 69 |
52 queue_.push_back(new PendingPacket(data, done_task)); | 70 queue_.push_back(new PendingPacket( |
| 71 new net::DrainableIOBuffer(data.get(), data->size()), done_task)); |
53 | 72 |
54 DoWrite(); | 73 DoWrite(); |
55 | 74 |
56 // DoWrite() may trigger OnWriteError() to be called. | 75 return !is_closed(); |
57 return !closed_; | |
58 } | 76 } |
59 | 77 |
60 void BufferedSocketWriterBase::DoWrite() { | 78 bool BufferedSocketWriter::is_closed() { |
61 DCHECK(CalledOnValidThread()); | 79 return write_callback_.is_null(); |
62 DCHECK(socket_); | 80 } |
63 | 81 |
64 // Don't try to write if there is another write pending. | 82 void BufferedSocketWriter::DoWrite() { |
65 if (write_pending_) | 83 DCHECK(thread_checker_.CalledOnValidThread()); |
66 return; | |
67 | 84 |
68 // Don't write after Close(). | 85 base::WeakPtr<BufferedSocketWriter> self = weak_factory_.GetWeakPtr(); |
69 if (closed_) | 86 while (self && !write_pending_ && !is_closed() && !queue_.empty()) { |
70 return; | 87 int result = write_callback_.Run( |
71 | 88 queue_.front()->data.get(), queue_.front()->data->BytesRemaining(), |
72 while (true) { | 89 base::Bind(&BufferedSocketWriter::OnWritten, |
73 net::IOBuffer* current_packet; | 90 weak_factory_.GetWeakPtr())); |
74 int current_packet_size; | 91 HandleWriteResult(result); |
75 GetNextPacket(¤t_packet, ¤t_packet_size); | |
76 | |
77 // Return if the queue is empty. | |
78 if (!current_packet) | |
79 return; | |
80 | |
81 int result = socket_->Write( | |
82 current_packet, current_packet_size, | |
83 base::Bind(&BufferedSocketWriterBase::OnWritten, | |
84 base::Unretained(this))); | |
85 bool write_again = false; | |
86 HandleWriteResult(result, &write_again); | |
87 if (!write_again) | |
88 return; | |
89 } | 92 } |
90 } | 93 } |
91 | 94 |
92 void BufferedSocketWriterBase::HandleWriteResult(int result, | 95 void BufferedSocketWriter::HandleWriteResult(int result) { |
93 bool* write_again) { | |
94 *write_again = false; | |
95 if (result < 0) { | 96 if (result < 0) { |
96 if (result == net::ERR_IO_PENDING) { | 97 if (result == net::ERR_IO_PENDING) { |
97 write_pending_ = true; | 98 write_pending_ = true; |
98 } else { | 99 } else { |
99 HandleError(result); | 100 write_callback_.Reset(); |
100 if (!write_failed_callback_.is_null()) | 101 if (!write_failed_callback_.is_null()) { |
101 write_failed_callback_.Run(result); | 102 WriteFailedCallback callback = write_failed_callback_; |
| 103 callback.Run(result); |
| 104 } |
102 } | 105 } |
103 return; | 106 return; |
104 } | 107 } |
105 | 108 |
106 base::Closure done_task = AdvanceBufferPosition(result); | 109 DCHECK(!queue_.empty()); |
107 if (!done_task.is_null()) { | 110 |
108 bool destroyed = false; | 111 queue_.front()->data->DidConsume(result); |
109 destroyed_flag_ = &destroyed; | 112 |
110 done_task.Run(); | 113 if (queue_.front()->data->BytesRemaining() == 0) { |
111 if (destroyed) { | 114 base::Closure done_task = queue_.front()->done_task; |
112 // Stop doing anything if we've been destroyed by the callback. | 115 delete queue_.front(); |
113 return; | 116 queue_.pop_front(); |
114 } | 117 |
115 destroyed_flag_ = nullptr; | 118 if (!done_task.is_null()) |
| 119 done_task.Run(); |
116 } | 120 } |
117 | |
118 *write_again = true; | |
119 } | 121 } |
120 | 122 |
121 void BufferedSocketWriterBase::OnWritten(int result) { | 123 void BufferedSocketWriter::OnWritten(int result) { |
122 DCHECK(CalledOnValidThread()); | 124 DCHECK(thread_checker_.CalledOnValidThread()); |
123 DCHECK(write_pending_); | 125 DCHECK(write_pending_); |
124 write_pending_ = false; | 126 write_pending_ = false; |
125 | 127 |
126 bool write_again; | 128 base::WeakPtr<BufferedSocketWriter> self = weak_factory_.GetWeakPtr(); |
127 HandleWriteResult(result, &write_again); | 129 HandleWriteResult(result); |
128 if (write_again) | 130 if (self) |
129 DoWrite(); | 131 DoWrite(); |
130 } | 132 } |
131 | 133 |
132 void BufferedSocketWriterBase::HandleError(int result) { | |
133 DCHECK(CalledOnValidThread()); | |
134 | |
135 closed_ = true; | |
136 | |
137 STLDeleteElements(&queue_); | |
138 | |
139 // Notify subclass that an error is received. | |
140 OnError(result); | |
141 } | |
142 | |
143 void BufferedSocketWriterBase::Close() { | |
144 DCHECK(CalledOnValidThread()); | |
145 closed_ = true; | |
146 } | |
147 | |
148 BufferedSocketWriterBase::~BufferedSocketWriterBase() { | |
149 if (destroyed_flag_) | |
150 *destroyed_flag_ = true; | |
151 | |
152 STLDeleteElements(&queue_); | |
153 } | |
154 | |
155 base::Closure BufferedSocketWriterBase::PopQueue() { | |
156 base::Closure result = queue_.front()->done_task; | |
157 delete queue_.front(); | |
158 queue_.pop_front(); | |
159 return result; | |
160 } | |
161 | |
162 BufferedSocketWriter::BufferedSocketWriter() { | |
163 } | |
164 | |
165 void BufferedSocketWriter::GetNextPacket( | |
166 net::IOBuffer** buffer, int* size) { | |
167 if (!current_buf_.get()) { | |
168 if (queue_.empty()) { | |
169 *buffer = nullptr; | |
170 return; // Nothing to write. | |
171 } | |
172 current_buf_ = new net::DrainableIOBuffer(queue_.front()->data.get(), | |
173 queue_.front()->data->size()); | |
174 } | |
175 | |
176 *buffer = current_buf_.get(); | |
177 *size = current_buf_->BytesRemaining(); | |
178 } | |
179 | |
180 base::Closure BufferedSocketWriter::AdvanceBufferPosition(int written) { | |
181 current_buf_->DidConsume(written); | |
182 | |
183 if (current_buf_->BytesRemaining() == 0) { | |
184 current_buf_ = nullptr; | |
185 return PopQueue(); | |
186 } | |
187 return base::Closure(); | |
188 } | |
189 | |
190 void BufferedSocketWriter::OnError(int result) { | |
191 current_buf_ = nullptr; | |
192 } | |
193 | |
194 BufferedSocketWriter::~BufferedSocketWriter() { | |
195 } | |
196 | |
197 BufferedDatagramWriter::BufferedDatagramWriter() { | |
198 } | |
199 | |
200 void BufferedDatagramWriter::GetNextPacket( | |
201 net::IOBuffer** buffer, int* size) { | |
202 if (queue_.empty()) { | |
203 *buffer = nullptr; | |
204 return; // Nothing to write. | |
205 } | |
206 *buffer = queue_.front()->data.get(); | |
207 *size = queue_.front()->data->size(); | |
208 } | |
209 | |
210 base::Closure BufferedDatagramWriter::AdvanceBufferPosition(int written) { | |
211 DCHECK_EQ(written, queue_.front()->data->size()); | |
212 return PopQueue(); | |
213 } | |
214 | |
215 void BufferedDatagramWriter::OnError(int result) { | |
216 // Nothing to do here. | |
217 } | |
218 | |
219 BufferedDatagramWriter::~BufferedDatagramWriter() { | |
220 } | |
221 | |
222 } // namespace remoting | 134 } // namespace remoting |
OLD | NEW |