| OLD | NEW |
| 1 // Copyright 2015 The LUCI Authors. All rights reserved. | 1 // Copyright 2015 The LUCI Authors. All rights reserved. |
| 2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
| 3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
| 4 | 4 |
| 5 package butler | 5 package butler |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "bytes" | 8 "bytes" |
| 9 "errors" | 9 "errors" |
| 10 "fmt" | 10 "fmt" |
| 11 "io" | 11 "io" |
| 12 "io/ioutil" |
| 12 "sync" | 13 "sync" |
| 13 "testing" | 14 "testing" |
| 15 "time" |
| 14 | 16 |
| 15 "github.com/luci/luci-go/common/clock/testclock" | 17 "github.com/luci/luci-go/common/clock/testclock" |
| 18 "github.com/luci/luci-go/common/proto/google" |
| 16 . "github.com/luci/luci-go/common/testing/assertions" | 19 . "github.com/luci/luci-go/common/testing/assertions" |
| 17 "github.com/luci/luci-go/logdog/api/logpb" | 20 "github.com/luci/luci-go/logdog/api/logpb" |
| 18 "github.com/luci/luci-go/logdog/client/butler/output" | 21 "github.com/luci/luci-go/logdog/client/butler/output" |
| 19 "github.com/luci/luci-go/logdog/client/butlerlib/streamproto" | 22 "github.com/luci/luci-go/logdog/client/butlerlib/streamproto" |
| 20 "github.com/luci/luci-go/logdog/common/types" | 23 "github.com/luci/luci-go/logdog/common/types" |
| 24 |
| 25 "golang.org/x/net/context" |
| 26 |
| 21 . "github.com/smartystreets/goconvey/convey" | 27 . "github.com/smartystreets/goconvey/convey" |
| 22 "golang.org/x/net/context" | |
| 23 ) | 28 ) |
| 24 | 29 |
| 25 type testOutput struct { | 30 type testOutput struct { |
| 26 sync.Mutex | 31 sync.Mutex |
| 27 | 32 |
| 28 err error | 33 err error |
| 29 maxSize int | 34 maxSize int |
| 30 streams map[string][]*logpb.LogEntry | 35 streams map[string][]*logpb.LogEntry |
| 31 terminal map[string]struct{} | 36 terminal map[string]struct{} |
| 32 closed bool | 37 closed bool |
| (...skipping 359 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 392 | 397 |
| 393 So(teeStdout.String(), ShouldEqual, "Hello, STDO
UT") | 398 So(teeStdout.String(), ShouldEqual, "Hello, STDO
UT") |
| 394 So(to.logs("stdout"), shouldHaveTextLogs, "Hello
, STDOUT") | 399 So(to.logs("stdout"), shouldHaveTextLogs, "Hello
, STDOUT") |
| 395 So(to.isTerminal("stdout"), ShouldBeTrue) | 400 So(to.isTerminal("stdout"), ShouldBeTrue) |
| 396 | 401 |
| 397 So(teeStderr.String(), ShouldEqual, "Hello, STDE
RR") | 402 So(teeStderr.String(), ShouldEqual, "Hello, STDE
RR") |
| 398 So(to.logs("stderr"), shouldHaveTextLogs, "Hello
, STDERR") | 403 So(to.logs("stderr"), shouldHaveTextLogs, "Hello
, STDERR") |
| 399 So(to.isTerminal("stderr"), ShouldBeTrue) | 404 So(to.isTerminal("stderr"), ShouldBeTrue) |
| 400 }) | 405 }) |
| 401 | 406 |
| 407 Convey(`Can apply global tags.`, func() { |
| 408 conf.GlobalTags = streamproto.TagMap{ |
| 409 "foo": "bar", |
| 410 "baz": "qux", |
| 411 } |
| 412 props := streamproto.Properties{ |
| 413 LogStreamDescriptor: &logpb.LogStreamDes
criptor{ |
| 414 Name: "stdout", |
| 415 ContentType: "test/data", |
| 416 Timestamp: google.NewTimestamp
(time.Date(2016, 1, 1, 0, 0, 0, 0, time.UTC)), |
| 417 }, |
| 418 } |
| 419 |
| 420 b := mkb(c, conf) |
| 421 defer func() { |
| 422 b.Activate() |
| 423 b.Wait() |
| 424 }() |
| 425 |
| 426 Convey(`Applies global tags, but allows the stre
am to override.`, func() { |
| 427 props.Tags = map[string]string{ |
| 428 "baz": "override", |
| 429 } |
| 430 |
| 431 So(b.AddStream(ioutil.NopCloser(&bytes.B
uffer{}), &props), ShouldBeNil) |
| 432 So(b.bundler.GetStreamDescs(), ShouldRes
emble, map[string]*logpb.LogStreamDescriptor{ |
| 433 "stdout": { |
| 434 Name: "stdout", |
| 435 ContentType: "test/data"
, |
| 436 Timestamp: props.Times
tamp, |
| 437 Tags: map[string]string{ |
| 438 "foo": "bar", |
| 439 "baz": "override
", |
| 440 }, |
| 441 }, |
| 442 }) |
| 443 }) |
| 444 |
| 445 Convey(`Will apply global tags if the stream has
none (nil).`, func() { |
| 446 So(b.AddStream(ioutil.NopCloser(&bytes.B
uffer{}), &props), ShouldBeNil) |
| 447 So(b.bundler.GetStreamDescs(), ShouldRes
emble, map[string]*logpb.LogStreamDescriptor{ |
| 448 "stdout": { |
| 449 Name: "stdout", |
| 450 ContentType: "test/data"
, |
| 451 Timestamp: props.Times
tamp, |
| 452 Tags: map[string]string{ |
| 453 "foo": "bar", |
| 454 "baz": "qux", |
| 455 }, |
| 456 }, |
| 457 }) |
| 458 }) |
| 459 }) |
| 460 |
| 402 Convey(`Run with 256 streams, stream{0..256} will deplet
e and finish.`, func() { | 461 Convey(`Run with 256 streams, stream{0..256} will deplet
e and finish.`, func() { |
| 403 b := mkb(c, conf) | 462 b := mkb(c, conf) |
| 404 streams := make([]*testStream, 256) | 463 streams := make([]*testStream, 256) |
| 405 for i := range streams { | 464 for i := range streams { |
| 406 streams[i] = newTestStream(func(p *strea
mproto.Properties) { | 465 streams[i] = newTestStream(func(p *strea
mproto.Properties) { |
| 407 p.Name = fmt.Sprintf("stream%d",
i) | 466 p.Name = fmt.Sprintf("stream%d",
i) |
| 408 }) | 467 }) |
| 409 } | 468 } |
| 410 | 469 |
| 411 for _, s := range streams { | 470 for _, s := range streams { |
| (...skipping 127 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 539 tss.onNext = func() { | 598 tss.onNext = func() { |
| 540 panic("test panic") | 599 panic("test panic") |
| 541 } | 600 } |
| 542 | 601 |
| 543 b := mkb(c, conf) | 602 b := mkb(c, conf) |
| 544 b.AddStreamServer(tss) | 603 b.AddStreamServer(tss) |
| 545 So(b.Wait(), ShouldErrLike, "test panic") | 604 So(b.Wait(), ShouldErrLike, "test panic") |
| 546 }) | 605 }) |
| 547 }) | 606 }) |
| 548 } | 607 } |
| OLD | NEW |