| OLD | NEW |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 package bigtable | 5 package bigtable |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "fmt" | 8 "fmt" |
| 9 "strings" | |
| 10 | 9 |
| 11 "github.com/luci/luci-go/common/errors" | 10 "github.com/luci/luci-go/common/errors" |
| 11 "github.com/luci/luci-go/common/grpcutil" |
| 12 log "github.com/luci/luci-go/common/logging" | 12 log "github.com/luci/luci-go/common/logging" |
| 13 "github.com/luci/luci-go/server/logdog/storage" | 13 "github.com/luci/luci-go/server/logdog/storage" |
| 14 "golang.org/x/net/context" | 14 "golang.org/x/net/context" |
| 15 "google.golang.org/cloud/bigtable" | 15 "google.golang.org/cloud/bigtable" |
| 16 ) | 16 ) |
| 17 | 17 |
| 18 // errStop is an internal sentinel error used to communicate "stop iteration" | 18 // errStop is an internal sentinel error used to communicate "stop iteration" |
| 19 // to btTable.getLogData. | 19 // to btTable.getLogData. |
| 20 var errStop = errors.New("stop") | 20 var errStop = errors.New("stop") |
| 21 | 21 |
| 22 // btGetCallback is a callback that is invoked for each log data row returned | 22 // btGetCallback is a callback that is invoked for each log data row returned |
| 23 // by getLogData. | 23 // by getLogData. |
| 24 // | 24 // |
| 25 // If an error is encountered, no more log data will be fetched. The error will | 25 // If an error is encountered, no more log data will be fetched. The error will |
| 26 // be propagated to the getLogData call unless the returned error is errStop, in | 26 // be propagated to the getLogData call unless the returned error is errStop, in |
| 27 // which case iteration will stop and getLogData will return nil. | 27 // which case iteration will stop and getLogData will return nil. |
| 28 type btGetCallback func(*rowKey, []byte) error | 28 type btGetCallback func(*rowKey, []byte) error |
| 29 | 29 |
| 30 // btTable is a general interface for BigTable operations intended to enable | 30 // btTable is a general interface for BigTable operations intended to enable |
| 31 // unit tests to stub out BigTable without adding runtime inefficiency. | 31 // unit tests to stub out BigTable without adding runtime inefficiency. |
| 32 // | |
| 33 // If any of these methods fails with a transient error, it will be wrapped | |
| 34 // as an errors.Transient error. | |
| 35 type btTable interface { | 32 type btTable interface { |
| 36 // putLogData adds new log data to BigTable. | 33 // putLogData adds new log data to BigTable. |
| 37 // | 34 // |
| 38 // If data already exists for the named row, it will return storage.ErrE
xists | 35 // If data already exists for the named row, it will return storage.ErrE
xists |
| 39 // and not add the data. | 36 // and not add the data. |
| 40 putLogData(context.Context, *rowKey, []byte) error | 37 putLogData(context.Context, *rowKey, []byte) error |
| 41 | 38 |
| 42 // getLogData retrieves rows belonging to the supplied stream record, st
arting | 39 // getLogData retrieves rows belonging to the supplied stream record, st
arting |
| 43 // with the first index owned by that record. The supplied callback is i
nvoked | 40 // with the first index owned by that record. The supplied callback is i
nvoked |
| 44 // once per retrieved row. | 41 // once per retrieved row. |
| 45 // | 42 // |
| 46 // rk is the starting row key. | 43 // rk is the starting row key. |
| 47 // | 44 // |
| 48 // If the supplied limit is nonzero, no more than limit rows will be | 45 // If the supplied limit is nonzero, no more than limit rows will be |
| 49 // retrieved. | 46 // retrieved. |
| 50 // | 47 // |
| 51 // If keysOnly is true, then the callback will return nil row data. | 48 // If keysOnly is true, then the callback will return nil row data. |
| 52 getLogData(c context.Context, rk *rowKey, limit int, keysOnly bool, cb b
tGetCallback) error | 49 getLogData(c context.Context, rk *rowKey, limit int, keysOnly bool, cb b
tGetCallback) error |
| 53 } | 50 } |
| 54 | 51 |
| 55 // btTransientSubstrings is the set of known error substrings returned by | |
| 56 // BigTable that indicate failures that aren't related to the specific data | |
| 57 // content. | |
| 58 var btTransientSubstrings = []string{ | |
| 59 "Internal error encountered", | |
| 60 "interactive login is required", | |
| 61 } | |
| 62 | |
| 63 // btTableProd is an implementation of the btTable interface that uses a real | 52 // btTableProd is an implementation of the btTable interface that uses a real |
| 64 // production BigTable connection. | 53 // production BigTable connection. |
| 65 type btTableProd struct { | 54 type btTableProd struct { |
| 66 *bigtable.Table | 55 *bigtable.Table |
| 67 } | 56 } |
| 68 | 57 |
| 69 func (t *btTableProd) putLogData(c context.Context, rk *rowKey, data []byte) err
or { | 58 func (t *btTableProd) putLogData(c context.Context, rk *rowKey, data []byte) err
or { |
| 70 m := bigtable.NewMutation() | 59 m := bigtable.NewMutation() |
| 71 m.Set("log", "data", bigtable.ServerTime, data) | 60 m.Set("log", "data", bigtable.ServerTime, data) |
| 72 cm := bigtable.NewCondMutation(bigtable.RowKeyFilter(rk.encode()), nil,
m) | 61 cm := bigtable.NewCondMutation(bigtable.RowKeyFilter(rk.encode()), nil,
m) |
| (...skipping 70 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 143 if isTransient(err) { | 132 if isTransient(err) { |
| 144 err = errors.WrapTransient(err) | 133 err = errors.WrapTransient(err) |
| 145 } | 134 } |
| 146 return err | 135 return err |
| 147 } | 136 } |
| 148 | 137 |
| 149 // isTransient tests if a BigTable SDK error is transient. | 138 // isTransient tests if a BigTable SDK error is transient. |
| 150 // | 139 // |
| 151 // Since the BigTable API doesn't give us this information, we will identify | 140 // Since the BigTable API doesn't give us this information, we will identify |
| 152 // transient errors by parsing their error string :( | 141 // transient errors by parsing their error string :( |
| 153 // | |
| 154 // TODO(dnj): File issue to add error qualifier functions to BigTable API. | |
| 155 func isTransient(err error) bool { | 142 func isTransient(err error) bool { |
| 156 » if err == nil { | 143 » return grpcutil.IsTransient(err) |
| 157 » » return false | |
| 158 » } | |
| 159 | |
| 160 » msg := err.Error() | |
| 161 » for _, s := range btTransientSubstrings { | |
| 162 » » if strings.Contains(msg, s) { | |
| 163 » » » return true | |
| 164 » » } | |
| 165 » } | |
| 166 » return false | |
| 167 } | 144 } |
| 168 | 145 |
| 169 // getLogData loads the "data" column from the "log" column family and returns | 146 // getLogData loads the "data" column from the "log" column family and returns |
| 170 // its []byte contents. | 147 // its []byte contents. |
| 171 // | 148 // |
| 172 // If the row doesn't exist, storage.ErrDoesNotExist will be returned. | 149 // If the row doesn't exist, storage.ErrDoesNotExist will be returned. |
| 173 func getLogData(row bigtable.Row) ([]byte, error) { | 150 func getLogData(row bigtable.Row) ([]byte, error) { |
| 174 ri := getReadItem(row, "log", "data") | 151 ri := getReadItem(row, "log", "data") |
| 175 if ri == nil { | 152 if ri == nil { |
| 176 return nil, storage.ErrDoesNotExist | 153 return nil, storage.ErrDoesNotExist |
| (...skipping 11 matching lines...) Expand all Loading... |
| 188 | 165 |
| 189 // Get the specific ReadItem for our column | 166 // Get the specific ReadItem for our column |
| 190 colName := fmt.Sprintf("%s:%s", family, column) | 167 colName := fmt.Sprintf("%s:%s", family, column) |
| 191 for _, item := range items { | 168 for _, item := range items { |
| 192 if item.Column == colName { | 169 if item.Column == colName { |
| 193 return &item | 170 return &item |
| 194 } | 171 } |
| 195 } | 172 } |
| 196 return nil | 173 return nil |
| 197 } | 174 } |
| OLD | NEW |