Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(75)

Side by Side Diff: common/eventlog/internal/logservice/batch_logger.go

Issue 2557593002: Add batch logging support to eventlog. (Closed)
Patch Set: Created 4 years ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2016 The LUCI Authors. All rights reserved.
2 // Use of this source code is governed under the Apache License, Version 2.0
3 // that can be found in the LICENSE file.
4
5 // Package logservice provides loggers which can be used to to collect and send batches of logs to the eventlog service.
6 package logservice
7
8 import (
9 "context"
10 "log"
11 "sync"
12 "time"
13
14 logpb "github.com/luci/luci-go/common/eventlog/proto"
15 )
16
17 // numRetries is the number of retry attempts we will make to log a given event in the face of transient errors.
18 const numRetries = 3
19
20 // BatchLogger accumlates log events and sends them in batches. It is safe for c oncurrent use.
21 type BatchLogger struct {
22 logger syncLogger
23
24 // ctx is the context to use when uploading batch of entries.
25 ctx context.Context
26
27 wg sync.WaitGroup
28 tickc <-chan time.Time // receives from tickc trigger uploads.
29 stopc chan struct{} // closing stopc indicates that the BatchLogger s hould shut down.
30
31 // retries groups log events by the number of retries remaining after th e next upload attempt.
32 retries *ringBuffer
33
34 // pending is a list of log events that have not yet had an upload attem pt.
35 mu sync.Mutex
36 pending []*logpb.LogRequestLite_LogEventLite
37 }
38
39 type syncLogger interface {
40 LogSync(context.Context, ...*logpb.LogRequestLite_LogEventLite) error
41 }
42
43 // NewBatchLogger constructs a new BatchLogger.
44 // The supplied context will be used when logging entries supplied via the Log m ethod.
45 // Its Close method must be called when it is no longer needed.
46 func NewBatchLogger(ctx context.Context, logger *Logger, uploadTicker <-chan tim e.Time) *BatchLogger {
47 return newBatchLogger(ctx, logger, uploadTicker)
48 }
49
50 func newBatchLogger(ctx context.Context, logger syncLogger, uploadTicker <-chan time.Time) *BatchLogger {
51 bl := &BatchLogger{
52 logger: logger,
53 }
54
55 bl.wg.Add(1)
56 bl.stopc = make(chan struct{})
djd-OOO-Apr2017 2016/12/06 06:31:27 Move these up into the struct literal.
mcgreevy 2016/12/06 23:57:36 Done.
57 bl.tickc = uploadTicker
58 go func() {
59 defer bl.wg.Done()
60 for {
61 select {
62 case <-bl.tickc:
63 bl.upload()
64 case <-bl.stopc:
65 return
66 }
67 }
68 }()
69 return bl
70 }
71
72 // Close flushes any pending logs and releases any resources held by the logger.
73 // Once Close has been returned, no more retry attempts will be made.
74 // Close should be called when the logger is no longer needed.
75 func (bl *BatchLogger) Close() {
76 close(bl.stopc)
77 bl.wg.Wait()
78
79 // Final upload.
80 // TODO(mcgreevy): flush outstanding events with exponential backoff.
81 bl.upload()
82 }
83
84 // Log stages events to be logged to the eventlog service.
85 // The EventTime in each event must have been obtained from time.Now.
86 // Log returns immediately, and batches of events will be sent to the eventlog s erver periodically.
87 func (bl *BatchLogger) Log(events ...*logpb.LogRequestLite_LogEventLite) {
88 bl.mu.Lock()
89 defer bl.mu.Unlock()
90 bl.pending = append(bl.pending, events...)
91 }
92
93 func (bl *BatchLogger) upload() {
94 // TODO: split uploads into multiple requests if size threshold is excee ded.
95 bl.mu.Lock()
96 pending := bl.pending
97 bl.pending = nil
djd-OOO-Apr2017 2016/12/06 06:31:27 maybe bl.pending = bl.pending[:0] to reuse the sli
mcgreevy 2016/12/06 23:57:36 If I were to do that, I would have to lock the mut
98 bl.mu.Unlock()
99
100 if pending == nil && bl.retries == nil {
djd-OOO-Apr2017 2016/12/06 06:31:27 len(pending) == 0 seems less fragile to future cha
mcgreevy 2016/12/06 23:57:36 Done.
101 return
102 }
103
104 // Normal case: nothing to retry.
105 if bl.retries == nil {
106 if err := bl.logger.LogSync(bl.ctx, pending...); ShouldRetry(err ) {
107 bl.retries = &ringBuffer{}
108 bl.retries.Push(pending)
109 }
110 return
111 }
112
113 // get a slice of elements for which this is our final attempt.
114 toUpload := bl.retries.Push(pending)
115
116 // record how many entries there are in our final attempt (for failure r eporting) before modifying this slice.
117 dropLen := len(toUpload)
djd-OOO-Apr2017 2016/12/06 06:31:27 Where is toUpload modified?
mcgreevy 2016/12/06 23:57:36 It is passed to bl.retries.AppendAll as a destinat
djd-OOO-Apr2017 2016/12/07 00:31:25 It looks like the len of toUpload can't change tho
mcgreevy 2016/12/07 02:42:37 Done, but also un-inlined the call to AppendAll to
118
119 // Append to toUpload all of the entries that we want to attempt uploadi ng.
120 err := bl.logger.LogSync(bl.ctx, bl.retries.AppendAll(toUpload)...)
121 if err == nil {
122 bl.retries = nil
123 return
124 }
125
126 if dropLen > 0 {
127 log.Printf("eventlog: WARNING: retries exhausted. Dropping %d ev ents.", dropLen)
128 }
129
130 if !ShouldRetry(err) {
131 bl.retries = nil
132 }
133 }
134
135 // ring Buffer is a fixed capacity buffer. When at capacity, a Push into the buf fer causes an element to be displaced.
136 type ringBuffer struct {
137 buf [numRetries][]*logpb.LogRequestLite_LogEventLite
138 i int // the next place to insert.
139 }
140
141 // Push inserts a batch of entries into the ring buffer, and returns the batch w hich it displaces.
142 func (rb *ringBuffer) Push(entry []*logpb.LogRequestLite_LogEventLite) []*logpb. LogRequestLite_LogEventLite {
143 displaced := rb.buf[rb.i]
144 rb.buf[rb.i] = entry
145 rb.i++
146 if rb.i == len(rb.buf) {
147 rb.i = 0
148 }
149
150 return displaced
151 }
152
153 // AppendAll appends the contents of all of the batches in the ring buffer to ds t, returning the result.
154 func (rb *ringBuffer) AppendAll(dst []*logpb.LogRequestLite_LogEventLite) []*log pb.LogRequestLite_LogEventLite {
155 for _, entry := range rb.buf {
156 dst = append(dst, entry...)
157 }
158 return dst
159 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698