OLD | NEW |
1 // Copyright 2016 The LUCI Authors. All rights reserved. | 1 // Copyright 2016 The LUCI Authors. All rights reserved. |
2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
4 | 4 |
5 package bqlog | 5 package bqlog |
6 | 6 |
7 import ( | 7 import ( |
8 "bytes" | 8 "bytes" |
9 "encoding/gob" | 9 "encoding/gob" |
10 "encoding/json" | 10 "encoding/json" |
11 "fmt" | 11 "fmt" |
12 "math/rand" | 12 "math/rand" |
13 "sync" | 13 "sync" |
14 "testing" | 14 "testing" |
15 "time" | 15 "time" |
16 | 16 |
17 "golang.org/x/net/context" | 17 "golang.org/x/net/context" |
18 bigquery "google.golang.org/api/bigquery/v2" | 18 bigquery "google.golang.org/api/bigquery/v2" |
19 | 19 |
20 "github.com/luci/gae/filter/featureBreaker" | 20 "github.com/luci/gae/filter/featureBreaker" |
21 "github.com/luci/gae/service/taskqueue" | 21 "github.com/luci/gae/service/taskqueue" |
22 "github.com/luci/luci-go/appengine/gaetesting" | 22 "github.com/luci/luci-go/appengine/gaetesting" |
23 "github.com/luci/luci-go/common/clock" | 23 "github.com/luci/luci-go/common/clock" |
24 "github.com/luci/luci-go/common/clock/testclock" | 24 "github.com/luci/luci-go/common/clock/testclock" |
25 "github.com/luci/luci-go/common/data/rand/mathrand" | 25 "github.com/luci/luci-go/common/data/rand/mathrand" |
26 "github.com/luci/luci-go/common/data/stringset" | 26 "github.com/luci/luci-go/common/data/stringset" |
27 "github.com/luci/luci-go/common/errors" | 27 "github.com/luci/luci-go/common/errors" |
| 28 "github.com/luci/luci-go/common/retry/transient" |
28 | 29 |
29 . "github.com/smartystreets/goconvey/convey" | 30 . "github.com/smartystreets/goconvey/convey" |
30 ) | 31 ) |
31 | 32 |
32 var testingLog = Log{ | 33 var testingLog = Log{ |
33 QueueName: "pull-queue", | 34 QueueName: "pull-queue", |
34 ProjectID: "projectID", | 35 ProjectID: "projectID", |
35 DatasetID: "datasetID", | 36 DatasetID: "datasetID", |
36 TableID: "tableID", | 37 TableID: "tableID", |
37 } | 38 } |
(...skipping 221 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
259 | 260 |
260 for i := 0; i < 20; i++ { | 261 for i := 0; i < 20; i++ { |
261 err := testingLog.Insert(ctx, Entry{ | 262 err := testingLog.Insert(ctx, Entry{ |
262 Data: map[string]interface{}{"i": i}, | 263 Data: map[string]interface{}{"i": i}, |
263 }) | 264 }) |
264 So(err, ShouldBeNil) | 265 So(err, ShouldBeNil) |
265 tc.Add(time.Millisecond) // emulate passage of t
ime to sort entries | 266 tc.Add(time.Millisecond) // emulate passage of t
ime to sort entries |
266 } | 267 } |
267 | 268 |
268 testingLog.insertMock = func(_ context.Context, r *bigqu
ery.TableDataInsertAllRequest) (*bigquery.TableDataInsertAllResponse, error) { | 269 testingLog.insertMock = func(_ context.Context, r *bigqu
ery.TableDataInsertAllRequest) (*bigquery.TableDataInsertAllResponse, error) { |
269 » » » » return nil, errors.WrapTransient(fmt.Errorf("omg
, transient error")) | 270 » » » » return nil, errors.New("omg, transient error", t
ransient.Tag) |
270 } | 271 } |
271 | 272 |
272 tc.SetTimerCallback(func(d time.Duration, t clock.Timer)
{ | 273 tc.SetTimerCallback(func(d time.Duration, t clock.Timer)
{ |
273 if testclock.HasTags(t, "insert-retry") { | 274 if testclock.HasTags(t, "insert-retry") { |
274 tc.Add(d) | 275 tc.Add(d) |
275 } | 276 } |
276 }) | 277 }) |
277 | 278 |
278 count, err := testingLog.Flush(ctx) | 279 count, err := testingLog.Flush(ctx) |
279 So(err.Error(), ShouldEqual, "omg, transient error (and
2 other errors)") | 280 So(err.Error(), ShouldEqual, "omg, transient error (and
2 other errors)") |
(...skipping 43 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
323 | 324 |
324 func mockInsertAll(l *Log, reqs *[]*bigquery.TableDataInsertAllRequest) { | 325 func mockInsertAll(l *Log, reqs *[]*bigquery.TableDataInsertAllRequest) { |
325 lock := sync.Mutex{} | 326 lock := sync.Mutex{} |
326 l.insertMock = func(ctx context.Context, r *bigquery.TableDataInsertAllR
equest) (*bigquery.TableDataInsertAllResponse, error) { | 327 l.insertMock = func(ctx context.Context, r *bigquery.TableDataInsertAllR
equest) (*bigquery.TableDataInsertAllResponse, error) { |
327 lock.Lock() | 328 lock.Lock() |
328 defer lock.Unlock() | 329 defer lock.Unlock() |
329 *reqs = append(*reqs, r) | 330 *reqs = append(*reqs, r) |
330 return &bigquery.TableDataInsertAllResponse{}, nil | 331 return &bigquery.TableDataInsertAllResponse{}, nil |
331 } | 332 } |
332 } | 333 } |
OLD | NEW |