| OLD | NEW |
| 1 // Copyright 2015 The LUCI Authors. All rights reserved. | 1 // Copyright 2015 The LUCI Authors. All rights reserved. |
| 2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
| 3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
| 4 | 4 |
| 5 package bigtable | 5 package bigtable |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "fmt" | 8 "fmt" |
| 9 "time" | 9 "time" |
| 10 | 10 |
| (...skipping 47 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 58 // If keysOnly is true, then the callback will return nil row data. | 58 // If keysOnly is true, then the callback will return nil row data. |
| 59 getLogData(c context.Context, rk *rowKey, limit int, keysOnly bool, cb b
tGetCallback) error | 59 getLogData(c context.Context, rk *rowKey, limit int, keysOnly bool, cb b
tGetCallback) error |
| 60 | 60 |
| 61 // setMaxLogAge updates the maximum log age policy for the log family. | 61 // setMaxLogAge updates the maximum log age policy for the log family. |
| 62 setMaxLogAge(context.Context, time.Duration) error | 62 setMaxLogAge(context.Context, time.Duration) error |
| 63 } | 63 } |
| 64 | 64 |
| 65 // btTableProd is an implementation of the btTable interface that uses a real | 65 // btTableProd is an implementation of the btTable interface that uses a real |
| 66 // production BigTable connection. | 66 // production BigTable connection. |
| 67 type btTableProd struct { | 67 type btTableProd struct { |
| 68 » *btStorage | 68 » base *btStorage |
| 69 } | 69 } |
| 70 | 70 |
| 71 func (t *btTableProd) putLogData(c context.Context, rk *rowKey, data []byte) err
or { | 71 func (t *btTableProd) putLogData(c context.Context, rk *rowKey, data []byte) err
or { |
| 72 m := bigtable.NewMutation() | 72 m := bigtable.NewMutation() |
| 73 m.Set(logColumnFamily, logColumn, bigtable.ServerTime, data) | 73 m.Set(logColumnFamily, logColumn, bigtable.ServerTime, data) |
| 74 cm := bigtable.NewCondMutation(bigtable.RowKeyFilter(rk.encode()), nil,
m) | 74 cm := bigtable.NewCondMutation(bigtable.RowKeyFilter(rk.encode()), nil,
m) |
| 75 | 75 |
| 76 rowExists := false | 76 rowExists := false |
| 77 » if err := t.logTable.Apply(c, rk.encode(), cm, bigtable.GetCondMutationR
esult(&rowExists)); err != nil { | 77 » if err := t.base.logTable.Apply(c, rk.encode(), cm, bigtable.GetCondMuta
tionResult(&rowExists)); err != nil { |
| 78 return grpcutil.WrapIfTransient(err) | 78 return grpcutil.WrapIfTransient(err) |
| 79 } | 79 } |
| 80 if rowExists { | 80 if rowExists { |
| 81 return storage.ErrExists | 81 return storage.ErrExists |
| 82 } | 82 } |
| 83 return nil | 83 return nil |
| 84 } | 84 } |
| 85 | 85 |
| 86 func (t *btTableProd) getLogData(c context.Context, rk *rowKey, limit int, keysO
nly bool, cb btGetCallback) error { | 86 func (t *btTableProd) getLogData(c context.Context, rk *rowKey, limit int, keysO
nly bool, cb btGetCallback) error { |
| 87 // Construct read options based on Get request. | 87 // Construct read options based on Get request. |
| 88 ropts := []bigtable.ReadOption{ | 88 ropts := []bigtable.ReadOption{ |
| 89 bigtable.RowFilter(bigtable.FamilyFilter(logColumnFamily)), | 89 bigtable.RowFilter(bigtable.FamilyFilter(logColumnFamily)), |
| 90 bigtable.RowFilter(bigtable.ColumnFilter(logColumn)), | 90 bigtable.RowFilter(bigtable.ColumnFilter(logColumn)), |
| 91 nil, | 91 nil, |
| 92 }[:2] | 92 }[:2] |
| 93 if keysOnly { | 93 if keysOnly { |
| 94 ropts = append(ropts, bigtable.RowFilter(bigtable.StripValueFilt
er())) | 94 ropts = append(ropts, bigtable.RowFilter(bigtable.StripValueFilt
er())) |
| 95 } | 95 } |
| 96 if limit > 0 { | 96 if limit > 0 { |
| 97 ropts = append(ropts, bigtable.LimitRows(int64(limit))) | 97 ropts = append(ropts, bigtable.LimitRows(int64(limit))) |
| 98 } | 98 } |
| 99 | 99 |
| 100 // This will limit the range to the immediate row key ("ASDF~INDEX") to | 100 // This will limit the range to the immediate row key ("ASDF~INDEX") to |
| 101 // immediately after the row key ("ASDF~~"). See rowKey for more informa
tion. | 101 // immediately after the row key ("ASDF~~"). See rowKey for more informa
tion. |
| 102 rng := bigtable.NewRange(rk.encode(), rk.pathPrefixUpperBound()) | 102 rng := bigtable.NewRange(rk.encode(), rk.pathPrefixUpperBound()) |
| 103 | 103 |
| 104 var innerErr error | 104 var innerErr error |
| 105 » err := t.logTable.ReadRows(c, rng, func(row bigtable.Row) bool { | 105 » err := t.base.logTable.ReadRows(c, rng, func(row bigtable.Row) bool { |
| 106 data, err := getLogRowData(row) | 106 data, err := getLogRowData(row) |
| 107 if err != nil { | 107 if err != nil { |
| 108 innerErr = storage.ErrBadData | 108 innerErr = storage.ErrBadData |
| 109 return false | 109 return false |
| 110 } | 110 } |
| 111 | 111 |
| 112 drk, err := decodeRowKey(row.Key()) | 112 drk, err := decodeRowKey(row.Key()) |
| 113 if err != nil { | 113 if err != nil { |
| 114 innerErr = err | 114 innerErr = err |
| 115 return false | 115 return false |
| (...skipping 12 matching lines...) Expand all Loading... |
| 128 return innerErr | 128 return innerErr |
| 129 } | 129 } |
| 130 return nil | 130 return nil |
| 131 } | 131 } |
| 132 | 132 |
| 133 func (t *btTableProd) setMaxLogAge(c context.Context, d time.Duration) error { | 133 func (t *btTableProd) setMaxLogAge(c context.Context, d time.Duration) error { |
| 134 var logGCPolicy bigtable.GCPolicy | 134 var logGCPolicy bigtable.GCPolicy |
| 135 if d > 0 { | 135 if d > 0 { |
| 136 logGCPolicy = bigtable.MaxAgePolicy(d) | 136 logGCPolicy = bigtable.MaxAgePolicy(d) |
| 137 } | 137 } |
| 138 » if err := t.adminClient.SetGCPolicy(c, t.LogTable, logColumnFamily, logG
CPolicy); err != nil { | 138 » if err := t.base.adminClient.SetGCPolicy(c, t.base.LogTable, logColumnFa
mily, logGCPolicy); err != nil { |
| 139 return grpcutil.WrapIfTransient(err) | 139 return grpcutil.WrapIfTransient(err) |
| 140 } | 140 } |
| 141 return nil | 141 return nil |
| 142 } | 142 } |
| 143 | 143 |
| 144 // getLogRowData loads the []byte contents of the supplied log row. | 144 // getLogRowData loads the []byte contents of the supplied log row. |
| 145 // | 145 // |
| 146 // If the row doesn't exist, storage.ErrDoesNotExist will be returned. | 146 // If the row doesn't exist, storage.ErrDoesNotExist will be returned. |
| 147 func getLogRowData(row bigtable.Row) (data []byte, err error) { | 147 func getLogRowData(row bigtable.Row) (data []byte, err error) { |
| 148 items, ok := row[logColumnFamily] | 148 items, ok := row[logColumnFamily] |
| (...skipping 25 matching lines...) Expand all Loading... |
| 174 | 174 |
| 175 // Get the specific ReadItem for our column | 175 // Get the specific ReadItem for our column |
| 176 colName := fmt.Sprintf("%s:%s", family, column) | 176 colName := fmt.Sprintf("%s:%s", family, column) |
| 177 for _, item := range items { | 177 for _, item := range items { |
| 178 if item.Column == colName { | 178 if item.Column == colName { |
| 179 return &item | 179 return &item |
| 180 } | 180 } |
| 181 } | 181 } |
| 182 return nil | 182 return nil |
| 183 } | 183 } |
| OLD | NEW |