Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(33)

Side by Side Diff: common/eventlog/internal/logservice/batch_logger.go

Issue 2557593002: Add batch logging support to eventlog. (Closed)
Patch Set: Fix race in test. Created 4 years ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « common/eventlog/eventlog.go ('k') | common/eventlog/internal/logservice/batch_logger_test.go » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 // Copyright 2016 The LUCI Authors. All rights reserved.
2 // Use of this source code is governed under the Apache License, Version 2.0
3 // that can be found in the LICENSE file.
4
5 // Package logservice provides loggers which can be used to to collect and send batches of logs to the eventlog service.
6 package logservice
7
8 import (
9 "context"
10 "log"
11 "sync"
12 "time"
13
14 logpb "github.com/luci/luci-go/common/eventlog/proto"
15 )
16
17 // numRetries is the number of retry attempts we will make to log a given event in the face of transient errors.
18 const numRetries = 3
19
20 // BatchLogger accumlates log events and sends them in batches. It is safe for c oncurrent use.
21 type BatchLogger struct {
22 logger syncLogger
23
24 // ctx is the context to use when uploading batch of entries.
25 ctx context.Context
26
27 wg sync.WaitGroup
28 tickc <-chan time.Time // receives from tickc trigger uploads.
29 stopc chan struct{} // closing stopc indicates that the BatchLogger s hould shut down.
30
31 // retries groups log events by the number of retries remaining after th e next upload attempt.
32 retries *ringBuffer
33
34 // pending is a list of log events that have not yet had an upload attem pt.
35 mu sync.Mutex
36 pending []*logpb.LogRequestLite_LogEventLite
37 }
38
39 type syncLogger interface {
40 LogSync(context.Context, ...*logpb.LogRequestLite_LogEventLite) error
41 }
42
43 // NewBatchLogger constructs a new BatchLogger.
44 // The supplied context will be used when logging entries supplied via the Log m ethod.
45 // Its Close method must be called when it is no longer needed.
46 func NewBatchLogger(ctx context.Context, logger *Logger, uploadTicker <-chan tim e.Time) *BatchLogger {
47 return newBatchLogger(ctx, logger, uploadTicker)
48 }
49
50 func newBatchLogger(ctx context.Context, logger syncLogger, uploadTicker <-chan time.Time) *BatchLogger {
51 bl := &BatchLogger{
52 logger: logger,
53 stopc: make(chan struct{}),
54 tickc: uploadTicker,
55 }
56
57 bl.wg.Add(1)
58 go func() {
59 defer bl.wg.Done()
60 for {
61 select {
62 case <-bl.tickc:
63 bl.upload()
64 case <-bl.stopc:
65 return
66 }
67 }
68 }()
69 return bl
70 }
71
72 // Close flushes any pending logs and releases any resources held by the logger.
73 // Once Close has been returned, no more retry attempts will be made.
74 // Close should be called when the logger is no longer needed.
75 func (bl *BatchLogger) Close() {
76 close(bl.stopc)
77 bl.wg.Wait()
78
79 // Final upload.
80 // TODO(mcgreevy): flush outstanding events with exponential backoff.
81 bl.upload()
82 }
83
84 // Log stages events to be logged to the eventlog service.
85 // The EventTime in each event must have been obtained from time.Now.
86 // Log returns immediately, and batches of events will be sent to the eventlog s erver periodically.
87 func (bl *BatchLogger) Log(events ...*logpb.LogRequestLite_LogEventLite) {
88 bl.mu.Lock()
89 defer bl.mu.Unlock()
90 bl.pending = append(bl.pending, events...)
91 }
92
93 func (bl *BatchLogger) upload() {
94 // TODO: split uploads into multiple requests if size threshold is excee ded.
95 bl.mu.Lock()
96 pending := bl.pending
97 bl.pending = nil
98 bl.mu.Unlock()
99
100 if len(pending) == 0 && bl.retries == nil {
101 return
102 }
103
104 // Normal case: nothing to retry.
105 if bl.retries == nil {
106 if err := bl.logger.LogSync(bl.ctx, pending...); ShouldRetry(err ) {
107 bl.retries = &ringBuffer{}
108 bl.retries.Push(pending)
109 }
110 return
111 }
112
113 // get a slice of elements for which this is our final attempt.
114 lastChance := bl.retries.Push(pending)
115 toUpload := bl.retries.AppendAll(lastChance)
116
117 err := bl.logger.LogSync(bl.ctx, toUpload...)
118 if err == nil {
119 bl.retries = nil
120 return
121 }
122
123 if dropped := len(lastChance); dropped > 0 {
124 log.Printf("eventlog: WARNING: retries exhausted. Dropping %d ev ents.", dropped)
125 }
126
127 if !ShouldRetry(err) {
128 bl.retries = nil
129 }
130 }
131
132 // ring Buffer is a fixed capacity buffer. When at capacity, a Push into the buf fer causes an element to be displaced.
133 type ringBuffer struct {
134 buf [numRetries][]*logpb.LogRequestLite_LogEventLite
135 i int // the next place to insert.
136 }
137
138 // Push inserts a batch of entries into the ring buffer, and returns the batch w hich it displaces.
139 func (rb *ringBuffer) Push(entry []*logpb.LogRequestLite_LogEventLite) []*logpb. LogRequestLite_LogEventLite {
140 displaced := rb.buf[rb.i]
141 rb.buf[rb.i] = entry
142 rb.i++
143 if rb.i == len(rb.buf) {
144 rb.i = 0
145 }
146
147 return displaced
148 }
149
150 // AppendAll appends the contents of all of the batches in the ring buffer to ds t, returning the result.
151 func (rb *ringBuffer) AppendAll(dst []*logpb.LogRequestLite_LogEventLite) []*log pb.LogRequestLite_LogEventLite {
152 for _, entry := range rb.buf {
153 dst = append(dst, entry...)
154 }
155 return dst
156 }
OLDNEW
« no previous file with comments | « common/eventlog/eventlog.go ('k') | common/eventlog/internal/logservice/batch_logger_test.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698