| OLD | NEW |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 package bigtable | 5 package bigtable |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "fmt" | 8 "fmt" |
| 9 "time" | 9 "time" |
| 10 | 10 |
| 11 "github.com/luci/luci-go/common/errors" | 11 "github.com/luci/luci-go/common/errors" |
| 12 "github.com/luci/luci-go/common/grpcutil" | 12 "github.com/luci/luci-go/common/grpcutil" |
| 13 "github.com/luci/luci-go/server/logdog/storage" | 13 "github.com/luci/luci-go/server/logdog/storage" |
| 14 "golang.org/x/net/context" | 14 "golang.org/x/net/context" |
| 15 "google.golang.org/cloud/bigtable" | 15 "google.golang.org/cloud/bigtable" |
| 16 ) | 16 ) |
| 17 | 17 |
| 18 const ( | 18 const ( |
| 19 logColumnFamily = "log" | 19 logColumnFamily = "log" |
| 20 » logColumn = "data" | 20 |
| 21 » // The data column stores raw low row data (RecordIO blob). |
| 22 » logColumn = "data" |
| 23 » logColName = logColumnFamily + ":" + logColumn |
| 21 ) | 24 ) |
| 22 | 25 |
| 23 // Limits taken from here: | 26 // Limits taken from here: |
| 24 // https://cloud.google.com/bigtable/docs/schema-design | 27 // https://cloud.google.com/bigtable/docs/schema-design |
| 25 const ( | 28 const ( |
| 26 // bigTableRowMaxBytes is the maximum number of bytes that a single BigT
able | 29 // bigTableRowMaxBytes is the maximum number of bytes that a single BigT
able |
| 27 // row may hold. | 30 // row may hold. |
| 28 bigTableRowMaxBytes = 1024 * 1024 * 10 // 10MB | 31 bigTableRowMaxBytes = 1024 * 1024 * 10 // 10MB |
| 29 ) | 32 ) |
| 30 | 33 |
| (...skipping 49 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 80 } | 83 } |
| 81 return nil | 84 return nil |
| 82 } | 85 } |
| 83 | 86 |
| 84 func (t *btTableProd) getLogData(c context.Context, rk *rowKey, limit int, keysO
nly bool, cb btGetCallback) error { | 87 func (t *btTableProd) getLogData(c context.Context, rk *rowKey, limit int, keysO
nly bool, cb btGetCallback) error { |
| 85 // Construct read options based on Get request. | 88 // Construct read options based on Get request. |
| 86 ropts := []bigtable.ReadOption{ | 89 ropts := []bigtable.ReadOption{ |
| 87 bigtable.RowFilter(bigtable.FamilyFilter(logColumnFamily)), | 90 bigtable.RowFilter(bigtable.FamilyFilter(logColumnFamily)), |
| 88 bigtable.RowFilter(bigtable.ColumnFilter(logColumn)), | 91 bigtable.RowFilter(bigtable.ColumnFilter(logColumn)), |
| 89 nil, | 92 nil, |
| 90 nil, | |
| 91 }[:2] | 93 }[:2] |
| 92 if keysOnly { | 94 if keysOnly { |
| 93 ropts = append(ropts, bigtable.RowFilter(bigtable.StripValueFilt
er())) | 95 ropts = append(ropts, bigtable.RowFilter(bigtable.StripValueFilt
er())) |
| 94 } | 96 } |
| 95 if limit > 0 { | 97 if limit > 0 { |
| 96 ropts = append(ropts, bigtable.LimitRows(int64(limit))) | 98 ropts = append(ropts, bigtable.LimitRows(int64(limit))) |
| 97 } | 99 } |
| 98 | 100 |
| 99 // This will limit the range to the immediate row key ("ASDF~INDEX") to | 101 // This will limit the range to the immediate row key ("ASDF~INDEX") to |
| 100 // immediately after the row key ("ASDF~~"). See rowKey for more informa
tion. | 102 // immediately after the row key ("ASDF~~"). See rowKey for more informa
tion. |
| 101 rng := bigtable.NewRange(rk.encode(), rk.pathPrefixUpperBound()) | 103 rng := bigtable.NewRange(rk.encode(), rk.pathPrefixUpperBound()) |
| 102 | 104 |
| 103 » innerErr := error(nil) | 105 » var innerErr error |
| 104 err := t.logTable.ReadRows(c, rng, func(row bigtable.Row) bool { | 106 err := t.logTable.ReadRows(c, rng, func(row bigtable.Row) bool { |
| 105 » » data := []byte(nil) | 107 » » data, err := getLogRowData(row) |
| 106 » » if !keysOnly { | 108 » » if err != nil { |
| 107 » » » err := error(nil) | 109 » » » innerErr = storage.ErrBadData |
| 108 » » » data, err = getLogData(row) | 110 » » » return false |
| 109 » » » if err != nil { | |
| 110 » » » » innerErr = storage.ErrBadData | |
| 111 » » » » return false | |
| 112 » » » } | |
| 113 } | 111 } |
| 114 | 112 |
| 115 drk, err := decodeRowKey(row.Key()) | 113 drk, err := decodeRowKey(row.Key()) |
| 116 if err != nil { | 114 if err != nil { |
| 117 innerErr = err | 115 innerErr = err |
| 118 return false | 116 return false |
| 119 } | 117 } |
| 120 | 118 |
| 121 if err := cb(drk, data); err != nil { | 119 if err := cb(drk, data); err != nil { |
| 122 innerErr = err | 120 innerErr = err |
| (...skipping 28 matching lines...) Expand all Loading... |
| 151 } | 149 } |
| 152 | 150 |
| 153 // isTransient tests if a BigTable SDK error is transient. | 151 // isTransient tests if a BigTable SDK error is transient. |
| 154 // | 152 // |
| 155 // Since the BigTable API doesn't give us this information, we will identify | 153 // Since the BigTable API doesn't give us this information, we will identify |
| 156 // transient errors by parsing their error string :( | 154 // transient errors by parsing their error string :( |
| 157 func isTransient(err error) bool { | 155 func isTransient(err error) bool { |
| 158 return (err != errStop) && grpcutil.IsTransient(err) | 156 return (err != errStop) && grpcutil.IsTransient(err) |
| 159 } | 157 } |
| 160 | 158 |
| 161 // getLogData loads the logColumn column from the logColumnFamily column family | 159 // getLogRowData loads the []byte contents of the supplied log row. |
| 162 // and returns its []byte contents. | |
| 163 // | 160 // |
| 164 // If the row doesn't exist, storage.ErrDoesNotExist will be returned. | 161 // If the row doesn't exist, storage.ErrDoesNotExist will be returned. |
| 165 func getLogData(row bigtable.Row) ([]byte, error) { | 162 func getLogRowData(row bigtable.Row) (data []byte, err error) { |
| 166 » ri := getReadItem(row, logColumnFamily, logColumn) | 163 » items, ok := row[logColumnFamily] |
| 167 » if ri == nil { | 164 » if !ok { |
| 168 » » return nil, storage.ErrDoesNotExist | 165 » » err = storage.ErrDoesNotExist |
| 166 » » return |
| 169 } | 167 } |
| 170 » return ri.Value, nil | 168 |
| 169 » for _, item := range items { |
| 170 » » switch item.Column { |
| 171 » » case logColName: |
| 172 » » » data = item.Value |
| 173 » » » return |
| 174 » » } |
| 175 » } |
| 176 |
| 177 » // If no fields could be extracted, the rows does not exist. |
| 178 » err = storage.ErrDoesNotExist |
| 179 » return |
| 171 } | 180 } |
| 172 | 181 |
| 173 // getReadItem retrieves a specific RowItem from the supplied Row. | 182 // getReadItem retrieves a specific RowItem from the supplied Row. |
| 174 func getReadItem(row bigtable.Row, family, column string) *bigtable.ReadItem { | 183 func getReadItem(row bigtable.Row, family, column string) *bigtable.ReadItem { |
| 175 // Get the row for our family. | 184 // Get the row for our family. |
| 176 » items, ok := row[family] | 185 » items, ok := row[logColumnFamily] |
| 177 if !ok { | 186 if !ok { |
| 178 return nil | 187 return nil |
| 179 } | 188 } |
| 180 | 189 |
| 181 // Get the specific ReadItem for our column | 190 // Get the specific ReadItem for our column |
| 182 colName := fmt.Sprintf("%s:%s", family, column) | 191 colName := fmt.Sprintf("%s:%s", family, column) |
| 183 for _, item := range items { | 192 for _, item := range items { |
| 184 if item.Column == colName { | 193 if item.Column == colName { |
| 185 return &item | 194 return &item |
| 186 } | 195 } |
| 187 } | 196 } |
| 188 return nil | 197 return nil |
| 189 } | 198 } |
| OLD | NEW |