Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(343)

Side by Side Diff: logdog/client/butler/butler.go

Issue 2456953003: LogDog: Update client/bootstrap to generate URLs. (Closed)
Patch Set: Winders Created 4 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « logdog/client/butler/bundler/bundler.go ('k') | logdog/client/butler/butler_test.go » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2015 The LUCI Authors. All rights reserved. 1 // Copyright 2015 The LUCI Authors. All rights reserved.
2 // Use of this source code is governed under the Apache License, Version 2.0 2 // Use of this source code is governed under the Apache License, Version 2.0
3 // that can be found in the LICENSE file. 3 // that can be found in the LICENSE file.
4 4
5 package butler 5 package butler
6 6
7 import ( 7 import (
8 "errors" 8 "errors"
9 "fmt" 9 "fmt"
10 "io" 10 "io"
(...skipping 295 matching lines...) Expand 10 before | Expand all | Expand 10 after
306 for { 306 for {
307 rc, config := streamServer.Next() 307 rc, config := streamServer.Next()
308 if rc == nil { 308 if rc == nil {
309 log.Debugf(ctx, "StreamServer returned nil strea m; terminating.") 309 log.Debugf(ctx, "StreamServer returned nil strea m; terminating.")
310 return 310 return
311 } 311 }
312 312
313 // Add this Stream to the Butler. 313 // Add this Stream to the Butler.
314 // 314 //
315 // We run this in a function so we can ensure cleanup on failure. 315 // We run this in a function so we can ensure cleanup on failure.
316 » » » if err := b.AddStream(rc, *config); err != nil { 316 » » » if err := b.AddStream(rc, config); err != nil {
317 log.Fields{ 317 log.Fields{
318 log.ErrorKey: err, 318 log.ErrorKey: err,
319 }.Errorf(ctx, "Failed to add stream.") 319 }.Errorf(ctx, "Failed to add stream.")
320 320
321 if err := rc.Close(); err != nil { 321 if err := rc.Close(); err != nil {
322 log.Fields{ 322 log.Fields{
323 log.ErrorKey: err, 323 log.ErrorKey: err,
324 }.Warningf(ctx, "Failed to close stream. ") 324 }.Warningf(ctx, "Failed to close stream. ")
325 } 325 }
326 } 326 }
(...skipping 13 matching lines...) Expand all
340 }() 340 }()
341 } 341 }
342 342
343 // AddStream adds a Stream to the Butler. This is goroutine-safe. 343 // AddStream adds a Stream to the Butler. This is goroutine-safe.
344 // 344 //
345 // If no error is returned, the Butler assumes ownership of the supplied stream. 345 // If no error is returned, the Butler assumes ownership of the supplied stream.
346 // The stream will be closed when processing is finished. 346 // The stream will be closed when processing is finished.
347 // 347 //
348 // If an error is occurred, the caller is still the owner of the stream and 348 // If an error is occurred, the caller is still the owner of the stream and
349 // is responsible for closing it. 349 // is responsible for closing it.
350 func (b *Butler) AddStream(rc io.ReadCloser, p streamproto.Properties) error { 350 func (b *Butler) AddStream(rc io.ReadCloser, p *streamproto.Properties) error {
351 » p = p.Clone()
351 if p.Timestamp == nil || p.Timestamp.Time().IsZero() { 352 if p.Timestamp == nil || p.Timestamp.Time().IsZero() {
352 p.Timestamp = google.NewTimestamp(clock.Now(b.ctx)) 353 p.Timestamp = google.NewTimestamp(clock.Now(b.ctx))
353 } 354 }
354 if err := p.Validate(); err != nil { 355 if err := p.Validate(); err != nil {
355 return err 356 return err
356 } 357 }
357 358
358 if p.Timeout > 0 { 359 if p.Timeout > 0 {
359 if rts, ok := rc.(iotools.ReadTimeoutSetter); ok { 360 if rts, ok := rc.(iotools.ReadTimeoutSetter); ok {
360 if err := rts.SetReadTimeout(p.Timeout); err != nil { 361 if err := rts.SetReadTimeout(p.Timeout); err != nil {
(...skipping 25 matching lines...) Expand all
386 case streamproto.TeeStderr: 387 case streamproto.TeeStderr:
387 if b.c.TeeStderr == nil { 388 if b.c.TeeStderr == nil {
388 return errors.New("butler: cannot tee through STDERR; no STDERR is configured") 389 return errors.New("butler: cannot tee through STDERR; no STDERR is configured")
389 } 390 }
390 reader = io.TeeReader(rc, b.c.TeeStderr) 391 reader = io.TeeReader(rc, b.c.TeeStderr)
391 392
392 default: 393 default:
393 return fmt.Errorf("invalid tee value: %v", p.Tee) 394 return fmt.Errorf("invalid tee value: %v", p.Tee)
394 } 395 }
395 396
396 p.Prefix = string(b.c.Prefix)
397 if err := b.registerStream(p.Name); err != nil { 397 if err := b.registerStream(p.Name); err != nil {
398 return err 398 return err
399 } 399 }
400 400
401 // Register this stream with our Bundler. It will take ownership of "p", so
402 // we should not use it after this point.
403 streamCtx := log.SetField(b.ctx, "stream", p.Name)
401 bs, err := b.bundler.Register(p) 404 bs, err := b.bundler.Register(p)
402 if err != nil { 405 if err != nil {
403 return err 406 return err
404 } 407 }
408 p = nil
405 409
406 b.streamC <- &stream{ 410 b.streamC <- &stream{
407 » » Context: log.SetField(b.ctx, "stream", p.Name), 411 » » Context: streamCtx,
408 r: reader, 412 r: reader,
409 c: rc, 413 c: rc,
410 bs: bs, 414 bs: bs,
411 } 415 }
412 return nil 416 return nil
413 } 417 }
414 418
415 func (b *Butler) runStreams(activateC chan struct{}) { 419 func (b *Butler) runStreams(activateC chan struct{}) {
416 streamFinishedC := make(chan struct{}) 420 streamFinishedC := make(chan struct{})
417 streamC := b.streamC 421 streamC := b.streamC
(...skipping 123 matching lines...) Expand 10 before | Expand all | Expand 10 after
541 // shutdown prematurely, so this should be reasonably quick. 545 // shutdown prematurely, so this should be reasonably quick.
542 b.Activate() 546 b.Activate()
543 } 547 }
544 548
545 // Returns the configured Butler error. 549 // Returns the configured Butler error.
546 func (b *Butler) getRunErr() error { 550 func (b *Butler) getRunErr() error {
547 b.shutdownMu.Lock() 551 b.shutdownMu.Lock()
548 defer b.shutdownMu.Unlock() 552 defer b.shutdownMu.Unlock()
549 return b.runErr 553 return b.runErr
550 } 554 }
OLDNEW
« no previous file with comments | « logdog/client/butler/bundler/bundler.go ('k') | logdog/client/butler/butler_test.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698