| OLD | NEW |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 package annotee | 5 package annotee |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "bufio" | 8 "bufio" |
| 9 "bytes" | 9 "bytes" |
| 10 "fmt" | 10 "fmt" |
| (...skipping 85 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 96 Execution *annotation.Execution | 96 Execution *annotation.Execution |
| 97 | 97 |
| 98 // MetadataUpdateInterval is the amount of time to wait after stream met
adata | 98 // MetadataUpdateInterval is the amount of time to wait after stream met
adata |
| 99 // updates to push the updated metadata protobuf to the metadata stream. | 99 // updates to push the updated metadata protobuf to the metadata stream. |
| 100 // | 100 // |
| 101 // - If this is < 0, metadata will only be pushed at the beginning
and end of | 101 // - If this is < 0, metadata will only be pushed at the beginning
and end of |
| 102 // a step. | 102 // a step. |
| 103 // - If this equals 0, metadata will be pushed every time it's upda
ted. | 103 // - If this equals 0, metadata will be pushed every time it's upda
ted. |
| 104 // - If this is 0, DefaultMetadataUpdateInterval will be used. | 104 // - If this is 0, DefaultMetadataUpdateInterval will be used. |
| 105 MetadataUpdateInterval time.Duration | 105 MetadataUpdateInterval time.Duration |
| 106 |
| 107 // Offline specifies whether parsing happens not at the same time as |
| 108 // emitting. If true and CURRENT_TIMESTAMP annotations are not provided |
| 109 // then step start/end times are left empty. |
| 110 Offline bool |
| 106 } | 111 } |
| 107 | 112 |
| 108 // Processor consumes data from a list of Stream entries and interacts with the | 113 // Processor consumes data from a list of Stream entries and interacts with the |
| 109 // supplied Client instance. | 114 // supplied Client instance. |
| 110 // | 115 // |
| 111 // A Processor must be instantiated with New. | 116 // A Processor must be instantiated with New. |
| 112 type Processor struct { | 117 type Processor struct { |
| 113 ctx context.Context | 118 ctx context.Context |
| 114 o *Options | 119 o *Options |
| 115 | 120 |
| 116 astate *annotation.State | 121 astate *annotation.State |
| 117 stepHandlers map[string]*stepHandler | 122 stepHandlers map[string]*stepHandler |
| 118 } | 123 } |
| 119 | 124 |
| 120 // New instantiates a new Processor. | 125 // New instantiates a new Processor. |
| 121 func New(c context.Context, o Options) *Processor { | 126 func New(c context.Context, o Options) *Processor { |
| 122 p := Processor{ | 127 p := Processor{ |
| 123 ctx: c, | 128 ctx: c, |
| 124 o: &o, | 129 o: &o, |
| 125 | 130 |
| 126 stepHandlers: make(map[string]*stepHandler), | 131 stepHandlers: make(map[string]*stepHandler), |
| 127 } | 132 } |
| 128 p.astate = &annotation.State{ | 133 p.astate = &annotation.State{ |
| 129 LogNameBase: o.Base, | 134 LogNameBase: o.Base, |
| 130 Callbacks: &annotationCallbacks{&p}, | 135 Callbacks: &annotationCallbacks{&p}, |
| 131 Execution: o.Execution, | 136 Execution: o.Execution, |
| 132 Clock: clock.Get(c), | 137 Clock: clock.Get(c), |
| 138 Offline: o.Offline, |
| 133 } | 139 } |
| 134 return &p | 140 return &p |
| 135 } | 141 } |
| 136 | 142 |
| 137 // RunStreams executes the Processor, consuming data from its configured streams | 143 // RunStreams executes the Processor, consuming data from its configured streams |
| 138 // and forwarding it to LogDog. Run will block until all streams have | 144 // and forwarding it to LogDog. Run will block until all streams have |
| 139 // terminated. | 145 // terminated. |
| 140 // | 146 // |
| 141 // If a stream terminates with an error, or if there is an error processing the | 147 // If a stream terminates with an error, or if there is an error processing the |
| 142 // stream data, Run will return an error. If multiple Streams fail with errors, | 148 // stream data, Run will return an error. If multiple Streams fail with errors, |
| (...skipping 304 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 447 sendLatest() | 453 sendLatest() |
| 448 } | 454 } |
| 449 } | 455 } |
| 450 } | 456 } |
| 451 | 457 |
| 452 func (h *stepHandler) finish() { | 458 func (h *stepHandler) finish() { |
| 453 if h.closed { | 459 if h.closed { |
| 454 return | 460 return |
| 455 } | 461 } |
| 456 | 462 |
| 457 » // Close the handler. This may send one last annotation to summarize the | 463 » // This may send one last annotation to summarize the |
| 458 // state if closing changed it. | 464 // state if closing changed it. |
| 459 » if h.step.Close() { | 465 » if h.step.Close(nil) { |
| 460 // Manually mark it updated, since Close callbacks will have unr
egistered | 466 // Manually mark it updated, since Close callbacks will have unr
egistered |
| 461 // us from the standard Updated() reporting loop. | 467 // us from the standard Updated() reporting loop. |
| 462 h.updated() | 468 h.updated() |
| 463 } | 469 } |
| 464 | 470 |
| 465 // Close and reap our meter goroutine. | 471 // Close and reap our meter goroutine. |
| 466 close(h.annotationC) | 472 close(h.annotationC) |
| 467 <-h.annotationFinishedC | 473 <-h.annotationFinishedC |
| 468 | 474 |
| 469 // Close all streams associated with this handler. | 475 // Close all streams associated with this handler. |
| (...skipping 149 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 619 return "" | 625 return "" |
| 620 } | 626 } |
| 621 return strings.TrimSpace(line[3 : len(line)-3]) | 627 return strings.TrimSpace(line[3 : len(line)-3]) |
| 622 } | 628 } |
| 623 | 629 |
| 624 func buildAnnotation(name string, params ...string) string { | 630 func buildAnnotation(name string, params ...string) string { |
| 625 v := make([]string, 1, 1+len(params)) | 631 v := make([]string, 1, 1+len(params)) |
| 626 v[0] = name | 632 v[0] = name |
| 627 return "@@@" + strings.Join(append(v, params...), "@") + "@@@" | 633 return "@@@" + strings.Join(append(v, params...), "@") + "@@@" |
| 628 } | 634 } |
| OLD | NEW |