| OLD | NEW |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 package bigtable | 5 package bigtable |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "bytes" |
| 9 "errors" |
| 8 "fmt" | 10 "fmt" |
| 9 | 11 |
| 10 "github.com/luci/luci-go/common/logdog/types" | 12 "github.com/luci/luci-go/common/logdog/types" |
| 11 log "github.com/luci/luci-go/common/logging" | 13 log "github.com/luci/luci-go/common/logging" |
| 14 "github.com/luci/luci-go/common/recordio" |
| 12 "github.com/luci/luci-go/server/logdog/storage" | 15 "github.com/luci/luci-go/server/logdog/storage" |
| 13 "golang.org/x/net/context" | 16 "golang.org/x/net/context" |
| 14 "google.golang.org/cloud" | 17 "google.golang.org/cloud" |
| 15 "google.golang.org/cloud/bigtable" | 18 "google.golang.org/cloud/bigtable" |
| 16 ) | 19 ) |
| 17 | 20 |
| 18 var ( | 21 var ( |
| 19 // StorageScopes is the set of OAuth scopes needed to use the storage | 22 // StorageScopes is the set of OAuth scopes needed to use the storage |
| 20 // functionality. | 23 // functionality. |
| 21 StorageScopes = []string{ | 24 StorageScopes = []string{ |
| (...skipping 15 matching lines...) Expand all Loading... |
| 37 // | 40 // |
| 38 // This is simply the maximum number of rows (limit). The actual number
of | 41 // This is simply the maximum number of rows (limit). The actual number
of |
| 39 // rows will be further constrained by tailRowMaxSize. | 42 // rows will be further constrained by tailRowMaxSize. |
| 40 tailRowCount = 128 | 43 tailRowCount = 128 |
| 41 | 44 |
| 42 // tailRowMaxSize is the maximum number of bytes of tail row data that w
ill be | 45 // tailRowMaxSize is the maximum number of bytes of tail row data that w
ill be |
| 43 // buffered during Tail row reading. | 46 // buffered during Tail row reading. |
| 44 tailRowMaxSize = 1024 * 1024 * 16 | 47 tailRowMaxSize = 1024 * 1024 * 16 |
| 45 ) | 48 ) |
| 46 | 49 |
| 50 // errStop is an internal sentinel error used to indicate "stop iteration" |
| 51 // to btTable.getLogData iterator. It will |
| 52 var errStop = errors.New("stop") |
| 53 |
| 47 // Options is a set of configuration options for BigTable storage. | 54 // Options is a set of configuration options for BigTable storage. |
| 48 type Options struct { | 55 type Options struct { |
| 49 // Project is the name of the project to connect to. | 56 // Project is the name of the project to connect to. |
| 50 Project string | 57 Project string |
| 51 // Zone is the name of the zone to connect to. | 58 // Zone is the name of the zone to connect to. |
| 52 Zone string | 59 Zone string |
| 53 // Cluster is the name of the cluster to connect to. | 60 // Cluster is the name of the cluster to connect to. |
| 54 Cluster string | 61 Cluster string |
| 55 // ClientOptions are additional client options to use when instantiating
the | 62 // ClientOptions are additional client options to use when instantiating
the |
| 56 // client instance. | 63 // client instance. |
| 57 ClientOptions []cloud.ClientOption | 64 ClientOptions []cloud.ClientOption |
| 58 | 65 |
| 59 // Table is the name of the BigTable table to use for logs. | 66 // Table is the name of the BigTable table to use for logs. |
| 60 LogTable string | 67 LogTable string |
| 61 } | 68 } |
| 62 | 69 |
| 70 func (o *Options) client(ctx context.Context) (*bigtable.Client, error) { |
| 71 return bigtable.NewClient(ctx, o.Project, o.Zone, o.Cluster, o.ClientOpt
ions...) |
| 72 } |
| 73 |
| 74 func (o *Options) adminClient(ctx context.Context) (*bigtable.AdminClient, error
) { |
| 75 return bigtable.NewAdminClient(ctx, o.Project, o.Zone, o.Cluster, o.Clie
ntOptions...) |
| 76 } |
| 77 |
| 63 // btStorage is a storage.Storage implementation that uses Google Cloud BigTable | 78 // btStorage is a storage.Storage implementation that uses Google Cloud BigTable |
| 64 // as a backend. | 79 // as a backend. |
| 65 type btStorage struct { | 80 type btStorage struct { |
| 66 *Options | 81 *Options |
| 67 | 82 |
| 68 » ctx context.Context | 83 » // Context is the bound supplied with New. It is retained (rather than |
| 84 » // supplied on a per-call basis) because a special Storage Context devoi
d of |
| 85 » // gRPC metadata is needed for Storage calls. |
| 86 » context.Context |
| 69 | 87 |
| 70 client *bigtable.Client | 88 client *bigtable.Client |
| 71 logTable *bigtable.Table | 89 logTable *bigtable.Table |
| 72 adminClient *bigtable.AdminClient | 90 adminClient *bigtable.AdminClient |
| 73 | 91 |
| 74 » table btTable | 92 » // raw is the underlying btTable instance to use for raw operations. |
| 93 » raw btTable |
| 94 » // maxRowSize is the maxmium number of bytes that can be stored in a sin
gle |
| 95 » // BigTable row. This is a function of BigTable, and constant in product
ion |
| 96 » // (bigTableRowMaxBytes), but variable here to allow for testing to cont
rol. |
| 97 » maxRowSize int |
| 75 } | 98 } |
| 76 | 99 |
| 77 // New instantiates a new Storage instance connected to a BigTable cluster. | 100 // New instantiates a new Storage instance connected to a BigTable cluster. |
| 78 // | 101 // |
| 79 // The returned Storage instance will close the Client when its Close() method | 102 // The returned Storage instance will close the Client when its Close() method |
| 80 // is called. | 103 // is called. |
| 81 func New(ctx context.Context, o Options) storage.Storage { | 104 func New(ctx context.Context, o Options) (storage.Storage, error) { |
| 105 » client, err := o.client(ctx) |
| 106 » if err != nil { |
| 107 » » return nil, err |
| 108 » } |
| 109 |
| 110 » admin, err := o.adminClient(ctx) |
| 111 » if err != nil { |
| 112 » » return nil, err |
| 113 » } |
| 114 |
| 115 » return newBTStorage(ctx, o, client, admin), nil |
| 116 } |
| 117 |
| 118 func newBTStorage(ctx context.Context, o Options, client *bigtable.Client, admin
Client *bigtable.AdminClient) *btStorage { |
| 82 st := &btStorage{ | 119 st := &btStorage{ |
| 83 Options: &o, | 120 Options: &o, |
| 84 » » ctx: ctx, | 121 » » Context: ctx, |
| 122 |
| 123 » » client: client, |
| 124 » » logTable: client.Open(o.LogTable), |
| 125 » » adminClient: adminClient, |
| 126 » » maxRowSize: bigTableRowMaxBytes, |
| 85 } | 127 } |
| 86 » st.table = &btTableProd{st} | 128 » st.raw = &btTableProd{st} |
| 87 return st | 129 return st |
| 88 } | 130 } |
| 89 | 131 |
| 90 func (s *btStorage) Close() { | 132 func (s *btStorage) Close() { |
| 91 if s.client != nil { | 133 if s.client != nil { |
| 92 s.client.Close() | 134 s.client.Close() |
| 93 s.client = nil | 135 s.client = nil |
| 94 } | 136 } |
| 95 | 137 |
| 96 if s.adminClient != nil { | 138 if s.adminClient != nil { |
| 97 s.adminClient.Close() | 139 s.adminClient.Close() |
| 98 s.adminClient = nil | 140 s.adminClient = nil |
| 99 } | 141 } |
| 100 } | 142 } |
| 101 | 143 |
| 102 func (s *btStorage) Config(cfg storage.Config) error { | 144 func (s *btStorage) Config(cfg storage.Config) error { |
| 103 » if err := s.table.setMaxLogAge(s.ctx, cfg.MaxLogAge); err != nil { | 145 » if err := s.raw.setMaxLogAge(s, cfg.MaxLogAge); err != nil { |
| 104 » » log.WithError(err).Errorf(s.ctx, "Failed to set 'log' GC policy.
") | 146 » » log.WithError(err).Errorf(s, "Failed to set 'log' GC policy.") |
| 105 return err | 147 return err |
| 106 } | 148 } |
| 107 log.Fields{ | 149 log.Fields{ |
| 108 "maxLogAge": cfg.MaxLogAge, | 150 "maxLogAge": cfg.MaxLogAge, |
| 109 » }.Infof(s.ctx, "Set maximum log age.") | 151 » }.Infof(s, "Set maximum log age.") |
| 110 return nil | 152 return nil |
| 111 } | 153 } |
| 112 | 154 |
| 113 func (s *btStorage) Put(r *storage.PutRequest) error { | 155 func (s *btStorage) Put(r storage.PutRequest) error { |
| 114 » rk := newRowKey(string(r.Path), int64(r.Index)) | 156 » rw := rowWriter{ |
| 115 » ctx := log.SetFields(s.ctx, log.Fields{ | 157 » » threshold: s.maxRowSize, |
| 116 » » "rowKey": rk, | 158 » } |
| 117 » » "path": r.Path, | |
| 118 » » "index": r.Index, | |
| 119 » » "size": len(r.Value), | |
| 120 » }) | |
| 121 » log.Debugf(ctx, "Adding entry to BigTable.") | |
| 122 | 159 |
| 123 » return s.table.putLogData(ctx, rk, r.Value) | 160 » for len(r.Values) > 0 { |
| 161 » » // Add the next entry to the writer. |
| 162 » » if appended := rw.append(r.Values[0]); !appended { |
| 163 » » » // We have failed to append our maximum BigTable row siz
e. Flush any |
| 164 » » » // currently-buffered row data and try again with an emp
ty buffer. |
| 165 » » » count, err := rw.flush(s, s.raw, r.Index, r.Path) |
| 166 » » » if err != nil { |
| 167 » » » » return err |
| 168 » » » } |
| 169 |
| 170 » » » if count == 0 { |
| 171 » » » » // Nothing was buffered, but we still couldn't a
ppend an entry. The |
| 172 » » » » // current entry is too large by itself, so we m
ust fail. |
| 173 » » » » return fmt.Errorf("single row entry exceeds maxi
mum size (%d > %d)", len(r.Values[0]), bigTableRowMaxBytes) |
| 174 » » » } |
| 175 |
| 176 » » » r.Index += types.MessageIndex(count) |
| 177 » » » continue |
| 178 » » } |
| 179 |
| 180 » » // We successfully appended this entry, so advance. |
| 181 » » r.Values = r.Values[1:] |
| 182 » } |
| 183 |
| 184 » // Flush any buffered rows. |
| 185 » if _, err := rw.flush(s, s.raw, r.Index, r.Path); err != nil { |
| 186 » » return err |
| 187 » } |
| 188 » return nil |
| 124 } | 189 } |
| 125 | 190 |
| 126 func (s *btStorage) Get(r *storage.GetRequest, cb storage.GetCallback) error { | 191 func (s *btStorage) Get(r storage.GetRequest, cb storage.GetCallback) error { |
| 127 startKey := newRowKey(string(r.Path), int64(r.Index)) | 192 startKey := newRowKey(string(r.Path), int64(r.Index)) |
| 128 » c := log.SetFields(s.ctx, log.Fields{ | 193 » ctx := log.SetFields(s, log.Fields{ |
| 129 "path": r.Path, | 194 "path": r.Path, |
| 130 "index": r.Index, | 195 "index": r.Index, |
| 131 "startRowKey": startKey, | 196 "startRowKey": startKey, |
| 132 }) | 197 }) |
| 133 | 198 |
| 134 » err := s.table.getLogData(c, startKey, r.Limit, false, func(rk *rowKey,
data []byte) error { | 199 » limit := r.Limit |
| 200 » err := s.raw.getLogData(ctx, startKey, r.Limit, false, func(rk *rowKey,
data []byte) error { |
| 135 // Does this key match our requested log stream? If not, we've m
oved past | 201 // Does this key match our requested log stream? If not, we've m
oved past |
| 136 // this stream's records and must stop iteration. | 202 // this stream's records and must stop iteration. |
| 137 if !rk.sharesPathWith(startKey) { | 203 if !rk.sharesPathWith(startKey) { |
| 138 return errStop | 204 return errStop |
| 139 } | 205 } |
| 140 | 206 |
| 141 » » // We have a row. Invoke our callback. | 207 » » // We have a row. Split it into individual records. |
| 142 » » if !cb(types.MessageIndex(rk.index), data) { | 208 » » records, err := recordio.Split(data) |
| 143 » » » return errStop | 209 » » if err != nil { |
| 210 » » » return storage.ErrBadData |
| 211 » » } |
| 212 |
| 213 » » // Issue our callback for each row. Since we index the row on th
e LAST entry |
| 214 » » // in the row, count backwards to get the index of the first ent
ry. |
| 215 » » firstIndex := types.MessageIndex(rk.index - int64(len(records))
+ 1) |
| 216 » » if firstIndex < 0 { |
| 217 » » » return storage.ErrBadData |
| 218 » » } |
| 219 » » for i, row := range records { |
| 220 » » » index := firstIndex + types.MessageIndex(i) |
| 221 » » » if index < r.Index { |
| 222 » » » » // An offset was specified, and this row is befo
re it, so skip. |
| 223 » » » » continue |
| 224 » » » } |
| 225 |
| 226 » » » if !cb(index, row) { |
| 227 » » » » return errStop |
| 228 » » » } |
| 229 |
| 230 » » » // Artificially apply limit within our row records. |
| 231 » » » if limit > 0 { |
| 232 » » » » limit-- |
| 233 » » » » if limit == 0 { |
| 234 » » » » » return errStop |
| 235 » » » » } |
| 236 » » » } |
| 144 } | 237 } |
| 145 return nil | 238 return nil |
| 146 }) | 239 }) |
| 147 » if err != nil { | 240 |
| 148 » » log.Fields{ | 241 » switch err { |
| 149 » » » log.ErrorKey: err, | 242 » case nil, errStop: |
| 150 » » » "project": s.Project, | 243 » » return nil |
| 151 » » » "zone": s.Zone, | 244 |
| 152 » » » "cluster": s.Cluster, | 245 » default: |
| 153 » » » "table": s.LogTable, | 246 » » log.WithError(err).Errorf(ctx, "Failed to retrieve row range.") |
| 154 » » }.Errorf(c, "Failed to retrieve row range.") | |
| 155 return err | 247 return err |
| 156 } | 248 } |
| 157 return nil | |
| 158 } | 249 } |
| 159 | 250 |
| 160 func (s *btStorage) Tail(p types.StreamPath) ([]byte, types.MessageIndex, error)
{ | 251 func (s *btStorage) Tail(p types.StreamPath) ([]byte, types.MessageIndex, error)
{ |
| 161 » c := log.SetFields(s.ctx, log.Fields{ | 252 » ctx := log.SetFields(s, log.Fields{ |
| 162 "path": p, | 253 "path": p, |
| 163 }) | 254 }) |
| 164 | 255 |
| 165 // Iterate through all log keys in the stream. Record the latest one. | 256 // Iterate through all log keys in the stream. Record the latest one. |
| 166 rk := newRowKey(string(p), 0) | 257 rk := newRowKey(string(p), 0) |
| 167 var latest *rowKey | 258 var latest *rowKey |
| 168 » err := s.table.getLogData(c, rk, 0, true, func(rk *rowKey, data []byte)
error { | 259 » err := s.raw.getLogData(ctx, rk, 0, true, func(rk *rowKey, data []byte)
error { |
| 169 latest = rk | 260 latest = rk |
| 170 return nil | 261 return nil |
| 171 }) | 262 }) |
| 172 if err != nil { | 263 if err != nil { |
| 173 log.Fields{ | 264 log.Fields{ |
| 174 log.ErrorKey: err, | 265 log.ErrorKey: err, |
| 175 "project": s.Project, | 266 "project": s.Project, |
| 176 "zone": s.Zone, | 267 "zone": s.Zone, |
| 177 "cluster": s.Cluster, | 268 "cluster": s.Cluster, |
| 178 "table": s.LogTable, | 269 "table": s.LogTable, |
| 179 » » }.Errorf(c, "Failed to scan for tail.") | 270 » » }.Errorf(ctx, "Failed to scan for tail.") |
| 180 } | 271 } |
| 181 | 272 |
| 182 if latest == nil { | 273 if latest == nil { |
| 183 // No rows for the specified stream. | 274 // No rows for the specified stream. |
| 184 return nil, 0, storage.ErrDoesNotExist | 275 return nil, 0, storage.ErrDoesNotExist |
| 185 } | 276 } |
| 186 | 277 |
| 187 // Fetch the latest row's data. | 278 // Fetch the latest row's data. |
| 188 var d []byte | 279 var d []byte |
| 189 » err = s.table.getLogData(c, latest, 1, false, func(rk *rowKey, data []by
te) error { | 280 » err = s.raw.getLogData(ctx, latest, 1, false, func(rk *rowKey, data []by
te) error { |
| 190 » » d = data | 281 » » records, err := recordio.Split(data) |
| 282 » » if err != nil || len(records) == 0 { |
| 283 » » » return storage.ErrBadData |
| 284 » » } |
| 285 » » d = records[len(records)-1] |
| 191 return errStop | 286 return errStop |
| 192 }) | 287 }) |
| 193 » if err != nil { | 288 » if err != nil && err != errStop { |
| 194 log.Fields{ | 289 log.Fields{ |
| 195 log.ErrorKey: err, | 290 log.ErrorKey: err, |
| 196 "project": s.Project, | 291 "project": s.Project, |
| 197 "zone": s.Zone, | 292 "zone": s.Zone, |
| 198 "cluster": s.Cluster, | 293 "cluster": s.Cluster, |
| 199 "table": s.LogTable, | 294 "table": s.LogTable, |
| 200 » » }.Errorf(c, "Failed to retrieve tail row.") | 295 » » }.Errorf(ctx, "Failed to retrieve tail row.") |
| 201 } | 296 } |
| 202 | 297 |
| 203 return d, types.MessageIndex(latest.index), nil | 298 return d, types.MessageIndex(latest.index), nil |
| 204 } | 299 } |
| 205 | 300 |
| 206 func (s *btStorage) getClient() (*bigtable.Client, error) { | 301 // rowWriter facilitates writing several consecutive data values to a single |
| 207 » if s.client == nil { | 302 // BigTable row. |
| 208 » » var err error | 303 type rowWriter struct { |
| 209 » » if s.client, err = bigtable.NewClient(s.ctx, s.Project, s.Zone,
s.Cluster, s.ClientOptions...); err != nil { | 304 » // buf is the current set of buffered data. |
| 210 » » » return nil, fmt.Errorf("failed to create client: %s", er
r) | 305 » buf bytes.Buffer |
| 211 » » } | 306 |
| 212 » } | 307 » // count is the number of rows in the writer. |
| 213 » return s.client, nil | 308 » count int |
| 309 |
| 310 » // threshold is the maximum number of bytes that we can write. |
| 311 » threshold int |
| 214 } | 312 } |
| 215 | 313 |
| 216 func (s *btStorage) getAdminClient() (*bigtable.AdminClient, error) { | 314 func (w *rowWriter) append(d []byte) (appended bool) { |
| 217 » if s.adminClient == nil { | 315 » origSize := w.buf.Len() |
| 218 » » var err error | 316 » defer func() { |
| 219 » » if s.adminClient, err = bigtable.NewAdminClient(s.ctx, s.Project
, s.Zone, s.Cluster, s.ClientOptions...); err != nil { | 317 » » // Restore our previous buffer state if we are reporting the wri
te as |
| 220 » » » return nil, fmt.Errorf("failed to create client: %s", er
r) | 318 » » // failed. |
| 319 » » if !appended { |
| 320 » » » w.buf.Truncate(origSize) |
| 221 } | 321 } |
| 322 }() |
| 323 |
| 324 // Serialize the next entry as a recordio blob. |
| 325 if _, err := recordio.WriteFrame(&w.buf, d); err != nil { |
| 326 return |
| 222 } | 327 } |
| 223 » return s.adminClient, nil | 328 |
| 329 » // If we have exceeded our threshold, report a failure. |
| 330 » appended = (w.buf.Len() <= w.threshold) |
| 331 » if appended { |
| 332 » » w.count++ |
| 333 » } |
| 334 » return |
| 224 } | 335 } |
| 225 | 336 |
| 226 // getLogTable returns a btTable instance. If one is not already configured, a | 337 func (w *rowWriter) flush(ctx context.Context, raw btTable, index types.MessageI
ndex, path types.StreamPath) (int, error) { |
| 227 // production instance will be generated and cached. | 338 » flushCount := w.count |
| 228 func (s *btStorage) getLogTable() (*bigtable.Table, error) { | 339 » if flushCount == 0 { |
| 229 » if s.logTable == nil { | 340 » » return 0, nil |
| 230 » » client, err := s.getClient() | |
| 231 » » if err != nil { | |
| 232 » » » return nil, err | |
| 233 » » } | |
| 234 » » s.logTable = client.Open(s.LogTable) | |
| 235 } | 341 } |
| 236 » return s.logTable, nil | 342 |
| 343 » // Write the current set of buffered rows to the table. Index on the LAS
T |
| 344 » // row index. |
| 345 » lastIndex := int64(index) + int64(flushCount) - 1 |
| 346 » rk := newRowKey(string(path), lastIndex) |
| 347 |
| 348 » log.Fields{ |
| 349 » » "rowKey": rk, |
| 350 » » "path": path, |
| 351 » » "index": index, |
| 352 » » "lastIndex": lastIndex, |
| 353 » » "count": w.count, |
| 354 » » "size": w.buf.Len(), |
| 355 » }.Debugf(ctx, "Adding entries to BigTable.") |
| 356 » if err := raw.putLogData(ctx, rk, w.buf.Bytes()); err != nil { |
| 357 » » return 0, err |
| 358 » } |
| 359 |
| 360 » // Reset our buffer state. |
| 361 » w.buf.Reset() |
| 362 » w.count = 0 |
| 363 » return flushCount, nil |
| 237 } | 364 } |
| OLD | NEW |