Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 package bigtable | 5 package bigtable |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "bytes" | |
| 9 "errors" | |
| 8 "fmt" | 10 "fmt" |
| 9 | 11 |
| 10 "github.com/luci/luci-go/common/logdog/types" | 12 "github.com/luci/luci-go/common/logdog/types" |
| 11 log "github.com/luci/luci-go/common/logging" | 13 log "github.com/luci/luci-go/common/logging" |
| 14 "github.com/luci/luci-go/common/recordio" | |
| 12 "github.com/luci/luci-go/server/logdog/storage" | 15 "github.com/luci/luci-go/server/logdog/storage" |
| 13 "golang.org/x/net/context" | 16 "golang.org/x/net/context" |
| 14 "google.golang.org/cloud" | 17 "google.golang.org/cloud" |
| 15 "google.golang.org/cloud/bigtable" | 18 "google.golang.org/cloud/bigtable" |
| 16 ) | 19 ) |
| 17 | 20 |
| 18 var ( | 21 var ( |
| 19 // StorageScopes is the set of OAuth scopes needed to use the storage | 22 // StorageScopes is the set of OAuth scopes needed to use the storage |
| 20 // functionality. | 23 // functionality. |
| 21 StorageScopes = []string{ | 24 StorageScopes = []string{ |
| (...skipping 15 matching lines...) Expand all Loading... | |
| 37 // | 40 // |
| 38 // This is simply the maximum number of rows (limit). The actual number of | 41 // This is simply the maximum number of rows (limit). The actual number of |
| 39 // rows will be further constrained by tailRowMaxSize. | 42 // rows will be further constrained by tailRowMaxSize. |
| 40 tailRowCount = 128 | 43 tailRowCount = 128 |
| 41 | 44 |
| 42 // tailRowMaxSize is the maximum number of bytes of tail row data that w ill be | 45 // tailRowMaxSize is the maximum number of bytes of tail row data that w ill be |
| 43 // buffered during Tail row reading. | 46 // buffered during Tail row reading. |
| 44 tailRowMaxSize = 1024 * 1024 * 16 | 47 tailRowMaxSize = 1024 * 1024 * 16 |
| 45 ) | 48 ) |
| 46 | 49 |
| 50 // errStop is an internal sentinel error used to indicate "stop iteration" | |
| 51 // to btTable.getLogData iterator. It will | |
| 52 var errStop = errors.New("stop") | |
| 53 | |
| 47 // Options is a set of configuration options for BigTable storage. | 54 // Options is a set of configuration options for BigTable storage. |
| 48 type Options struct { | 55 type Options struct { |
| 49 // Project is the name of the project to connect to. | 56 // Project is the name of the project to connect to. |
| 50 Project string | 57 Project string |
| 51 // Zone is the name of the zone to connect to. | 58 // Zone is the name of the zone to connect to. |
| 52 Zone string | 59 Zone string |
| 53 // Cluster is the name of the cluster to connect to. | 60 // Cluster is the name of the cluster to connect to. |
| 54 Cluster string | 61 Cluster string |
| 55 // ClientOptions are additional client options to use when instantiating the | 62 // ClientOptions are additional client options to use when instantiating the |
| 56 // client instance. | 63 // client instance. |
| 57 ClientOptions []cloud.ClientOption | 64 ClientOptions []cloud.ClientOption |
| 58 | 65 |
| 59 // Table is the name of the BigTable table to use for logs. | 66 // Table is the name of the BigTable table to use for logs. |
| 60 LogTable string | 67 LogTable string |
| 61 } | 68 } |
| 62 | 69 |
| 70 func (o *Options) client(ctx context.Context) (*bigtable.Client, error) { | |
| 71 return bigtable.NewClient(ctx, o.Project, o.Zone, o.Cluster, o.ClientOpt ions...) | |
| 72 } | |
| 73 | |
| 74 func (o *Options) adminClient(ctx context.Context) (*bigtable.AdminClient, error ) { | |
| 75 return bigtable.NewAdminClient(ctx, o.Project, o.Zone, o.Cluster, o.Clie ntOptions...) | |
| 76 } | |
| 77 | |
| 63 // btStorage is a storage.Storage implementation that uses Google Cloud BigTable | 78 // btStorage is a storage.Storage implementation that uses Google Cloud BigTable |
| 64 // as a backend. | 79 // as a backend. |
| 65 type btStorage struct { | 80 type btStorage struct { |
| 66 *Options | 81 *Options |
| 67 | 82 |
| 68 » ctx context.Context | 83 » // Context is the bound supplied with New. It is retained (rather than |
| 84 » // supplied on a per-call basis) because a special Storage Context is ne eded | |
| 85 » // for Storage calls). See GAE code for more information. | |
|
nodir
2016/03/30 18:29:42
"GAE code" is abstract. Provide a link or say some
dnj
2016/03/30 18:37:13
Done.
| |
| 86 » context.Context | |
| 69 | 87 |
| 70 client *bigtable.Client | 88 client *bigtable.Client |
| 71 logTable *bigtable.Table | 89 logTable *bigtable.Table |
| 72 adminClient *bigtable.AdminClient | 90 adminClient *bigtable.AdminClient |
| 73 | 91 |
| 74 » table btTable | 92 » table btTable |
| 93 » maxRowSize int | |
|
nodir
2016/03/30 18:29:42
a comment would be nice. Initially I thought it is
dnj
2016/03/30 18:37:13
Done.
| |
| 75 } | 94 } |
| 76 | 95 |
| 77 // New instantiates a new Storage instance connected to a BigTable cluster. | 96 // New instantiates a new Storage instance connected to a BigTable cluster. |
| 78 // | 97 // |
| 79 // The returned Storage instance will close the Client when its Close() method | 98 // The returned Storage instance will close the Client when its Close() method |
| 80 // is called. | 99 // is called. |
| 81 func New(ctx context.Context, o Options) storage.Storage { | 100 func New(ctx context.Context, o Options) (storage.Storage, error) { |
| 101 » client, err := o.client(ctx) | |
| 102 » if err != nil { | |
| 103 » » return nil, err | |
| 104 » } | |
| 105 | |
| 106 » admin, err := o.adminClient(ctx) | |
| 107 » if err != nil { | |
| 108 » » return nil, err | |
| 109 » } | |
| 110 | |
| 111 » return newBTStorage(ctx, o, client, admin), nil | |
| 112 } | |
| 113 | |
| 114 func newBTStorage(ctx context.Context, o Options, client *bigtable.Client, admin Client *bigtable.AdminClient) *btStorage { | |
| 82 st := &btStorage{ | 115 st := &btStorage{ |
| 83 Options: &o, | 116 Options: &o, |
| 84 » » ctx: ctx, | 117 » » Context: ctx, |
| 118 | |
| 119 » » client: client, | |
| 120 » » logTable: client.Open(o.LogTable), | |
| 121 » » adminClient: adminClient, | |
| 122 » » maxRowSize: bigTableRowMaxBytes, | |
| 85 } | 123 } |
| 86 st.table = &btTableProd{st} | 124 st.table = &btTableProd{st} |
| 87 return st | 125 return st |
| 88 } | 126 } |
| 89 | 127 |
| 90 func (s *btStorage) Close() { | 128 func (s *btStorage) Close() { |
| 91 if s.client != nil { | 129 if s.client != nil { |
| 92 s.client.Close() | 130 s.client.Close() |
| 93 s.client = nil | 131 s.client = nil |
| 94 } | 132 } |
| 95 | 133 |
| 96 if s.adminClient != nil { | 134 if s.adminClient != nil { |
| 97 s.adminClient.Close() | 135 s.adminClient.Close() |
| 98 s.adminClient = nil | 136 s.adminClient = nil |
| 99 } | 137 } |
| 100 } | 138 } |
| 101 | 139 |
| 102 func (s *btStorage) Config(cfg storage.Config) error { | 140 func (s *btStorage) Config(cfg storage.Config) error { |
| 103 » if err := s.table.setMaxLogAge(s.ctx, cfg.MaxLogAge); err != nil { | 141 » if err := s.table.setMaxLogAge(s, cfg.MaxLogAge); err != nil { |
| 104 » » log.WithError(err).Errorf(s.ctx, "Failed to set 'log' GC policy. ") | 142 » » log.WithError(err).Errorf(s, "Failed to set 'log' GC policy.") |
| 105 return err | 143 return err |
| 106 } | 144 } |
| 107 log.Fields{ | 145 log.Fields{ |
| 108 "maxLogAge": cfg.MaxLogAge, | 146 "maxLogAge": cfg.MaxLogAge, |
| 109 » }.Infof(s.ctx, "Set maximum log age.") | 147 » }.Infof(s, "Set maximum log age.") |
| 110 return nil | 148 return nil |
| 111 } | 149 } |
| 112 | 150 |
| 113 func (s *btStorage) Put(r *storage.PutRequest) error { | 151 func (s *btStorage) Put(r storage.PutRequest) error { |
| 114 » rk := newRowKey(string(r.Path), int64(r.Index)) | 152 » rw := rowWriter{ |
| 115 » ctx := log.SetFields(s.ctx, log.Fields{ | 153 » » threshold: s.maxRowSize, |
| 116 » » "rowKey": rk, | 154 » } |
| 117 » » "path": r.Path, | |
| 118 » » "index": r.Index, | |
| 119 » » "size": len(r.Value), | |
| 120 » }) | |
| 121 » log.Debugf(ctx, "Adding entry to BigTable.") | |
| 122 | 155 |
| 123 » return s.table.putLogData(ctx, rk, r.Value) | 156 » for len(r.Values) > 0 { |
| 157 » » // Add the next entry to the writer. | |
| 158 » » if appended := rw.append(r.Values[0]); !appended { | |
| 159 » » » // We have failed to append our maximum BigTable row siz e. Flush any | |
| 160 » » » // currently-buffered row data and try again with an emp ty buffer. | |
| 161 » » » count, err := rw.flush(s, s.table, r.Index, r.Path) | |
| 162 » » » if err != nil { | |
| 163 » » » » return err | |
| 164 » » » } | |
| 165 | |
| 166 » » » if count == 0 { | |
| 167 » » » » // Nothing was buffered, but we still couldn't a ppend an entry. The | |
| 168 » » » » // current entry is too large by itself, so we m ust fail. | |
| 169 » » » » return fmt.Errorf("single row entry exceeds maxi mum size (%d > %d)", len(r.Values[0]), bigTableRowMaxBytes) | |
| 170 » » » } | |
| 171 | |
| 172 » » » r.Index += types.MessageIndex(count) | |
| 173 » » » continue | |
| 174 » » } | |
| 175 | |
| 176 » » // We successfully appended this entry, so advance. | |
| 177 » » r.Values = r.Values[1:] | |
| 178 » } | |
| 179 | |
| 180 » // Flush any buffered rows. | |
| 181 » if _, err := rw.flush(s, s.table, r.Index, r.Path); err != nil { | |
| 182 » » return err | |
| 183 » } | |
| 184 » return nil | |
| 124 } | 185 } |
| 125 | 186 |
| 126 func (s *btStorage) Get(r *storage.GetRequest, cb storage.GetCallback) error { | 187 func (s *btStorage) Get(r storage.GetRequest, cb storage.GetCallback) error { |
| 127 startKey := newRowKey(string(r.Path), int64(r.Index)) | 188 startKey := newRowKey(string(r.Path), int64(r.Index)) |
| 128 » c := log.SetFields(s.ctx, log.Fields{ | 189 » ctx := log.SetFields(s, log.Fields{ |
| 129 "path": r.Path, | 190 "path": r.Path, |
| 130 "index": r.Index, | 191 "index": r.Index, |
| 131 "startRowKey": startKey, | 192 "startRowKey": startKey, |
| 132 }) | 193 }) |
| 133 | 194 |
| 134 » err := s.table.getLogData(c, startKey, r.Limit, false, func(rk *rowKey, data []byte) error { | 195 » limit := r.Limit |
| 196 » err := s.table.getLogData(ctx, startKey, r.Limit, false, func(rk *rowKey , data []byte) error { | |
| 135 // Does this key match our requested log stream? If not, we've m oved past | 197 // Does this key match our requested log stream? If not, we've m oved past |
| 136 // this stream's records and must stop iteration. | 198 // this stream's records and must stop iteration. |
| 137 if !rk.sharesPathWith(startKey) { | 199 if !rk.sharesPathWith(startKey) { |
| 138 return errStop | 200 return errStop |
| 139 } | 201 } |
| 140 | 202 |
| 141 » » // We have a row. Invoke our callback. | 203 » » // We have a row. Split it into individual records. |
| 142 » » if !cb(types.MessageIndex(rk.index), data) { | 204 » » records, err := recordio.Split(data) |
| 143 » » » return errStop | 205 » » if err != nil { |
| 206 » » » return storage.ErrBadData | |
| 207 » » } | |
| 208 | |
| 209 » » // Issue our callback for each row. Since we index the row on th e LAST entry | |
| 210 » » // in the row, count backwards to get the index of the first ent ry. | |
| 211 » » firstIndex := types.MessageIndex(rk.index - int64(len(records)) + 1) | |
| 212 » » if firstIndex < 0 { | |
| 213 » » » return storage.ErrBadData | |
| 214 » » } | |
| 215 » » for i, row := range records { | |
| 216 » » » index := firstIndex + types.MessageIndex(i) | |
| 217 » » » if index < r.Index { | |
| 218 » » » » // An offset was specified, and this row is befo re it, so skip. | |
| 219 » » » » continue | |
| 220 » » » } | |
| 221 | |
| 222 » » » if !cb(index, row) { | |
| 223 » » » » return errStop | |
| 224 » » » } | |
| 225 | |
| 226 » » » // Artificially apply limit within our row records. | |
| 227 » » » if limit > 0 { | |
| 228 » » » » limit-- | |
| 229 » » » » if limit == 0 { | |
| 230 » » » » » return errStop | |
| 231 » » » » } | |
| 232 » » » } | |
| 144 } | 233 } |
| 145 return nil | 234 return nil |
| 146 }) | 235 }) |
| 147 » if err != nil { | 236 |
| 148 » » log.Fields{ | 237 » switch err { |
| 149 » » » log.ErrorKey: err, | 238 » case nil, errStop: |
| 150 » » » "project": s.Project, | 239 » » return nil |
| 151 » » » "zone": s.Zone, | 240 |
| 152 » » » "cluster": s.Cluster, | 241 » default: |
| 153 » » » "table": s.LogTable, | 242 » » log.WithError(err).Errorf(ctx, "Failed to retrieve row range.") |
| 154 » » }.Errorf(c, "Failed to retrieve row range.") | |
| 155 return err | 243 return err |
| 156 } | 244 } |
| 157 return nil | |
| 158 } | 245 } |
| 159 | 246 |
| 160 func (s *btStorage) Tail(p types.StreamPath) ([]byte, types.MessageIndex, error) { | 247 func (s *btStorage) Tail(p types.StreamPath) ([]byte, types.MessageIndex, error) { |
| 161 » c := log.SetFields(s.ctx, log.Fields{ | 248 » ctx := log.SetFields(s, log.Fields{ |
| 162 "path": p, | 249 "path": p, |
| 163 }) | 250 }) |
| 164 | 251 |
| 165 // Iterate through all log keys in the stream. Record the latest one. | 252 // Iterate through all log keys in the stream. Record the latest one. |
| 166 rk := newRowKey(string(p), 0) | 253 rk := newRowKey(string(p), 0) |
| 167 var latest *rowKey | 254 var latest *rowKey |
| 168 » err := s.table.getLogData(c, rk, 0, true, func(rk *rowKey, data []byte) error { | 255 » err := s.table.getLogData(ctx, rk, 0, true, func(rk *rowKey, data []byte ) error { |
| 169 latest = rk | 256 latest = rk |
| 170 return nil | 257 return nil |
| 171 }) | 258 }) |
| 172 if err != nil { | 259 if err != nil { |
| 173 log.Fields{ | 260 log.Fields{ |
| 174 log.ErrorKey: err, | 261 log.ErrorKey: err, |
| 175 "project": s.Project, | 262 "project": s.Project, |
| 176 "zone": s.Zone, | 263 "zone": s.Zone, |
| 177 "cluster": s.Cluster, | 264 "cluster": s.Cluster, |
| 178 "table": s.LogTable, | 265 "table": s.LogTable, |
| 179 » » }.Errorf(c, "Failed to scan for tail.") | 266 » » }.Errorf(ctx, "Failed to scan for tail.") |
| 180 } | 267 } |
| 181 | 268 |
| 182 if latest == nil { | 269 if latest == nil { |
| 183 // No rows for the specified stream. | 270 // No rows for the specified stream. |
| 184 return nil, 0, storage.ErrDoesNotExist | 271 return nil, 0, storage.ErrDoesNotExist |
| 185 } | 272 } |
| 186 | 273 |
| 187 // Fetch the latest row's data. | 274 // Fetch the latest row's data. |
| 188 var d []byte | 275 var d []byte |
| 189 » err = s.table.getLogData(c, latest, 1, false, func(rk *rowKey, data []by te) error { | 276 » err = s.table.getLogData(ctx, latest, 1, false, func(rk *rowKey, data [] byte) error { |
| 190 » » d = data | 277 » » records, err := recordio.Split(data) |
| 278 » » if err != nil || len(records) == 0 { | |
| 279 » » » return storage.ErrBadData | |
| 280 » » } | |
| 281 » » d = records[len(records)-1] | |
| 191 return errStop | 282 return errStop |
| 192 }) | 283 }) |
| 193 » if err != nil { | 284 » if err != nil && err != errStop { |
| 194 log.Fields{ | 285 log.Fields{ |
| 195 log.ErrorKey: err, | 286 log.ErrorKey: err, |
| 196 "project": s.Project, | 287 "project": s.Project, |
| 197 "zone": s.Zone, | 288 "zone": s.Zone, |
| 198 "cluster": s.Cluster, | 289 "cluster": s.Cluster, |
| 199 "table": s.LogTable, | 290 "table": s.LogTable, |
| 200 » » }.Errorf(c, "Failed to retrieve tail row.") | 291 » » }.Errorf(ctx, "Failed to retrieve tail row.") |
| 201 } | 292 } |
| 202 | 293 |
| 203 return d, types.MessageIndex(latest.index), nil | 294 return d, types.MessageIndex(latest.index), nil |
| 204 } | 295 } |
| 205 | 296 |
| 206 func (s *btStorage) getClient() (*bigtable.Client, error) { | 297 // rowWriter facilitates writing several consecutive data values to a single |
| 207 » if s.client == nil { | 298 // BigTable row. |
| 208 » » var err error | 299 type rowWriter struct { |
| 209 » » if s.client, err = bigtable.NewClient(s.ctx, s.Project, s.Zone, s.Cluster, s.ClientOptions...); err != nil { | 300 » // buf is the current set of buffered data. |
| 210 » » » return nil, fmt.Errorf("failed to create client: %s", er r) | 301 » buf bytes.Buffer |
| 211 » » } | 302 |
| 212 » } | 303 » // count is the number of rows in the writer. |
| 213 » return s.client, nil | 304 » count int |
| 305 | |
| 306 » // threshold is the maximum number of bytes that we can write. | |
| 307 » threshold int | |
| 214 } | 308 } |
| 215 | 309 |
| 216 func (s *btStorage) getAdminClient() (*bigtable.AdminClient, error) { | 310 func (w *rowWriter) append(d []byte) (appended bool) { |
| 217 » if s.adminClient == nil { | 311 » origSize := w.buf.Len() |
| 218 » » var err error | 312 » defer func() { |
| 219 » » if s.adminClient, err = bigtable.NewAdminClient(s.ctx, s.Project , s.Zone, s.Cluster, s.ClientOptions...); err != nil { | 313 » » // Restore our previous buffer state if we are reporting the wri te as |
| 220 » » » return nil, fmt.Errorf("failed to create client: %s", er r) | 314 » » // failed. |
| 315 » » if !appended { | |
| 316 » » » w.buf.Truncate(origSize) | |
| 221 } | 317 } |
| 318 }() | |
| 319 | |
| 320 // Serialize the next entry as a recordio blob. | |
| 321 if _, err := recordio.WriteFrame(&w.buf, d); err != nil { | |
| 322 return | |
| 222 } | 323 } |
| 223 » return s.adminClient, nil | 324 |
| 325 » // If we have exceeded our threshold, report a failure. | |
| 326 » appended = (w.buf.Len() <= w.threshold) | |
| 327 » if appended { | |
| 328 » » w.count++ | |
| 329 » } | |
| 330 » return | |
| 224 } | 331 } |
| 225 | 332 |
| 226 // getLogTable returns a btTable instance. If one is not already configured, a | 333 func (w *rowWriter) flush(ctx context.Context, table btTable, index types.Messag eIndex, path types.StreamPath) (int, error) { |
| 227 // production instance will be generated and cached. | 334 » flushCount := w.count |
| 228 func (s *btStorage) getLogTable() (*bigtable.Table, error) { | 335 » if flushCount == 0 { |
| 229 » if s.logTable == nil { | 336 » » return 0, nil |
| 230 » » client, err := s.getClient() | |
| 231 » » if err != nil { | |
| 232 » » » return nil, err | |
| 233 » » } | |
| 234 » » s.logTable = client.Open(s.LogTable) | |
| 235 } | 337 } |
| 236 » return s.logTable, nil | 338 |
| 339 » // Write the current set of buffered rows to the table. Index on the LAS T | |
| 340 » // row index. | |
| 341 » lastIndex := int64(index) + int64(flushCount) - 1 | |
| 342 » rk := newRowKey(string(path), lastIndex) | |
| 343 | |
| 344 » log.Fields{ | |
| 345 » » "rowKey": rk, | |
| 346 » » "path": path, | |
| 347 » » "index": index, | |
| 348 » » "lastIndex": lastIndex, | |
| 349 » » "count": w.count, | |
| 350 » » "size": w.buf.Len(), | |
| 351 » }.Debugf(ctx, "Adding entries to BigTable.") | |
| 352 » if err := table.putLogData(ctx, rk, w.buf.Bytes()); err != nil { | |
| 353 » » return 0, err | |
| 354 » } | |
| 355 | |
| 356 » // Reset our buffer state. | |
| 357 » w.buf.Reset() | |
| 358 » w.count = 0 | |
| 359 » return flushCount, nil | |
| 237 } | 360 } |
| OLD | NEW |