| OLD | NEW |
| 1 // Copyright 2016 The LUCI Authors. All rights reserved. | 1 // Copyright 2016 The LUCI Authors. All rights reserved. |
| 2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
| 3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
| 4 | 4 |
| 5 // Package bqlog provides a mechanism to asynchronously log rows to BigQuery. | 5 // Package bqlog provides a mechanism to asynchronously log rows to BigQuery. |
| 6 // | 6 // |
| 7 // It uses Pull Task Queues as a temporary buffer for rows. The main use case is | 7 // It uses Pull Task Queues as a temporary buffer for rows. The main use case is |
| 8 // to log events from online RPC handlers that are supposed to be fast and | 8 // to log events from online RPC handlers that are supposed to be fast and |
| 9 // reliable (and not depend on BigQuery latency or availability). If you need to | 9 // reliable (and not depend on BigQuery latency or availability). If you need to |
| 10 // upload a large number of events at once or you are doing some offline batch | 10 // upload a large number of events at once or you are doing some offline batch |
| (...skipping 560 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 571 code = gerr.Code | 571 code = gerr.Code |
| 572 status = fmt.Sprintf("http_%d", code) | 572 status = fmt.Sprintf("http_%d", code) |
| 573 } else if ctx.Err() != nil { | 573 } else if ctx.Err() != nil { |
| 574 status = "timeout" | 574 status = "timeout" |
| 575 } else if err != nil { | 575 } else if err != nil { |
| 576 status = "unknown" | 576 status = "unknown" |
| 577 } | 577 } |
| 578 dt := clock.Since(ctx, startTime) | 578 dt := clock.Since(ctx, startTime) |
| 579 bigQueryLatency.Add(ctx, float64(dt.Nanoseconds()/1e6), f.TableR
ef, "insertAll", status) | 579 bigQueryLatency.Add(ctx, float64(dt.Nanoseconds()/1e6), f.TableR
ef, "insertAll", status) |
| 580 if code >= 500 { | 580 if code >= 500 { |
| 581 » » » return errors.WrapTransient(err) | 581 » » » return retry.Tag.Apply(err) |
| 582 } | 582 } |
| 583 return err | 583 return err |
| 584 }, func(err error, wait time.Duration) { | 584 }, func(err error, wait time.Duration) { |
| 585 logging.Fields{ | 585 logging.Fields{ |
| 586 logging.ErrorKey: err, | 586 logging.ErrorKey: err, |
| 587 "wait": wait, | 587 "wait": wait, |
| 588 }.Warningf(ctx, "Failed to send data to BigQuery") | 588 }.Warningf(ctx, "Failed to send data to BigQuery") |
| 589 }) | 589 }) |
| 590 if err != nil { | 590 if err != nil { |
| 591 logging.WithError(err).Errorf(ctx, "Failed to send data to BigQu
ery") | 591 logging.WithError(err).Errorf(ctx, "Failed to send data to BigQu
ery") |
| 592 » » if !errors.IsTransient(err) && err != context.DeadlineExceeded { | 592 » » if !retry.Tag.In(err) && err != context.DeadlineExceeded { |
| 593 chunk.Done(ctx) | 593 chunk.Done(ctx) |
| 594 } | 594 } |
| 595 return 0, err | 595 return 0, err |
| 596 } | 596 } |
| 597 | 597 |
| 598 if success := len(rows) - len(lastResp.InsertErrors); success > 0 { | 598 if success := len(rows) - len(lastResp.InsertErrors); success > 0 { |
| 599 flushedEntryCount.Add(ctx, int64(success), f.TableRef, "ok") | 599 flushedEntryCount.Add(ctx, int64(success), f.TableRef, "ok") |
| 600 } | 600 } |
| 601 | 601 |
| 602 if len(lastResp.InsertErrors) != 0 { | 602 if len(lastResp.InsertErrors) != 0 { |
| (...skipping 24 matching lines...) Expand all Loading... |
| 627 func (f *asyncFlusher) retryParams() retry.Iterator { | 627 func (f *asyncFlusher) retryParams() retry.Iterator { |
| 628 return &retry.ExponentialBackoff{ | 628 return &retry.ExponentialBackoff{ |
| 629 Limited: retry.Limited{ | 629 Limited: retry.Limited{ |
| 630 Delay: 50 * time.Millisecond, | 630 Delay: 50 * time.Millisecond, |
| 631 Retries: 50, | 631 Retries: 50, |
| 632 MaxTotal: 45 * time.Second, | 632 MaxTotal: 45 * time.Second, |
| 633 }, | 633 }, |
| 634 Multiplier: 2, | 634 Multiplier: 2, |
| 635 } | 635 } |
| 636 } | 636 } |
| OLD | NEW |