Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1)

Side by Side Diff: server/logdog/storage/bigtable/storage.go

Issue 1838803002: LogDog: BigTable batching schema. (Closed) Base URL: https://github.com/luci/luci-go@recordio-split
Patch Set: Minor comments and quality of code tweaks. Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 package bigtable 5 package bigtable
6 6
7 import ( 7 import (
8 "bytes"
9 "errors"
8 "fmt" 10 "fmt"
9 11
10 "github.com/luci/luci-go/common/logdog/types" 12 "github.com/luci/luci-go/common/logdog/types"
11 log "github.com/luci/luci-go/common/logging" 13 log "github.com/luci/luci-go/common/logging"
14 "github.com/luci/luci-go/common/recordio"
12 "github.com/luci/luci-go/server/logdog/storage" 15 "github.com/luci/luci-go/server/logdog/storage"
13 "golang.org/x/net/context" 16 "golang.org/x/net/context"
14 "google.golang.org/cloud" 17 "google.golang.org/cloud"
15 "google.golang.org/cloud/bigtable" 18 "google.golang.org/cloud/bigtable"
16 ) 19 )
17 20
18 var ( 21 var (
19 // StorageScopes is the set of OAuth scopes needed to use the storage 22 // StorageScopes is the set of OAuth scopes needed to use the storage
20 // functionality. 23 // functionality.
21 StorageScopes = []string{ 24 StorageScopes = []string{
(...skipping 15 matching lines...) Expand all
37 // 40 //
38 // This is simply the maximum number of rows (limit). The actual number of 41 // This is simply the maximum number of rows (limit). The actual number of
39 // rows will be further constrained by tailRowMaxSize. 42 // rows will be further constrained by tailRowMaxSize.
40 tailRowCount = 128 43 tailRowCount = 128
41 44
42 // tailRowMaxSize is the maximum number of bytes of tail row data that w ill be 45 // tailRowMaxSize is the maximum number of bytes of tail row data that w ill be
43 // buffered during Tail row reading. 46 // buffered during Tail row reading.
44 tailRowMaxSize = 1024 * 1024 * 16 47 tailRowMaxSize = 1024 * 1024 * 16
45 ) 48 )
46 49
50 // errStop is an internal sentinel error used to indicate "stop iteration"
51 // to btTable.getLogData iterator. It will
52 var errStop = errors.New("stop")
53
47 // Options is a set of configuration options for BigTable storage. 54 // Options is a set of configuration options for BigTable storage.
48 type Options struct { 55 type Options struct {
49 // Project is the name of the project to connect to. 56 // Project is the name of the project to connect to.
50 Project string 57 Project string
51 // Zone is the name of the zone to connect to. 58 // Zone is the name of the zone to connect to.
52 Zone string 59 Zone string
53 // Cluster is the name of the cluster to connect to. 60 // Cluster is the name of the cluster to connect to.
54 Cluster string 61 Cluster string
55 // ClientOptions are additional client options to use when instantiating the 62 // ClientOptions are additional client options to use when instantiating the
56 // client instance. 63 // client instance.
57 ClientOptions []cloud.ClientOption 64 ClientOptions []cloud.ClientOption
58 65
59 // Table is the name of the BigTable table to use for logs. 66 // Table is the name of the BigTable table to use for logs.
60 LogTable string 67 LogTable string
61 } 68 }
62 69
70 func (o *Options) client(ctx context.Context) (*bigtable.Client, error) {
71 return bigtable.NewClient(ctx, o.Project, o.Zone, o.Cluster, o.ClientOpt ions...)
72 }
73
74 func (o *Options) adminClient(ctx context.Context) (*bigtable.AdminClient, error ) {
75 return bigtable.NewAdminClient(ctx, o.Project, o.Zone, o.Cluster, o.Clie ntOptions...)
76 }
77
63 // btStorage is a storage.Storage implementation that uses Google Cloud BigTable 78 // btStorage is a storage.Storage implementation that uses Google Cloud BigTable
64 // as a backend. 79 // as a backend.
65 type btStorage struct { 80 type btStorage struct {
66 *Options 81 *Options
67 82
68 » ctx context.Context 83 » // Context is the bound supplied with New. It is retained (rather than
84 » // supplied on a per-call basis) because a special Storage Context devoi d of
85 » // gRPC metadata is needed for Storage calls.
86 » context.Context
69 87
70 client *bigtable.Client 88 client *bigtable.Client
71 logTable *bigtable.Table 89 logTable *bigtable.Table
72 adminClient *bigtable.AdminClient 90 adminClient *bigtable.AdminClient
73 91
74 » table btTable 92 » // raw is the underlying btTable instance to use for raw operations.
93 » raw btTable
94 » // maxRowSize is the maxmium number of bytes that can be stored in a sin gle
95 » // BigTable row. This is a function of BigTable, and constant in product ion
96 » // (bigTableRowMaxBytes), but variable here to allow for testing to cont rol.
97 » maxRowSize int
75 } 98 }
76 99
77 // New instantiates a new Storage instance connected to a BigTable cluster. 100 // New instantiates a new Storage instance connected to a BigTable cluster.
78 // 101 //
79 // The returned Storage instance will close the Client when its Close() method 102 // The returned Storage instance will close the Client when its Close() method
80 // is called. 103 // is called.
81 func New(ctx context.Context, o Options) storage.Storage { 104 func New(ctx context.Context, o Options) (storage.Storage, error) {
105 » client, err := o.client(ctx)
106 » if err != nil {
107 » » return nil, err
108 » }
109
110 » admin, err := o.adminClient(ctx)
111 » if err != nil {
112 » » return nil, err
113 » }
114
115 » return newBTStorage(ctx, o, client, admin), nil
116 }
117
118 func newBTStorage(ctx context.Context, o Options, client *bigtable.Client, admin Client *bigtable.AdminClient) *btStorage {
82 st := &btStorage{ 119 st := &btStorage{
83 Options: &o, 120 Options: &o,
84 » » ctx: ctx, 121 » » Context: ctx,
122
123 » » client: client,
124 » » logTable: client.Open(o.LogTable),
125 » » adminClient: adminClient,
126 » » maxRowSize: bigTableRowMaxBytes,
85 } 127 }
86 » st.table = &btTableProd{st} 128 » st.raw = &btTableProd{st}
87 return st 129 return st
88 } 130 }
89 131
90 func (s *btStorage) Close() { 132 func (s *btStorage) Close() {
91 if s.client != nil { 133 if s.client != nil {
92 s.client.Close() 134 s.client.Close()
93 s.client = nil 135 s.client = nil
94 } 136 }
95 137
96 if s.adminClient != nil { 138 if s.adminClient != nil {
97 s.adminClient.Close() 139 s.adminClient.Close()
98 s.adminClient = nil 140 s.adminClient = nil
99 } 141 }
100 } 142 }
101 143
102 func (s *btStorage) Config(cfg storage.Config) error { 144 func (s *btStorage) Config(cfg storage.Config) error {
103 » if err := s.table.setMaxLogAge(s.ctx, cfg.MaxLogAge); err != nil { 145 » if err := s.raw.setMaxLogAge(s, cfg.MaxLogAge); err != nil {
104 » » log.WithError(err).Errorf(s.ctx, "Failed to set 'log' GC policy. ") 146 » » log.WithError(err).Errorf(s, "Failed to set 'log' GC policy.")
105 return err 147 return err
106 } 148 }
107 log.Fields{ 149 log.Fields{
108 "maxLogAge": cfg.MaxLogAge, 150 "maxLogAge": cfg.MaxLogAge,
109 » }.Infof(s.ctx, "Set maximum log age.") 151 » }.Infof(s, "Set maximum log age.")
110 return nil 152 return nil
111 } 153 }
112 154
113 func (s *btStorage) Put(r *storage.PutRequest) error { 155 func (s *btStorage) Put(r storage.PutRequest) error {
114 » rk := newRowKey(string(r.Path), int64(r.Index)) 156 » rw := rowWriter{
115 » ctx := log.SetFields(s.ctx, log.Fields{ 157 » » threshold: s.maxRowSize,
116 » » "rowKey": rk, 158 » }
117 » » "path": r.Path,
118 » » "index": r.Index,
119 » » "size": len(r.Value),
120 » })
121 » log.Debugf(ctx, "Adding entry to BigTable.")
122 159
123 » return s.table.putLogData(ctx, rk, r.Value) 160 » for len(r.Values) > 0 {
161 » » // Add the next entry to the writer.
162 » » if appended := rw.append(r.Values[0]); !appended {
163 » » » // We have failed to append our maximum BigTable row siz e. Flush any
164 » » » // currently-buffered row data and try again with an emp ty buffer.
165 » » » count, err := rw.flush(s, s.raw, r.Index, r.Path)
166 » » » if err != nil {
167 » » » » return err
168 » » » }
169
170 » » » if count == 0 {
171 » » » » // Nothing was buffered, but we still couldn't a ppend an entry. The
172 » » » » // current entry is too large by itself, so we m ust fail.
173 » » » » return fmt.Errorf("single row entry exceeds maxi mum size (%d > %d)", len(r.Values[0]), bigTableRowMaxBytes)
174 » » » }
175
176 » » » r.Index += types.MessageIndex(count)
177 » » » continue
178 » » }
179
180 » » // We successfully appended this entry, so advance.
181 » » r.Values = r.Values[1:]
182 » }
183
184 » // Flush any buffered rows.
185 » if _, err := rw.flush(s, s.raw, r.Index, r.Path); err != nil {
186 » » return err
187 » }
188 » return nil
124 } 189 }
125 190
126 func (s *btStorage) Get(r *storage.GetRequest, cb storage.GetCallback) error { 191 func (s *btStorage) Get(r storage.GetRequest, cb storage.GetCallback) error {
127 startKey := newRowKey(string(r.Path), int64(r.Index)) 192 startKey := newRowKey(string(r.Path), int64(r.Index))
128 » c := log.SetFields(s.ctx, log.Fields{ 193 » ctx := log.SetFields(s, log.Fields{
129 "path": r.Path, 194 "path": r.Path,
130 "index": r.Index, 195 "index": r.Index,
131 "startRowKey": startKey, 196 "startRowKey": startKey,
132 }) 197 })
133 198
134 » err := s.table.getLogData(c, startKey, r.Limit, false, func(rk *rowKey, data []byte) error { 199 » limit := r.Limit
200 » err := s.raw.getLogData(ctx, startKey, r.Limit, false, func(rk *rowKey, data []byte) error {
135 // Does this key match our requested log stream? If not, we've m oved past 201 // Does this key match our requested log stream? If not, we've m oved past
136 // this stream's records and must stop iteration. 202 // this stream's records and must stop iteration.
137 if !rk.sharesPathWith(startKey) { 203 if !rk.sharesPathWith(startKey) {
138 return errStop 204 return errStop
139 } 205 }
140 206
141 » » // We have a row. Invoke our callback. 207 » » // We have a row. Split it into individual records.
142 » » if !cb(types.MessageIndex(rk.index), data) { 208 » » records, err := recordio.Split(data)
143 » » » return errStop 209 » » if err != nil {
210 » » » return storage.ErrBadData
211 » » }
212
213 » » // Issue our callback for each row. Since we index the row on th e LAST entry
214 » » // in the row, count backwards to get the index of the first ent ry.
215 » » firstIndex := types.MessageIndex(rk.index - int64(len(records)) + 1)
216 » » if firstIndex < 0 {
217 » » » return storage.ErrBadData
218 » » }
219 » » for i, row := range records {
220 » » » index := firstIndex + types.MessageIndex(i)
221 » » » if index < r.Index {
222 » » » » // An offset was specified, and this row is befo re it, so skip.
223 » » » » continue
224 » » » }
225
226 » » » if !cb(index, row) {
227 » » » » return errStop
228 » » » }
229
230 » » » // Artificially apply limit within our row records.
231 » » » if limit > 0 {
232 » » » » limit--
233 » » » » if limit == 0 {
234 » » » » » return errStop
235 » » » » }
236 » » » }
144 } 237 }
145 return nil 238 return nil
146 }) 239 })
147 » if err != nil { 240
148 » » log.Fields{ 241 » switch err {
149 » » » log.ErrorKey: err, 242 » case nil, errStop:
150 » » » "project": s.Project, 243 » » return nil
151 » » » "zone": s.Zone, 244
152 » » » "cluster": s.Cluster, 245 » default:
153 » » » "table": s.LogTable, 246 » » log.WithError(err).Errorf(ctx, "Failed to retrieve row range.")
154 » » }.Errorf(c, "Failed to retrieve row range.")
155 return err 247 return err
156 } 248 }
157 return nil
158 } 249 }
159 250
160 func (s *btStorage) Tail(p types.StreamPath) ([]byte, types.MessageIndex, error) { 251 func (s *btStorage) Tail(p types.StreamPath) ([]byte, types.MessageIndex, error) {
161 » c := log.SetFields(s.ctx, log.Fields{ 252 » ctx := log.SetFields(s, log.Fields{
162 "path": p, 253 "path": p,
163 }) 254 })
164 255
165 // Iterate through all log keys in the stream. Record the latest one. 256 // Iterate through all log keys in the stream. Record the latest one.
166 rk := newRowKey(string(p), 0) 257 rk := newRowKey(string(p), 0)
167 var latest *rowKey 258 var latest *rowKey
168 » err := s.table.getLogData(c, rk, 0, true, func(rk *rowKey, data []byte) error { 259 » err := s.raw.getLogData(ctx, rk, 0, true, func(rk *rowKey, data []byte) error {
169 latest = rk 260 latest = rk
170 return nil 261 return nil
171 }) 262 })
172 if err != nil { 263 if err != nil {
173 log.Fields{ 264 log.Fields{
174 log.ErrorKey: err, 265 log.ErrorKey: err,
175 "project": s.Project, 266 "project": s.Project,
176 "zone": s.Zone, 267 "zone": s.Zone,
177 "cluster": s.Cluster, 268 "cluster": s.Cluster,
178 "table": s.LogTable, 269 "table": s.LogTable,
179 » » }.Errorf(c, "Failed to scan for tail.") 270 » » }.Errorf(ctx, "Failed to scan for tail.")
180 } 271 }
181 272
182 if latest == nil { 273 if latest == nil {
183 // No rows for the specified stream. 274 // No rows for the specified stream.
184 return nil, 0, storage.ErrDoesNotExist 275 return nil, 0, storage.ErrDoesNotExist
185 } 276 }
186 277
187 // Fetch the latest row's data. 278 // Fetch the latest row's data.
188 var d []byte 279 var d []byte
189 » err = s.table.getLogData(c, latest, 1, false, func(rk *rowKey, data []by te) error { 280 » err = s.raw.getLogData(ctx, latest, 1, false, func(rk *rowKey, data []by te) error {
190 » » d = data 281 » » records, err := recordio.Split(data)
282 » » if err != nil || len(records) == 0 {
283 » » » return storage.ErrBadData
284 » » }
285 » » d = records[len(records)-1]
191 return errStop 286 return errStop
192 }) 287 })
193 » if err != nil { 288 » if err != nil && err != errStop {
194 log.Fields{ 289 log.Fields{
195 log.ErrorKey: err, 290 log.ErrorKey: err,
196 "project": s.Project, 291 "project": s.Project,
197 "zone": s.Zone, 292 "zone": s.Zone,
198 "cluster": s.Cluster, 293 "cluster": s.Cluster,
199 "table": s.LogTable, 294 "table": s.LogTable,
200 » » }.Errorf(c, "Failed to retrieve tail row.") 295 » » }.Errorf(ctx, "Failed to retrieve tail row.")
201 } 296 }
202 297
203 return d, types.MessageIndex(latest.index), nil 298 return d, types.MessageIndex(latest.index), nil
204 } 299 }
205 300
206 func (s *btStorage) getClient() (*bigtable.Client, error) { 301 // rowWriter facilitates writing several consecutive data values to a single
207 » if s.client == nil { 302 // BigTable row.
208 » » var err error 303 type rowWriter struct {
209 » » if s.client, err = bigtable.NewClient(s.ctx, s.Project, s.Zone, s.Cluster, s.ClientOptions...); err != nil { 304 » // buf is the current set of buffered data.
210 » » » return nil, fmt.Errorf("failed to create client: %s", er r) 305 » buf bytes.Buffer
211 » » } 306
212 » } 307 » // count is the number of rows in the writer.
213 » return s.client, nil 308 » count int
309
310 » // threshold is the maximum number of bytes that we can write.
311 » threshold int
214 } 312 }
215 313
216 func (s *btStorage) getAdminClient() (*bigtable.AdminClient, error) { 314 func (w *rowWriter) append(d []byte) (appended bool) {
217 » if s.adminClient == nil { 315 » origSize := w.buf.Len()
218 » » var err error 316 » defer func() {
219 » » if s.adminClient, err = bigtable.NewAdminClient(s.ctx, s.Project , s.Zone, s.Cluster, s.ClientOptions...); err != nil { 317 » » // Restore our previous buffer state if we are reporting the wri te as
220 » » » return nil, fmt.Errorf("failed to create client: %s", er r) 318 » » // failed.
319 » » if !appended {
320 » » » w.buf.Truncate(origSize)
221 } 321 }
322 }()
323
324 // Serialize the next entry as a recordio blob.
325 if _, err := recordio.WriteFrame(&w.buf, d); err != nil {
326 return
222 } 327 }
223 » return s.adminClient, nil 328
329 » // If we have exceeded our threshold, report a failure.
330 » appended = (w.buf.Len() <= w.threshold)
331 » if appended {
332 » » w.count++
333 » }
334 » return
224 } 335 }
225 336
226 // getLogTable returns a btTable instance. If one is not already configured, a 337 func (w *rowWriter) flush(ctx context.Context, raw btTable, index types.MessageI ndex, path types.StreamPath) (int, error) {
227 // production instance will be generated and cached. 338 » flushCount := w.count
228 func (s *btStorage) getLogTable() (*bigtable.Table, error) { 339 » if flushCount == 0 {
229 » if s.logTable == nil { 340 » » return 0, nil
230 » » client, err := s.getClient()
231 » » if err != nil {
232 » » » return nil, err
233 » » }
234 » » s.logTable = client.Open(s.LogTable)
235 } 341 }
236 » return s.logTable, nil 342
343 » // Write the current set of buffered rows to the table. Index on the LAS T
344 » // row index.
345 » lastIndex := int64(index) + int64(flushCount) - 1
346 » rk := newRowKey(string(path), lastIndex)
347
348 » log.Fields{
349 » » "rowKey": rk,
350 » » "path": path,
351 » » "index": index,
352 » » "lastIndex": lastIndex,
353 » » "count": w.count,
354 » » "size": w.buf.Len(),
355 » }.Debugf(ctx, "Adding entries to BigTable.")
356 » if err := raw.putLogData(ctx, rk, w.buf.Bytes()); err != nil {
357 » » return 0, err
358 » }
359
360 » // Reset our buffer state.
361 » w.buf.Reset()
362 » w.count = 0
363 » return flushCount, nil
237 } 364 }
OLDNEW
« no previous file with comments | « server/logdog/storage/bigtable/initialize.go ('k') | server/logdog/storage/bigtable/storage_test.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698