| OLD | NEW |
| (Empty) | |
| 1 // Copyright 2015 The LUCI Authors. All rights reserved. |
| 2 // Use of this source code is governed under the Apache License, Version 2.0 |
| 3 // that can be found in the LICENSE file. |
| 4 |
| 5 package coordinator |
| 6 |
| 7 import ( |
| 8 "bytes" |
| 9 "compress/zlib" |
| 10 "fmt" |
| 11 "io/ioutil" |
| 12 "strings" |
| 13 "time" |
| 14 |
| 15 "github.com/luci/luci-go/common/errors" |
| 16 log "github.com/luci/luci-go/common/logging" |
| 17 "github.com/luci/luci-go/logdog/common/storage/caching" |
| 18 |
| 19 "github.com/luci/gae/service/memcache" |
| 20 |
| 21 "golang.org/x/net/context" |
| 22 ) |
| 23 |
| 24 const ( |
| 25 schemaVersion = "v1" |
| 26 |
| 27 // defaultCompressionThreshold is the threshold where entries will becom
e |
| 28 // compressed. If the size of data exceeds this threshold, it will be |
| 29 // compressed with zlib in the cache. |
| 30 defaultCompressionThreshold = 64 * 1024 // 64KiB |
| 31 ) |
| 32 |
| 33 // StorageCache implements a generic caching.Cache for Storage instances. |
| 34 type StorageCache struct { |
| 35 compressionThreshold int |
| 36 } |
| 37 |
| 38 // Get implements caching.Cache. |
| 39 func (sc *StorageCache) Get(c context.Context, items ...*caching.Item) { |
| 40 mcItems := make([]memcache.Item, len(items)) |
| 41 for i, itm := range items { |
| 42 mcItems[i] = memcache.NewItem(c, sc.mkCacheKey(itm)) |
| 43 } |
| 44 |
| 45 err := memcache.Get(c, mcItems...) |
| 46 sc.memcacheErrCB(err, len(mcItems), func(err error, i int) { |
| 47 // By default, no data. |
| 48 items[i].Data = nil |
| 49 |
| 50 switch err { |
| 51 case nil: |
| 52 itemData := mcItems[i].Value() |
| 53 if len(itemData) == 0 { |
| 54 log.Warningf(c, "Cached storage missing compress
ion byte.") |
| 55 return |
| 56 } |
| 57 isCompressed, itemData := itemData[0], itemData[1:] |
| 58 |
| 59 if isCompressed != 0x00 { |
| 60 // This entry is compressed. |
| 61 zr, err := zlib.NewReader(bytes.NewReader(itemDa
ta)) |
| 62 if err != nil { |
| 63 log.Fields{ |
| 64 log.ErrorKey: err, |
| 65 "key": mcItems[i].Key(), |
| 66 }.Warningf(c, "Failed to create ZLIB rea
der.") |
| 67 return |
| 68 } |
| 69 defer zr.Close() |
| 70 |
| 71 if itemData, err = ioutil.ReadAll(zr); err != ni
l { |
| 72 log.Fields{ |
| 73 log.ErrorKey: err, |
| 74 "key": mcItems[i].Key(), |
| 75 }.Warningf(c, "Failed to decompress cach
ed item.") |
| 76 return |
| 77 } |
| 78 } |
| 79 items[i].Data = itemData |
| 80 |
| 81 case memcache.ErrCacheMiss: |
| 82 break |
| 83 |
| 84 default: |
| 85 log.Fields{ |
| 86 log.ErrorKey: err, |
| 87 "key": mcItems[i].Key(), |
| 88 }.Warningf(c, "Error retrieving cached entry.") |
| 89 } |
| 90 }) |
| 91 } |
| 92 |
| 93 // Put implements caching.Cache. |
| 94 func (sc *StorageCache) Put(c context.Context, exp time.Duration, items ...*cach
ing.Item) { |
| 95 threshold := sc.compressionThreshold |
| 96 if threshold == 0 { |
| 97 threshold = defaultCompressionThreshold |
| 98 } |
| 99 |
| 100 var ( |
| 101 buf bytes.Buffer |
| 102 zw zlib.Writer |
| 103 usedZW bool |
| 104 ) |
| 105 defer func() { |
| 106 if usedZW { |
| 107 zw.Close() |
| 108 } |
| 109 }() |
| 110 |
| 111 mcItems := make([]memcache.Item, 0, len(items)) |
| 112 for _, itm := range items { |
| 113 if itm.Data == nil { |
| 114 continue |
| 115 } |
| 116 |
| 117 // Compress the data in the cache item. |
| 118 writeItemData := func(d []byte) bool { |
| 119 buf.Reset() |
| 120 buf.Grow(len(d) + 1) |
| 121 |
| 122 if len(d) < threshold { |
| 123 // Do not compress the item. Write a "0x00" to i
ndicate that it is |
| 124 // not compressed. |
| 125 if err := buf.WriteByte(0x00); err != nil { |
| 126 log.WithError(err).Warningf(c, "Failed t
o write compression byte.") |
| 127 return false |
| 128 } |
| 129 if _, err := buf.Write(d); err != nil { |
| 130 log.WithError(err).Warningf(c, "Failed t
o write storage cache data.") |
| 131 return false |
| 132 } |
| 133 return true |
| 134 } |
| 135 |
| 136 // Compress the item. Write a "0x01" to indicate that it
is compressed. |
| 137 zw := zlib.NewWriter(&buf) |
| 138 if err := buf.WriteByte(0x01); err != nil { |
| 139 log.WithError(err).Warningf(c, "Failed to write
compression byte.") |
| 140 return false |
| 141 } |
| 142 defer zw.Close() |
| 143 |
| 144 if _, err := zw.Write(d); err != nil { |
| 145 log.WithError(err).Warningf(c, "Failed to compre
ss storage cache data.") |
| 146 return false |
| 147 } |
| 148 if err := zw.Flush(); err != nil { |
| 149 log.WithError(err).Warningf(c, "Failed to flush
compressed storage cache data.") |
| 150 return false |
| 151 } |
| 152 return true |
| 153 } |
| 154 |
| 155 if !writeItemData(itm.Data) { |
| 156 continue |
| 157 } |
| 158 |
| 159 mcItem := memcache.NewItem(c, sc.mkCacheKey(itm)) |
| 160 mcItem.SetValue(append([]byte(nil), buf.Bytes()...)) |
| 161 if exp > 0 { |
| 162 mcItem.SetExpiration(exp) |
| 163 } |
| 164 mcItems = append(mcItems, mcItem) |
| 165 } |
| 166 |
| 167 err := memcache.Set(c, mcItems...) |
| 168 sc.memcacheErrCB(err, len(mcItems), func(err error, i int) { |
| 169 switch err { |
| 170 case nil, memcache.ErrNotStored: |
| 171 break |
| 172 |
| 173 default: |
| 174 log.Fields{ |
| 175 log.ErrorKey: err, |
| 176 "key": mcItems[i].Key(), |
| 177 }.Warningf(c, "Error storing cached entry.") |
| 178 } |
| 179 }) |
| 180 } |
| 181 |
| 182 func (*StorageCache) memcacheErrCB(err error, count int, cb func(error, int)) { |
| 183 merr, _ := err.(errors.MultiError) |
| 184 if merr != nil && len(merr) != count { |
| 185 panic(fmt.Errorf("MultiError count mismatch (%d != %d)", len(mer
r), count)) |
| 186 } |
| 187 |
| 188 for i := 0; i < count; i++ { |
| 189 switch { |
| 190 case err == nil: |
| 191 cb(nil, i) |
| 192 |
| 193 case merr == nil: |
| 194 cb(err, i) |
| 195 |
| 196 default: |
| 197 cb(merr[i], i) |
| 198 } |
| 199 } |
| 200 } |
| 201 |
| 202 func (*StorageCache) mkCacheKey(itm *caching.Item) string { |
| 203 return strings.Join([]string{ |
| 204 "storage_cache", |
| 205 schemaVersion, |
| 206 itm.Schema, |
| 207 itm.Type, |
| 208 itm.Key, |
| 209 }, "_") |
| 210 } |
| OLD | NEW |