Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(816)

Side by Side Diff: logdog/appengine/coordinator/storage_cache.go

Issue 2435113002: LogDog: Add Storage-layer data caching. (Closed)
Patch Set: Fix byteLimit bug. Created 4 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2015 The LUCI Authors. All rights reserved.
2 // Use of this source code is governed under the Apache License, Version 2.0
3 // that can be found in the LICENSE file.
4
5 package coordinator
6
7 import (
8 "bytes"
9 "compress/zlib"
10 "fmt"
11 "io/ioutil"
12 "strings"
13 "time"
14
15 "github.com/luci/luci-go/common/errors"
16 log "github.com/luci/luci-go/common/logging"
17 "github.com/luci/luci-go/logdog/common/storage/caching"
18
19 "github.com/luci/gae/service/memcache"
20
21 "golang.org/x/net/context"
22 )
23
24 const (
25 schemaVersion = "v1"
26
27 // defaultCompressionThreshold is the threshold where entries will becom e
28 // compressed. If the size of data exceeds this threshold, it will be
29 // compressed with zlib in the cache.
30 defaultCompressionThreshold = 64 * 1024 // 64KiB
31 )
32
33 // StorageCache implements a generic caching.Cache for Storage instances.
34 type StorageCache struct {
35 compressionThreshold int
36 }
37
38 // Get implements caching.Cache.
39 func (sc *StorageCache) Get(c context.Context, items ...*caching.Item) {
40 mcItems := make([]memcache.Item, len(items))
41 for i, itm := range items {
42 mcItems[i] = memcache.NewItem(c, sc.mkCacheKey(itm))
43 }
44
45 err := memcache.Get(c, mcItems...)
46 sc.memcacheErrCB(err, len(mcItems), func(err error, i int) {
47 // By default, no data.
48 items[i].Data = nil
49
50 switch err {
51 case nil:
52 itemData := mcItems[i].Value()
53 if len(itemData) == 0 {
54 log.Warningf(c, "Cached storage missing compress ion byte.")
55 return
56 }
57 isCompressed, itemData := itemData[0], itemData[1:]
58
59 if isCompressed != 0x00 {
60 // This entry is compressed.
61 zr, err := zlib.NewReader(bytes.NewReader(itemDa ta))
62 if err != nil {
63 log.Fields{
64 log.ErrorKey: err,
65 "key": mcItems[i].Key(),
66 }.Warningf(c, "Failed to create ZLIB rea der.")
67 return
68 }
69 defer zr.Close()
70
71 if itemData, err = ioutil.ReadAll(zr); err != ni l {
72 log.Fields{
73 log.ErrorKey: err,
74 "key": mcItems[i].Key(),
75 }.Warningf(c, "Failed to decompress cach ed item.")
76 return
77 }
78 }
79 items[i].Data = itemData
80
81 case memcache.ErrCacheMiss:
82 break
83
84 default:
85 log.Fields{
86 log.ErrorKey: err,
87 "key": mcItems[i].Key(),
88 }.Warningf(c, "Error retrieving cached entry.")
89 }
90 })
91 }
92
93 // Put implements caching.Cache.
94 func (sc *StorageCache) Put(c context.Context, exp time.Duration, items ...*cach ing.Item) {
95 threshold := sc.compressionThreshold
96 if threshold == 0 {
97 threshold = defaultCompressionThreshold
98 }
99
100 var (
101 buf bytes.Buffer
102 zw zlib.Writer
103 usedZW bool
104 )
105 defer func() {
106 if usedZW {
107 zw.Close()
108 }
109 }()
110
111 mcItems := make([]memcache.Item, 0, len(items))
112 for _, itm := range items {
113 if itm.Data == nil {
114 continue
115 }
116
117 // Compress the data in the cache item.
118 writeItemData := func(d []byte) bool {
119 buf.Reset()
120 buf.Grow(len(d) + 1)
121
122 if len(d) < threshold {
123 // Do not compress the item. Write a "0x00" to i ndicate that it is
124 // not compressed.
125 if err := buf.WriteByte(0x00); err != nil {
126 log.WithError(err).Warningf(c, "Failed t o write compression byte.")
127 return false
128 }
129 if _, err := buf.Write(d); err != nil {
130 log.WithError(err).Warningf(c, "Failed t o write storage cache data.")
131 return false
132 }
133 return true
134 }
135
136 // Compress the item. Write a "0x01" to indicate that it is compressed.
137 zw := zlib.NewWriter(&buf)
138 if err := buf.WriteByte(0x01); err != nil {
139 log.WithError(err).Warningf(c, "Failed to write compression byte.")
140 return false
141 }
142 defer zw.Close()
143
144 if _, err := zw.Write(d); err != nil {
145 log.WithError(err).Warningf(c, "Failed to compre ss storage cache data.")
146 return false
147 }
148 if err := zw.Flush(); err != nil {
149 log.WithError(err).Warningf(c, "Failed to flush compressed storage cache data.")
150 return false
151 }
152 return true
153 }
154
155 if !writeItemData(itm.Data) {
156 continue
157 }
158
159 mcItem := memcache.NewItem(c, sc.mkCacheKey(itm))
160 mcItem.SetValue(append([]byte(nil), buf.Bytes()...))
161 if exp > 0 {
162 mcItem.SetExpiration(exp)
163 }
164 mcItems = append(mcItems, mcItem)
165 }
166
167 err := memcache.Set(c, mcItems...)
168 sc.memcacheErrCB(err, len(mcItems), func(err error, i int) {
169 switch err {
170 case nil, memcache.ErrNotStored:
171 break
172
173 default:
174 log.Fields{
175 log.ErrorKey: err,
176 "key": mcItems[i].Key(),
177 }.Warningf(c, "Error storing cached entry.")
178 }
179 })
180 }
181
182 func (*StorageCache) memcacheErrCB(err error, count int, cb func(error, int)) {
183 merr, _ := err.(errors.MultiError)
184 if merr != nil && len(merr) != count {
185 panic(fmt.Errorf("MultiError count mismatch (%d != %d)", len(mer r), count))
186 }
187
188 for i := 0; i < count; i++ {
189 switch {
190 case err == nil:
191 cb(nil, i)
192
193 case merr == nil:
194 cb(err, i)
195
196 default:
197 cb(merr[i], i)
198 }
199 }
200 }
201
202 func (*StorageCache) mkCacheKey(itm *caching.Item) string {
203 return strings.Join([]string{
204 "storage_cache",
205 schemaVersion,
206 itm.Schema,
207 itm.Type,
208 itm.Key,
209 }, "_")
210 }
OLDNEW
« no previous file with comments | « logdog/appengine/coordinator/service.go ('k') | logdog/appengine/coordinator/storage_cache_test.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698