Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(24)

Unified Diff: src/circular-queue.cc

Issue 1138004: Add multithreading test for SamplingCircularQueue, fix implementation. (Closed)
Patch Set: Created 10 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « src/circular-queue.h ('k') | src/circular-queue-inl.h » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: src/circular-queue.cc
diff --git a/src/circular-queue.cc b/src/circular-queue.cc
index 5f7a33eb3a86d0dc71416617300eec7c945b7715..a7c25323e827842a41e00f4469ca917de72f6c0d 100644
--- a/src/circular-queue.cc
+++ b/src/circular-queue.cc
@@ -52,52 +52,44 @@ SamplingCircularQueue::SamplingCircularQueue(int record_size_in_bytes,
buffer_[i] = kClear;
}
buffer_[buffer_size_] = kEnd;
+
+ // Layout producer and consumer position pointers each on their own
+ // cache lines to avoid cache lines thrashing due to simultaneous
+ // updates of positions by different processor cores.
+ const int positions_size =
+ RoundUp(1, kProcessorCacheLineSize) +
+ RoundUp(sizeof(ProducerPosition), kProcessorCacheLineSize) +
+ RoundUp(sizeof(ConsumerPosition), kProcessorCacheLineSize);
+ positions_ = NewArray<byte>(positions_size);
+
+ producer_pos_ = reinterpret_cast<ProducerPosition*>(
+ RoundUp(positions_, kProcessorCacheLineSize));
+ producer_pos_->enqueue_pos = buffer_;
+
+ consumer_pos_ = reinterpret_cast<ConsumerPosition*>(
+ reinterpret_cast<byte*>(producer_pos_) + kProcessorCacheLineSize);
+ ASSERT(reinterpret_cast<byte*>(consumer_pos_ + 1) <=
+ positions_ + positions_size);
+ consumer_pos_->dequeue_chunk_pos = buffer_;
+ consumer_pos_->dequeue_chunk_poll_pos = buffer_ + producer_consumer_distance_;
+ consumer_pos_->dequeue_pos = NULL;
}
SamplingCircularQueue::~SamplingCircularQueue() {
+ DeleteArray(positions_);
DeleteArray(buffer_);
}
-void SamplingCircularQueue::SetUpProducer() {
- producer_key_ = Thread::CreateThreadLocalKey();
- Thread::SetThreadLocal(producer_key_, buffer_);
-}
-
-
-void SamplingCircularQueue::TearDownProducer() {
- Thread::DeleteThreadLocalKey(producer_key_);
-}
-
-
-void SamplingCircularQueue::SetUpConsumer() {
- consumer_key_ = Thread::CreateThreadLocalKey();
- ConsumerPosition* cp = new ConsumerPosition;
- cp->dequeue_chunk_pos = buffer_;
- cp->dequeue_chunk_poll_pos = buffer_ + producer_consumer_distance_;
- cp->dequeue_pos = NULL;
- Thread::SetThreadLocal(consumer_key_, cp);
-}
-
-
-void SamplingCircularQueue::TearDownConsumer() {
- delete reinterpret_cast<ConsumerPosition*>(
- Thread::GetThreadLocal(consumer_key_));
- Thread::DeleteThreadLocalKey(consumer_key_);
-}
-
-
void* SamplingCircularQueue::StartDequeue() {
- ConsumerPosition* cp = reinterpret_cast<ConsumerPosition*>(
- Thread::GetThreadLocal(consumer_key_));
- if (cp->dequeue_pos != NULL) {
- return cp->dequeue_pos;
+ if (consumer_pos_->dequeue_pos != NULL) {
+ return consumer_pos_->dequeue_pos;
} else {
- if (*cp->dequeue_chunk_poll_pos != kClear) {
- cp->dequeue_pos = cp->dequeue_chunk_pos;
- cp->dequeue_end_pos = cp->dequeue_pos + chunk_size_;
- return cp->dequeue_pos;
+ if (*consumer_pos_->dequeue_chunk_poll_pos != kClear) {
+ consumer_pos_->dequeue_pos = consumer_pos_->dequeue_chunk_pos;
+ consumer_pos_->dequeue_end_pos = consumer_pos_->dequeue_pos + chunk_size_;
+ return consumer_pos_->dequeue_pos;
} else {
return NULL;
}
@@ -106,25 +98,21 @@ void* SamplingCircularQueue::StartDequeue() {
void SamplingCircularQueue::FinishDequeue() {
- ConsumerPosition* cp = reinterpret_cast<ConsumerPosition*>(
- Thread::GetThreadLocal(consumer_key_));
- cp->dequeue_pos += record_size_;
- if (cp->dequeue_pos < cp->dequeue_end_pos) return;
+ consumer_pos_->dequeue_pos += record_size_;
+ if (consumer_pos_->dequeue_pos < consumer_pos_->dequeue_end_pos) return;
// Move to next chunk.
- cp->dequeue_pos = NULL;
- *cp->dequeue_chunk_pos = kClear;
- cp->dequeue_chunk_pos += chunk_size_;
- WrapPositionIfNeeded(&cp->dequeue_chunk_pos);
- cp->dequeue_chunk_poll_pos += chunk_size_;
- WrapPositionIfNeeded(&cp->dequeue_chunk_poll_pos);
+ consumer_pos_->dequeue_pos = NULL;
+ *consumer_pos_->dequeue_chunk_pos = kClear;
+ consumer_pos_->dequeue_chunk_pos += chunk_size_;
+ WrapPositionIfNeeded(&consumer_pos_->dequeue_chunk_pos);
+ consumer_pos_->dequeue_chunk_poll_pos += chunk_size_;
+ WrapPositionIfNeeded(&consumer_pos_->dequeue_chunk_poll_pos);
}
void SamplingCircularQueue::FlushResidualRecords() {
- ConsumerPosition* cp = reinterpret_cast<ConsumerPosition*>(
- Thread::GetThreadLocal(consumer_key_));
// Eliminate producer / consumer distance.
- cp->dequeue_chunk_poll_pos = cp->dequeue_chunk_pos;
+ consumer_pos_->dequeue_chunk_poll_pos = consumer_pos_->dequeue_chunk_pos;
}
« no previous file with comments | « src/circular-queue.h ('k') | src/circular-queue-inl.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698