| OLD | NEW |
| 1 // Copyright 2016 The LUCI Authors. All rights reserved. | 1 // Copyright 2016 The LUCI Authors. All rights reserved. |
| 2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
| 3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
| 4 | 4 |
| 5 package bqlog | 5 package bqlog |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "bytes" | 8 "bytes" |
| 9 "encoding/gob" | 9 "encoding/gob" |
| 10 "encoding/json" | 10 "encoding/json" |
| 11 "fmt" | 11 "fmt" |
| 12 "math/rand" | 12 "math/rand" |
| 13 "sync" | 13 "sync" |
| 14 "testing" | 14 "testing" |
| 15 "time" | 15 "time" |
| 16 | 16 |
| 17 "golang.org/x/net/context" | 17 "golang.org/x/net/context" |
| 18 bigquery "google.golang.org/api/bigquery/v2" | 18 bigquery "google.golang.org/api/bigquery/v2" |
| 19 | 19 |
| 20 "github.com/luci/gae/filter/featureBreaker" | 20 "github.com/luci/gae/filter/featureBreaker" |
| 21 "github.com/luci/gae/service/taskqueue" | 21 "github.com/luci/gae/service/taskqueue" |
| 22 "github.com/luci/luci-go/appengine/gaetesting" | 22 "github.com/luci/luci-go/appengine/gaetesting" |
| 23 "github.com/luci/luci-go/common/clock" | 23 "github.com/luci/luci-go/common/clock" |
| 24 "github.com/luci/luci-go/common/clock/testclock" | 24 "github.com/luci/luci-go/common/clock/testclock" |
| 25 "github.com/luci/luci-go/common/data/rand/mathrand" | 25 "github.com/luci/luci-go/common/data/rand/mathrand" |
| 26 "github.com/luci/luci-go/common/data/stringset" | 26 "github.com/luci/luci-go/common/data/stringset" |
| 27 "github.com/luci/luci-go/common/errors" | 27 "github.com/luci/luci-go/common/errors" |
| 28 "github.com/luci/luci-go/common/retry" |
| 28 | 29 |
| 29 . "github.com/smartystreets/goconvey/convey" | 30 . "github.com/smartystreets/goconvey/convey" |
| 30 ) | 31 ) |
| 31 | 32 |
| 32 var testingLog = Log{ | 33 var testingLog = Log{ |
| 33 QueueName: "pull-queue", | 34 QueueName: "pull-queue", |
| 34 ProjectID: "projectID", | 35 ProjectID: "projectID", |
| 35 DatasetID: "datasetID", | 36 DatasetID: "datasetID", |
| 36 TableID: "tableID", | 37 TableID: "tableID", |
| 37 } | 38 } |
| (...skipping 221 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 259 | 260 |
| 260 for i := 0; i < 20; i++ { | 261 for i := 0; i < 20; i++ { |
| 261 err := testingLog.Insert(ctx, Entry{ | 262 err := testingLog.Insert(ctx, Entry{ |
| 262 Data: map[string]interface{}{"i": i}, | 263 Data: map[string]interface{}{"i": i}, |
| 263 }) | 264 }) |
| 264 So(err, ShouldBeNil) | 265 So(err, ShouldBeNil) |
| 265 tc.Add(time.Millisecond) // emulate passage of t
ime to sort entries | 266 tc.Add(time.Millisecond) // emulate passage of t
ime to sort entries |
| 266 } | 267 } |
| 267 | 268 |
| 268 testingLog.insertMock = func(_ context.Context, r *bigqu
ery.TableDataInsertAllRequest) (*bigquery.TableDataInsertAllResponse, error) { | 269 testingLog.insertMock = func(_ context.Context, r *bigqu
ery.TableDataInsertAllRequest) (*bigquery.TableDataInsertAllResponse, error) { |
| 269 » » » » return nil, errors.WrapTransient(fmt.Errorf("omg
, transient error")) | 270 » » » » return nil, errors.New("omg, transient error", r
etry.Tag) |
| 270 } | 271 } |
| 271 | 272 |
| 272 tc.SetTimerCallback(func(d time.Duration, t clock.Timer)
{ | 273 tc.SetTimerCallback(func(d time.Duration, t clock.Timer)
{ |
| 273 if testclock.HasTags(t, "insert-retry") { | 274 if testclock.HasTags(t, "insert-retry") { |
| 274 tc.Add(d) | 275 tc.Add(d) |
| 275 } | 276 } |
| 276 }) | 277 }) |
| 277 | 278 |
| 278 count, err := testingLog.Flush(ctx) | 279 count, err := testingLog.Flush(ctx) |
| 279 So(err.Error(), ShouldEqual, "omg, transient error (and
2 other errors)") | 280 So(err.Error(), ShouldEqual, "omg, transient error (and
2 other errors)") |
| (...skipping 43 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 323 | 324 |
| 324 func mockInsertAll(l *Log, reqs *[]*bigquery.TableDataInsertAllRequest) { | 325 func mockInsertAll(l *Log, reqs *[]*bigquery.TableDataInsertAllRequest) { |
| 325 lock := sync.Mutex{} | 326 lock := sync.Mutex{} |
| 326 l.insertMock = func(ctx context.Context, r *bigquery.TableDataInsertAllR
equest) (*bigquery.TableDataInsertAllResponse, error) { | 327 l.insertMock = func(ctx context.Context, r *bigquery.TableDataInsertAllR
equest) (*bigquery.TableDataInsertAllResponse, error) { |
| 327 lock.Lock() | 328 lock.Lock() |
| 328 defer lock.Unlock() | 329 defer lock.Unlock() |
| 329 *reqs = append(*reqs, r) | 330 *reqs = append(*reqs, r) |
| 330 return &bigquery.TableDataInsertAllResponse{}, nil | 331 return &bigquery.TableDataInsertAllResponse{}, nil |
| 331 } | 332 } |
| 332 } | 333 } |
| OLD | NEW |