Index: src/circular-queue.h |
diff --git a/src/circular-queue.h b/src/circular-queue.h |
index 4ad4f4b5505356c784e63dd12dd939da526e935e..45c5be521e11e84e2ee85a12afe3078cc1e6cfbb 100644 |
--- a/src/circular-queue.h |
+++ b/src/circular-queue.h |
@@ -35,24 +35,22 @@ namespace internal { |
// Lock-free cache-friendly sampling circular queue for large |
// records. Intended for fast transfer of large records between a |
// single producer and a single consumer. If the queue is full, |
-// previous unread records are overwritten. The queue is designed with |
+// StartEnqueue will return NULL. The queue is designed with |
// a goal in mind to evade cache lines thrashing by preventing |
// simultaneous reads and writes to adjanced memory locations. |
-// |
-// IMPORTANT: as a producer never checks for chunks cleanness, it is |
-// possible that it can catch up and overwrite a chunk that a consumer |
-// is currently reading, resulting in a corrupt record being read. |
class SamplingCircularQueue { |
Benedikt Meurer
2013/08/13 09:31:32
While we're already cleaning up this class, let's
yurys
2013/08/13 14:10:29
Done.
|
public: |
// Executed on the application thread. |
SamplingCircularQueue(size_t record_size_in_bytes, |
- size_t desired_chunk_size_in_bytes, |
- unsigned buffer_size_in_chunks); |
+ unsigned buffer_size_in_records); |
~SamplingCircularQueue(); |
- // Enqueue returns a pointer to a memory location for storing the next |
- // record. |
- INLINE(void* Enqueue()); |
+ // StartEnqueue returns a pointer to a memory location for storing the next |
+ // record or NULL if all entries are full at the moment. |
+ INLINE(void* StartEnqueue()); |
+ // Notifies the queue that the producer has complete writing data into the |
+ // memory returned by StartEnqueue and it can be passed to the consumer. |
+ INLINE(void FinishEnqueue()); |
// Executed on the consumer (analyzer) thread. |
// StartDequeue returns a pointer to a memory location for retrieving |
@@ -61,37 +59,29 @@ class SamplingCircularQueue { |
// to StartDequeue will return the same pointer. |
void* StartDequeue(); |
void FinishDequeue(); |
- // Due to a presence of slipping between the producer and the consumer, |
- // the queue must be notified whether producing has been finished in order |
- // to process remaining records from the buffer. |
- void FlushResidualRecords(); |
typedef AtomicWord Cell; |
Benedikt Meurer
2013/08/13 09:31:32
Get rid of this typedef, it's just confusing. Also
yurys
2013/08/13 14:10:29
Done.
|
private: |
- // Reserved values for the chunk marker (first Cell in each chunk). |
+ // Reserved values for the entry marker (first Cell in each entry). |
enum { |
- kClear, // Marks clean (processed) chunks. |
- kEnqueueStarted // Marks chunks where enqueue started. |
+ kEmpty, // Marks clean (processed) entries. |
+ kFull // Marks entries already filled by the producer but not yet |
+ // completely processed by the consumer. |
}; |
struct ProducerPosition { |
- Cell* next_chunk_pos; |
Cell* enqueue_pos; |
}; |
struct ConsumerPosition { |
- Cell* dequeue_chunk_pos; |
- Cell* dequeue_chunk_poll_pos; |
Cell* dequeue_pos; |
- Cell* dequeue_end_pos; |
}; |
- INLINE(void WrapPositionIfNeeded(Cell** pos)); |
+ inline void WrapPositionIfNeeded(Cell** pos); |
- const size_t record_size_; |
- const size_t chunk_size_in_bytes_; |
- const size_t chunk_size_; |
+ const size_t entry_size_; |
const size_t buffer_size_; |
+ Cell* not_aligned_buffer_; |
Cell* buffer_; |
byte* positions_; |
ProducerPosition* producer_pos_; |