| Index: src/circular-queue.cc
|
| diff --git a/src/circular-queue.cc b/src/circular-queue.cc
|
| index 5f7a33eb3a86d0dc71416617300eec7c945b7715..a7c25323e827842a41e00f4469ca917de72f6c0d 100644
|
| --- a/src/circular-queue.cc
|
| +++ b/src/circular-queue.cc
|
| @@ -52,52 +52,44 @@ SamplingCircularQueue::SamplingCircularQueue(int record_size_in_bytes,
|
| buffer_[i] = kClear;
|
| }
|
| buffer_[buffer_size_] = kEnd;
|
| +
|
| + // Layout producer and consumer position pointers each on their own
|
| + // cache lines to avoid cache lines thrashing due to simultaneous
|
| + // updates of positions by different processor cores.
|
| + const int positions_size =
|
| + RoundUp(1, kProcessorCacheLineSize) +
|
| + RoundUp(sizeof(ProducerPosition), kProcessorCacheLineSize) +
|
| + RoundUp(sizeof(ConsumerPosition), kProcessorCacheLineSize);
|
| + positions_ = NewArray<byte>(positions_size);
|
| +
|
| + producer_pos_ = reinterpret_cast<ProducerPosition*>(
|
| + RoundUp(positions_, kProcessorCacheLineSize));
|
| + producer_pos_->enqueue_pos = buffer_;
|
| +
|
| + consumer_pos_ = reinterpret_cast<ConsumerPosition*>(
|
| + reinterpret_cast<byte*>(producer_pos_) + kProcessorCacheLineSize);
|
| + ASSERT(reinterpret_cast<byte*>(consumer_pos_ + 1) <=
|
| + positions_ + positions_size);
|
| + consumer_pos_->dequeue_chunk_pos = buffer_;
|
| + consumer_pos_->dequeue_chunk_poll_pos = buffer_ + producer_consumer_distance_;
|
| + consumer_pos_->dequeue_pos = NULL;
|
| }
|
|
|
|
|
| SamplingCircularQueue::~SamplingCircularQueue() {
|
| + DeleteArray(positions_);
|
| DeleteArray(buffer_);
|
| }
|
|
|
|
|
| -void SamplingCircularQueue::SetUpProducer() {
|
| - producer_key_ = Thread::CreateThreadLocalKey();
|
| - Thread::SetThreadLocal(producer_key_, buffer_);
|
| -}
|
| -
|
| -
|
| -void SamplingCircularQueue::TearDownProducer() {
|
| - Thread::DeleteThreadLocalKey(producer_key_);
|
| -}
|
| -
|
| -
|
| -void SamplingCircularQueue::SetUpConsumer() {
|
| - consumer_key_ = Thread::CreateThreadLocalKey();
|
| - ConsumerPosition* cp = new ConsumerPosition;
|
| - cp->dequeue_chunk_pos = buffer_;
|
| - cp->dequeue_chunk_poll_pos = buffer_ + producer_consumer_distance_;
|
| - cp->dequeue_pos = NULL;
|
| - Thread::SetThreadLocal(consumer_key_, cp);
|
| -}
|
| -
|
| -
|
| -void SamplingCircularQueue::TearDownConsumer() {
|
| - delete reinterpret_cast<ConsumerPosition*>(
|
| - Thread::GetThreadLocal(consumer_key_));
|
| - Thread::DeleteThreadLocalKey(consumer_key_);
|
| -}
|
| -
|
| -
|
| void* SamplingCircularQueue::StartDequeue() {
|
| - ConsumerPosition* cp = reinterpret_cast<ConsumerPosition*>(
|
| - Thread::GetThreadLocal(consumer_key_));
|
| - if (cp->dequeue_pos != NULL) {
|
| - return cp->dequeue_pos;
|
| + if (consumer_pos_->dequeue_pos != NULL) {
|
| + return consumer_pos_->dequeue_pos;
|
| } else {
|
| - if (*cp->dequeue_chunk_poll_pos != kClear) {
|
| - cp->dequeue_pos = cp->dequeue_chunk_pos;
|
| - cp->dequeue_end_pos = cp->dequeue_pos + chunk_size_;
|
| - return cp->dequeue_pos;
|
| + if (*consumer_pos_->dequeue_chunk_poll_pos != kClear) {
|
| + consumer_pos_->dequeue_pos = consumer_pos_->dequeue_chunk_pos;
|
| + consumer_pos_->dequeue_end_pos = consumer_pos_->dequeue_pos + chunk_size_;
|
| + return consumer_pos_->dequeue_pos;
|
| } else {
|
| return NULL;
|
| }
|
| @@ -106,25 +98,21 @@ void* SamplingCircularQueue::StartDequeue() {
|
|
|
|
|
| void SamplingCircularQueue::FinishDequeue() {
|
| - ConsumerPosition* cp = reinterpret_cast<ConsumerPosition*>(
|
| - Thread::GetThreadLocal(consumer_key_));
|
| - cp->dequeue_pos += record_size_;
|
| - if (cp->dequeue_pos < cp->dequeue_end_pos) return;
|
| + consumer_pos_->dequeue_pos += record_size_;
|
| + if (consumer_pos_->dequeue_pos < consumer_pos_->dequeue_end_pos) return;
|
| // Move to next chunk.
|
| - cp->dequeue_pos = NULL;
|
| - *cp->dequeue_chunk_pos = kClear;
|
| - cp->dequeue_chunk_pos += chunk_size_;
|
| - WrapPositionIfNeeded(&cp->dequeue_chunk_pos);
|
| - cp->dequeue_chunk_poll_pos += chunk_size_;
|
| - WrapPositionIfNeeded(&cp->dequeue_chunk_poll_pos);
|
| + consumer_pos_->dequeue_pos = NULL;
|
| + *consumer_pos_->dequeue_chunk_pos = kClear;
|
| + consumer_pos_->dequeue_chunk_pos += chunk_size_;
|
| + WrapPositionIfNeeded(&consumer_pos_->dequeue_chunk_pos);
|
| + consumer_pos_->dequeue_chunk_poll_pos += chunk_size_;
|
| + WrapPositionIfNeeded(&consumer_pos_->dequeue_chunk_poll_pos);
|
| }
|
|
|
|
|
| void SamplingCircularQueue::FlushResidualRecords() {
|
| - ConsumerPosition* cp = reinterpret_cast<ConsumerPosition*>(
|
| - Thread::GetThreadLocal(consumer_key_));
|
| // Eliminate producer / consumer distance.
|
| - cp->dequeue_chunk_poll_pos = cp->dequeue_chunk_pos;
|
| + consumer_pos_->dequeue_chunk_poll_pos = consumer_pos_->dequeue_chunk_pos;
|
| }
|
|
|
|
|
|
|