OLD | NEW |
1 // Copyright 2016 The LUCI Authors. All rights reserved. | 1 // Copyright 2016 The LUCI Authors. All rights reserved. |
2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
4 | 4 |
5 // Package bqlog provides a mechanism to asynchronously log rows to BigQuery. | 5 // Package bqlog provides a mechanism to asynchronously log rows to BigQuery. |
6 // | 6 // |
7 // It uses Pull Task Queues as a temporary buffer for rows. The main use case is | 7 // It uses Pull Task Queues as a temporary buffer for rows. The main use case is |
8 // to log events from online RPC handlers that are supposed to be fast and | 8 // to log events from online RPC handlers that are supposed to be fast and |
9 // reliable (and not depend on BigQuery latency or availability). If you need to | 9 // reliable (and not depend on BigQuery latency or availability). If you need to |
10 // upload a large number of events at once or you are doing some offline batch | 10 // upload a large number of events at once or you are doing some offline batch |
(...skipping 26 matching lines...) Expand all Loading... |
37 bigquery "google.golang.org/api/bigquery/v2" | 37 bigquery "google.golang.org/api/bigquery/v2" |
38 "google.golang.org/api/googleapi" | 38 "google.golang.org/api/googleapi" |
39 | 39 |
40 "github.com/luci/gae/service/info" | 40 "github.com/luci/gae/service/info" |
41 "github.com/luci/gae/service/taskqueue" | 41 "github.com/luci/gae/service/taskqueue" |
42 | 42 |
43 "github.com/luci/luci-go/common/clock" | 43 "github.com/luci/luci-go/common/clock" |
44 "github.com/luci/luci-go/common/errors" | 44 "github.com/luci/luci-go/common/errors" |
45 "github.com/luci/luci-go/common/logging" | 45 "github.com/luci/luci-go/common/logging" |
46 "github.com/luci/luci-go/common/retry" | 46 "github.com/luci/luci-go/common/retry" |
| 47 "github.com/luci/luci-go/common/retry/transient" |
47 | 48 |
48 "github.com/luci/luci-go/common/tsmon/distribution" | 49 "github.com/luci/luci-go/common/tsmon/distribution" |
49 "github.com/luci/luci-go/common/tsmon/field" | 50 "github.com/luci/luci-go/common/tsmon/field" |
50 "github.com/luci/luci-go/common/tsmon/metric" | 51 "github.com/luci/luci-go/common/tsmon/metric" |
51 "github.com/luci/luci-go/common/tsmon/types" | 52 "github.com/luci/luci-go/common/tsmon/types" |
52 | 53 |
53 "github.com/luci/luci-go/server/auth" | 54 "github.com/luci/luci-go/server/auth" |
54 ) | 55 ) |
55 | 56 |
56 const ( | 57 const ( |
(...skipping 494 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
551 } | 552 } |
552 | 553 |
553 if len(rows) == 0 { | 554 if len(rows) == 0 { |
554 chunk.Done(ctx) | 555 chunk.Done(ctx) |
555 return 0, nil | 556 return 0, nil |
556 } | 557 } |
557 | 558 |
558 // Now actually send all the entries with retries. | 559 // Now actually send all the entries with retries. |
559 var lastResp *bigquery.TableDataInsertAllResponse | 560 var lastResp *bigquery.TableDataInsertAllResponse |
560 taggedCtx := clock.Tag(ctx, "insert-retry") // used by tests | 561 taggedCtx := clock.Tag(ctx, "insert-retry") // used by tests |
561 » err := retry.Retry(taggedCtx, retry.TransientOnly(f.retryParams), func()
error { | 562 » err := retry.Retry(taggedCtx, transient.Only(f.retryParams), func() erro
r { |
562 startTime := clock.Now(ctx) | 563 startTime := clock.Now(ctx) |
563 var err error | 564 var err error |
564 lastResp, err = f.Insert(ctx, &bigquery.TableDataInsertAllReques
t{ | 565 lastResp, err = f.Insert(ctx, &bigquery.TableDataInsertAllReques
t{ |
565 SkipInvalidRows: true, // they will be reported in lastR
esp.InsertErrors | 566 SkipInvalidRows: true, // they will be reported in lastR
esp.InsertErrors |
566 Rows: rows, | 567 Rows: rows, |
567 }) | 568 }) |
568 code := 0 | 569 code := 0 |
569 status := "ok" | 570 status := "ok" |
570 if gerr, _ := err.(*googleapi.Error); gerr != nil { | 571 if gerr, _ := err.(*googleapi.Error); gerr != nil { |
571 code = gerr.Code | 572 code = gerr.Code |
572 status = fmt.Sprintf("http_%d", code) | 573 status = fmt.Sprintf("http_%d", code) |
573 } else if ctx.Err() != nil { | 574 } else if ctx.Err() != nil { |
574 status = "timeout" | 575 status = "timeout" |
575 } else if err != nil { | 576 } else if err != nil { |
576 status = "unknown" | 577 status = "unknown" |
577 } | 578 } |
578 dt := clock.Since(ctx, startTime) | 579 dt := clock.Since(ctx, startTime) |
579 bigQueryLatency.Add(ctx, float64(dt.Nanoseconds()/1e6), f.TableR
ef, "insertAll", status) | 580 bigQueryLatency.Add(ctx, float64(dt.Nanoseconds()/1e6), f.TableR
ef, "insertAll", status) |
580 if code >= 500 { | 581 if code >= 500 { |
581 » » » return errors.WrapTransient(err) | 582 » » » return transient.Tag.Apply(err) |
582 } | 583 } |
583 return err | 584 return err |
584 }, func(err error, wait time.Duration) { | 585 }, func(err error, wait time.Duration) { |
585 logging.Fields{ | 586 logging.Fields{ |
586 logging.ErrorKey: err, | 587 logging.ErrorKey: err, |
587 "wait": wait, | 588 "wait": wait, |
588 }.Warningf(ctx, "Failed to send data to BigQuery") | 589 }.Warningf(ctx, "Failed to send data to BigQuery") |
589 }) | 590 }) |
590 if err != nil { | 591 if err != nil { |
591 logging.WithError(err).Errorf(ctx, "Failed to send data to BigQu
ery") | 592 logging.WithError(err).Errorf(ctx, "Failed to send data to BigQu
ery") |
592 » » if !errors.IsTransient(err) && err != context.DeadlineExceeded { | 593 » » if !transient.Tag.In(err) && err != context.DeadlineExceeded { |
593 chunk.Done(ctx) | 594 chunk.Done(ctx) |
594 } | 595 } |
595 return 0, err | 596 return 0, err |
596 } | 597 } |
597 | 598 |
598 if success := len(rows) - len(lastResp.InsertErrors); success > 0 { | 599 if success := len(rows) - len(lastResp.InsertErrors); success > 0 { |
599 flushedEntryCount.Add(ctx, int64(success), f.TableRef, "ok") | 600 flushedEntryCount.Add(ctx, int64(success), f.TableRef, "ok") |
600 } | 601 } |
601 | 602 |
602 if len(lastResp.InsertErrors) != 0 { | 603 if len(lastResp.InsertErrors) != 0 { |
(...skipping 24 matching lines...) Expand all Loading... |
627 func (f *asyncFlusher) retryParams() retry.Iterator { | 628 func (f *asyncFlusher) retryParams() retry.Iterator { |
628 return &retry.ExponentialBackoff{ | 629 return &retry.ExponentialBackoff{ |
629 Limited: retry.Limited{ | 630 Limited: retry.Limited{ |
630 Delay: 50 * time.Millisecond, | 631 Delay: 50 * time.Millisecond, |
631 Retries: 50, | 632 Retries: 50, |
632 MaxTotal: 45 * time.Second, | 633 MaxTotal: 45 * time.Second, |
633 }, | 634 }, |
634 Multiplier: 2, | 635 Multiplier: 2, |
635 } | 636 } |
636 } | 637 } |
OLD | NEW |