OLD | NEW |
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "net/quic/quic_session.h" | 5 #include "net/quic/quic_session.h" |
6 | 6 |
7 #include <set> | 7 #include <set> |
8 | 8 |
9 #include "base/basictypes.h" | 9 #include "base/basictypes.h" |
10 #include "base/containers/hash_tables.h" | 10 #include "base/containers/hash_tables.h" |
(...skipping 78 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
89 : QuicSpdyStream(id, session) {} | 89 : QuicSpdyStream(id, session) {} |
90 | 90 |
91 using ReliableQuicStream::CloseWriteSide; | 91 using ReliableQuicStream::CloseWriteSide; |
92 | 92 |
93 void OnDataAvailable() override {} | 93 void OnDataAvailable() override {} |
94 | 94 |
95 void SendBody(const string& data, bool fin) { | 95 void SendBody(const string& data, bool fin) { |
96 WriteOrBufferData(data, fin, nullptr); | 96 WriteOrBufferData(data, fin, nullptr); |
97 } | 97 } |
98 | 98 |
| 99 using QuicSpdyStream::set_priority; |
| 100 |
99 MOCK_METHOD0(OnCanWrite, void()); | 101 MOCK_METHOD0(OnCanWrite, void()); |
100 }; | 102 }; |
101 | 103 |
102 // Poor man's functor for use as callback in a mock. | 104 // Poor man's functor for use as callback in a mock. |
103 class StreamBlocker { | 105 class StreamBlocker { |
104 public: | 106 public: |
105 StreamBlocker(QuicSession* session, QuicStreamId stream_id) | 107 StreamBlocker(QuicSession* session, QuicStreamId stream_id) |
106 : session_(session), | 108 : session_(session), |
107 stream_id_(stream_id) { | 109 stream_id_(stream_id) { |
108 } | 110 } |
109 | 111 |
110 void MarkConnectionLevelWriteBlocked() { | 112 void MarkConnectionLevelWriteBlocked() { |
111 session_->MarkConnectionLevelWriteBlocked(stream_id_, kSomeMiddlePriority); | 113 session_->MarkConnectionLevelWriteBlocked(stream_id_, kSomeMiddlePriority); |
112 } | 114 } |
113 | 115 |
| 116 void MarkHighPriorityWriteBlocked() { |
| 117 session_->MarkConnectionLevelWriteBlocked(stream_id_, kHighestPriority); |
| 118 } |
| 119 |
114 private: | 120 private: |
115 QuicSession* const session_; | 121 QuicSession* const session_; |
116 const QuicStreamId stream_id_; | 122 const QuicStreamId stream_id_; |
117 }; | 123 }; |
118 | 124 |
119 class TestSession : public QuicSpdySession { | 125 class TestSession : public QuicSpdySession { |
120 public: | 126 public: |
121 explicit TestSession(QuicConnection* connection) | 127 explicit TestSession(QuicConnection* connection) |
122 : QuicSpdySession(connection, DefaultQuicConfig()), | 128 : QuicSpdySession(connection, DefaultQuicConfig()), |
123 crypto_stream_(this), | 129 crypto_stream_(this), |
(...skipping 27 matching lines...) Expand all Loading... |
151 return QuicSpdySession::GetOrCreateDynamicStream(stream_id); | 157 return QuicSpdySession::GetOrCreateDynamicStream(stream_id); |
152 } | 158 } |
153 | 159 |
154 QuicConsumedData WritevData( | 160 QuicConsumedData WritevData( |
155 QuicStreamId id, | 161 QuicStreamId id, |
156 QuicIOVector data, | 162 QuicIOVector data, |
157 QuicStreamOffset offset, | 163 QuicStreamOffset offset, |
158 bool fin, | 164 bool fin, |
159 FecProtection fec_protection, | 165 FecProtection fec_protection, |
160 QuicAckListenerInterface* ack_notifier_delegate) override { | 166 QuicAckListenerInterface* ack_notifier_delegate) override { |
161 // Always consumes everything. | 167 QuicConsumedData consumed(data.total_length, fin); |
162 if (writev_consumes_all_data_) { | 168 if (!writev_consumes_all_data_) { |
163 return QuicConsumedData(data.total_length, fin); | 169 consumed = QuicSession::WritevData(id, data, offset, fin, fec_protection, |
164 } else { | 170 ack_notifier_delegate); |
165 return QuicSession::WritevData(id, data, offset, fin, fec_protection, | |
166 ack_notifier_delegate); | |
167 } | 171 } |
| 172 QuicSessionPeer::GetWriteBlockedStreams(this)->UpdateBytesForStream( |
| 173 id, consumed.bytes_consumed); |
| 174 return consumed; |
168 } | 175 } |
169 | 176 |
170 void set_writev_consumes_all_data(bool val) { | 177 void set_writev_consumes_all_data(bool val) { |
171 writev_consumes_all_data_ = val; | 178 writev_consumes_all_data_ = val; |
172 } | 179 } |
173 | 180 |
174 QuicConsumedData SendStreamData(QuicStreamId id) { | 181 QuicConsumedData SendStreamData(QuicStreamId id) { |
175 struct iovec iov; | 182 struct iovec iov; |
176 return WritevData(id, MakeIOVector("not empty", &iov), 0, true, | 183 return WritevData(id, MakeIOVector("not empty", &iov), 0, true, |
177 MAY_FEC_PROTECT, nullptr); | 184 MAY_FEC_PROTECT, nullptr); |
178 } | 185 } |
179 | 186 |
| 187 QuicConsumedData SendLargeFakeData(QuicStreamId id, int bytes) { |
| 188 DCHECK(writev_consumes_all_data_); |
| 189 struct iovec iov; |
| 190 iov.iov_base = nullptr; // should not be read. |
| 191 iov.iov_len = static_cast<size_t>(bytes); |
| 192 return WritevData(id, QuicIOVector(&iov, 1, bytes), 0, true, |
| 193 MAY_FEC_PROTECT, nullptr); |
| 194 } |
| 195 |
180 using QuicSession::PostProcessAfterData; | 196 using QuicSession::PostProcessAfterData; |
181 | 197 |
182 private: | 198 private: |
183 StrictMock<TestCryptoStream> crypto_stream_; | 199 StrictMock<TestCryptoStream> crypto_stream_; |
184 | 200 |
185 bool writev_consumes_all_data_; | 201 bool writev_consumes_all_data_; |
186 }; | 202 }; |
187 | 203 |
188 class QuicSessionTestBase : public ::testing::TestWithParam<QuicVersion> { | 204 class QuicSessionTestBase : public ::testing::TestWithParam<QuicVersion> { |
189 protected: | 205 protected: |
(...skipping 193 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
383 TestStream* stream2 = session_.CreateOutgoingDynamicStream(); | 399 TestStream* stream2 = session_.CreateOutgoingDynamicStream(); |
384 TestStream* stream4 = session_.CreateOutgoingDynamicStream(); | 400 TestStream* stream4 = session_.CreateOutgoingDynamicStream(); |
385 TestStream* stream6 = session_.CreateOutgoingDynamicStream(); | 401 TestStream* stream6 = session_.CreateOutgoingDynamicStream(); |
386 | 402 |
387 session_.MarkConnectionLevelWriteBlocked(stream2->id(), kSomeMiddlePriority); | 403 session_.MarkConnectionLevelWriteBlocked(stream2->id(), kSomeMiddlePriority); |
388 session_.MarkConnectionLevelWriteBlocked(stream6->id(), kSomeMiddlePriority); | 404 session_.MarkConnectionLevelWriteBlocked(stream6->id(), kSomeMiddlePriority); |
389 session_.MarkConnectionLevelWriteBlocked(stream4->id(), kSomeMiddlePriority); | 405 session_.MarkConnectionLevelWriteBlocked(stream4->id(), kSomeMiddlePriority); |
390 | 406 |
391 InSequence s; | 407 InSequence s; |
392 StreamBlocker stream2_blocker(&session_, stream2->id()); | 408 StreamBlocker stream2_blocker(&session_, stream2->id()); |
393 // Reregister, to test the loop limit. | 409 |
394 EXPECT_CALL(*stream2, OnCanWrite()) | 410 if (FLAGS_quic_batch_writes) { |
395 .WillOnce(Invoke(&stream2_blocker, | 411 // Reregister, to test the loop limit. |
396 &StreamBlocker::MarkConnectionLevelWriteBlocked)); | 412 EXPECT_CALL(*stream2, OnCanWrite()) |
397 EXPECT_CALL(*stream6, OnCanWrite()); | 413 .WillOnce(Invoke(&stream2_blocker, |
398 EXPECT_CALL(*stream4, OnCanWrite()); | 414 &StreamBlocker::MarkConnectionLevelWriteBlocked)); |
| 415 // 2 will get called a second time as it didn't finish its block |
| 416 EXPECT_CALL(*stream2, OnCanWrite()); |
| 417 EXPECT_CALL(*stream6, OnCanWrite()); |
| 418 // 4 will not get called, as we exceeded the loop limit. |
| 419 } else { |
| 420 // Reregister, to test the loop limit. |
| 421 EXPECT_CALL(*stream2, OnCanWrite()) |
| 422 .WillOnce(Invoke(&stream2_blocker, |
| 423 &StreamBlocker::MarkConnectionLevelWriteBlocked)); |
| 424 EXPECT_CALL(*stream6, OnCanWrite()); |
| 425 EXPECT_CALL(*stream4, OnCanWrite()); |
| 426 } |
399 session_.OnCanWrite(); | 427 session_.OnCanWrite(); |
400 EXPECT_TRUE(session_.WillingAndAbleToWrite()); | 428 EXPECT_TRUE(session_.WillingAndAbleToWrite()); |
401 } | 429 } |
402 | 430 |
| 431 TEST_P(QuicSessionTestServer, TestBatchedWrites) { |
| 432 FLAGS_quic_batch_writes = true; |
| 433 TestStream* stream2 = session_.CreateOutgoingDynamicStream(); |
| 434 TestStream* stream4 = session_.CreateOutgoingDynamicStream(); |
| 435 TestStream* stream6 = session_.CreateOutgoingDynamicStream(); |
| 436 |
| 437 session_.set_writev_consumes_all_data(true); |
| 438 session_.MarkConnectionLevelWriteBlocked(stream2->id(), kSomeMiddlePriority); |
| 439 session_.MarkConnectionLevelWriteBlocked(stream4->id(), kSomeMiddlePriority); |
| 440 |
| 441 StreamBlocker stream2_blocker(&session_, stream2->id()); |
| 442 StreamBlocker stream4_blocker(&session_, stream4->id()); |
| 443 StreamBlocker stream6_blocker(&session_, stream6->id()); |
| 444 // With two sessions blocked, we should get two write calls. They should both |
| 445 // go to the first stream as it will only write 6k and mark itself blocked |
| 446 // again. |
| 447 InSequence s; |
| 448 EXPECT_CALL(*stream2, OnCanWrite()) |
| 449 .WillOnce(DoAll(testing::IgnoreResult(Invoke(CreateFunctor( |
| 450 &session_, &TestSession::SendLargeFakeData, stream2->id(), 6000))), |
| 451 Invoke(&stream2_blocker, |
| 452 &StreamBlocker::MarkConnectionLevelWriteBlocked))); |
| 453 EXPECT_CALL(*stream2, OnCanWrite()) |
| 454 .WillOnce(DoAll(testing::IgnoreResult(Invoke(CreateFunctor( |
| 455 &session_, &TestSession::SendLargeFakeData, stream2->id(), 6000))), |
| 456 Invoke(&stream2_blocker, |
| 457 &StreamBlocker::MarkConnectionLevelWriteBlocked))); |
| 458 session_.OnCanWrite(); |
| 459 |
| 460 // We should get one more call for stream2, at which point it has used its |
| 461 // write quota and we move over to stream 4. |
| 462 EXPECT_CALL(*stream2, OnCanWrite()) |
| 463 .WillOnce(DoAll(testing::IgnoreResult(Invoke(CreateFunctor( |
| 464 &session_, &TestSession::SendLargeFakeData, stream2->id(), 6000))), |
| 465 Invoke(&stream2_blocker, |
| 466 &StreamBlocker::MarkConnectionLevelWriteBlocked))); |
| 467 EXPECT_CALL(*stream4, OnCanWrite()) |
| 468 .WillOnce(DoAll(testing::IgnoreResult(Invoke(CreateFunctor( |
| 469 &session_, &TestSession::SendLargeFakeData, stream4->id(), 6000))), |
| 470 Invoke(&stream4_blocker, |
| 471 &StreamBlocker::MarkConnectionLevelWriteBlocked))); |
| 472 session_.OnCanWrite(); |
| 473 |
| 474 // Now let stream 4 do the 2nd of its 3 writes, but add a block for a high |
| 475 // priority stream 6. 4 should be preempted. 6 will write but *not* block so |
| 476 // will cede back to 4. |
| 477 stream6->set_priority(kHighestPriority); |
| 478 EXPECT_CALL(*stream4, OnCanWrite()) |
| 479 .WillOnce(DoAll(testing::IgnoreResult(Invoke(CreateFunctor( |
| 480 &session_, &TestSession::SendLargeFakeData, stream4->id(), 6000))), |
| 481 Invoke(&stream4_blocker, |
| 482 &StreamBlocker::MarkConnectionLevelWriteBlocked), |
| 483 Invoke(&stream6_blocker, |
| 484 &StreamBlocker::MarkHighPriorityWriteBlocked))); |
| 485 EXPECT_CALL(*stream6, OnCanWrite()) |
| 486 .WillOnce(testing::IgnoreResult(Invoke(CreateFunctor( |
| 487 &session_, &TestSession::SendLargeFakeData, stream4->id(), 6000)))); |
| 488 session_.OnCanWrite(); |
| 489 |
| 490 // Stream4 alread did 6k worth of writes, so after doing another 12k it should |
| 491 // cede and 2 should resume. |
| 492 EXPECT_CALL(*stream4, OnCanWrite()) |
| 493 .WillOnce(DoAll(testing::IgnoreResult(Invoke(CreateFunctor( |
| 494 &session_, &TestSession::SendLargeFakeData, stream4->id(), 12000))), |
| 495 Invoke(&stream4_blocker, |
| 496 &StreamBlocker::MarkConnectionLevelWriteBlocked))); |
| 497 EXPECT_CALL(*stream2, OnCanWrite()) |
| 498 .WillOnce(DoAll(testing::IgnoreResult(Invoke(CreateFunctor( |
| 499 &session_, &TestSession::SendLargeFakeData, stream2->id(), 6000))), |
| 500 Invoke(&stream2_blocker, |
| 501 &StreamBlocker::MarkConnectionLevelWriteBlocked))); |
| 502 session_.OnCanWrite(); |
| 503 } |
| 504 |
403 TEST_P(QuicSessionTestServer, OnCanWriteBundlesStreams) { | 505 TEST_P(QuicSessionTestServer, OnCanWriteBundlesStreams) { |
404 // Drive congestion control manually. | 506 // Drive congestion control manually. |
405 MockSendAlgorithm* send_algorithm = new StrictMock<MockSendAlgorithm>; | 507 MockSendAlgorithm* send_algorithm = new StrictMock<MockSendAlgorithm>; |
406 QuicConnectionPeer::SetSendAlgorithm(session_.connection(), send_algorithm); | 508 QuicConnectionPeer::SetSendAlgorithm(session_.connection(), send_algorithm); |
407 | 509 |
408 TestStream* stream2 = session_.CreateOutgoingDynamicStream(); | 510 TestStream* stream2 = session_.CreateOutgoingDynamicStream(); |
409 TestStream* stream4 = session_.CreateOutgoingDynamicStream(); | 511 TestStream* stream4 = session_.CreateOutgoingDynamicStream(); |
410 TestStream* stream6 = session_.CreateOutgoingDynamicStream(); | 512 TestStream* stream6 = session_.CreateOutgoingDynamicStream(); |
411 | 513 |
412 session_.MarkConnectionLevelWriteBlocked(stream2->id(), kSomeMiddlePriority); | 514 session_.MarkConnectionLevelWriteBlocked(stream2->id(), kSomeMiddlePriority); |
(...skipping 96 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
509 InSequence s; | 611 InSequence s; |
510 // Force most streams to re-register, which is common scenario when we block | 612 // Force most streams to re-register, which is common scenario when we block |
511 // the Crypto stream, and only the crypto stream can "really" write. | 613 // the Crypto stream, and only the crypto stream can "really" write. |
512 | 614 |
513 // Due to prioritization, we *should* be asked to write the crypto stream | 615 // Due to prioritization, we *should* be asked to write the crypto stream |
514 // first. | 616 // first. |
515 // Don't re-register the crypto stream (which signals complete writing). | 617 // Don't re-register the crypto stream (which signals complete writing). |
516 TestCryptoStream* crypto_stream = session_.GetCryptoStream(); | 618 TestCryptoStream* crypto_stream = session_.GetCryptoStream(); |
517 EXPECT_CALL(*crypto_stream, OnCanWrite()); | 619 EXPECT_CALL(*crypto_stream, OnCanWrite()); |
518 | 620 |
519 // Re-register all other streams, to show they weren't able to proceed. | 621 EXPECT_CALL(*stream2, OnCanWrite()); |
520 EXPECT_CALL(*stream2, OnCanWrite()) | 622 EXPECT_CALL(*stream3, OnCanWrite()); |
521 .WillOnce(Invoke(&stream2_blocker, | |
522 &StreamBlocker::MarkConnectionLevelWriteBlocked)); | |
523 EXPECT_CALL(*stream3, OnCanWrite()) | |
524 .WillOnce(Invoke(&stream3_blocker, | |
525 &StreamBlocker::MarkConnectionLevelWriteBlocked)); | |
526 EXPECT_CALL(*stream4, OnCanWrite()) | 623 EXPECT_CALL(*stream4, OnCanWrite()) |
527 .WillOnce(Invoke(&stream4_blocker, | 624 .WillOnce(Invoke(&stream4_blocker, |
528 &StreamBlocker::MarkConnectionLevelWriteBlocked)); | 625 &StreamBlocker::MarkConnectionLevelWriteBlocked)); |
529 | 626 |
530 session_.OnCanWrite(); | 627 session_.OnCanWrite(); |
531 EXPECT_TRUE(session_.WillingAndAbleToWrite()); | 628 EXPECT_TRUE(session_.WillingAndAbleToWrite()); |
532 EXPECT_FALSE(session_.HasPendingHandshake()); // Crypto stream wrote. | 629 EXPECT_FALSE(session_.HasPendingHandshake()); // Crypto stream wrote. |
533 } | 630 } |
534 | 631 |
535 TEST_P(QuicSessionTestServer, OnCanWriteWithClosedStream) { | 632 TEST_P(QuicSessionTestServer, OnCanWriteWithClosedStream) { |
(...skipping 552 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
1088 // Verify that there is no entry for the stream in | 1185 // Verify that there is no entry for the stream in |
1089 // locally_closed_streams_highest_offset_. | 1186 // locally_closed_streams_highest_offset_. |
1090 EXPECT_EQ( | 1187 EXPECT_EQ( |
1091 FLAGS_quic_fix_fin_accounting ? 0u : 1u, | 1188 FLAGS_quic_fix_fin_accounting ? 0u : 1u, |
1092 QuicSessionPeer::GetLocallyClosedStreamsHighestOffset(&session_).size()); | 1189 QuicSessionPeer::GetLocallyClosedStreamsHighestOffset(&session_).size()); |
1093 } | 1190 } |
1094 | 1191 |
1095 } // namespace | 1192 } // namespace |
1096 } // namespace test | 1193 } // namespace test |
1097 } // namespace net | 1194 } // namespace net |
OLD | NEW |