| OLD | NEW |
| 1 // Copyright 2015 The LUCI Authors. All rights reserved. | 1 // Copyright 2015 The LUCI Authors. All rights reserved. |
| 2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
| 3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
| 4 | 4 |
| 5 package butler | 5 package butler |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "errors" | 8 "errors" |
| 9 "fmt" | 9 "fmt" |
| 10 "io" | 10 "io" |
| (...skipping 295 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 306 for { | 306 for { |
| 307 rc, config := streamServer.Next() | 307 rc, config := streamServer.Next() |
| 308 if rc == nil { | 308 if rc == nil { |
| 309 log.Debugf(ctx, "StreamServer returned nil strea
m; terminating.") | 309 log.Debugf(ctx, "StreamServer returned nil strea
m; terminating.") |
| 310 return | 310 return |
| 311 } | 311 } |
| 312 | 312 |
| 313 // Add this Stream to the Butler. | 313 // Add this Stream to the Butler. |
| 314 // | 314 // |
| 315 // We run this in a function so we can ensure cleanup on
failure. | 315 // We run this in a function so we can ensure cleanup on
failure. |
| 316 » » » if err := b.AddStream(rc, *config); err != nil { | 316 » » » if err := b.AddStream(rc, config); err != nil { |
| 317 log.Fields{ | 317 log.Fields{ |
| 318 log.ErrorKey: err, | 318 log.ErrorKey: err, |
| 319 }.Errorf(ctx, "Failed to add stream.") | 319 }.Errorf(ctx, "Failed to add stream.") |
| 320 | 320 |
| 321 if err := rc.Close(); err != nil { | 321 if err := rc.Close(); err != nil { |
| 322 log.Fields{ | 322 log.Fields{ |
| 323 log.ErrorKey: err, | 323 log.ErrorKey: err, |
| 324 }.Warningf(ctx, "Failed to close stream.
") | 324 }.Warningf(ctx, "Failed to close stream.
") |
| 325 } | 325 } |
| 326 } | 326 } |
| (...skipping 13 matching lines...) Expand all Loading... |
| 340 }() | 340 }() |
| 341 } | 341 } |
| 342 | 342 |
| 343 // AddStream adds a Stream to the Butler. This is goroutine-safe. | 343 // AddStream adds a Stream to the Butler. This is goroutine-safe. |
| 344 // | 344 // |
| 345 // If no error is returned, the Butler assumes ownership of the supplied stream. | 345 // If no error is returned, the Butler assumes ownership of the supplied stream. |
| 346 // The stream will be closed when processing is finished. | 346 // The stream will be closed when processing is finished. |
| 347 // | 347 // |
| 348 // If an error is occurred, the caller is still the owner of the stream and | 348 // If an error is occurred, the caller is still the owner of the stream and |
| 349 // is responsible for closing it. | 349 // is responsible for closing it. |
| 350 func (b *Butler) AddStream(rc io.ReadCloser, p streamproto.Properties) error { | 350 func (b *Butler) AddStream(rc io.ReadCloser, p *streamproto.Properties) error { |
| 351 » p = p.Clone() |
| 351 if p.Timestamp == nil || p.Timestamp.Time().IsZero() { | 352 if p.Timestamp == nil || p.Timestamp.Time().IsZero() { |
| 352 p.Timestamp = google.NewTimestamp(clock.Now(b.ctx)) | 353 p.Timestamp = google.NewTimestamp(clock.Now(b.ctx)) |
| 353 } | 354 } |
| 354 if err := p.Validate(); err != nil { | 355 if err := p.Validate(); err != nil { |
| 355 return err | 356 return err |
| 356 } | 357 } |
| 357 | 358 |
| 358 if p.Timeout > 0 { | 359 if p.Timeout > 0 { |
| 359 if rts, ok := rc.(iotools.ReadTimeoutSetter); ok { | 360 if rts, ok := rc.(iotools.ReadTimeoutSetter); ok { |
| 360 if err := rts.SetReadTimeout(p.Timeout); err != nil { | 361 if err := rts.SetReadTimeout(p.Timeout); err != nil { |
| (...skipping 25 matching lines...) Expand all Loading... |
| 386 case streamproto.TeeStderr: | 387 case streamproto.TeeStderr: |
| 387 if b.c.TeeStderr == nil { | 388 if b.c.TeeStderr == nil { |
| 388 return errors.New("butler: cannot tee through STDERR; no
STDERR is configured") | 389 return errors.New("butler: cannot tee through STDERR; no
STDERR is configured") |
| 389 } | 390 } |
| 390 reader = io.TeeReader(rc, b.c.TeeStderr) | 391 reader = io.TeeReader(rc, b.c.TeeStderr) |
| 391 | 392 |
| 392 default: | 393 default: |
| 393 return fmt.Errorf("invalid tee value: %v", p.Tee) | 394 return fmt.Errorf("invalid tee value: %v", p.Tee) |
| 394 } | 395 } |
| 395 | 396 |
| 396 p.Prefix = string(b.c.Prefix) | |
| 397 if err := b.registerStream(p.Name); err != nil { | 397 if err := b.registerStream(p.Name); err != nil { |
| 398 return err | 398 return err |
| 399 } | 399 } |
| 400 | 400 |
| 401 // Register this stream with our Bundler. It will take ownership of "p",
so |
| 402 // we should not use it after this point. |
| 403 streamCtx := log.SetField(b.ctx, "stream", p.Name) |
| 401 bs, err := b.bundler.Register(p) | 404 bs, err := b.bundler.Register(p) |
| 402 if err != nil { | 405 if err != nil { |
| 403 return err | 406 return err |
| 404 } | 407 } |
| 408 p = nil |
| 405 | 409 |
| 406 b.streamC <- &stream{ | 410 b.streamC <- &stream{ |
| 407 » » Context: log.SetField(b.ctx, "stream", p.Name), | 411 » » Context: streamCtx, |
| 408 r: reader, | 412 r: reader, |
| 409 c: rc, | 413 c: rc, |
| 410 bs: bs, | 414 bs: bs, |
| 411 } | 415 } |
| 412 return nil | 416 return nil |
| 413 } | 417 } |
| 414 | 418 |
| 415 func (b *Butler) runStreams(activateC chan struct{}) { | 419 func (b *Butler) runStreams(activateC chan struct{}) { |
| 416 streamFinishedC := make(chan struct{}) | 420 streamFinishedC := make(chan struct{}) |
| 417 streamC := b.streamC | 421 streamC := b.streamC |
| (...skipping 123 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 541 // shutdown prematurely, so this should be reasonably quick. | 545 // shutdown prematurely, so this should be reasonably quick. |
| 542 b.Activate() | 546 b.Activate() |
| 543 } | 547 } |
| 544 | 548 |
| 545 // Returns the configured Butler error. | 549 // Returns the configured Butler error. |
| 546 func (b *Butler) getRunErr() error { | 550 func (b *Butler) getRunErr() error { |
| 547 b.shutdownMu.Lock() | 551 b.shutdownMu.Lock() |
| 548 defer b.shutdownMu.Unlock() | 552 defer b.shutdownMu.Unlock() |
| 549 return b.runErr | 553 return b.runErr |
| 550 } | 554 } |
| OLD | NEW |