Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(329)

Unified Diff: logdog/appengine/coordinator/storage_cache.go

Issue 2435113002: LogDog: Add Storage-layer data caching. (Closed)
Patch Set: Fix byteLimit bug. Created 4 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « logdog/appengine/coordinator/service.go ('k') | logdog/appengine/coordinator/storage_cache_test.go » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: logdog/appengine/coordinator/storage_cache.go
diff --git a/logdog/appengine/coordinator/storage_cache.go b/logdog/appengine/coordinator/storage_cache.go
new file mode 100644
index 0000000000000000000000000000000000000000..9a5f45690ef6c1e762dab3f59a01d3b96309cc6d
--- /dev/null
+++ b/logdog/appengine/coordinator/storage_cache.go
@@ -0,0 +1,210 @@
+// Copyright 2015 The LUCI Authors. All rights reserved.
+// Use of this source code is governed under the Apache License, Version 2.0
+// that can be found in the LICENSE file.
+
+package coordinator
+
+import (
+ "bytes"
+ "compress/zlib"
+ "fmt"
+ "io/ioutil"
+ "strings"
+ "time"
+
+ "github.com/luci/luci-go/common/errors"
+ log "github.com/luci/luci-go/common/logging"
+ "github.com/luci/luci-go/logdog/common/storage/caching"
+
+ "github.com/luci/gae/service/memcache"
+
+ "golang.org/x/net/context"
+)
+
+const (
+ schemaVersion = "v1"
+
+ // defaultCompressionThreshold is the threshold where entries will become
+ // compressed. If the size of data exceeds this threshold, it will be
+ // compressed with zlib in the cache.
+ defaultCompressionThreshold = 64 * 1024 // 64KiB
+)
+
+// StorageCache implements a generic caching.Cache for Storage instances.
+type StorageCache struct {
+ compressionThreshold int
+}
+
+// Get implements caching.Cache.
+func (sc *StorageCache) Get(c context.Context, items ...*caching.Item) {
+ mcItems := make([]memcache.Item, len(items))
+ for i, itm := range items {
+ mcItems[i] = memcache.NewItem(c, sc.mkCacheKey(itm))
+ }
+
+ err := memcache.Get(c, mcItems...)
+ sc.memcacheErrCB(err, len(mcItems), func(err error, i int) {
+ // By default, no data.
+ items[i].Data = nil
+
+ switch err {
+ case nil:
+ itemData := mcItems[i].Value()
+ if len(itemData) == 0 {
+ log.Warningf(c, "Cached storage missing compression byte.")
+ return
+ }
+ isCompressed, itemData := itemData[0], itemData[1:]
+
+ if isCompressed != 0x00 {
+ // This entry is compressed.
+ zr, err := zlib.NewReader(bytes.NewReader(itemData))
+ if err != nil {
+ log.Fields{
+ log.ErrorKey: err,
+ "key": mcItems[i].Key(),
+ }.Warningf(c, "Failed to create ZLIB reader.")
+ return
+ }
+ defer zr.Close()
+
+ if itemData, err = ioutil.ReadAll(zr); err != nil {
+ log.Fields{
+ log.ErrorKey: err,
+ "key": mcItems[i].Key(),
+ }.Warningf(c, "Failed to decompress cached item.")
+ return
+ }
+ }
+ items[i].Data = itemData
+
+ case memcache.ErrCacheMiss:
+ break
+
+ default:
+ log.Fields{
+ log.ErrorKey: err,
+ "key": mcItems[i].Key(),
+ }.Warningf(c, "Error retrieving cached entry.")
+ }
+ })
+}
+
+// Put implements caching.Cache.
+func (sc *StorageCache) Put(c context.Context, exp time.Duration, items ...*caching.Item) {
+ threshold := sc.compressionThreshold
+ if threshold == 0 {
+ threshold = defaultCompressionThreshold
+ }
+
+ var (
+ buf bytes.Buffer
+ zw zlib.Writer
+ usedZW bool
+ )
+ defer func() {
+ if usedZW {
+ zw.Close()
+ }
+ }()
+
+ mcItems := make([]memcache.Item, 0, len(items))
+ for _, itm := range items {
+ if itm.Data == nil {
+ continue
+ }
+
+ // Compress the data in the cache item.
+ writeItemData := func(d []byte) bool {
+ buf.Reset()
+ buf.Grow(len(d) + 1)
+
+ if len(d) < threshold {
+ // Do not compress the item. Write a "0x00" to indicate that it is
+ // not compressed.
+ if err := buf.WriteByte(0x00); err != nil {
+ log.WithError(err).Warningf(c, "Failed to write compression byte.")
+ return false
+ }
+ if _, err := buf.Write(d); err != nil {
+ log.WithError(err).Warningf(c, "Failed to write storage cache data.")
+ return false
+ }
+ return true
+ }
+
+ // Compress the item. Write a "0x01" to indicate that it is compressed.
+ zw := zlib.NewWriter(&buf)
+ if err := buf.WriteByte(0x01); err != nil {
+ log.WithError(err).Warningf(c, "Failed to write compression byte.")
+ return false
+ }
+ defer zw.Close()
+
+ if _, err := zw.Write(d); err != nil {
+ log.WithError(err).Warningf(c, "Failed to compress storage cache data.")
+ return false
+ }
+ if err := zw.Flush(); err != nil {
+ log.WithError(err).Warningf(c, "Failed to flush compressed storage cache data.")
+ return false
+ }
+ return true
+ }
+
+ if !writeItemData(itm.Data) {
+ continue
+ }
+
+ mcItem := memcache.NewItem(c, sc.mkCacheKey(itm))
+ mcItem.SetValue(append([]byte(nil), buf.Bytes()...))
+ if exp > 0 {
+ mcItem.SetExpiration(exp)
+ }
+ mcItems = append(mcItems, mcItem)
+ }
+
+ err := memcache.Set(c, mcItems...)
+ sc.memcacheErrCB(err, len(mcItems), func(err error, i int) {
+ switch err {
+ case nil, memcache.ErrNotStored:
+ break
+
+ default:
+ log.Fields{
+ log.ErrorKey: err,
+ "key": mcItems[i].Key(),
+ }.Warningf(c, "Error storing cached entry.")
+ }
+ })
+}
+
+func (*StorageCache) memcacheErrCB(err error, count int, cb func(error, int)) {
+ merr, _ := err.(errors.MultiError)
+ if merr != nil && len(merr) != count {
+ panic(fmt.Errorf("MultiError count mismatch (%d != %d)", len(merr), count))
+ }
+
+ for i := 0; i < count; i++ {
+ switch {
+ case err == nil:
+ cb(nil, i)
+
+ case merr == nil:
+ cb(err, i)
+
+ default:
+ cb(merr[i], i)
+ }
+ }
+}
+
+func (*StorageCache) mkCacheKey(itm *caching.Item) string {
+ return strings.Join([]string{
+ "storage_cache",
+ schemaVersion,
+ itm.Schema,
+ itm.Type,
+ itm.Key,
+ }, "_")
+}
« no previous file with comments | « logdog/appengine/coordinator/service.go ('k') | logdog/appengine/coordinator/storage_cache_test.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698