OLD | NEW |
1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 package bigtable | 5 package bigtable |
6 | 6 |
7 import ( | 7 import ( |
8 "fmt" | 8 "fmt" |
9 | 9 |
10 "github.com/luci/luci-go/common/errors" | 10 "github.com/luci/luci-go/common/errors" |
(...skipping 28 matching lines...) Expand all Loading... |
39 // getLogData retrieves rows belonging to the supplied stream record, st
arting | 39 // getLogData retrieves rows belonging to the supplied stream record, st
arting |
40 // with the first index owned by that record. The supplied callback is i
nvoked | 40 // with the first index owned by that record. The supplied callback is i
nvoked |
41 // once per retrieved row. | 41 // once per retrieved row. |
42 // | 42 // |
43 // rk is the starting row key. | 43 // rk is the starting row key. |
44 // | 44 // |
45 // If the supplied limit is nonzero, no more than limit rows will be | 45 // If the supplied limit is nonzero, no more than limit rows will be |
46 // retrieved. | 46 // retrieved. |
47 // | 47 // |
48 // If keysOnly is true, then the callback will return nil row data. | 48 // If keysOnly is true, then the callback will return nil row data. |
49 » getLogData(c context.Context, rk *rowKey, limit int, keysOnly bool, cb b
tGetCallback) error | 49 » getLogData(c context.Context, rk *rowKey, limit int64, keysOnly bool, cb
btGetCallback) error |
50 | 50 |
51 // deleteRow deletes the log data associated with a given row. | 51 // deleteRow deletes the log data associated with a given row. |
52 deleteRow(context.Context, *rowKey) error | 52 deleteRow(context.Context, *rowKey) error |
53 } | 53 } |
54 | 54 |
55 // btTableProd is an implementation of the btTable interface that uses a real | 55 // btTableProd is an implementation of the btTable interface that uses a real |
56 // production BigTable connection. | 56 // production BigTable connection. |
57 type btTableProd struct { | 57 type btTableProd struct { |
58 *bigtable.Table | 58 *bigtable.Table |
59 } | 59 } |
60 | 60 |
61 func (t *btTableProd) putLogData(c context.Context, rk *rowKey, data []byte) err
or { | 61 func (t *btTableProd) putLogData(c context.Context, rk *rowKey, data []byte) err
or { |
62 m := bigtable.NewMutation() | 62 m := bigtable.NewMutation() |
63 m.Set("log", "data", bigtable.ServerTime, data) | 63 m.Set("log", "data", bigtable.ServerTime, data) |
64 cm := bigtable.NewCondMutation(bigtable.RowKeyFilter(rk.encode()), nil,
m) | 64 cm := bigtable.NewCondMutation(bigtable.RowKeyFilter(rk.encode()), nil,
m) |
65 | 65 |
66 rowExists := false | 66 rowExists := false |
67 err := t.Apply(c, rk.encode(), cm, bigtable.GetCondMutationResult(&rowEx
ists)) | 67 err := t.Apply(c, rk.encode(), cm, bigtable.GetCondMutationResult(&rowEx
ists)) |
68 if err != nil { | 68 if err != nil { |
69 return wrapTransient(err) | 69 return wrapTransient(err) |
70 } | 70 } |
71 if rowExists { | 71 if rowExists { |
72 return storage.ErrExists | 72 return storage.ErrExists |
73 } | 73 } |
74 return nil | 74 return nil |
75 } | 75 } |
76 | 76 |
77 func (t *btTableProd) getLogData(c context.Context, rk *rowKey, limit int, keysO
nly bool, cb btGetCallback) error { | 77 func (t *btTableProd) getLogData(c context.Context, rk *rowKey, limit int64, key
sOnly bool, cb btGetCallback) error { |
78 // Construct read options based on Get request. | 78 // Construct read options based on Get request. |
79 ropts := []bigtable.ReadOption{ | 79 ropts := []bigtable.ReadOption{ |
80 bigtable.RowFilter(bigtable.FamilyFilter("log")), | 80 bigtable.RowFilter(bigtable.FamilyFilter("log")), |
81 bigtable.RowFilter(bigtable.ColumnFilter("data")), | 81 bigtable.RowFilter(bigtable.ColumnFilter("data")), |
82 nil, | 82 nil, |
83 nil, | 83 nil, |
84 }[:2] | 84 }[:2] |
85 if keysOnly { | 85 if keysOnly { |
86 ropts = append(ropts, bigtable.RowFilter(bigtable.StripValueFilt
er())) | 86 ropts = append(ropts, bigtable.RowFilter(bigtable.StripValueFilt
er())) |
87 } | 87 } |
88 if limit > 0 { | 88 if limit > 0 { |
89 » » ropts = append(ropts, bigtable.LimitRows(int64(limit))) | 89 » » ropts = append(ropts, bigtable.LimitRows(limit)) |
90 } | 90 } |
91 | 91 |
92 // This will limit the range to the immediate row key ("ASDF~INDEX") to | 92 // This will limit the range to the immediate row key ("ASDF~INDEX") to |
93 // immediately after the row key ("ASDF~~"). See rowKey for more informa
tion. | 93 // immediately after the row key ("ASDF~~"). See rowKey for more informa
tion. |
94 rng := bigtable.NewRange(rk.encode(), rk.pathPrefixUpperBound()) | 94 rng := bigtable.NewRange(rk.encode(), rk.pathPrefixUpperBound()) |
95 | 95 |
96 innerErr := error(nil) | 96 innerErr := error(nil) |
97 err := t.ReadRows(c, rng, func(row bigtable.Row) bool { | 97 err := t.ReadRows(c, rng, func(row bigtable.Row) bool { |
98 data := []byte(nil) | 98 data := []byte(nil) |
99 if !keysOnly { | 99 if !keysOnly { |
(...skipping 74 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
174 | 174 |
175 // Get the specific ReadItem for our column | 175 // Get the specific ReadItem for our column |
176 colName := fmt.Sprintf("%s:%s", family, column) | 176 colName := fmt.Sprintf("%s:%s", family, column) |
177 for _, item := range items { | 177 for _, item := range items { |
178 if item.Column == colName { | 178 if item.Column == colName { |
179 return &item | 179 return &item |
180 } | 180 } |
181 } | 181 } |
182 return nil | 182 return nil |
183 } | 183 } |
OLD | NEW |