Index: src/circular-queue.h |
diff --git a/src/circular-queue.h b/src/circular-queue.h |
index 4ad4f4b5505356c784e63dd12dd939da526e935e..3ecba1fea1dec986136d43ab7913ec0e28cf9de0 100644 |
--- a/src/circular-queue.h |
+++ b/src/circular-queue.h |
@@ -28,6 +28,8 @@ |
#ifndef V8_CIRCULAR_QUEUE_H_ |
#define V8_CIRCULAR_QUEUE_H_ |
+#include "v8globals.h" |
+ |
namespace v8 { |
namespace internal { |
@@ -35,67 +37,50 @@ namespace internal { |
// Lock-free cache-friendly sampling circular queue for large |
// records. Intended for fast transfer of large records between a |
// single producer and a single consumer. If the queue is full, |
-// previous unread records are overwritten. The queue is designed with |
+// StartEnqueue will return NULL. The queue is designed with |
// a goal in mind to evade cache lines thrashing by preventing |
// simultaneous reads and writes to adjanced memory locations. |
-// |
-// IMPORTANT: as a producer never checks for chunks cleanness, it is |
-// possible that it can catch up and overwrite a chunk that a consumer |
-// is currently reading, resulting in a corrupt record being read. |
+template<typename T, unsigned Length> |
class SamplingCircularQueue { |
public: |
// Executed on the application thread. |
- SamplingCircularQueue(size_t record_size_in_bytes, |
- size_t desired_chunk_size_in_bytes, |
- unsigned buffer_size_in_chunks); |
+ SamplingCircularQueue(); |
~SamplingCircularQueue(); |
- // Enqueue returns a pointer to a memory location for storing the next |
- // record. |
- INLINE(void* Enqueue()); |
+ // StartEnqueue returns a pointer to a memory location for storing the next |
+ // record or NULL if all entries are full at the moment. |
+ T* StartEnqueue(); |
+ // Notifies the queue that the producer has complete writing data into the |
+ // memory returned by StartEnqueue and it can be passed to the consumer. |
+ void FinishEnqueue(); |
// Executed on the consumer (analyzer) thread. |
// StartDequeue returns a pointer to a memory location for retrieving |
// the next record. After the record had been read by a consumer, |
// FinishDequeue must be called. Until that moment, subsequent calls |
// to StartDequeue will return the same pointer. |
- void* StartDequeue(); |
+ T* StartDequeue(); |
void FinishDequeue(); |
- // Due to a presence of slipping between the producer and the consumer, |
- // the queue must be notified whether producing has been finished in order |
- // to process remaining records from the buffer. |
- void FlushResidualRecords(); |
- |
- typedef AtomicWord Cell; |
private: |
- // Reserved values for the chunk marker (first Cell in each chunk). |
+ // Reserved values for the entry marker. |
enum { |
- kClear, // Marks clean (processed) chunks. |
- kEnqueueStarted // Marks chunks where enqueue started. |
+ kEmpty, // Marks clean (processed) entries. |
+ kFull // Marks entries already filled by the producer but not yet |
+ // completely processed by the consumer. |
}; |
- struct ProducerPosition { |
- Cell* next_chunk_pos; |
- Cell* enqueue_pos; |
- }; |
- struct ConsumerPosition { |
- Cell* dequeue_chunk_pos; |
- Cell* dequeue_chunk_poll_pos; |
- Cell* dequeue_pos; |
- Cell* dequeue_end_pos; |
+ struct V8_ALIGNAS(PROCESSOR_CACHE_LINE_SIZE) Entry { |
+ Entry() : marker(kEmpty) {} |
+ T record; |
+ Atomic32 marker; |
}; |
- INLINE(void WrapPositionIfNeeded(Cell** pos)); |
+ Entry* Next(Entry* entry); |
- const size_t record_size_; |
- const size_t chunk_size_in_bytes_; |
- const size_t chunk_size_; |
- const size_t buffer_size_; |
- Cell* buffer_; |
- byte* positions_; |
- ProducerPosition* producer_pos_; |
- ConsumerPosition* consumer_pos_; |
+ Entry buffer_[Length]; |
+ Entry* enqueue_pos_ V8_ALIGNAS(PROCESSOR_CACHE_LINE_SIZE); |
+ Entry* dequeue_pos_ V8_ALIGNAS(PROCESSOR_CACHE_LINE_SIZE); |
DISALLOW_COPY_AND_ASSIGN(SamplingCircularQueue); |
}; |