Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(275)

Side by Side Diff: server/logdog/storage/bigtable/bigtable.go

Issue 1838803002: LogDog: BigTable batching schema. (Closed) Base URL: https://github.com/luci/luci-go@recordio-split
Patch Set: Created 4 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 package bigtable 5 package bigtable
6 6
7 import ( 7 import (
8 "fmt" 8 "fmt"
9 "time" 9 "time"
10 10
11 "github.com/luci/luci-go/common/errors" 11 "github.com/luci/luci-go/common/errors"
12 "github.com/luci/luci-go/common/grpcutil" 12 "github.com/luci/luci-go/common/grpcutil"
13 log "github.com/luci/luci-go/common/logging"
14 "github.com/luci/luci-go/server/logdog/storage" 13 "github.com/luci/luci-go/server/logdog/storage"
15 "golang.org/x/net/context" 14 "golang.org/x/net/context"
16 "google.golang.org/cloud/bigtable" 15 "google.golang.org/cloud/bigtable"
17 ) 16 )
18 17
19 // errStop is an internal sentinel error used to communicate "stop iteration"
20 // to btTable.getLogData.
21 var errStop = errors.New("stop")
22
23 const ( 18 const (
24 logColumnFamily = "log" 19 logColumnFamily = "log"
25 logColumn = "data" 20 logColumn = "data"
26 ) 21 )
27 22
23 // Limits taken from here:
24 // https://cloud.google.com/bigtable/docs/schema-design
25 const (
26 // bigTableRowMaxBytes is the maximum number of bytes that a single BigT able
27 // row may hold.
28 bigTableRowMaxBytes = 1024 * 1024 * 10 // 10MB
29 )
30
28 // btGetCallback is a callback that is invoked for each log data row returned 31 // btGetCallback is a callback that is invoked for each log data row returned
29 // by getLogData. 32 // by getLogData.
30 // 33 //
31 // If an error is encountered, no more log data will be fetched. The error will 34 // If an error is encountered, no more log data will be fetched. The error will
32 // be propagated to the getLogData call unless the returned error is errStop, in 35 // be propagated to the getLogData call.
33 // which case iteration will stop and getLogData will return nil.
34 type btGetCallback func(*rowKey, []byte) error 36 type btGetCallback func(*rowKey, []byte) error
nodir 2016/03/30 18:29:42 [general feedback] you don't have to prefix types
dnj 2016/03/30 18:37:13 Yep, but in this case, this is a BigTable-specific
35 37
36 // btTable is a general interface for BigTable operations intended to enable 38 // btTable is a general interface for BigTable operations intended to enable
37 // unit tests to stub out BigTable without adding runtime inefficiency. 39 // unit tests to stub out BigTable without adding runtime inefficiency.
38 type btTable interface { 40 type btTable interface {
39 // putLogData adds new log data to BigTable. 41 // putLogData adds new log data to BigTable.
40 // 42 //
41 // If data already exists for the named row, it will return storage.ErrE xists 43 // If data already exists for the named row, it will return storage.ErrE xists
42 // and not add the data. 44 // and not add the data.
43 putLogData(context.Context, *rowKey, []byte) error 45 putLogData(context.Context, *rowKey, []byte) error
44 46
(...skipping 13 matching lines...) Expand all
58 setMaxLogAge(context.Context, time.Duration) error 60 setMaxLogAge(context.Context, time.Duration) error
59 } 61 }
60 62
61 // btTableProd is an implementation of the btTable interface that uses a real 63 // btTableProd is an implementation of the btTable interface that uses a real
62 // production BigTable connection. 64 // production BigTable connection.
63 type btTableProd struct { 65 type btTableProd struct {
64 *btStorage 66 *btStorage
65 } 67 }
66 68
67 func (t *btTableProd) putLogData(c context.Context, rk *rowKey, data []byte) err or { 69 func (t *btTableProd) putLogData(c context.Context, rk *rowKey, data []byte) err or {
68 table, err := t.getLogTable()
69 if err != nil {
70 return err
71 }
72
73 m := bigtable.NewMutation() 70 m := bigtable.NewMutation()
74 m.Set(logColumnFamily, logColumn, bigtable.ServerTime, data) 71 m.Set(logColumnFamily, logColumn, bigtable.ServerTime, data)
75 cm := bigtable.NewCondMutation(bigtable.RowKeyFilter(rk.encode()), nil, m) 72 cm := bigtable.NewCondMutation(bigtable.RowKeyFilter(rk.encode()), nil, m)
76 73
77 rowExists := false 74 rowExists := false
78 » if err = table.Apply(c, rk.encode(), cm, bigtable.GetCondMutationResult( &rowExists)); err != nil { 75 » if err := t.logTable.Apply(c, rk.encode(), cm, bigtable.GetCondMutationR esult(&rowExists)); err != nil {
79 return wrapTransient(err) 76 return wrapTransient(err)
80 } 77 }
81 if rowExists { 78 if rowExists {
82 return storage.ErrExists 79 return storage.ErrExists
83 } 80 }
84 return nil 81 return nil
85 } 82 }
86 83
87 func (t *btTableProd) getLogData(c context.Context, rk *rowKey, limit int, keysO nly bool, cb btGetCallback) error { 84 func (t *btTableProd) getLogData(c context.Context, rk *rowKey, limit int, keysO nly bool, cb btGetCallback) error {
88 table, err := t.getLogTable()
89 if err != nil {
90 return err
91 }
92 // Construct read options based on Get request. 85 // Construct read options based on Get request.
93 ropts := []bigtable.ReadOption{ 86 ropts := []bigtable.ReadOption{
94 bigtable.RowFilter(bigtable.FamilyFilter(logColumnFamily)), 87 bigtable.RowFilter(bigtable.FamilyFilter(logColumnFamily)),
95 bigtable.RowFilter(bigtable.ColumnFilter(logColumn)), 88 bigtable.RowFilter(bigtable.ColumnFilter(logColumn)),
96 nil, 89 nil,
97 nil, 90 nil,
98 }[:2] 91 }[:2]
99 if keysOnly { 92 if keysOnly {
100 ropts = append(ropts, bigtable.RowFilter(bigtable.StripValueFilt er())) 93 ropts = append(ropts, bigtable.RowFilter(bigtable.StripValueFilt er()))
101 } 94 }
102 if limit > 0 { 95 if limit > 0 {
103 ropts = append(ropts, bigtable.LimitRows(int64(limit))) 96 ropts = append(ropts, bigtable.LimitRows(int64(limit)))
104 } 97 }
105 98
106 // This will limit the range to the immediate row key ("ASDF~INDEX") to 99 // This will limit the range to the immediate row key ("ASDF~INDEX") to
107 // immediately after the row key ("ASDF~~"). See rowKey for more informa tion. 100 // immediately after the row key ("ASDF~~"). See rowKey for more informa tion.
108 rng := bigtable.NewRange(rk.encode(), rk.pathPrefixUpperBound()) 101 rng := bigtable.NewRange(rk.encode(), rk.pathPrefixUpperBound())
109 102
110 innerErr := error(nil) 103 innerErr := error(nil)
111 » err = table.ReadRows(c, rng, func(row bigtable.Row) bool { 104 » err := t.logTable.ReadRows(c, rng, func(row bigtable.Row) bool {
112 data := []byte(nil) 105 data := []byte(nil)
113 if !keysOnly { 106 if !keysOnly {
114 err := error(nil) 107 err := error(nil)
115 data, err = getLogData(row) 108 data, err = getLogData(row)
116 if err != nil { 109 if err != nil {
117 innerErr = storage.ErrBadData 110 innerErr = storage.ErrBadData
118 return false 111 return false
119 } 112 }
120 } 113 }
121 114
122 drk, err := decodeRowKey(row.Key()) 115 drk, err := decodeRowKey(row.Key())
123 if err != nil { 116 if err != nil {
124 » » » log.Fields{ 117 » » » innerErr = err
125 » » » » log.ErrorKey: err,
126 » » » » "value": row.Key(),
127 » » » }.Warningf(c, "Failed to parse row key.")
128 » » » innerErr = storage.ErrBadData
129 return false 118 return false
130 } 119 }
131 120
132 if err := cb(drk, data); err != nil { 121 if err := cb(drk, data); err != nil {
133 » » » if err != errStop { 122 » » » innerErr = err
134 » » » » innerErr = err
135 » » » }
136 return false 123 return false
137 } 124 }
138 return true 125 return true
139 }, ropts...) 126 }, ropts...)
140 if err == nil { 127 if err == nil {
141 err = innerErr 128 err = innerErr
142 } 129 }
143 return wrapTransient(err) 130 return wrapTransient(err)
144 } 131 }
145 132
146 func (t *btTableProd) setMaxLogAge(c context.Context, d time.Duration) error { 133 func (t *btTableProd) setMaxLogAge(c context.Context, d time.Duration) error {
147 ac, err := t.getAdminClient()
148 if err != nil {
149 return err
150 }
151
152 var logGCPolicy bigtable.GCPolicy 134 var logGCPolicy bigtable.GCPolicy
153 if d > 0 { 135 if d > 0 {
154 logGCPolicy = bigtable.MaxAgePolicy(d) 136 logGCPolicy = bigtable.MaxAgePolicy(d)
155 } 137 }
156 » if err := ac.SetGCPolicy(c, t.LogTable, logColumnFamily, logGCPolicy); e rr != nil { 138 » if err := t.adminClient.SetGCPolicy(c, t.LogTable, logColumnFamily, logG CPolicy); err != nil {
157 return wrapTransient(err) 139 return wrapTransient(err)
158 } 140 }
159 return nil 141 return nil
160 } 142 }
161 143
162 // wrapTransient wraps the supplied error in an errors.TransientError if it is 144 // wrapTransient wraps the supplied error in an errors.TransientError if it is
163 // transient. 145 // transient.
164 func wrapTransient(err error) error { 146 func wrapTransient(err error) error {
165 if isTransient(err) { 147 if isTransient(err) {
166 err = errors.WrapTransient(err) 148 err = errors.WrapTransient(err)
(...skipping 31 matching lines...) Expand 10 before | Expand all | Expand 10 after
198 180
199 // Get the specific ReadItem for our column 181 // Get the specific ReadItem for our column
200 colName := fmt.Sprintf("%s:%s", family, column) 182 colName := fmt.Sprintf("%s:%s", family, column)
201 for _, item := range items { 183 for _, item := range items {
202 if item.Column == colName { 184 if item.Column == colName {
203 return &item 185 return &item
204 } 186 }
205 } 187 }
206 return nil 188 return nil
207 } 189 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698