Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(4)

Side by Side Diff: tokenserver/appengine/impl/utils/bqlog/bqlog.go

Issue 2951393002: [errors] de-specialize Transient in favor of Tags. (Closed)
Patch Set: more refactor Created 3 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2016 The LUCI Authors. All rights reserved. 1 // Copyright 2016 The LUCI Authors. All rights reserved.
2 // Use of this source code is governed under the Apache License, Version 2.0 2 // Use of this source code is governed under the Apache License, Version 2.0
3 // that can be found in the LICENSE file. 3 // that can be found in the LICENSE file.
4 4
5 // Package bqlog provides a mechanism to asynchronously log rows to BigQuery. 5 // Package bqlog provides a mechanism to asynchronously log rows to BigQuery.
6 // 6 //
7 // It uses Pull Task Queues as a temporary buffer for rows. The main use case is 7 // It uses Pull Task Queues as a temporary buffer for rows. The main use case is
8 // to log events from online RPC handlers that are supposed to be fast and 8 // to log events from online RPC handlers that are supposed to be fast and
9 // reliable (and not depend on BigQuery latency or availability). If you need to 9 // reliable (and not depend on BigQuery latency or availability). If you need to
10 // upload a large number of events at once or you are doing some offline batch 10 // upload a large number of events at once or you are doing some offline batch
(...skipping 26 matching lines...) Expand all
37 bigquery "google.golang.org/api/bigquery/v2" 37 bigquery "google.golang.org/api/bigquery/v2"
38 "google.golang.org/api/googleapi" 38 "google.golang.org/api/googleapi"
39 39
40 "github.com/luci/gae/service/info" 40 "github.com/luci/gae/service/info"
41 "github.com/luci/gae/service/taskqueue" 41 "github.com/luci/gae/service/taskqueue"
42 42
43 "github.com/luci/luci-go/common/clock" 43 "github.com/luci/luci-go/common/clock"
44 "github.com/luci/luci-go/common/errors" 44 "github.com/luci/luci-go/common/errors"
45 "github.com/luci/luci-go/common/logging" 45 "github.com/luci/luci-go/common/logging"
46 "github.com/luci/luci-go/common/retry" 46 "github.com/luci/luci-go/common/retry"
47 "github.com/luci/luci-go/common/retry/transient"
47 48
48 "github.com/luci/luci-go/common/tsmon/distribution" 49 "github.com/luci/luci-go/common/tsmon/distribution"
49 "github.com/luci/luci-go/common/tsmon/field" 50 "github.com/luci/luci-go/common/tsmon/field"
50 "github.com/luci/luci-go/common/tsmon/metric" 51 "github.com/luci/luci-go/common/tsmon/metric"
51 "github.com/luci/luci-go/common/tsmon/types" 52 "github.com/luci/luci-go/common/tsmon/types"
52 53
53 "github.com/luci/luci-go/server/auth" 54 "github.com/luci/luci-go/server/auth"
54 ) 55 )
55 56
56 const ( 57 const (
(...skipping 494 matching lines...) Expand 10 before | Expand all | Expand 10 after
551 } 552 }
552 553
553 if len(rows) == 0 { 554 if len(rows) == 0 {
554 chunk.Done(ctx) 555 chunk.Done(ctx)
555 return 0, nil 556 return 0, nil
556 } 557 }
557 558
558 // Now actually send all the entries with retries. 559 // Now actually send all the entries with retries.
559 var lastResp *bigquery.TableDataInsertAllResponse 560 var lastResp *bigquery.TableDataInsertAllResponse
560 taggedCtx := clock.Tag(ctx, "insert-retry") // used by tests 561 taggedCtx := clock.Tag(ctx, "insert-retry") // used by tests
561 » err := retry.Retry(taggedCtx, retry.TransientOnly(f.retryParams), func() error { 562 » err := retry.Retry(taggedCtx, transient.Only(f.retryParams), func() erro r {
562 startTime := clock.Now(ctx) 563 startTime := clock.Now(ctx)
563 var err error 564 var err error
564 lastResp, err = f.Insert(ctx, &bigquery.TableDataInsertAllReques t{ 565 lastResp, err = f.Insert(ctx, &bigquery.TableDataInsertAllReques t{
565 SkipInvalidRows: true, // they will be reported in lastR esp.InsertErrors 566 SkipInvalidRows: true, // they will be reported in lastR esp.InsertErrors
566 Rows: rows, 567 Rows: rows,
567 }) 568 })
568 code := 0 569 code := 0
569 status := "ok" 570 status := "ok"
570 if gerr, _ := err.(*googleapi.Error); gerr != nil { 571 if gerr, _ := err.(*googleapi.Error); gerr != nil {
571 code = gerr.Code 572 code = gerr.Code
572 status = fmt.Sprintf("http_%d", code) 573 status = fmt.Sprintf("http_%d", code)
573 } else if ctx.Err() != nil { 574 } else if ctx.Err() != nil {
574 status = "timeout" 575 status = "timeout"
575 } else if err != nil { 576 } else if err != nil {
576 status = "unknown" 577 status = "unknown"
577 } 578 }
578 dt := clock.Since(ctx, startTime) 579 dt := clock.Since(ctx, startTime)
579 bigQueryLatency.Add(ctx, float64(dt.Nanoseconds()/1e6), f.TableR ef, "insertAll", status) 580 bigQueryLatency.Add(ctx, float64(dt.Nanoseconds()/1e6), f.TableR ef, "insertAll", status)
580 if code >= 500 { 581 if code >= 500 {
581 » » » return errors.WrapTransient(err) 582 » » » return transient.Tag.Apply(err)
582 } 583 }
583 return err 584 return err
584 }, func(err error, wait time.Duration) { 585 }, func(err error, wait time.Duration) {
585 logging.Fields{ 586 logging.Fields{
586 logging.ErrorKey: err, 587 logging.ErrorKey: err,
587 "wait": wait, 588 "wait": wait,
588 }.Warningf(ctx, "Failed to send data to BigQuery") 589 }.Warningf(ctx, "Failed to send data to BigQuery")
589 }) 590 })
590 if err != nil { 591 if err != nil {
591 logging.WithError(err).Errorf(ctx, "Failed to send data to BigQu ery") 592 logging.WithError(err).Errorf(ctx, "Failed to send data to BigQu ery")
592 » » if !errors.IsTransient(err) && err != context.DeadlineExceeded { 593 » » if !transient.Tag.In(err) && err != context.DeadlineExceeded {
593 chunk.Done(ctx) 594 chunk.Done(ctx)
594 } 595 }
595 return 0, err 596 return 0, err
596 } 597 }
597 598
598 if success := len(rows) - len(lastResp.InsertErrors); success > 0 { 599 if success := len(rows) - len(lastResp.InsertErrors); success > 0 {
599 flushedEntryCount.Add(ctx, int64(success), f.TableRef, "ok") 600 flushedEntryCount.Add(ctx, int64(success), f.TableRef, "ok")
600 } 601 }
601 602
602 if len(lastResp.InsertErrors) != 0 { 603 if len(lastResp.InsertErrors) != 0 {
(...skipping 24 matching lines...) Expand all
627 func (f *asyncFlusher) retryParams() retry.Iterator { 628 func (f *asyncFlusher) retryParams() retry.Iterator {
628 return &retry.ExponentialBackoff{ 629 return &retry.ExponentialBackoff{
629 Limited: retry.Limited{ 630 Limited: retry.Limited{
630 Delay: 50 * time.Millisecond, 631 Delay: 50 * time.Millisecond,
631 Retries: 50, 632 Retries: 50,
632 MaxTotal: 45 * time.Second, 633 MaxTotal: 45 * time.Second,
633 }, 634 },
634 Multiplier: 2, 635 Multiplier: 2,
635 } 636 }
636 } 637 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698