Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(241)

Side by Side Diff: net/curvecp/messenger.cc

Issue 8824006: Migrate net/socket/socket.h, net/socket/stream_socket.h to base::Bind(). (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: rebased Created 9 years ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « net/curvecp/messenger.h ('k') | net/curvecp/packetizer.h » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2011 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2011 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "net/curvecp/messenger.h" 5 #include "net/curvecp/messenger.h"
6 6
7 #include "base/bind.h"
7 #include "base/logging.h" 8 #include "base/logging.h"
8 #include "base/message_loop.h" 9 #include "base/message_loop.h"
9 #include "net/base/io_buffer.h" 10 #include "net/base/io_buffer.h"
10 #include "net/base/net_errors.h" 11 #include "net/base/net_errors.h"
11 #include "net/curvecp/protocol.h" 12 #include "net/curvecp/protocol.h"
12 13
13 // Basic protocol design: 14 // Basic protocol design:
14 // 15 //
15 // OnTimeout: Called when the timeout timer pops. 16 // OnTimeout: Called when the timeout timer pops.
16 // - call SendMessage() 17 // - call SendMessage()
(...skipping 34 matching lines...) Expand 10 before | Expand all | Expand 10 after
51 static const size_t kMaxWriteQueueMessages = 128; 52 static const size_t kMaxWriteQueueMessages = 128;
52 53
53 // Size of the send buffer. 54 // Size of the send buffer.
54 static const size_t kSendBufferSize = (128 * 1024); 55 static const size_t kSendBufferSize = (128 * 1024);
55 // Size of the receive buffer. 56 // Size of the receive buffer.
56 static const size_t kReceiveBufferSize = (128 * 1024); 57 static const size_t kReceiveBufferSize = (128 * 1024);
57 58
58 Messenger::Messenger(Packetizer* packetizer) 59 Messenger::Messenger(Packetizer* packetizer)
59 : packetizer_(packetizer), 60 : packetizer_(packetizer),
60 send_buffer_(kSendBufferSize), 61 send_buffer_(kSendBufferSize),
61 send_complete_callback_(NULL),
62 old_receive_complete_callback_(NULL),
63 pending_receive_length_(0), 62 pending_receive_length_(0),
64 send_message_in_progress_(false), 63 send_message_in_progress_(false) {
65 ALLOW_THIS_IN_INITIALIZER_LIST(
66 send_message_callback_(this, &Messenger::OnSendMessageComplete)),
67 ALLOW_THIS_IN_INITIALIZER_LIST(factory_(this)) {
68 } 64 }
69 65
70 Messenger::~Messenger() { 66 Messenger::~Messenger() {
71 } 67 }
72 68
73 int Messenger::Read(IOBuffer* buf, int buf_len, OldCompletionCallback* callback) {
74 DCHECK(CalledOnValidThread());
75 DCHECK(!old_receive_complete_callback_ &&
76 receive_complete_callback_.is_null());
77
78 if (!received_list_.bytes_available()) {
79 old_receive_complete_callback_ = callback;
80 pending_receive_ = buf;
81 pending_receive_length_ = buf_len;
82 return ERR_IO_PENDING;
83 }
84
85 int bytes_read = InternalRead(buf, buf_len);
86 DCHECK_LT(0, bytes_read);
87 return bytes_read;
88 }
89 int Messenger::Read(IOBuffer* buf, int buf_len, 69 int Messenger::Read(IOBuffer* buf, int buf_len,
90 const CompletionCallback& callback) { 70 const CompletionCallback& callback) {
91 DCHECK(CalledOnValidThread()); 71 DCHECK(CalledOnValidThread());
92 DCHECK(!old_receive_complete_callback_ && 72 DCHECK(receive_complete_callback_.is_null());
93 receive_complete_callback_.is_null());
94 73
95 if (!received_list_.bytes_available()) { 74 if (!received_list_.bytes_available()) {
96 receive_complete_callback_ = callback; 75 receive_complete_callback_ = callback;
97 pending_receive_ = buf; 76 pending_receive_ = buf;
98 pending_receive_length_ = buf_len; 77 pending_receive_length_ = buf_len;
99 return ERR_IO_PENDING; 78 return ERR_IO_PENDING;
100 } 79 }
101 80
102 int bytes_read = InternalRead(buf, buf_len); 81 int bytes_read = InternalRead(buf, buf_len);
103 DCHECK_LT(0, bytes_read); 82 DCHECK_LT(0, bytes_read);
104 return bytes_read; 83 return bytes_read;
105 } 84 }
106 85
107 int Messenger::Write(IOBuffer* buf, int buf_len, OldCompletionCallback* callback ) { 86 int Messenger::Write(IOBuffer* buf, int buf_len,
87 const CompletionCallback& callback) {
108 DCHECK(CalledOnValidThread()); 88 DCHECK(CalledOnValidThread());
109 DCHECK(!pending_send_.get()); // Already a write pending! 89 DCHECK(!pending_send_.get()); // Already a write pending!
110 DCHECK(!send_complete_callback_); 90 DCHECK(send_complete_callback_.is_null());
111 DCHECK_LT(0, buf_len); 91 DCHECK_LT(0, buf_len);
112 92
113 int len = send_buffer_.write(buf->data(), buf_len); 93 int len = send_buffer_.write(buf->data(), buf_len);
114 if (!send_timer_.IsRunning()) 94 if (!send_timer_.IsRunning())
115 send_timer_.Start(FROM_HERE, base::TimeDelta(), 95 send_timer_.Start(FROM_HERE, base::TimeDelta(),
116 this, &Messenger::OnSendTimer); 96 this, &Messenger::OnSendTimer);
117 if (len) 97 if (len)
118 return len; 98 return len;
119 99
120 // We couldn't add data to the send buffer, so block the application. 100 // We couldn't add data to the send buffer, so block the application.
(...skipping 40 matching lines...) Expand 10 before | Expand all | Expand 10 after
161 IOBufferWithSize* Messenger::CreateBufferFromSendQueue() { 141 IOBufferWithSize* Messenger::CreateBufferFromSendQueue() {
162 DCHECK_LT(0, send_buffer_.length()); 142 DCHECK_LT(0, send_buffer_.length());
163 143
164 int length = std::min(packetizer_->max_message_payload(), 144 int length = std::min(packetizer_->max_message_payload(),
165 send_buffer_.length()); 145 send_buffer_.length());
166 IOBufferWithSize* rv = new IOBufferWithSize(length); 146 IOBufferWithSize* rv = new IOBufferWithSize(length);
167 int bytes = send_buffer_.read(rv->data(), length); 147 int bytes = send_buffer_.read(rv->data(), length);
168 DCHECK_EQ(bytes, length); 148 DCHECK_EQ(bytes, length);
169 149
170 // We consumed data, check to see if someone is waiting to write more data. 150 // We consumed data, check to see if someone is waiting to write more data.
171 if (send_complete_callback_) { 151 if (!send_complete_callback_.is_null()) {
172 DCHECK(pending_send_.get()); 152 DCHECK(pending_send_.get());
173 153
174 int len = send_buffer_.write(pending_send_->data(), pending_send_length_); 154 int len = send_buffer_.write(pending_send_->data(), pending_send_length_);
175 if (len) { 155 if (len) {
176 pending_send_ = NULL; 156 pending_send_ = NULL;
177 OldCompletionCallback* callback = send_complete_callback_; 157 CompletionCallback callback = send_complete_callback_;
178 send_complete_callback_ = NULL; 158 send_complete_callback_.Reset();
179 callback->Run(len); 159 callback.Run(len);
180 } 160 }
181 } 161 }
182 162
183 return rv; 163 return rv;
184 } 164 }
185 165
186 void Messenger::OnSendMessageComplete(int result) { 166 void Messenger::OnSendMessageComplete(int result) {
187 DCHECK_NE(ERR_IO_PENDING, result); 167 DCHECK_NE(ERR_IO_PENDING, result);
188 168
189 send_message_in_progress_ = false; 169 send_message_in_progress_ = false;
(...skipping 77 matching lines...) Expand 10 before | Expand all | Expand 10 after
267 uint16_pack(header->size.val, data->size()); 247 uint16_pack(header->size.val, data->size());
268 uint64_pack(header->position, position); 248 uint64_pack(header->position, position);
269 // TODO(mbelshe): Fill in rest of the header fields. 249 // TODO(mbelshe): Fill in rest of the header fields.
270 // needs to have the block-position. He tags each chunk with an 250 // needs to have the block-position. He tags each chunk with an
271 // absolute offset into the data stream. 251 // absolute offset into the data stream.
272 // Copy the contents of the message into the Message frame. 252 // Copy the contents of the message into the Message frame.
273 memcpy(message->data() + sizeof(Message), data->data(), data->size()); 253 memcpy(message->data() + sizeof(Message), data->data(), data->size());
274 254
275 sent_list_.MarkBlockSent(position, id); 255 sent_list_.MarkBlockSent(position, id);
276 256
277 int rv = packetizer_->SendMessage(key_, 257 int rv = packetizer_->SendMessage(
278 message->data(), 258 key_, message->data(), padded_size,
279 padded_size, 259 base::Bind(&Messenger::OnSendMessageComplete, base::Unretained(this)));
280 &send_message_callback_);
281 if (rv == ERR_IO_PENDING) { 260 if (rv == ERR_IO_PENDING) {
282 send_message_in_progress_ = true; 261 send_message_in_progress_ = true;
283 return; 262 return;
284 } 263 }
285 264
286 // UDP must write all or none. 265 // UDP must write all or none.
287 DCHECK_EQ(padded_size, static_cast<size_t>(rv)); 266 DCHECK_EQ(padded_size, static_cast<size_t>(rv));
288 OnSendMessageComplete(rv); 267 OnSendMessageComplete(rv);
289 } 268 }
290 269
(...skipping 59 matching lines...) Expand 10 before | Expand all | Expand 10 after
350 uint64 position = uint64_unpack(header->position); 329 uint64 position = uint64_unpack(header->position);
351 scoped_refptr<IOBuffer> buffer(new IOBuffer(body_length)); 330 scoped_refptr<IOBuffer> buffer(new IOBuffer(body_length));
352 memcpy(buffer->data(), message->data() + sizeof(Message), body_length); 331 memcpy(buffer->data(), message->data() + sizeof(Message), body_length);
353 received_list_.AddBlock(position, buffer, body_length); 332 received_list_.AddBlock(position, buffer, body_length);
354 333
355 SendAck(message_id); 334 SendAck(message_id);
356 } 335 }
357 336
358 // If we have data available, and a read is pending, notify the callback. 337 // If we have data available, and a read is pending, notify the callback.
359 if (received_list_.bytes_available() && 338 if (received_list_.bytes_available() &&
360 (old_receive_complete_callback_ || 339 !receive_complete_callback_.is_null()) {
361 !receive_complete_callback_.is_null())) {
362 // Pass the data up to the caller. 340 // Pass the data up to the caller.
363 int bytes_read = InternalRead(pending_receive_, pending_receive_length_); 341 int bytes_read = InternalRead(pending_receive_, pending_receive_length_);
364 if (old_receive_complete_callback_) { 342 CompletionCallback callback = receive_complete_callback_;
365 OldCompletionCallback* callback = old_receive_complete_callback_; 343 receive_complete_callback_.Reset();
366 old_receive_complete_callback_ = NULL; 344 callback.Run(bytes_read);
367 callback->Run(bytes_read);
368 } else {
369 CompletionCallback callback = receive_complete_callback_;
370 receive_complete_callback_.Reset();
371 callback.Run(bytes_read);
372 }
373 } 345 }
374 } 346 }
375 347
376 void Messenger::SendAck(uint32 last_message_received) { 348 void Messenger::SendAck(uint32 last_message_received) {
377 scoped_refptr<IOBuffer> buffer(new IOBuffer(sizeof(Message))); 349 scoped_refptr<IOBuffer> buffer(new IOBuffer(sizeof(Message)));
378 memset(buffer->data(), 0, sizeof(Message)); 350 memset(buffer->data(), 0, sizeof(Message));
379 Message* message = reinterpret_cast<Message*>(buffer->data()); 351 Message* message = reinterpret_cast<Message*>(buffer->data());
380 uint32_pack(message->last_message_received, last_message_received); 352 uint32_pack(message->last_message_received, last_message_received);
381 uint64_pack(message->acknowledge_1, received_list_.bytes_received()); 353 uint64_pack(message->acknowledge_1, received_list_.bytes_received());
382 LOG(ERROR) << "SendAck " << received_list_.bytes_received(); 354 LOG(ERROR) << "SendAck " << received_list_.bytes_received();
383 // TODO(mbelshe): fill in remainder of selective acknowledgements 355 // TODO(mbelshe): fill in remainder of selective acknowledgements
384 356
385 // TODO(mbelshe): Fix this - it is totally possible to have a send message 357 // TODO(mbelshe): Fix this - it is totally possible to have a send message
386 // in progress here... 358 // in progress here...
387 DCHECK(!send_message_in_progress_); 359 DCHECK(!send_message_in_progress_);
388 360
389 int rv = packetizer_->SendMessage(key_, 361 int rv = packetizer_->SendMessage(
390 buffer->data(), 362 key_, buffer->data(), sizeof(Message),
391 sizeof(Message), 363 base::Bind(&Messenger::OnSendMessageComplete, base::Unretained(this)));
392 &send_message_callback_);
393 // TODO(mbelshe): Fix me! Deal with the error cases 364 // TODO(mbelshe): Fix me! Deal with the error cases
394 DCHECK(rv == sizeof(Message)); 365 DCHECK(rv == sizeof(Message));
395 } 366 }
396 367
397 } // namespace net 368 } // namespace net
OLDNEW
« no previous file with comments | « net/curvecp/messenger.h ('k') | net/curvecp/packetizer.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698