Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(152)

Side by Side Diff: net/quic/quic_session_test.cc

Issue 1472563002: Let QUIC streams write 16k before ceding. Behind FLAG_quic_batch_writes. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@107581674
Patch Set: Created 5 years ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « net/quic/quic_session.cc ('k') | net/quic/quic_write_blocked_list.h » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "net/quic/quic_session.h" 5 #include "net/quic/quic_session.h"
6 6
7 #include <set> 7 #include <set>
8 8
9 #include "base/basictypes.h" 9 #include "base/basictypes.h"
10 #include "base/containers/hash_tables.h" 10 #include "base/containers/hash_tables.h"
(...skipping 78 matching lines...) Expand 10 before | Expand all | Expand 10 after
89 : QuicSpdyStream(id, session) {} 89 : QuicSpdyStream(id, session) {}
90 90
91 using ReliableQuicStream::CloseWriteSide; 91 using ReliableQuicStream::CloseWriteSide;
92 92
93 void OnDataAvailable() override {} 93 void OnDataAvailable() override {}
94 94
95 void SendBody(const string& data, bool fin) { 95 void SendBody(const string& data, bool fin) {
96 WriteOrBufferData(data, fin, nullptr); 96 WriteOrBufferData(data, fin, nullptr);
97 } 97 }
98 98
99 using QuicSpdyStream::set_priority;
100
99 MOCK_METHOD0(OnCanWrite, void()); 101 MOCK_METHOD0(OnCanWrite, void());
100 }; 102 };
101 103
102 // Poor man's functor for use as callback in a mock. 104 // Poor man's functor for use as callback in a mock.
103 class StreamBlocker { 105 class StreamBlocker {
104 public: 106 public:
105 StreamBlocker(QuicSession* session, QuicStreamId stream_id) 107 StreamBlocker(QuicSession* session, QuicStreamId stream_id)
106 : session_(session), 108 : session_(session),
107 stream_id_(stream_id) { 109 stream_id_(stream_id) {
108 } 110 }
109 111
110 void MarkConnectionLevelWriteBlocked() { 112 void MarkConnectionLevelWriteBlocked() {
111 session_->MarkConnectionLevelWriteBlocked(stream_id_, kSomeMiddlePriority); 113 session_->MarkConnectionLevelWriteBlocked(stream_id_, kSomeMiddlePriority);
112 } 114 }
113 115
116 void MarkHighPriorityWriteBlocked() {
117 session_->MarkConnectionLevelWriteBlocked(stream_id_, kHighestPriority);
118 }
119
114 private: 120 private:
115 QuicSession* const session_; 121 QuicSession* const session_;
116 const QuicStreamId stream_id_; 122 const QuicStreamId stream_id_;
117 }; 123 };
118 124
119 class TestSession : public QuicSpdySession { 125 class TestSession : public QuicSpdySession {
120 public: 126 public:
121 explicit TestSession(QuicConnection* connection) 127 explicit TestSession(QuicConnection* connection)
122 : QuicSpdySession(connection, DefaultQuicConfig()), 128 : QuicSpdySession(connection, DefaultQuicConfig()),
123 crypto_stream_(this), 129 crypto_stream_(this),
(...skipping 27 matching lines...) Expand all
151 return QuicSpdySession::GetOrCreateDynamicStream(stream_id); 157 return QuicSpdySession::GetOrCreateDynamicStream(stream_id);
152 } 158 }
153 159
154 QuicConsumedData WritevData( 160 QuicConsumedData WritevData(
155 QuicStreamId id, 161 QuicStreamId id,
156 QuicIOVector data, 162 QuicIOVector data,
157 QuicStreamOffset offset, 163 QuicStreamOffset offset,
158 bool fin, 164 bool fin,
159 FecProtection fec_protection, 165 FecProtection fec_protection,
160 QuicAckListenerInterface* ack_notifier_delegate) override { 166 QuicAckListenerInterface* ack_notifier_delegate) override {
161 // Always consumes everything. 167 QuicConsumedData consumed(data.total_length, fin);
162 if (writev_consumes_all_data_) { 168 if (!writev_consumes_all_data_) {
163 return QuicConsumedData(data.total_length, fin); 169 consumed = QuicSession::WritevData(id, data, offset, fin, fec_protection,
164 } else { 170 ack_notifier_delegate);
165 return QuicSession::WritevData(id, data, offset, fin, fec_protection,
166 ack_notifier_delegate);
167 } 171 }
172 QuicSessionPeer::GetWriteBlockedStreams(this)->UpdateBytesForStream(
173 id, consumed.bytes_consumed);
174 return consumed;
168 } 175 }
169 176
170 void set_writev_consumes_all_data(bool val) { 177 void set_writev_consumes_all_data(bool val) {
171 writev_consumes_all_data_ = val; 178 writev_consumes_all_data_ = val;
172 } 179 }
173 180
174 QuicConsumedData SendStreamData(QuicStreamId id) { 181 QuicConsumedData SendStreamData(QuicStreamId id) {
175 struct iovec iov; 182 struct iovec iov;
176 return WritevData(id, MakeIOVector("not empty", &iov), 0, true, 183 return WritevData(id, MakeIOVector("not empty", &iov), 0, true,
177 MAY_FEC_PROTECT, nullptr); 184 MAY_FEC_PROTECT, nullptr);
178 } 185 }
179 186
187 QuicConsumedData SendLargeFakeData(QuicStreamId id, int bytes) {
188 DCHECK(writev_consumes_all_data_);
189 struct iovec iov;
190 iov.iov_base = nullptr; // should not be read.
191 iov.iov_len = static_cast<size_t>(bytes);
192 return WritevData(id, QuicIOVector(&iov, 1, bytes), 0, true,
193 MAY_FEC_PROTECT, nullptr);
194 }
195
180 using QuicSession::PostProcessAfterData; 196 using QuicSession::PostProcessAfterData;
181 197
182 private: 198 private:
183 StrictMock<TestCryptoStream> crypto_stream_; 199 StrictMock<TestCryptoStream> crypto_stream_;
184 200
185 bool writev_consumes_all_data_; 201 bool writev_consumes_all_data_;
186 }; 202 };
187 203
188 class QuicSessionTestBase : public ::testing::TestWithParam<QuicVersion> { 204 class QuicSessionTestBase : public ::testing::TestWithParam<QuicVersion> {
189 protected: 205 protected:
(...skipping 193 matching lines...) Expand 10 before | Expand all | Expand 10 after
383 TestStream* stream2 = session_.CreateOutgoingDynamicStream(); 399 TestStream* stream2 = session_.CreateOutgoingDynamicStream();
384 TestStream* stream4 = session_.CreateOutgoingDynamicStream(); 400 TestStream* stream4 = session_.CreateOutgoingDynamicStream();
385 TestStream* stream6 = session_.CreateOutgoingDynamicStream(); 401 TestStream* stream6 = session_.CreateOutgoingDynamicStream();
386 402
387 session_.MarkConnectionLevelWriteBlocked(stream2->id(), kSomeMiddlePriority); 403 session_.MarkConnectionLevelWriteBlocked(stream2->id(), kSomeMiddlePriority);
388 session_.MarkConnectionLevelWriteBlocked(stream6->id(), kSomeMiddlePriority); 404 session_.MarkConnectionLevelWriteBlocked(stream6->id(), kSomeMiddlePriority);
389 session_.MarkConnectionLevelWriteBlocked(stream4->id(), kSomeMiddlePriority); 405 session_.MarkConnectionLevelWriteBlocked(stream4->id(), kSomeMiddlePriority);
390 406
391 InSequence s; 407 InSequence s;
392 StreamBlocker stream2_blocker(&session_, stream2->id()); 408 StreamBlocker stream2_blocker(&session_, stream2->id());
393 // Reregister, to test the loop limit. 409
394 EXPECT_CALL(*stream2, OnCanWrite()) 410 if (FLAGS_quic_batch_writes) {
395 .WillOnce(Invoke(&stream2_blocker, 411 // Reregister, to test the loop limit.
396 &StreamBlocker::MarkConnectionLevelWriteBlocked)); 412 EXPECT_CALL(*stream2, OnCanWrite())
397 EXPECT_CALL(*stream6, OnCanWrite()); 413 .WillOnce(Invoke(&stream2_blocker,
398 EXPECT_CALL(*stream4, OnCanWrite()); 414 &StreamBlocker::MarkConnectionLevelWriteBlocked));
415 // 2 will get called a second time as it didn't finish its block
416 EXPECT_CALL(*stream2, OnCanWrite());
417 EXPECT_CALL(*stream6, OnCanWrite());
418 // 4 will not get called, as we exceeded the loop limit.
419 } else {
420 // Reregister, to test the loop limit.
421 EXPECT_CALL(*stream2, OnCanWrite())
422 .WillOnce(Invoke(&stream2_blocker,
423 &StreamBlocker::MarkConnectionLevelWriteBlocked));
424 EXPECT_CALL(*stream6, OnCanWrite());
425 EXPECT_CALL(*stream4, OnCanWrite());
426 }
399 session_.OnCanWrite(); 427 session_.OnCanWrite();
400 EXPECT_TRUE(session_.WillingAndAbleToWrite()); 428 EXPECT_TRUE(session_.WillingAndAbleToWrite());
401 } 429 }
402 430
431 TEST_P(QuicSessionTestServer, TestBatchedWrites) {
432 FLAGS_quic_batch_writes = true;
433 TestStream* stream2 = session_.CreateOutgoingDynamicStream();
434 TestStream* stream4 = session_.CreateOutgoingDynamicStream();
435 TestStream* stream6 = session_.CreateOutgoingDynamicStream();
436
437 session_.set_writev_consumes_all_data(true);
438 session_.MarkConnectionLevelWriteBlocked(stream2->id(), kSomeMiddlePriority);
439 session_.MarkConnectionLevelWriteBlocked(stream4->id(), kSomeMiddlePriority);
440
441 StreamBlocker stream2_blocker(&session_, stream2->id());
442 StreamBlocker stream4_blocker(&session_, stream4->id());
443 StreamBlocker stream6_blocker(&session_, stream6->id());
444 // With two sessions blocked, we should get two write calls. They should both
445 // go to the first stream as it will only write 6k and mark itself blocked
446 // again.
447 InSequence s;
448 EXPECT_CALL(*stream2, OnCanWrite())
449 .WillOnce(DoAll(testing::IgnoreResult(Invoke(CreateFunctor(
450 &session_, &TestSession::SendLargeFakeData, stream2->id(), 6000))),
451 Invoke(&stream2_blocker,
452 &StreamBlocker::MarkConnectionLevelWriteBlocked)));
453 EXPECT_CALL(*stream2, OnCanWrite())
454 .WillOnce(DoAll(testing::IgnoreResult(Invoke(CreateFunctor(
455 &session_, &TestSession::SendLargeFakeData, stream2->id(), 6000))),
456 Invoke(&stream2_blocker,
457 &StreamBlocker::MarkConnectionLevelWriteBlocked)));
458 session_.OnCanWrite();
459
460 // We should get one more call for stream2, at which point it has used its
461 // write quota and we move over to stream 4.
462 EXPECT_CALL(*stream2, OnCanWrite())
463 .WillOnce(DoAll(testing::IgnoreResult(Invoke(CreateFunctor(
464 &session_, &TestSession::SendLargeFakeData, stream2->id(), 6000))),
465 Invoke(&stream2_blocker,
466 &StreamBlocker::MarkConnectionLevelWriteBlocked)));
467 EXPECT_CALL(*stream4, OnCanWrite())
468 .WillOnce(DoAll(testing::IgnoreResult(Invoke(CreateFunctor(
469 &session_, &TestSession::SendLargeFakeData, stream4->id(), 6000))),
470 Invoke(&stream4_blocker,
471 &StreamBlocker::MarkConnectionLevelWriteBlocked)));
472 session_.OnCanWrite();
473
474 // Now let stream 4 do the 2nd of its 3 writes, but add a block for a high
475 // priority stream 6. 4 should be preempted. 6 will write but *not* block so
476 // will cede back to 4.
477 stream6->set_priority(kHighestPriority);
478 EXPECT_CALL(*stream4, OnCanWrite())
479 .WillOnce(DoAll(testing::IgnoreResult(Invoke(CreateFunctor(
480 &session_, &TestSession::SendLargeFakeData, stream4->id(), 6000))),
481 Invoke(&stream4_blocker,
482 &StreamBlocker::MarkConnectionLevelWriteBlocked),
483 Invoke(&stream6_blocker,
484 &StreamBlocker::MarkHighPriorityWriteBlocked)));
485 EXPECT_CALL(*stream6, OnCanWrite())
486 .WillOnce(testing::IgnoreResult(Invoke(CreateFunctor(
487 &session_, &TestSession::SendLargeFakeData, stream4->id(), 6000))));
488 session_.OnCanWrite();
489
490 // Stream4 alread did 6k worth of writes, so after doing another 12k it should
491 // cede and 2 should resume.
492 EXPECT_CALL(*stream4, OnCanWrite())
493 .WillOnce(DoAll(testing::IgnoreResult(Invoke(CreateFunctor(
494 &session_, &TestSession::SendLargeFakeData, stream4->id(), 12000))),
495 Invoke(&stream4_blocker,
496 &StreamBlocker::MarkConnectionLevelWriteBlocked)));
497 EXPECT_CALL(*stream2, OnCanWrite())
498 .WillOnce(DoAll(testing::IgnoreResult(Invoke(CreateFunctor(
499 &session_, &TestSession::SendLargeFakeData, stream2->id(), 6000))),
500 Invoke(&stream2_blocker,
501 &StreamBlocker::MarkConnectionLevelWriteBlocked)));
502 session_.OnCanWrite();
503 }
504
403 TEST_P(QuicSessionTestServer, OnCanWriteBundlesStreams) { 505 TEST_P(QuicSessionTestServer, OnCanWriteBundlesStreams) {
404 // Drive congestion control manually. 506 // Drive congestion control manually.
405 MockSendAlgorithm* send_algorithm = new StrictMock<MockSendAlgorithm>; 507 MockSendAlgorithm* send_algorithm = new StrictMock<MockSendAlgorithm>;
406 QuicConnectionPeer::SetSendAlgorithm(session_.connection(), send_algorithm); 508 QuicConnectionPeer::SetSendAlgorithm(session_.connection(), send_algorithm);
407 509
408 TestStream* stream2 = session_.CreateOutgoingDynamicStream(); 510 TestStream* stream2 = session_.CreateOutgoingDynamicStream();
409 TestStream* stream4 = session_.CreateOutgoingDynamicStream(); 511 TestStream* stream4 = session_.CreateOutgoingDynamicStream();
410 TestStream* stream6 = session_.CreateOutgoingDynamicStream(); 512 TestStream* stream6 = session_.CreateOutgoingDynamicStream();
411 513
412 session_.MarkConnectionLevelWriteBlocked(stream2->id(), kSomeMiddlePriority); 514 session_.MarkConnectionLevelWriteBlocked(stream2->id(), kSomeMiddlePriority);
(...skipping 96 matching lines...) Expand 10 before | Expand all | Expand 10 after
509 InSequence s; 611 InSequence s;
510 // Force most streams to re-register, which is common scenario when we block 612 // Force most streams to re-register, which is common scenario when we block
511 // the Crypto stream, and only the crypto stream can "really" write. 613 // the Crypto stream, and only the crypto stream can "really" write.
512 614
513 // Due to prioritization, we *should* be asked to write the crypto stream 615 // Due to prioritization, we *should* be asked to write the crypto stream
514 // first. 616 // first.
515 // Don't re-register the crypto stream (which signals complete writing). 617 // Don't re-register the crypto stream (which signals complete writing).
516 TestCryptoStream* crypto_stream = session_.GetCryptoStream(); 618 TestCryptoStream* crypto_stream = session_.GetCryptoStream();
517 EXPECT_CALL(*crypto_stream, OnCanWrite()); 619 EXPECT_CALL(*crypto_stream, OnCanWrite());
518 620
519 // Re-register all other streams, to show they weren't able to proceed. 621 EXPECT_CALL(*stream2, OnCanWrite());
520 EXPECT_CALL(*stream2, OnCanWrite()) 622 EXPECT_CALL(*stream3, OnCanWrite());
521 .WillOnce(Invoke(&stream2_blocker,
522 &StreamBlocker::MarkConnectionLevelWriteBlocked));
523 EXPECT_CALL(*stream3, OnCanWrite())
524 .WillOnce(Invoke(&stream3_blocker,
525 &StreamBlocker::MarkConnectionLevelWriteBlocked));
526 EXPECT_CALL(*stream4, OnCanWrite()) 623 EXPECT_CALL(*stream4, OnCanWrite())
527 .WillOnce(Invoke(&stream4_blocker, 624 .WillOnce(Invoke(&stream4_blocker,
528 &StreamBlocker::MarkConnectionLevelWriteBlocked)); 625 &StreamBlocker::MarkConnectionLevelWriteBlocked));
529 626
530 session_.OnCanWrite(); 627 session_.OnCanWrite();
531 EXPECT_TRUE(session_.WillingAndAbleToWrite()); 628 EXPECT_TRUE(session_.WillingAndAbleToWrite());
532 EXPECT_FALSE(session_.HasPendingHandshake()); // Crypto stream wrote. 629 EXPECT_FALSE(session_.HasPendingHandshake()); // Crypto stream wrote.
533 } 630 }
534 631
535 TEST_P(QuicSessionTestServer, OnCanWriteWithClosedStream) { 632 TEST_P(QuicSessionTestServer, OnCanWriteWithClosedStream) {
(...skipping 552 matching lines...) Expand 10 before | Expand all | Expand 10 after
1088 // Verify that there is no entry for the stream in 1185 // Verify that there is no entry for the stream in
1089 // locally_closed_streams_highest_offset_. 1186 // locally_closed_streams_highest_offset_.
1090 EXPECT_EQ( 1187 EXPECT_EQ(
1091 FLAGS_quic_fix_fin_accounting ? 0u : 1u, 1188 FLAGS_quic_fix_fin_accounting ? 0u : 1u,
1092 QuicSessionPeer::GetLocallyClosedStreamsHighestOffset(&session_).size()); 1189 QuicSessionPeer::GetLocallyClosedStreamsHighestOffset(&session_).size());
1093 } 1190 }
1094 1191
1095 } // namespace 1192 } // namespace
1096 } // namespace test 1193 } // namespace test
1097 } // namespace net 1194 } // namespace net
OLDNEW
« no previous file with comments | « net/quic/quic_session.cc ('k') | net/quic/quic_write_blocked_list.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698