Index: src/circular-queue.cc |
diff --git a/src/circular-queue.cc b/src/circular-queue.cc |
index 5f7a33eb3a86d0dc71416617300eec7c945b7715..a7c25323e827842a41e00f4469ca917de72f6c0d 100644 |
--- a/src/circular-queue.cc |
+++ b/src/circular-queue.cc |
@@ -52,52 +52,44 @@ SamplingCircularQueue::SamplingCircularQueue(int record_size_in_bytes, |
buffer_[i] = kClear; |
} |
buffer_[buffer_size_] = kEnd; |
+ |
+ // Layout producer and consumer position pointers each on their own |
+ // cache lines to avoid cache lines thrashing due to simultaneous |
+ // updates of positions by different processor cores. |
+ const int positions_size = |
+ RoundUp(1, kProcessorCacheLineSize) + |
+ RoundUp(sizeof(ProducerPosition), kProcessorCacheLineSize) + |
+ RoundUp(sizeof(ConsumerPosition), kProcessorCacheLineSize); |
+ positions_ = NewArray<byte>(positions_size); |
+ |
+ producer_pos_ = reinterpret_cast<ProducerPosition*>( |
+ RoundUp(positions_, kProcessorCacheLineSize)); |
+ producer_pos_->enqueue_pos = buffer_; |
+ |
+ consumer_pos_ = reinterpret_cast<ConsumerPosition*>( |
+ reinterpret_cast<byte*>(producer_pos_) + kProcessorCacheLineSize); |
+ ASSERT(reinterpret_cast<byte*>(consumer_pos_ + 1) <= |
+ positions_ + positions_size); |
+ consumer_pos_->dequeue_chunk_pos = buffer_; |
+ consumer_pos_->dequeue_chunk_poll_pos = buffer_ + producer_consumer_distance_; |
+ consumer_pos_->dequeue_pos = NULL; |
} |
SamplingCircularQueue::~SamplingCircularQueue() { |
+ DeleteArray(positions_); |
DeleteArray(buffer_); |
} |
-void SamplingCircularQueue::SetUpProducer() { |
- producer_key_ = Thread::CreateThreadLocalKey(); |
- Thread::SetThreadLocal(producer_key_, buffer_); |
-} |
- |
- |
-void SamplingCircularQueue::TearDownProducer() { |
- Thread::DeleteThreadLocalKey(producer_key_); |
-} |
- |
- |
-void SamplingCircularQueue::SetUpConsumer() { |
- consumer_key_ = Thread::CreateThreadLocalKey(); |
- ConsumerPosition* cp = new ConsumerPosition; |
- cp->dequeue_chunk_pos = buffer_; |
- cp->dequeue_chunk_poll_pos = buffer_ + producer_consumer_distance_; |
- cp->dequeue_pos = NULL; |
- Thread::SetThreadLocal(consumer_key_, cp); |
-} |
- |
- |
-void SamplingCircularQueue::TearDownConsumer() { |
- delete reinterpret_cast<ConsumerPosition*>( |
- Thread::GetThreadLocal(consumer_key_)); |
- Thread::DeleteThreadLocalKey(consumer_key_); |
-} |
- |
- |
void* SamplingCircularQueue::StartDequeue() { |
- ConsumerPosition* cp = reinterpret_cast<ConsumerPosition*>( |
- Thread::GetThreadLocal(consumer_key_)); |
- if (cp->dequeue_pos != NULL) { |
- return cp->dequeue_pos; |
+ if (consumer_pos_->dequeue_pos != NULL) { |
+ return consumer_pos_->dequeue_pos; |
} else { |
- if (*cp->dequeue_chunk_poll_pos != kClear) { |
- cp->dequeue_pos = cp->dequeue_chunk_pos; |
- cp->dequeue_end_pos = cp->dequeue_pos + chunk_size_; |
- return cp->dequeue_pos; |
+ if (*consumer_pos_->dequeue_chunk_poll_pos != kClear) { |
+ consumer_pos_->dequeue_pos = consumer_pos_->dequeue_chunk_pos; |
+ consumer_pos_->dequeue_end_pos = consumer_pos_->dequeue_pos + chunk_size_; |
+ return consumer_pos_->dequeue_pos; |
} else { |
return NULL; |
} |
@@ -106,25 +98,21 @@ void* SamplingCircularQueue::StartDequeue() { |
void SamplingCircularQueue::FinishDequeue() { |
- ConsumerPosition* cp = reinterpret_cast<ConsumerPosition*>( |
- Thread::GetThreadLocal(consumer_key_)); |
- cp->dequeue_pos += record_size_; |
- if (cp->dequeue_pos < cp->dequeue_end_pos) return; |
+ consumer_pos_->dequeue_pos += record_size_; |
+ if (consumer_pos_->dequeue_pos < consumer_pos_->dequeue_end_pos) return; |
// Move to next chunk. |
- cp->dequeue_pos = NULL; |
- *cp->dequeue_chunk_pos = kClear; |
- cp->dequeue_chunk_pos += chunk_size_; |
- WrapPositionIfNeeded(&cp->dequeue_chunk_pos); |
- cp->dequeue_chunk_poll_pos += chunk_size_; |
- WrapPositionIfNeeded(&cp->dequeue_chunk_poll_pos); |
+ consumer_pos_->dequeue_pos = NULL; |
+ *consumer_pos_->dequeue_chunk_pos = kClear; |
+ consumer_pos_->dequeue_chunk_pos += chunk_size_; |
+ WrapPositionIfNeeded(&consumer_pos_->dequeue_chunk_pos); |
+ consumer_pos_->dequeue_chunk_poll_pos += chunk_size_; |
+ WrapPositionIfNeeded(&consumer_pos_->dequeue_chunk_poll_pos); |
} |
void SamplingCircularQueue::FlushResidualRecords() { |
- ConsumerPosition* cp = reinterpret_cast<ConsumerPosition*>( |
- Thread::GetThreadLocal(consumer_key_)); |
// Eliminate producer / consumer distance. |
- cp->dequeue_chunk_poll_pos = cp->dequeue_chunk_pos; |
+ consumer_pos_->dequeue_chunk_poll_pos = consumer_pos_->dequeue_chunk_pos; |
} |