Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright 2016 The Chromium Authors. All rights reserved. | 1 // Copyright 2016 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 package collector | 5 package collector |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "bytes" | 8 "bytes" |
| 9 "time" | 9 "time" |
| 10 | 10 |
| (...skipping 145 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 156 | 156 |
| 157 // be is the Bundle entry. | 157 // be is the Bundle entry. |
| 158 be *logpb.ButlerLogBundle_Entry | 158 be *logpb.ButlerLogBundle_Entry |
| 159 // path is the constructed path of the stream being processed. | 159 // path is the constructed path of the stream being processed. |
| 160 path types.StreamPath | 160 path types.StreamPath |
| 161 } | 161 } |
| 162 | 162 |
| 163 // processLogStream processes an individual set of log messages belonging to the | 163 // processLogStream processes an individual set of log messages belonging to the |
| 164 // same log stream. | 164 // same log stream. |
| 165 func (c *Collector) processLogStream(ctx context.Context, h *bundleEntryHandler) error { | 165 func (c *Collector) processLogStream(ctx context.Context, h *bundleEntryHandler) error { |
| 166 // If this bundle has neither log entries nor a terminal index, it is ju nk and | |
| 167 // must be discarded. | |
| 168 // | |
| 169 // This is more important than a basic optimization, as it enforces that no | |
| 170 // zero-entry log streams can be ingested. Either some entries exist, or there | |
| 171 // is a promise of a terminal entry. | |
| 172 if len(h.be.Logs) == 0 && !h.be.Terminal { | |
|
dnj
2016/04/11 17:20:04
Quick short-circuit here. Nothing really interesti
| |
| 173 log.Warningf(ctx, "Bundle entry is non-terminal and contains no logs; discarding.") | |
| 174 return nil | |
| 175 } | |
| 176 | |
| 166 if err := h.be.Desc.Validate(true); err != nil { | 177 if err := h.be.Desc.Validate(true); err != nil { |
| 167 log.Errorf(log.SetError(ctx, err), "Invalid log stream descripto r.") | 178 log.Errorf(log.SetError(ctx, err), "Invalid log stream descripto r.") |
| 168 return err | 179 return err |
| 169 } | 180 } |
| 170 h.path = types.StreamName(h.be.Desc.Prefix).Join(types.StreamName(h.be.D esc.Name)) | 181 h.path = types.StreamName(h.be.Desc.Prefix).Join(types.StreamName(h.be.D esc.Name)) |
| 171 ctx = log.SetField(ctx, "path", h.path) | 182 ctx = log.SetField(ctx, "path", h.path) |
| 172 | 183 |
| 173 if len(h.be.Secret) == 0 { | 184 if len(h.be.Secret) == 0 { |
| 174 log.Errorf(ctx, "Missing secret.") | 185 log.Errorf(ctx, "Missing secret.") |
| 175 return errors.New("missing stream secret") | 186 return errors.New("missing stream secret") |
| (...skipping 150 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 326 for _, e := range merr { | 337 for _, e := range merr { |
| 327 if hasTransientError(e) { | 338 if hasTransientError(e) { |
| 328 return true | 339 return true |
| 329 } | 340 } |
| 330 } | 341 } |
| 331 return false | 342 return false |
| 332 } | 343 } |
| 333 | 344 |
| 334 return errors.IsTransient(err) | 345 return errors.IsTransient(err) |
| 335 } | 346 } |
| OLD | NEW |