Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(252)

Side by Side Diff: server/internal/logdog/collector/coordinator/cache.go

Issue 1610993002: LogDog: Add collector service implementation. (Closed) Base URL: https://github.com/luci/luci-go@master
Patch Set: Reorganized, cleaned up, comments, and updated for pRPC. Created 4 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 package coordinator
6
7 import (
8 "sync/atomic"
9 "time"
10
11 "github.com/luci/luci-go/common/clock"
12 "github.com/luci/luci-go/common/logdog/types"
13 log "github.com/luci/luci-go/common/logging"
14 "github.com/luci/luci-go/common/lru"
15 "github.com/luci/luci-go/common/promise"
16 "github.com/luci/luci-go/common/proto/logdog/logpb"
17 "golang.org/x/net/context"
18 )
19
20 const (
21 // DefaultSize is the default (maximum) size of the LRU cache.
22 DefaultSize = 1024 * 1024
23
24 // DefaultExpiration is the default expiration value.
25 DefaultExpiration = 10 * time.Minute
26 )
27
28 // cache is a Coordinator interface implementation for the Collector service
29 // that caches remote results locally.
30 type cache struct {
31 Coordinator
32
33 // Size is the number of stream states to hold in the cache. If zero,
34 // DefaultCacheSize will be used.
35 size int
36
37 // expiration is the maximum lifespan of a cache entry. If an entry is o lder
38 // than this, it will be discarded. If zero, DefaultExpiration will be u sed.
39 expiration time.Duration
40
41 // cache is the LRU state cache.
42 lru *lru.Cache
43 }
44
45 // NewCache creates a new Cache instance that wraps a Coordinator instance.
iannucci 2016/02/05 23:41:01 wraps a Coordinator with a cache?
dnj (Google) 2016/02/06 04:10:36 Done.
46 func NewCache(c Coordinator, size int, expiration time.Duration) Coordinator {
47 if size <= 0 {
48 size = DefaultSize
49 }
50 if expiration <= 0 {
51 expiration = DefaultExpiration
52 }
53
54 return &cache{
55 Coordinator: c,
56 expiration: expiration,
57 lru: lru.New(size),
58 }
59 }
60
61 // getOrRegister is a goroutine-safe blocking call that synchronizes log stream
iannucci 2016/02/05 23:41:01 comment desync
dnj (Google) 2016/02/06 04:10:36 Done.
62 // state with the Coordinator.
63 //
64 // If successful, the supplied state will be pushed directly to the
65 // Coordinator service, and the pointer value will be overwritten with the
66 // state returned by the Coordinator service.
67 //
68 // If an error occurs and it is transient, an errors.Transient error will be
69 // returned.
70 func (c *cache) RegisterStream(ctx context.Context, st *LogStreamState, d *logpb .LogStreamDescriptor) (
71 *LogStreamState, error) {
72 now := clock.Now(ctx)
73
74 // Get the cacheEntry from our cache. If it is expired, doesn't exist, o r
75 // we're forcing, ignore any existing entry and replace with a Promise p ending
76 // Coordinator sync.
77 entry := c.lru.Mutate(st.Path, func(current interface{}) interface{} {
78 // Don't replace an existing entry, unless it has an error or ha s expired.
79 if current != nil {
80 curEntry := current.(*cacheEntry)
81 if !curEntry.hasError() && now.Before(curEntry.expiresAt ) {
82 return current
83 }
84 }
85
86 p := promise.New(func() (interface{}, error) {
87 st, err := c.Coordinator.RegisterStream(ctx, st, d)
88 if err != nil {
89 return nil, err
90 }
91
92 return &LogStreamState{
93 Path: st.Path,
94 ProtoVersion: st.ProtoVersion,
95 Secret: st.Secret,
96 TerminalIndex: types.MessageIndex(st.TerminalInd ex),
97 Archived: st.Archived,
98 Purged: st.Purged,
99 }, nil
100 })
101
102 return &cacheEntry{
103 terminalIndex: -1,
104 p: p,
105 path: st.Path,
106 expiresAt: now.Add(c.expiration),
107 }
108 }).(*cacheEntry)
109
110 // If there was an error, purge the erroneous entry from the cache so th at
111 // the next "update" will re-fetch it.
112 st, err := entry.get(ctx)
113 if err != nil {
114 log.Fields{
115 log.ErrorKey: err,
116 }.Errorf(ctx, "Error retrieving stream state.")
117 return nil, err
118 }
119 return st, nil
120 }
121
122 func (c *cache) TerminateStream(ctx context.Context, st *LogStreamState) error {
123 // Immediately update our state cache to record the terminal index, if
124 // we have a state cache.
125 c.lru.Mutate(st.Path, func(current interface{}) (r interface{}) {
126 // Always return the current entry. We're just atomically examin ing it to
127 // load it with a terminal index.
128 r = current
129 if r != nil {
130 r.(*cacheEntry).loadTerminalIndex(st.TerminalIndex)
131 }
132 return
133 })
134
135 return c.Coordinator.TerminateStream(ctx, st)
136 }
137
138 // cacheEntry is the value stored in the cache. It contains a Promise
139 // representing the value and an optional invalidation singleton to ensure that
140 // if the state failed to fetch, it will be invalidated at most once.
141 //
142 // In addition to remote caching via Promise, the state can be updated locally
143 // by calling the cache's "put" method. In this case, the Promise will be nil,
144 // and the state value will be populated.
145 type cacheEntry struct {
146 // terminalIndex is the loaded terminal index set via loadTerminalIndex. It
147 // will be applied to returned LogStreamState objects so that once a ter minal
148 // index has been set, we become aware of it in the Collector.
149 //
150 // This MUST be the first field in the struct in order to comply with at omic's
151 // 64-bit alignment requirements.
152 terminalIndex int64
153
154 // p is a Promise that is blocking pending a Coordiantor stream state
155 // response. Upon successful resolution, it will contain a *LogStreamSta te.
156 p promise.Promise
157 path types.StreamPath
158 expiresAt time.Time
159 }
160
161 // get returns the cached state that this entry owns, blocking until resolution
162 // if necessary.
163 //
164 // The returned state is a shallow copy of the cached state, and may be
165 // modified by the caller.
166 func (e *cacheEntry) get(ctx context.Context) (*LogStreamState, error) {
167 promisedSt, err := e.p.Get(ctx)
168 if err != nil {
169 return nil, err
170 }
171
172 // Create a clone of our cached value (not deep, so secret is not cloned , but
173 // the Collector will not modify that). If we have a local terminal inde x
174 // cached, apply that to the response.
175 //
176 // We need to lock around our terminalIndex.
177 rp := *(promisedSt.(*LogStreamState))
178 if rp.TerminalIndex < 0 {
179 rp.TerminalIndex = types.MessageIndex(atomic.LoadInt64(&e.termin alIndex))
180 }
181
182 return &rp, nil
183 }
184
185 // hasError tests if this entry has completed evaluation with an error state.
186 // This is non-blocking, so if the evaluation hasn't completed, it will return
187 // false.
188 func (e *cacheEntry) hasError() bool {
189 if _, err := e.p.Peek(); err != nil && err != promise.ErrNoData {
190 return true
191 }
192 return false
193 }
194
195 // loadTerminalIndex loads a local cache of the stream's terminal index. This
196 // will be applied to all future get requests.
197 func (e *cacheEntry) loadTerminalIndex(tidx types.MessageIndex) {
198 atomic.StoreInt64(&e.terminalIndex, int64(tidx))
199 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698