| OLD | NEW |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 package coordinator | 5 package coordinator |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "bytes" | 8 "bytes" |
| 9 "fmt" | 9 "fmt" |
| 10 "testing" | 10 "testing" |
| 11 "time" | 11 "time" |
| 12 | 12 |
| 13 "github.com/luci/gae/impl/memory" | 13 "github.com/luci/gae/impl/memory" |
| 14 ds "github.com/luci/gae/service/datastore" | 14 ds "github.com/luci/gae/service/datastore" |
| 15 "github.com/luci/luci-go/common/clock" | |
| 16 "github.com/luci/luci-go/common/clock/testclock" | 15 "github.com/luci/luci-go/common/clock/testclock" |
| 17 "github.com/luci/luci-go/common/logdog/types" | 16 "github.com/luci/luci-go/common/logdog/types" |
| 18 "github.com/luci/luci-go/common/proto/google" | 17 "github.com/luci/luci-go/common/proto/google" |
| 19 "github.com/luci/luci-go/common/proto/logdog/logpb" | 18 "github.com/luci/luci-go/common/proto/logdog/logpb" |
| 20 "golang.org/x/net/context" | 19 "golang.org/x/net/context" |
| 21 | 20 |
| 22 . "github.com/luci/luci-go/common/testing/assertions" | 21 . "github.com/luci/luci-go/common/testing/assertions" |
| 23 . "github.com/smartystreets/goconvey/convey" | 22 . "github.com/smartystreets/goconvey/convey" |
| 24 ) | 23 ) |
| 25 | 24 |
| (...skipping 28 matching lines...) Expand all Loading... |
| 54 ps := make(ds.PropertySlice, len(values)) | 53 ps := make(ds.PropertySlice, len(values)) |
| 55 for i, v := range values { | 54 for i, v := range values { |
| 56 ps[i] = ds.MkProperty(v) | 55 ps[i] = ds.MkProperty(v) |
| 57 } | 56 } |
| 58 return ps | 57 return ps |
| 59 } | 58 } |
| 60 | 59 |
| 61 func TestLogStream(t *testing.T) { | 60 func TestLogStream(t *testing.T) { |
| 62 t.Parallel() | 61 t.Parallel() |
| 63 | 62 |
| 64 » Convey(`A LogStream with invalid tags will fail to encode.`, t, func() { | 63 » Convey(`When creating a LogStream`, t, func() { |
| 65 » » ls := &LogStream{ | 64 » » Convey(`Can create keyed on hash.`, func() { |
| 66 » » » Prefix: "foo", | 65 » » » ls, err := NewLogStream("0123456789abcdef0123456789ABCDE
F0123456789abcdef0123456789ABCDEF") |
| 67 » » » Name: "bar", | 66 » » » So(err, ShouldBeNil) |
| 68 » » » Tags: TagMap{ | 67 » » » So(ls, ShouldNotBeNil) |
| 69 » » » » "!!!invalid key!!!": "value", | 68 » » }) |
| 70 » » » }, | 69 |
| 71 » » } | 70 » » Convey(`Will fail to create keyed on an invalid hash-length stri
ng.`, func() { |
| 72 » » _, err := ls.Save(true) | 71 » » » _, err := NewLogStream("0123456789abcdef!@#$%^&*()ABCDEF
0123456789abcdef0123456789ABCDEF") |
| 73 » » So(err, ShouldErrLike, "failed to encode tags") | 72 » » » So(err, ShouldErrLike, "invalid path") |
| 73 » » }) |
| 74 |
| 75 » » Convey(`Can create keyed on path.`, func() { |
| 76 » » » ls, err := NewLogStream("a/b/+/c/d") |
| 77 » » » So(err, ShouldBeNil) |
| 78 » » » So(ls, ShouldNotBeNil) |
| 79 » » }) |
| 80 |
| 81 » » Convey(`Will fail to create keyed on neither a path nor hash.`,
func() { |
| 82 » » » _, err := NewLogStream("") |
| 83 » » » So(err, ShouldNotBeNil) |
| 84 » » }) |
| 74 }) | 85 }) |
| 75 | 86 |
| 76 » Convey(`A LogStream will skip invalid tags when loading.`, t, func() { | 87 » Convey(`A testing log stream`, t, func() { |
| 77 » » ls := &LogStream{ | |
| 78 » » » Prefix: "foo", | |
| 79 » » » Name: "bar", | |
| 80 » » } | |
| 81 » » pmap := ds.PropertyMap{ | |
| 82 » » » "_Tags": sps(encodeKey("!!!invalid key!!!")), | |
| 83 » » } | |
| 84 » » So(ls.Load(pmap), ShouldBeNil) | |
| 85 » » So(ls.Tags, ShouldResemble, TagMap(nil)) | |
| 86 » }) | |
| 87 | |
| 88 » Convey(`With a testing configuration`, t, func() { | |
| 89 c, tc := testclock.UseTime(context.Background(), testclock.TestT
imeLocal) | 88 c, tc := testclock.UseTime(context.Background(), testclock.TestT
imeLocal) |
| 90 c = memory.Use(c) | 89 c = memory.Use(c) |
| 91 di := ds.Get(c) | 90 di := ds.Get(c) |
| 92 di.Testable().AutoIndex(true) | 91 di.Testable().AutoIndex(true) |
| 93 di.Testable().Consistent(true) | 92 di.Testable().Consistent(true) |
| 94 | 93 |
| 94 now := ds.RoundTime(tc.Now().UTC()) |
| 95 |
| 96 ls := LogStream{ |
| 97 Prefix: "testing", |
| 98 Name: "log/stream", |
| 99 State: LSStreaming, |
| 100 TerminalIndex: -1, |
| 101 Secret: bytes.Repeat([]byte{0x6F}, types.StreamSe
cretLength), |
| 102 Created: now.UTC(), |
| 103 ContentType: string(types.ContentTypeText), |
| 104 } |
| 105 ls.recalculateHashID() |
| 106 |
| 95 desc := logpb.LogStreamDescriptor{ | 107 desc := logpb.LogStreamDescriptor{ |
| 96 Prefix: "testing", | 108 Prefix: "testing", |
| 97 Name: "log/stream", | 109 Name: "log/stream", |
| 98 StreamType: logpb.StreamType_TEXT, | 110 StreamType: logpb.StreamType_TEXT, |
| 99 ContentType: "application/text", | 111 ContentType: "application/text", |
| 100 » » » Timestamp: google.NewTimestamp(clock.Now(c)), | 112 » » » Timestamp: google.NewTimestamp(now), |
| 101 Tags: map[string]string{ | 113 Tags: map[string]string{ |
| 102 "foo": "bar", | 114 "foo": "bar", |
| 103 "baz": "qux", | 115 "baz": "qux", |
| 104 "quux": "", | 116 "quux": "", |
| 105 }, | 117 }, |
| 106 } | 118 } |
| 107 | 119 |
| 108 » » Convey(`Can create a LogStream keyed on hash.`, func() { | 120 » » Convey(`Will skip invalid tags when loading.`, func() { |
| 109 » » » ls, err := NewLogStream("0123456789abcdef0123456789ABCDE
F0123456789abcdef0123456789ABCDEF") | 121 » » » pmap, err := ls.Save(false) |
| 110 So(err, ShouldBeNil) | 122 So(err, ShouldBeNil) |
| 111 » » » So(ls, ShouldNotBeNil) | 123 » » » pmap["_Tags"] = sps(encodeKey("!!!invalid key!!!")) |
| 124 |
| 125 » » » So(ls.Load(pmap), ShouldBeNil) |
| 126 » » » So(ls.Tags, ShouldResemble, TagMap(nil)) |
| 112 }) | 127 }) |
| 113 | 128 |
| 114 » » Convey(`Will fail to create a LogStream keyed on an invalid hash
-length string.`, func() { | 129 » » Convey(`With invalid tags will fail to encode.`, func() { |
| 115 » » » _, err := NewLogStream("0123456789abcdef!@#$%^&*()ABCDEF
0123456789abcdef0123456789ABCDEF") | 130 » » » ls.Tags = TagMap{ |
| 116 » » » So(err, ShouldErrLike, "invalid path") | 131 » » » » "!!!invalid key!!!": "value", |
| 132 » » » } |
| 133 |
| 134 » » » ls.SetDSValidate(false) |
| 135 » » » _, err := ls.Save(true) |
| 136 » » » So(err, ShouldErrLike, "failed to encode tags") |
| 117 }) | 137 }) |
| 118 | 138 |
| 119 » » Convey(`Can create a LogStream keyed on path.`, func() { | 139 » » Convey(`Can populate the LogStream with descriptor state.`, func
() { |
| 120 » » » ls, err := NewLogStream("a/b/+/c/d") | 140 » » » So(ls.LoadDescriptor(&desc), ShouldBeNil) |
| 121 » » » So(err, ShouldBeNil) | 141 » » » So(ls.Validate(), ShouldBeNil) |
| 122 » » » So(ls, ShouldNotBeNil) | |
| 123 » » }) | |
| 124 | 142 |
| 125 » » Convey(`Will fail to create a LogStream keyed on neither a path
nor hash.`, func() { | 143 » » » Convey(`Will not validate`, func() { |
| 126 » » » _, err := NewLogStream("") | 144 » » » » Convey(`Without a valid Prefix`, func() { |
| 127 » » » So(err, ShouldNotBeNil) | 145 » » » » » ls.Prefix = "" |
| 128 » » }) | 146 » » » » » ls.recalculateHashID() |
| 129 | 147 |
| 130 » » Convey(`Can create a new LogStream`, func() { | 148 » » » » » So(ls.Validate(), ShouldErrLike, "invali
d prefix") |
| 131 » » » ls, err := NewLogStream(string(desc.Path())) | 149 » » » » }) |
| 132 » » » So(err, ShouldBeNil) | 150 » » » » Convey(`Without a valid Name`, func() { |
| 151 » » » » » ls.Name = "" |
| 152 » » » » » ls.recalculateHashID() |
| 133 | 153 |
| 134 » » » Convey(`Can populate the LogStream with descriptor state
.`, func() { | 154 » » » » » So(ls.Validate(), ShouldErrLike, "invali
d name") |
| 135 » » » » ls.Created = ds.RoundTime(clock.Now(c).UTC()) | 155 » » » » }) |
| 136 » » » » ls.Updated = ds.RoundTime(clock.Now(c).UTC()) | 156 » » » » Convey(`Without a valid stream secret`, func() { |
| 137 » » » » ls.Secret = bytes.Repeat([]byte{0x6F}, types.Str
eamSecretLength) | 157 » » » » » ls.Secret = nil |
| 138 » » » » So(ls.LoadDescriptor(&desc), ShouldBeNil) | 158 » » » » » So(ls.Validate(), ShouldErrLike, "invali
d secret length") |
| 139 » » » » So(ls.Validate(), ShouldBeNil) | 159 » » » » }) |
| 160 » » » » Convey(`Without a valid content type`, func() { |
| 161 » » » » » ls.ContentType = "" |
| 162 » » » » » So(ls.Validate(), ShouldErrLike, "empty
content type") |
| 163 » » » » }) |
| 164 » » » » Convey(`Without a valid created time`, func() { |
| 165 » » » » » ls.Created = time.Time{} |
| 166 » » » » » So(ls.Validate(), ShouldErrLike, "create
d time is not set") |
| 167 » » » » }) |
| 168 » » » » Convey(`With a terminal index, will not validate
without a TerminatedTime.`, func() { |
| 169 » » » » » ls.State = LSArchiveTasked |
| 170 » » » » » ls.TerminalIndex = 1337 |
| 171 » » » » » So(ls.Validate(), ShouldErrLike, "missin
g terminated time") |
| 140 | 172 |
| 141 » » » » Convey(`Will not validate`, func() { | 173 » » » » » ls.TerminatedTime = now |
| 142 » » » » » Convey(`Without a valid Prefix`, func()
{ | 174 » » » » » So(ls.Validate(), ShouldBeNil) |
| 143 » » » » » » ls.Prefix = "" | |
| 144 » » » » » » So(ls.Validate(), ShouldErrLike,
"invalid prefix") | |
| 145 » » » » » }) | |
| 146 » » » » » Convey(`Without a valid prefix`, func()
{ | |
| 147 » » » » » » ls.Name = "" | |
| 148 » » » » » » So(ls.Validate(), ShouldErrLike,
"invalid name") | |
| 149 » » » » » }) | |
| 150 » » » » » Convey(`Without a valid stream secret`,
func() { | |
| 151 » » » » » » ls.Secret = nil | |
| 152 » » » » » » So(ls.Validate(), ShouldErrLike,
"invalid secret length") | |
| 153 » » » » » }) | |
| 154 » » » » » Convey(`Without a valid content type`, f
unc() { | |
| 155 » » » » » » ls.ContentType = "" | |
| 156 » » » » » » So(ls.Validate(), ShouldErrLike,
"empty content type") | |
| 157 » » » » » }) | |
| 158 » » » » » Convey(`Without a valid created time`, f
unc() { | |
| 159 » » » » » » ls.Created = time.Time{} | |
| 160 » » » » » » So(ls.Validate(), ShouldErrLike,
"created time is not set") | |
| 161 » » » » » }) | |
| 162 » » » » » Convey(`Without a valid updated time`, f
unc() { | |
| 163 » » » » » » ls.Updated = time.Time{} | |
| 164 » » » » » » So(ls.Validate(), ShouldErrLike,
"updated time is not set") | |
| 165 » » » » » }) | |
| 166 » » » » » Convey(`With an updated time before the
created time`, func() { | |
| 167 » » » » » » ls.Updated = ls.Created.Add(-tim
e.Second) | |
| 168 » » » » » » So(ls.Validate(), ShouldErrLike,
"updated time must be >= created time") | |
| 169 » » » » » }) | |
| 170 » » » » » Convey(`Without a valid stream type`, fu
nc() { | |
| 171 » » » » » » ls.StreamType = -1 | |
| 172 » » » » » » So(ls.Validate(), ShouldErrLike,
"unsupported stream type") | |
| 173 » » » » » }) | |
| 174 » » » » » Convey(`Without an invalid tag: empty ke
y`, func() { | |
| 175 » » » » » » ls.Tags[""] = "empty" | |
| 176 » » » » » » So(ls.Validate(), ShouldErrLike,
"invalid tag") | |
| 177 » » » » » }) | |
| 178 » » » » » Convey(`Without an invalid tag: bad key`
, func() { | |
| 179 » » » » » » ls.Tags["!"] = "bad-value" | |
| 180 » » » » » » So(ls.Validate(), ShouldErrLike,
"invalid tag") | |
| 181 » » » » » }) | |
| 182 » » » » » Convey(`With an invalid descriptor proto
buf`, func() { | |
| 183 » » » » » » ls.Descriptor = []byte{0x00} //
Invalid tag, "0". | |
| 184 » » » » » » So(ls.Validate(), ShouldErrLike,
"could not unmarshal descriptor") | |
| 185 » » » » » }) | |
| 186 }) | 175 }) |
| 176 Convey(`When archived, will not validate without
an ArchivedTime.`, func() { |
| 177 ls.State = LSArchived |
| 178 So(ls.Validate(), ShouldErrLike, "missin
g terminated time") |
| 187 | 179 |
| 188 » » » » Convey(`Can write the LogStream to the Datastore
.`, func() { | 180 » » » » » ls.TerminatedTime = now |
| 189 » » » » » So(di.Put(ls), ShouldBeNil) | 181 » » » » » So(ls.Validate(), ShouldErrLike, "missin
g archived time") |
| 190 | 182 |
| 191 » » » » » Convey(`Can read the LogStream back from
the Datastore.`, func() { | 183 » » » » » ls.ArchivedTime = now |
| 192 » » » » » » ls2 := LogStreamFromID(ls.HashID
()) | 184 » » » » » So(ls.Validate(), ShouldBeNil) |
| 193 » » » » » » So(di.Get(ls2), ShouldBeNil) | 185 » » » » }) |
| 194 » » » » » » So(ls2, ShouldResemble, ls) | 186 » » » » Convey(`Without a valid stream type`, func() { |
| 195 » » » » » }) | 187 » » » » » ls.StreamType = -1 |
| 188 » » » » » So(ls.Validate(), ShouldErrLike, "unsupp
orted stream type") |
| 189 » » » » }) |
| 190 » » » » Convey(`Without an invalid tag: empty key`, func
() { |
| 191 » » » » » ls.Tags[""] = "empty" |
| 192 » » » » » So(ls.Validate(), ShouldErrLike, "invali
d tag") |
| 193 » » » » }) |
| 194 » » » » Convey(`Without an invalid tag: bad key`, func()
{ |
| 195 » » » » » ls.Tags["!"] = "bad-value" |
| 196 » » » » » So(ls.Validate(), ShouldErrLike, "invali
d tag") |
| 197 » » » » }) |
| 198 » » » » Convey(`With an invalid descriptor protobuf`, fu
nc() { |
| 199 » » » » » ls.Descriptor = []byte{0x00} // Invalid
tag, "0". |
| 200 » » » » » So(ls.Validate(), ShouldErrLike, "could
not unmarshal descriptor") |
| 196 }) | 201 }) |
| 197 }) | 202 }) |
| 198 | 203 |
| 199 » » » Convey(`Will refuse to populate from an invalid descript
or.`, func() { | 204 » » » Convey(`Can write the LogStream to the Datastore.`, func
() { |
| 200 » » » » desc.Name = "" | 205 » » » » So(di.Put(&ls), ShouldBeNil) |
| 201 » » » » So(ls.LoadDescriptor(&desc), ShouldErrLike, "inv
alid descriptor") | 206 |
| 207 » » » » Convey(`Can read the LogStream back from the Dat
astore via prefix/name.`, func() { |
| 208 » » » » » ls2 := LogStreamFromPath(ls.Path()) |
| 209 » » » » » So(di.Get(ls2), ShouldBeNil) |
| 210 » » » » » So(ls2, ShouldResemble, &ls) |
| 211 » » » » }) |
| 212 |
| 213 » » » » Convey(`Can read the LogStream back from the Dat
astore via hash.`, func() { |
| 214 » » » » » ls2 := LogStreamFromID(ls.HashID) |
| 215 » » » » » So(di.Get(ls2), ShouldBeNil) |
| 216 » » » » » So(ls2, ShouldResemble, &ls) |
| 217 » » » » }) |
| 218 |
| 219 » » » » Convey(`Can read the LogStream back from the Dat
astore.`, func() { |
| 220 » » » » » var ls2 LogStream |
| 221 » » » » » ds.PopulateKey(&ls2, ds.Get(c).KeyForObj
(&ls)) |
| 222 » » » » » So(di.Get(&ls2), ShouldBeNil) |
| 223 » » » » » So(ls2, ShouldResemble, ls) |
| 224 » » » » }) |
| 202 }) | 225 }) |
| 203 }) | 226 }) |
| 204 | 227 |
| 228 Convey(`Will refuse to populate from an invalid descriptor.`, fu
nc() { |
| 229 desc.StreamType = -1 |
| 230 So(ls.LoadDescriptor(&desc), ShouldErrLike, "invalid des
criptor") |
| 231 }) |
| 232 |
| 205 Convey(`Writing multiple LogStream entries`, func() { | 233 Convey(`Writing multiple LogStream entries`, func() { |
| 206 times := map[string]time.Time{} | 234 times := map[string]time.Time{} |
| 207 streamNames := []string{ | 235 streamNames := []string{ |
| 208 "foo/bar", | 236 "foo/bar", |
| 209 "foo/bar/baz", | 237 "foo/bar/baz", |
| 210 "baz/qux", | 238 "baz/qux", |
| 211 "cat/dog", | 239 "cat/dog", |
| 212 "cat/bird/dog", | 240 "cat/bird/dog", |
| 213 "bird/plane", | 241 "bird/plane", |
| 214 } | 242 } |
| 215 » » » for _, name := range streamNames { | 243 » » » for i, name := range streamNames { |
| 216 » » » » desc := desc | 244 » » » » lsCopy := ls |
| 217 » » » » desc.Name = name | 245 » » » » lsCopy.Name = name |
| 246 » » » » lsCopy.Created = ds.RoundTime(now.Add(time.Durat
ion(i) * time.Second)) |
| 247 » » » » lsCopy.recalculateHashID() |
| 218 | 248 |
| 219 » » » » ls, err := NewLogStream(string(desc.Path())) | 249 » » » » descCopy := desc |
| 220 » » » » So(err, ShouldBeNil) | 250 » » » » descCopy.Name = name |
| 221 | 251 |
| 222 » » » » ls.Secret = bytes.Repeat([]byte{0x55}, types.Str
eamSecretLength) | 252 » » » » if err := lsCopy.LoadDescriptor(&descCopy); err
!= nil { |
| 223 » » » » So(ls.LoadDescriptor(&desc), ShouldBeNil) | 253 » » » » » panic(err) |
| 224 » » » » ls.Created = clock.Now(c).UTC() | 254 » » » » } |
| 225 » » » » ls.Updated = ls.Created | 255 » » » » So(di.Put(&lsCopy), ShouldBeNil) |
| 226 » » » » So(di.Put(ls), ShouldBeNil) | |
| 227 | 256 |
| 228 » » » » times[name] = clock.Now(c) | 257 » » » » times[name] = lsCopy.Created |
| 229 » » » » tc.Add(time.Second) | |
| 230 } | 258 } |
| 231 | 259 |
| 232 Convey(`When querying LogStream by -Created`, func() { | 260 Convey(`When querying LogStream by -Created`, func() { |
| 233 q := ds.NewQuery("LogStream").Order("-Created") | 261 q := ds.NewQuery("LogStream").Order("-Created") |
| 234 | 262 |
| 235 Convey(`LogStream path queries`, func() { | 263 Convey(`LogStream path queries`, func() { |
| 236 Convey(`A query for "foo/bar" should ret
urn "foo/bar".`, func() { | 264 Convey(`A query for "foo/bar" should ret
urn "foo/bar".`, func() { |
| 237 q, err := AddLogStreamPathFilter
(q, "**/+/foo/bar") | 265 q, err := AddLogStreamPathFilter
(q, "**/+/foo/bar") |
| 238 So(err, ShouldBeNil) | 266 So(err, ShouldBeNil) |
| 239 | 267 |
| (...skipping 173 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 413 "_C": sps("PF:1:foo", "PR:1:baz", "PR:3:bar"), | 441 "_C": sps("PF:1:foo", "PR:1:baz", "PR:3:bar"), |
| 414 }) | 442 }) |
| 415 }) | 443 }) |
| 416 | 444 |
| 417 Convey(`Will error if more than one greedy glob is present.`, fu
nc() { | 445 Convey(`Will error if more than one greedy glob is present.`, fu
nc() { |
| 418 _, err := AddLogStreamPathFilter(q, "*/foo/**/bar/**") | 446 _, err := AddLogStreamPathFilter(q, "*/foo/**/bar/**") |
| 419 So(err, ShouldErrLike, "cannot have more than one greedy
glob") | 447 So(err, ShouldErrLike, "cannot have more than one greedy
glob") |
| 420 }) | 448 }) |
| 421 }) | 449 }) |
| 422 } | 450 } |
| OLD | NEW |