Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 package bigtable | 5 package bigtable |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "fmt" | 8 "fmt" |
| 9 "time" | 9 "time" |
| 10 | 10 |
| 11 "github.com/luci/luci-go/common/errors" | 11 "github.com/luci/luci-go/common/errors" |
| 12 "github.com/luci/luci-go/common/grpcutil" | 12 "github.com/luci/luci-go/common/grpcutil" |
| 13 log "github.com/luci/luci-go/common/logging" | |
| 14 "github.com/luci/luci-go/server/logdog/storage" | 13 "github.com/luci/luci-go/server/logdog/storage" |
| 15 "golang.org/x/net/context" | 14 "golang.org/x/net/context" |
| 16 "google.golang.org/cloud/bigtable" | 15 "google.golang.org/cloud/bigtable" |
| 17 ) | 16 ) |
| 18 | 17 |
| 19 // errStop is an internal sentinel error used to communicate "stop iteration" | |
| 20 // to btTable.getLogData. | |
| 21 var errStop = errors.New("stop") | |
| 22 | |
| 23 const ( | 18 const ( |
| 24 logColumnFamily = "log" | 19 logColumnFamily = "log" |
| 25 logColumn = "data" | 20 logColumn = "data" |
| 26 ) | 21 ) |
| 27 | 22 |
| 23 // Limits taken from here: | |
| 24 // https://cloud.google.com/bigtable/docs/schema-design | |
| 25 const ( | |
| 26 // bigTableRowMaxBytes is the maximum number of bytes that a single BigT able | |
| 27 // row may hold. | |
| 28 bigTableRowMaxBytes = 1024 * 1024 * 10 // 10MB | |
| 29 ) | |
| 30 | |
| 28 // btGetCallback is a callback that is invoked for each log data row returned | 31 // btGetCallback is a callback that is invoked for each log data row returned |
| 29 // by getLogData. | 32 // by getLogData. |
| 30 // | 33 // |
| 31 // If an error is encountered, no more log data will be fetched. The error will | 34 // If an error is encountered, no more log data will be fetched. The error will |
| 32 // be propagated to the getLogData call unless the returned error is errStop, in | 35 // be propagated to the getLogData call. |
| 33 // which case iteration will stop and getLogData will return nil. | |
| 34 type btGetCallback func(*rowKey, []byte) error | 36 type btGetCallback func(*rowKey, []byte) error |
|
nodir
2016/03/30 18:29:42
[general feedback] you don't have to prefix types
dnj
2016/03/30 18:37:13
Yep, but in this case, this is a BigTable-specific
| |
| 35 | 37 |
| 36 // btTable is a general interface for BigTable operations intended to enable | 38 // btTable is a general interface for BigTable operations intended to enable |
| 37 // unit tests to stub out BigTable without adding runtime inefficiency. | 39 // unit tests to stub out BigTable without adding runtime inefficiency. |
| 38 type btTable interface { | 40 type btTable interface { |
| 39 // putLogData adds new log data to BigTable. | 41 // putLogData adds new log data to BigTable. |
| 40 // | 42 // |
| 41 // If data already exists for the named row, it will return storage.ErrE xists | 43 // If data already exists for the named row, it will return storage.ErrE xists |
| 42 // and not add the data. | 44 // and not add the data. |
| 43 putLogData(context.Context, *rowKey, []byte) error | 45 putLogData(context.Context, *rowKey, []byte) error |
| 44 | 46 |
| (...skipping 13 matching lines...) Expand all Loading... | |
| 58 setMaxLogAge(context.Context, time.Duration) error | 60 setMaxLogAge(context.Context, time.Duration) error |
| 59 } | 61 } |
| 60 | 62 |
| 61 // btTableProd is an implementation of the btTable interface that uses a real | 63 // btTableProd is an implementation of the btTable interface that uses a real |
| 62 // production BigTable connection. | 64 // production BigTable connection. |
| 63 type btTableProd struct { | 65 type btTableProd struct { |
| 64 *btStorage | 66 *btStorage |
| 65 } | 67 } |
| 66 | 68 |
| 67 func (t *btTableProd) putLogData(c context.Context, rk *rowKey, data []byte) err or { | 69 func (t *btTableProd) putLogData(c context.Context, rk *rowKey, data []byte) err or { |
| 68 table, err := t.getLogTable() | |
| 69 if err != nil { | |
| 70 return err | |
| 71 } | |
| 72 | |
| 73 m := bigtable.NewMutation() | 70 m := bigtable.NewMutation() |
| 74 m.Set(logColumnFamily, logColumn, bigtable.ServerTime, data) | 71 m.Set(logColumnFamily, logColumn, bigtable.ServerTime, data) |
| 75 cm := bigtable.NewCondMutation(bigtable.RowKeyFilter(rk.encode()), nil, m) | 72 cm := bigtable.NewCondMutation(bigtable.RowKeyFilter(rk.encode()), nil, m) |
| 76 | 73 |
| 77 rowExists := false | 74 rowExists := false |
| 78 » if err = table.Apply(c, rk.encode(), cm, bigtable.GetCondMutationResult( &rowExists)); err != nil { | 75 » if err := t.logTable.Apply(c, rk.encode(), cm, bigtable.GetCondMutationR esult(&rowExists)); err != nil { |
| 79 return wrapTransient(err) | 76 return wrapTransient(err) |
| 80 } | 77 } |
| 81 if rowExists { | 78 if rowExists { |
| 82 return storage.ErrExists | 79 return storage.ErrExists |
| 83 } | 80 } |
| 84 return nil | 81 return nil |
| 85 } | 82 } |
| 86 | 83 |
| 87 func (t *btTableProd) getLogData(c context.Context, rk *rowKey, limit int, keysO nly bool, cb btGetCallback) error { | 84 func (t *btTableProd) getLogData(c context.Context, rk *rowKey, limit int, keysO nly bool, cb btGetCallback) error { |
| 88 table, err := t.getLogTable() | |
| 89 if err != nil { | |
| 90 return err | |
| 91 } | |
| 92 // Construct read options based on Get request. | 85 // Construct read options based on Get request. |
| 93 ropts := []bigtable.ReadOption{ | 86 ropts := []bigtable.ReadOption{ |
| 94 bigtable.RowFilter(bigtable.FamilyFilter(logColumnFamily)), | 87 bigtable.RowFilter(bigtable.FamilyFilter(logColumnFamily)), |
| 95 bigtable.RowFilter(bigtable.ColumnFilter(logColumn)), | 88 bigtable.RowFilter(bigtable.ColumnFilter(logColumn)), |
| 96 nil, | 89 nil, |
| 97 nil, | 90 nil, |
| 98 }[:2] | 91 }[:2] |
| 99 if keysOnly { | 92 if keysOnly { |
| 100 ropts = append(ropts, bigtable.RowFilter(bigtable.StripValueFilt er())) | 93 ropts = append(ropts, bigtable.RowFilter(bigtable.StripValueFilt er())) |
| 101 } | 94 } |
| 102 if limit > 0 { | 95 if limit > 0 { |
| 103 ropts = append(ropts, bigtable.LimitRows(int64(limit))) | 96 ropts = append(ropts, bigtable.LimitRows(int64(limit))) |
| 104 } | 97 } |
| 105 | 98 |
| 106 // This will limit the range to the immediate row key ("ASDF~INDEX") to | 99 // This will limit the range to the immediate row key ("ASDF~INDEX") to |
| 107 // immediately after the row key ("ASDF~~"). See rowKey for more informa tion. | 100 // immediately after the row key ("ASDF~~"). See rowKey for more informa tion. |
| 108 rng := bigtable.NewRange(rk.encode(), rk.pathPrefixUpperBound()) | 101 rng := bigtable.NewRange(rk.encode(), rk.pathPrefixUpperBound()) |
| 109 | 102 |
| 110 innerErr := error(nil) | 103 innerErr := error(nil) |
| 111 » err = table.ReadRows(c, rng, func(row bigtable.Row) bool { | 104 » err := t.logTable.ReadRows(c, rng, func(row bigtable.Row) bool { |
| 112 data := []byte(nil) | 105 data := []byte(nil) |
| 113 if !keysOnly { | 106 if !keysOnly { |
| 114 err := error(nil) | 107 err := error(nil) |
| 115 data, err = getLogData(row) | 108 data, err = getLogData(row) |
| 116 if err != nil { | 109 if err != nil { |
| 117 innerErr = storage.ErrBadData | 110 innerErr = storage.ErrBadData |
| 118 return false | 111 return false |
| 119 } | 112 } |
| 120 } | 113 } |
| 121 | 114 |
| 122 drk, err := decodeRowKey(row.Key()) | 115 drk, err := decodeRowKey(row.Key()) |
| 123 if err != nil { | 116 if err != nil { |
| 124 » » » log.Fields{ | 117 » » » innerErr = err |
| 125 » » » » log.ErrorKey: err, | |
| 126 » » » » "value": row.Key(), | |
| 127 » » » }.Warningf(c, "Failed to parse row key.") | |
| 128 » » » innerErr = storage.ErrBadData | |
| 129 return false | 118 return false |
| 130 } | 119 } |
| 131 | 120 |
| 132 if err := cb(drk, data); err != nil { | 121 if err := cb(drk, data); err != nil { |
| 133 » » » if err != errStop { | 122 » » » innerErr = err |
| 134 » » » » innerErr = err | |
| 135 » » » } | |
| 136 return false | 123 return false |
| 137 } | 124 } |
| 138 return true | 125 return true |
| 139 }, ropts...) | 126 }, ropts...) |
| 140 if err == nil { | 127 if err == nil { |
| 141 err = innerErr | 128 err = innerErr |
| 142 } | 129 } |
| 143 return wrapTransient(err) | 130 return wrapTransient(err) |
| 144 } | 131 } |
| 145 | 132 |
| 146 func (t *btTableProd) setMaxLogAge(c context.Context, d time.Duration) error { | 133 func (t *btTableProd) setMaxLogAge(c context.Context, d time.Duration) error { |
| 147 ac, err := t.getAdminClient() | |
| 148 if err != nil { | |
| 149 return err | |
| 150 } | |
| 151 | |
| 152 var logGCPolicy bigtable.GCPolicy | 134 var logGCPolicy bigtable.GCPolicy |
| 153 if d > 0 { | 135 if d > 0 { |
| 154 logGCPolicy = bigtable.MaxAgePolicy(d) | 136 logGCPolicy = bigtable.MaxAgePolicy(d) |
| 155 } | 137 } |
| 156 » if err := ac.SetGCPolicy(c, t.LogTable, logColumnFamily, logGCPolicy); e rr != nil { | 138 » if err := t.adminClient.SetGCPolicy(c, t.LogTable, logColumnFamily, logG CPolicy); err != nil { |
| 157 return wrapTransient(err) | 139 return wrapTransient(err) |
| 158 } | 140 } |
| 159 return nil | 141 return nil |
| 160 } | 142 } |
| 161 | 143 |
| 162 // wrapTransient wraps the supplied error in an errors.TransientError if it is | 144 // wrapTransient wraps the supplied error in an errors.TransientError if it is |
| 163 // transient. | 145 // transient. |
| 164 func wrapTransient(err error) error { | 146 func wrapTransient(err error) error { |
| 165 if isTransient(err) { | 147 if isTransient(err) { |
| 166 err = errors.WrapTransient(err) | 148 err = errors.WrapTransient(err) |
| (...skipping 31 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 198 | 180 |
| 199 // Get the specific ReadItem for our column | 181 // Get the specific ReadItem for our column |
| 200 colName := fmt.Sprintf("%s:%s", family, column) | 182 colName := fmt.Sprintf("%s:%s", family, column) |
| 201 for _, item := range items { | 183 for _, item := range items { |
| 202 if item.Column == colName { | 184 if item.Column == colName { |
| 203 return &item | 185 return &item |
| 204 } | 186 } |
| 205 } | 187 } |
| 206 return nil | 188 return nil |
| 207 } | 189 } |
| OLD | NEW |