| OLD | NEW |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 package logs | 5 package logs |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "bytes" | 8 "bytes" |
| 9 "errors" | 9 "errors" |
| 10 "fmt" | 10 "fmt" |
| 11 "io" | 11 "io" |
| 12 "math" | 12 "math" |
| 13 "testing" | 13 "testing" |
| 14 "time" | 14 "time" |
| 15 | 15 |
| 16 "github.com/golang/protobuf/proto" | 16 "github.com/golang/protobuf/proto" |
| 17 "github.com/luci/gae/filter/featureBreaker" | 17 "github.com/luci/gae/filter/featureBreaker" |
| 18 "github.com/luci/gae/impl/memory" | 18 "github.com/luci/gae/impl/memory" |
| 19 ds "github.com/luci/gae/service/datastore" | 19 ds "github.com/luci/gae/service/datastore" |
| 20 "github.com/luci/luci-go/appengine/logdog/coordinator" | 20 "github.com/luci/luci-go/appengine/logdog/coordinator" |
| 21 ct "github.com/luci/luci-go/appengine/logdog/coordinator/coordinatorTest
" | 21 ct "github.com/luci/luci-go/appengine/logdog/coordinator/coordinatorTest
" |
| 22 "github.com/luci/luci-go/common/api/logdog_coordinator/logs/v1" | 22 "github.com/luci/luci-go/common/api/logdog_coordinator/logs/v1" |
| 23 "github.com/luci/luci-go/common/clock/testclock" | 23 "github.com/luci/luci-go/common/clock/testclock" |
| 24 "github.com/luci/luci-go/common/config" |
| 24 "github.com/luci/luci-go/common/gcloud/gs" | 25 "github.com/luci/luci-go/common/gcloud/gs" |
| 25 "github.com/luci/luci-go/common/iotools" | 26 "github.com/luci/luci-go/common/iotools" |
| 26 "github.com/luci/luci-go/common/logdog/types" | 27 "github.com/luci/luci-go/common/logdog/types" |
| 27 "github.com/luci/luci-go/common/proto/logdog/logpb" | 28 "github.com/luci/luci-go/common/proto/logdog/logpb" |
| 28 "github.com/luci/luci-go/common/recordio" | 29 "github.com/luci/luci-go/common/recordio" |
| 29 "github.com/luci/luci-go/server/auth" | 30 "github.com/luci/luci-go/server/auth" |
| 30 "github.com/luci/luci-go/server/auth/authtest" | 31 "github.com/luci/luci-go/server/auth/authtest" |
| 31 "github.com/luci/luci-go/server/logdog/archive" | 32 "github.com/luci/luci-go/server/logdog/archive" |
| 32 "github.com/luci/luci-go/server/logdog/storage" | 33 "github.com/luci/luci-go/server/logdog/storage" |
| 33 memoryStorage "github.com/luci/luci-go/server/logdog/storage/memory" | 34 memoryStorage "github.com/luci/luci-go/server/logdog/storage/memory" |
| (...skipping 116 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 150 | 151 |
| 151 func testGetImpl(t *testing.T, archived bool) { | 152 func testGetImpl(t *testing.T, archived bool) { |
| 152 Convey(fmt.Sprintf(`With a testing configuration, a Get request (archive
d=%v)`, archived), t, func() { | 153 Convey(fmt.Sprintf(`With a testing configuration, a Get request (archive
d=%v)`, archived), t, func() { |
| 153 c, tc := testclock.UseTime(context.Background(), testclock.TestT
imeLocal) | 154 c, tc := testclock.UseTime(context.Background(), testclock.TestT
imeLocal) |
| 154 c = memory.Use(c) | 155 c = memory.Use(c) |
| 155 | 156 |
| 156 fs := authtest.FakeState{} | 157 fs := authtest.FakeState{} |
| 157 c = auth.WithState(c, &fs) | 158 c = auth.WithState(c, &fs) |
| 158 | 159 |
| 159 ms := memoryStorage.Storage{} | 160 ms := memoryStorage.Storage{} |
| 161 |
| 160 gsc := testGSClient{} | 162 gsc := testGSClient{} |
| 161 svcStub := ct.Services{ | 163 svcStub := ct.Services{ |
| 162 IS: func() (storage.Storage, error) { | 164 IS: func() (storage.Storage, error) { |
| 163 return &ms, nil | 165 return &ms, nil |
| 164 }, | 166 }, |
| 165 GS: func() (gs.Client, error) { | 167 GS: func() (gs.Client, error) { |
| 166 return gsc, nil | 168 return gsc, nil |
| 167 }, | 169 }, |
| 168 } | 170 } |
| 169 svcStub.InitConfig() | 171 svcStub.InitConfig() |
| 170 svcStub.ServiceConfig.Coordinator.AdminAuthGroup = "test-adminis
trators" | 172 svcStub.ServiceConfig.Coordinator.AdminAuthGroup = "test-adminis
trators" |
| 171 c = coordinator.WithServices(c, &svcStub) | 173 c = coordinator.WithServices(c, &svcStub) |
| 172 | 174 |
| 173 » » s := New() | 175 » » svr := New() |
| 176 |
| 177 » » // di is a datastore bound to the test project namespace. |
| 178 » » const project = "test-project" |
| 179 » » if err := coordinator.WithProjectNamespace(&c, config.ProjectNam
e(project)); err != nil { |
| 180 » » » panic(err) |
| 181 » » } |
| 182 » » di := ds.Get(c) |
| 174 | 183 |
| 175 // Generate our test stream. | 184 // Generate our test stream. |
| 176 desc := ct.TestLogStreamDescriptor(c, "foo/bar") | 185 desc := ct.TestLogStreamDescriptor(c, "foo/bar") |
| 177 ls := ct.TestLogStream(c, desc) | 186 ls := ct.TestLogStream(c, desc) |
| 178 » » if err := ds.Get(c).Put(ls); err != nil { | 187 » » if err := di.Put(ls); err != nil { |
| 179 panic(err) | 188 panic(err) |
| 180 } | 189 } |
| 181 | 190 |
| 182 tc.Add(time.Second) | 191 tc.Add(time.Second) |
| 183 var entries []*logpb.LogEntry | 192 var entries []*logpb.LogEntry |
| 184 protobufs := map[uint64][]byte{} | 193 protobufs := map[uint64][]byte{} |
| 185 for _, v := range []int{0, 1, 2, 4, 5, 7} { | 194 for _, v := range []int{0, 1, 2, 4, 5, 7} { |
| 186 le := ct.TestLogEntry(c, ls, v) | 195 le := ct.TestLogEntry(c, ls, v) |
| 187 le.GetText().Lines = append(le.GetText().Lines, &logpb.T
ext_Line{ | 196 le.GetText().Lines = append(le.GetText().Lines, &logpb.T
ext_Line{ |
| 188 Value: "another line of text", | 197 Value: "another line of text", |
| (...skipping 37 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 226 size += recordio.FrameHeaderSize(int64(len(pb)))
+ len(pb) | 235 size += recordio.FrameHeaderSize(int64(len(pb)))
+ len(pb) |
| 227 } | 236 } |
| 228 if size > math.MaxInt32 { | 237 if size > math.MaxInt32 { |
| 229 panic(size) | 238 panic(size) |
| 230 } | 239 } |
| 231 return int32(size) | 240 return int32(size) |
| 232 } | 241 } |
| 233 | 242 |
| 234 Convey(`Testing Get requests (no logs)`, func() { | 243 Convey(`Testing Get requests (no logs)`, func() { |
| 235 req := logdog.GetRequest{ | 244 req := logdog.GetRequest{ |
| 236 » » » » Path: string(ls.Path()), | 245 » » » » Project: project, |
| 246 » » » » Path: string(ls.Path()), |
| 237 } | 247 } |
| 238 | 248 |
| 249 Convey(`Will succeed with no logs.`, func() { |
| 250 resp, err := svr.Get(c, &req) |
| 251 |
| 252 So(err, ShouldBeRPCOK) |
| 253 So(resp, shouldHaveLogs) |
| 254 }) |
| 255 |
| 239 Convey(`Will fail if the Path is not a stream path or a
hash.`, func() { | 256 Convey(`Will fail if the Path is not a stream path or a
hash.`, func() { |
| 240 req.Path = "not/a/full/stream/path" | 257 req.Path = "not/a/full/stream/path" |
| 241 » » » » _, err := s.Get(c, &req) | 258 » » » » _, err := svr.Get(c, &req) |
| 242 So(err, ShouldErrLike, "invalid path value") | 259 So(err, ShouldErrLike, "invalid path value") |
| 243 }) | 260 }) |
| 244 | 261 |
| 245 Convey(`Will fail with Internal if the datastore Get() d
oesn't work.`, func() { | 262 Convey(`Will fail with Internal if the datastore Get() d
oesn't work.`, func() { |
| 246 c, fb := featureBreaker.FilterRDS(c, nil) | 263 c, fb := featureBreaker.FilterRDS(c, nil) |
| 247 fb.BreakFeatures(errors.New("testing error"), "G
etMulti") | 264 fb.BreakFeatures(errors.New("testing error"), "G
etMulti") |
| 248 | 265 |
| 249 » » » » _, err := s.Get(c, &req) | 266 » » » » _, err := svr.Get(c, &req) |
| 250 So(err, ShouldBeRPCInternal) | 267 So(err, ShouldBeRPCInternal) |
| 251 }) | 268 }) |
| 252 | 269 |
| 253 » » » Convey(`Will fail with NotFound if the log stream does n
ot exist.`, func() { | 270 » » » Convey(`Will fail with NotFound if the log stream does n
ot exist (different project).`, func() { |
| 271 » » » » req.Project = "does-not-exist" |
| 272 » » » » _, err := svr.Get(c, &req) |
| 273 » » » » So(err, ShouldBeRPCNotFound) |
| 274 » » » }) |
| 275 |
| 276 » » » Convey(`Will fail with NotFound if the log path does not
exist (different path).`, func() { |
| 254 req.Path = "testing/+/does/not/exist" | 277 req.Path = "testing/+/does/not/exist" |
| 255 » » » » _, err := s.Get(c, &req) | 278 » » » » _, err := svr.Get(c, &req) |
| 256 So(err, ShouldBeRPCNotFound) | 279 So(err, ShouldBeRPCNotFound) |
| 257 }) | 280 }) |
| 258 }) | 281 }) |
| 259 | 282 |
| 260 » » if !archived { | 283 » » Convey(`Testing Tail requests (no logs)`, func() { |
| 261 » » » // Add the logs to the in-memory temporary storage. | 284 » » » req := logdog.TailRequest{ |
| 262 » » » for _, le := range entries { | 285 » » » » Project: project, |
| 263 » » » » err := ms.Put(storage.PutRequest{ | 286 » » » » Path: string(ls.Path()), |
| 264 » » » » » Path: ls.Path(), | |
| 265 » » » » » Index: types.MessageIndex(le.StreamInde
x), | |
| 266 » » » » » Values: [][]byte{protobufs[le.StreamInde
x]}, | |
| 267 » » » » }) | |
| 268 » » » » if err != nil { | |
| 269 » » » » » panic(fmt.Errorf("failed to Put() LogEnt
ry: %v", err)) | |
| 270 » » » » } | |
| 271 } | 287 } |
| 272 » » } else { | 288 |
| 273 » » » // Archive this log stream. We will generate one index e
ntry for every | 289 » » » Convey(`Will succeed with no logs.`, func() { |
| 274 » » » // 2 log entries. | 290 » » » » resp, err := svr.Tail(c, &req) |
| 275 » » » src := staticArchiveSource(entries) | 291 |
| 276 » » » var lbuf, ibuf bytes.Buffer | 292 » » » » So(err, ShouldBeRPCOK) |
| 277 » » » m := archive.Manifest{ | 293 » » » » So(resp, shouldHaveLogs) |
| 278 » » » » Desc: desc, | 294 » » » }) |
| 279 » » » » Source: &src, | 295 |
| 280 » » » » LogWriter: &lbuf, | 296 » » » Convey(`Will fail with NotFound if the log stream does n
ot exist (different project).`, func() { |
| 281 » » » » IndexWriter: &ibuf, | 297 » » » » req.Project = "does-not-exist" |
| 282 » » » » StreamIndexRange: 2, | 298 » » » » _, err := svr.Tail(c, &req) |
| 299 » » » » So(err, ShouldBeRPCNotFound) |
| 300 » » » }) |
| 301 |
| 302 » » » Convey(`Will fail with NotFound if the log path does not
exist (different path).`, func() { |
| 303 » » » » req.Path = "testing/+/does/not/exist" |
| 304 » » » » _, err := svr.Tail(c, &req) |
| 305 » » » » So(err, ShouldBeRPCNotFound) |
| 306 » » » }) |
| 307 » » }) |
| 308 |
| 309 » » Convey(`When testing log data is added`, func() { |
| 310 » » » if !archived { |
| 311 » » » » // Add the logs to the in-memory temporary stora
ge. |
| 312 » » » » for _, le := range entries { |
| 313 » » » » » err := ms.Put(storage.PutRequest{ |
| 314 » » » » » » Project: project, |
| 315 » » » » » » Path: ls.Path(), |
| 316 » » » » » » Index: types.MessageIndex(le.S
treamIndex), |
| 317 » » » » » » Values: [][]byte{protobufs[le.S
treamIndex]}, |
| 318 » » » » » }) |
| 319 » » » » » if err != nil { |
| 320 » » » » » » panic(fmt.Errorf("failed to Put(
) LogEntry: %v", err)) |
| 321 » » » » » } |
| 322 » » » » } |
| 323 » » » } else { |
| 324 » » » » // Archive this log stream. We will generate one
index entry for every |
| 325 » » » » // 2 log entries. |
| 326 » » » » src := staticArchiveSource(entries) |
| 327 » » » » var lbuf, ibuf bytes.Buffer |
| 328 » » » » m := archive.Manifest{ |
| 329 » » » » » Desc: desc, |
| 330 » » » » » Source: &src, |
| 331 » » » » » LogWriter: &lbuf, |
| 332 » » » » » IndexWriter: &ibuf, |
| 333 » » » » » StreamIndexRange: 2, |
| 334 » » » » } |
| 335 » » » » if err := archive.Archive(m); err != nil { |
| 336 » » » » » panic(err) |
| 337 » » » » } |
| 338 |
| 339 » » » » now := tc.Now().UTC() |
| 340 |
| 341 » » » » gsc.put("gs://testbucket/stream", lbuf.Bytes()) |
| 342 » » » » gsc.put("gs://testbucket/index", ibuf.Bytes()) |
| 343 » » » » ls.State = coordinator.LSArchived |
| 344 » » » » ls.TerminatedTime = now |
| 345 » » » » ls.ArchivedTime = now |
| 346 » » » » ls.ArchiveStreamURL = "gs://testbucket/stream" |
| 347 » » » » ls.ArchiveIndexURL = "gs://testbucket/index" |
| 283 } | 348 } |
| 284 » » » if err := archive.Archive(m); err != nil { | 349 » » » if err := di.Put(ls); err != nil { |
| 285 panic(err) | 350 panic(err) |
| 286 } | 351 } |
| 287 | 352 |
| 288 » » » now := tc.Now().UTC() | 353 » » » Convey(`Testing Get requests`, func() { |
| 289 | 354 » » » » req := logdog.GetRequest{ |
| 290 » » » gsc.put("gs://testbucket/stream", lbuf.Bytes()) | 355 » » » » » Project: project, |
| 291 » » » gsc.put("gs://testbucket/index", ibuf.Bytes()) | 356 » » » » » Path: string(ls.Path()), |
| 292 » » » ls.State = coordinator.LSArchived | 357 » » » » } |
| 293 » » » ls.TerminatedTime = now | 358 |
| 294 » » » ls.ArchivedTime = now | 359 » » » » Convey(`When the log stream is purged`, func() { |
| 295 » » » ls.ArchiveStreamURL = "gs://testbucket/stream" | 360 » » » » » ls.Purged = true |
| 296 » » » ls.ArchiveIndexURL = "gs://testbucket/index" | 361 » » » » » if err := di.Put(ls); err != nil { |
| 297 » » } | |
| 298 » » if err := ds.Get(c).Put(ls); err != nil { | |
| 299 » » » panic(err) | |
| 300 » » } | |
| 301 | |
| 302 » » Convey(`Testing Get requests`, func() { | |
| 303 » » » req := logdog.GetRequest{ | |
| 304 » » » » Path: string(ls.Path()), | |
| 305 » » » } | |
| 306 | |
| 307 » » » Convey(`When the log stream is purged`, func() { | |
| 308 » » » » ls.Purged = true | |
| 309 » » » » if err := ds.Get(c).Put(ls); err != nil { | |
| 310 » » » » » panic(err) | |
| 311 » » » » } | |
| 312 | |
| 313 » » » » Convey(`Will return NotFound if the user is not
an administrator.`, func() { | |
| 314 » » » » » _, err := s.Get(c, &req) | |
| 315 » » » » » So(err, ShouldBeRPCNotFound) | |
| 316 » » » » }) | |
| 317 | |
| 318 » » » » Convey(`Will process the request if the user is
an administrator.`, func() { | |
| 319 » » » » » fs.IdentityGroups = []string{"test-admin
istrators"} | |
| 320 | |
| 321 » » » » » resp, err := s.Get(c, &req) | |
| 322 » » » » » So(err, ShouldBeRPCOK) | |
| 323 » » » » » So(resp, shouldHaveLogs, 0, 1, 2) | |
| 324 » » » » }) | |
| 325 » » » }) | |
| 326 | |
| 327 » » » Convey(`Will return empty if no records were requested.`
, func() { | |
| 328 » » » » req.LogCount = -1 | |
| 329 » » » » req.State = false | |
| 330 | |
| 331 » » » » resp, err := s.Get(c, &req) | |
| 332 » » » » So(err, ShouldBeRPCOK) | |
| 333 » » » » So(resp.Logs, ShouldHaveLength, 0) | |
| 334 » » » }) | |
| 335 | |
| 336 » » » Convey(`Will successfully retrieve a stream path.`, func
() { | |
| 337 » » » » resp, err := s.Get(c, &req) | |
| 338 » » » » So(err, ShouldBeRPCOK) | |
| 339 » » » » So(resp, shouldHaveLogs, 0, 1, 2) | |
| 340 » » » }) | |
| 341 | |
| 342 » » » Convey(`Will successfully retrieve a stream path offset
at 4.`, func() { | |
| 343 » » » » req.Index = 4 | |
| 344 | |
| 345 » » » » resp, err := s.Get(c, &req) | |
| 346 » » » » So(err, ShouldBeRPCOK) | |
| 347 » » » » So(resp, shouldHaveLogs, 4, 5) | |
| 348 » » » }) | |
| 349 | |
| 350 » » » Convey(`Will retrieve no logs for contiguous offset 6.`,
func() { | |
| 351 » » » » req.Index = 6 | |
| 352 | |
| 353 » » » » resp, err := s.Get(c, &req) | |
| 354 » » » » So(err, ShouldBeRPCOK) | |
| 355 » » » » So(len(resp.Logs), ShouldEqual, 0) | |
| 356 » » » }) | |
| 357 | |
| 358 » » » Convey(`Will retrieve log 7 for non-contiguous offset 6.
`, func() { | |
| 359 » » » » req.NonContiguous = true | |
| 360 » » » » req.Index = 6 | |
| 361 | |
| 362 » » » » resp, err := s.Get(c, &req) | |
| 363 » » » » So(err, ShouldBeRPCOK) | |
| 364 » » » » So(resp, shouldHaveLogs, 7) | |
| 365 » » » }) | |
| 366 | |
| 367 » » » Convey(`With a byte limit of 1, will still return at lea
st one log entry.`, func() { | |
| 368 » » » » req.ByteCount = 1 | |
| 369 | |
| 370 » » » » resp, err := s.Get(c, &req) | |
| 371 » » » » So(err, ShouldBeRPCOK) | |
| 372 » » » » So(resp, shouldHaveLogs, 0) | |
| 373 » » » }) | |
| 374 | |
| 375 » » » Convey(`With a byte limit of sizeof(0), will return log
entry 0.`, func() { | |
| 376 » » » » req.ByteCount = frameSize(0) | |
| 377 | |
| 378 » » » » resp, err := s.Get(c, &req) | |
| 379 » » » » So(err, ShouldBeRPCOK) | |
| 380 » » » » So(resp, shouldHaveLogs, 0) | |
| 381 » » » }) | |
| 382 | |
| 383 » » » Convey(`With a byte limit of sizeof(0)+1, will return lo
g entry 0.`, func() { | |
| 384 » » » » req.ByteCount = frameSize(0) + 1 | |
| 385 | |
| 386 » » » » resp, err := s.Get(c, &req) | |
| 387 » » » » So(err, ShouldBeRPCOK) | |
| 388 » » » » So(resp, shouldHaveLogs, 0) | |
| 389 » » » }) | |
| 390 | |
| 391 » » » Convey(`With a byte limit of sizeof({0, 1}), will return
log entries {0, 1}.`, func() { | |
| 392 » » » » req.ByteCount = frameSize(0, 1) | |
| 393 | |
| 394 » » » » resp, err := s.Get(c, &req) | |
| 395 » » » » So(err, ShouldBeRPCOK) | |
| 396 » » » » So(resp, shouldHaveLogs, 0, 1) | |
| 397 » » » }) | |
| 398 | |
| 399 » » » Convey(`With a byte limit of sizeof({0, 1, 2}), will ret
urn log entries {0, 1, 2}.`, func() { | |
| 400 » » » » req.ByteCount = frameSize(0, 1, 2) | |
| 401 | |
| 402 » » » » resp, err := s.Get(c, &req) | |
| 403 » » » » So(err, ShouldBeRPCOK) | |
| 404 » » » » So(resp, shouldHaveLogs, 0, 1, 2) | |
| 405 » » » }) | |
| 406 | |
| 407 » » » Convey(`With a byte limit of sizeof({0, 1, 2})+1, will r
eturn log entries {0, 1, 2}.`, func() { | |
| 408 » » » » req.ByteCount = frameSize(0, 1, 2) + 1 | |
| 409 | |
| 410 » » » » resp, err := s.Get(c, &req) | |
| 411 » » » » So(err, ShouldBeRPCOK) | |
| 412 » » » » So(resp, shouldHaveLogs, 0, 1, 2) | |
| 413 » » » }) | |
| 414 | |
| 415 » » » Convey(`Will successfully retrieve a stream path hash.`,
func() { | |
| 416 » » » » req.Path = ls.HashID | |
| 417 » » » » resp, err := s.Get(c, &req) | |
| 418 » » » » So(err, ShouldBeRPCOK) | |
| 419 » » » » So(resp, shouldHaveLogs, 0, 1, 2) | |
| 420 » » » }) | |
| 421 | |
| 422 » » » Convey(`When requesting state`, func() { | |
| 423 » » » » req.State = true | |
| 424 » » » » req.LogCount = -1 | |
| 425 | |
| 426 » » » » Convey(`Will successfully retrieve stream state.
`, func() { | |
| 427 » » » » » resp, err := s.Get(c, &req) | |
| 428 » » » » » So(err, ShouldBeRPCOK) | |
| 429 » » » » » So(resp.State, ShouldResemble, loadLogSt
reamState(ls)) | |
| 430 » » » » » So(len(resp.Logs), ShouldEqual, 0) | |
| 431 » » » » }) | |
| 432 | |
| 433 » » » » Convey(`Will return Internal if the protobuf des
criptor data is corrupt.`, func() { | |
| 434 » » » » » ls.SetDSValidate(false) | |
| 435 » » » » » ls.Descriptor = []byte{0x00} // Invalid
protobuf, zero tag. | |
| 436 » » » » » if err := ds.Get(c).Put(ls); err != nil
{ | |
| 437 panic(err) | 362 panic(err) |
| 438 } | 363 } |
| 439 | 364 |
| 440 » » » » » _, err := s.Get(c, &req) | 365 » » » » » Convey(`Will return NotFound if the user
is not an administrator.`, func() { |
| 366 » » » » » » _, err := svr.Get(c, &req) |
| 367 » » » » » » So(err, ShouldBeRPCNotFound) |
| 368 » » » » » }) |
| 369 |
| 370 » » » » » Convey(`Will process the request if the
user is an administrator.`, func() { |
| 371 » » » » » » fs.IdentityGroups = []string{"te
st-administrators"} |
| 372 |
| 373 » » » » » » resp, err := svr.Get(c, &req) |
| 374 » » » » » » So(err, ShouldBeRPCOK) |
| 375 » » » » » » So(resp, shouldHaveLogs, 0, 1, 2
) |
| 376 » » » » » }) |
| 377 » » » » }) |
| 378 |
| 379 » » » » Convey(`Will return empty if no records were req
uested.`, func() { |
| 380 » » » » » req.LogCount = -1 |
| 381 » » » » » req.State = false |
| 382 |
| 383 » » » » » resp, err := svr.Get(c, &req) |
| 384 » » » » » So(err, ShouldBeRPCOK) |
| 385 » » » » » So(resp.Logs, ShouldHaveLength, 0) |
| 386 » » » » }) |
| 387 |
| 388 » » » » Convey(`Will successfully retrieve a stream path
.`, func() { |
| 389 » » » » » resp, err := svr.Get(c, &req) |
| 390 » » » » » So(err, ShouldBeRPCOK) |
| 391 » » » » » So(resp, shouldHaveLogs, 0, 1, 2) |
| 392 » » » » }) |
| 393 |
| 394 » » » » Convey(`Will successfully retrieve a stream path
offset at 4.`, func() { |
| 395 » » » » » req.Index = 4 |
| 396 |
| 397 » » » » » resp, err := svr.Get(c, &req) |
| 398 » » » » » So(err, ShouldBeRPCOK) |
| 399 » » » » » So(resp, shouldHaveLogs, 4, 5) |
| 400 » » » » }) |
| 401 |
| 402 » » » » Convey(`Will retrieve no logs for contiguous off
set 6.`, func() { |
| 403 » » » » » req.Index = 6 |
| 404 |
| 405 » » » » » resp, err := svr.Get(c, &req) |
| 406 » » » » » So(err, ShouldBeRPCOK) |
| 407 » » » » » So(len(resp.Logs), ShouldEqual, 0) |
| 408 » » » » }) |
| 409 |
| 410 » » » » Convey(`Will retrieve log 7 for non-contiguous o
ffset 6.`, func() { |
| 411 » » » » » req.NonContiguous = true |
| 412 » » » » » req.Index = 6 |
| 413 |
| 414 » » » » » resp, err := svr.Get(c, &req) |
| 415 » » » » » So(err, ShouldBeRPCOK) |
| 416 » » » » » So(resp, shouldHaveLogs, 7) |
| 417 » » » » }) |
| 418 |
| 419 » » » » Convey(`With a byte limit of 1, will still retur
n at least one log entry.`, func() { |
| 420 » » » » » req.ByteCount = 1 |
| 421 |
| 422 » » » » » resp, err := svr.Get(c, &req) |
| 423 » » » » » So(err, ShouldBeRPCOK) |
| 424 » » » » » So(resp, shouldHaveLogs, 0) |
| 425 » » » » }) |
| 426 |
| 427 » » » » Convey(`With a byte limit of sizeof(0), will ret
urn log entry 0.`, func() { |
| 428 » » » » » req.ByteCount = frameSize(0) |
| 429 |
| 430 » » » » » resp, err := svr.Get(c, &req) |
| 431 » » » » » So(err, ShouldBeRPCOK) |
| 432 » » » » » So(resp, shouldHaveLogs, 0) |
| 433 » » » » }) |
| 434 |
| 435 » » » » Convey(`With a byte limit of sizeof(0)+1, will r
eturn log entry 0.`, func() { |
| 436 » » » » » req.ByteCount = frameSize(0) + 1 |
| 437 |
| 438 » » » » » resp, err := svr.Get(c, &req) |
| 439 » » » » » So(err, ShouldBeRPCOK) |
| 440 » » » » » So(resp, shouldHaveLogs, 0) |
| 441 » » » » }) |
| 442 |
| 443 » » » » Convey(`With a byte limit of sizeof({0, 1}), wil
l return log entries {0, 1}.`, func() { |
| 444 » » » » » req.ByteCount = frameSize(0, 1) |
| 445 |
| 446 » » » » » resp, err := svr.Get(c, &req) |
| 447 » » » » » So(err, ShouldBeRPCOK) |
| 448 » » » » » So(resp, shouldHaveLogs, 0, 1) |
| 449 » » » » }) |
| 450 |
| 451 » » » » Convey(`With a byte limit of sizeof({0, 1, 2}),
will return log entries {0, 1, 2}.`, func() { |
| 452 » » » » » req.ByteCount = frameSize(0, 1, 2) |
| 453 |
| 454 » » » » » resp, err := svr.Get(c, &req) |
| 455 » » » » » So(err, ShouldBeRPCOK) |
| 456 » » » » » So(resp, shouldHaveLogs, 0, 1, 2) |
| 457 » » » » }) |
| 458 |
| 459 » » » » Convey(`With a byte limit of sizeof({0, 1, 2})+1
, will return log entries {0, 1, 2}.`, func() { |
| 460 » » » » » req.ByteCount = frameSize(0, 1, 2) + 1 |
| 461 |
| 462 » » » » » resp, err := svr.Get(c, &req) |
| 463 » » » » » So(err, ShouldBeRPCOK) |
| 464 » » » » » So(resp, shouldHaveLogs, 0, 1, 2) |
| 465 » » » » }) |
| 466 |
| 467 » » » » Convey(`Will successfully retrieve a stream path
hash.`, func() { |
| 468 » » » » » req.Path = ls.HashID |
| 469 » » » » » resp, err := svr.Get(c, &req) |
| 470 » » » » » So(err, ShouldBeRPCOK) |
| 471 » » » » » So(resp, shouldHaveLogs, 0, 1, 2) |
| 472 » » » » }) |
| 473 |
| 474 » » » » Convey(`When requesting state`, func() { |
| 475 » » » » » req.State = true |
| 476 » » » » » req.LogCount = -1 |
| 477 |
| 478 » » » » » Convey(`Will successfully retrieve strea
m state.`, func() { |
| 479 » » » » » » resp, err := svr.Get(c, &req) |
| 480 » » » » » » So(err, ShouldBeRPCOK) |
| 481 » » » » » » So(resp.State, ShouldResemble, l
oadLogStreamState(ls)) |
| 482 » » » » » » So(len(resp.Logs), ShouldEqual,
0) |
| 483 » » » » » }) |
| 484 |
| 485 » » » » » Convey(`Will return Internal if the prot
obuf descriptor data is corrupt.`, func() { |
| 486 » » » » » » ls.SetDSValidate(false) |
| 487 » » » » » » ls.Descriptor = []byte{0x00} //
Invalid protobuf, zero tag. |
| 488 » » » » » » if err := di.Put(ls); err != nil
{ |
| 489 » » » » » » » panic(err) |
| 490 » » » » » » } |
| 491 |
| 492 » » » » » » _, err := svr.Get(c, &req) |
| 493 » » » » » » So(err, ShouldBeRPCInternal) |
| 494 » » » » » }) |
| 495 » » » » }) |
| 496 |
| 497 » » » » Convey(`Will return Internal if the protobuf log
entry data is corrupt.`, func() { |
| 498 » » » » » if archived { |
| 499 » » » » » » // Corrupt the archive datastrea
m. |
| 500 » » » » » » stream := gsc.get("gs://testbuck
et/stream") |
| 501 » » » » » » zeroRecords(stream) |
| 502 » » » » » } else { |
| 503 » » » » » » // Add corrupted entry to Storag
e. Create a new entry here, since |
| 504 » » » » » » // the storage will reject a dup
licate/overwrite. |
| 505 » » » » » » err := ms.Put(storage.PutRequest
{ |
| 506 » » » » » » » Project: project, |
| 507 » » » » » » » Path: types.StreamPat
h(req.Path), |
| 508 » » » » » » » Index: 666, |
| 509 » » » » » » » Values: [][]byte{{0x00}
}, // Invalid protobuf, zero tag. |
| 510 » » » » » » }) |
| 511 » » » » » » if err != nil { |
| 512 » » » » » » » panic(err) |
| 513 » » » » » » } |
| 514 » » » » » » req.Index = 666 |
| 515 » » » » » } |
| 516 |
| 517 » » » » » _, err := svr.Get(c, &req) |
| 441 So(err, ShouldBeRPCInternal) | 518 So(err, ShouldBeRPCInternal) |
| 442 }) | 519 }) |
| 443 » » » }) | 520 |
| 444 | 521 » » » » Convey(`Will successfully retrieve both logs and
stream state.`, func() { |
| 445 » » » Convey(`Will return Internal if the protobuf log entry d
ata is corrupt.`, func() { | 522 » » » » » req.State = true |
| 446 » » » » if archived { | 523 |
| 447 » » » » » // Corrupt the archive datastream. | 524 » » » » » resp, err := svr.Get(c, &req) |
| 448 » » » » » stream := gsc.get("gs://testbucket/strea
m") | 525 » » » » » So(err, ShouldBeRPCOK) |
| 449 » » » » » zeroRecords(stream) | 526 » » » » » So(resp.State, ShouldResemble, loadLogSt
reamState(ls)) |
| 450 » » » » } else { | 527 » » » » » So(resp, shouldHaveLogs, 0, 1, 2) |
| 451 » » » » » // Add corrupted entry to Storage. Creat
e a new entry here, since | 528 » » » » }) |
| 452 » » » » » // the storage will reject a duplicate/o
verwrite. | 529 |
| 453 » » » » » err := ms.Put(storage.PutRequest{ | 530 » » » » Convey(`Will return Internal if the Storage is n
ot working.`, func() { |
| 454 » » » » » » Path: types.StreamPath(req.Pat
h), | 531 » » » » » if archived { |
| 455 » » » » » » Index: 666, | 532 » » » » » » gsc["error"] = []byte("test erro
r") |
| 456 » » » » » » Values: [][]byte{{0x00}}, // Inv
alid protobuf, zero tag. | 533 » » » » » } else { |
| 457 » » » » » }) | 534 » » » » » » ms.Close() |
| 458 » » » » » if err != nil { | |
| 459 » » » » » » panic(err) | |
| 460 } | 535 } |
| 461 » » » » » req.Index = 666 | 536 |
| 462 » » » » } | 537 » » » » » _, err := svr.Get(c, &req) |
| 463 | 538 » » » » » So(err, ShouldBeRPCInternal) |
| 464 » » » » _, err := s.Get(c, &req) | 539 » » » » }) |
| 465 » » » » So(err, ShouldBeRPCInternal) | 540 |
| 466 » » » }) | 541 » » » » Convey(`Will enforce a maximum count of 2.`, fun
c() { |
| 467 | 542 » » » » » req.LogCount = 2 |
| 468 » » » Convey(`Will successfully retrieve both logs and stream
state.`, func() { | 543 » » » » » resp, err := svr.Get(c, &req) |
| 469 » » » » req.State = true | 544 » » » » » So(err, ShouldBeRPCOK) |
| 470 | 545 » » » » » So(resp, shouldHaveLogs, 0, 1) |
| 471 » » » » resp, err := s.Get(c, &req) | 546 » » » » }) |
| 472 » » » » So(err, ShouldBeRPCOK) | 547 |
| 473 » » » » So(resp.State, ShouldResemble, loadLogStreamStat
e(ls)) | 548 » » » » Convey(`When requesting protobufs`, func() { |
| 474 » » » » So(resp, shouldHaveLogs, 0, 1, 2) | 549 » » » » » req.State = true |
| 475 » » » }) | 550 |
| 476 | 551 » » » » » resp, err := svr.Get(c, &req) |
| 477 » » » Convey(`Will return Internal if the Storage is not worki
ng.`, func() { | 552 » » » » » So(err, ShouldBeRPCOK) |
| 478 » » » » if archived { | 553 » » » » » So(resp, shouldHaveLogs, 0, 1, 2) |
| 479 » » » » » gsc["error"] = []byte("test error") | 554 |
| 480 » » » » } else { | 555 » » » » » // Confirm that this has protobufs. |
| 481 » » » » » ms.Close() | 556 » » » » » So(len(resp.Logs), ShouldEqual, 3) |
| 482 » » » » } | 557 » » » » » So(resp.Logs[0], ShouldNotBeNil) |
| 483 | 558 |
| 484 » » » » _, err := s.Get(c, &req) | 559 » » » » » // Confirm that there is a descriptor pr
otobuf. |
| 485 » » » » So(err, ShouldBeRPCInternal) | 560 » » » » » So(resp.Desc, ShouldResemble, desc) |
| 486 » » » }) | 561 |
| 487 | 562 » » » » » // Confirm that the state was returned. |
| 488 » » » Convey(`Will enforce a maximum count of 2.`, func() { | 563 » » » » » So(resp.State, ShouldNotBeNil) |
| 489 » » » » req.LogCount = 2 | 564 » » » » }) |
| 490 » » » » resp, err := s.Get(c, &req) | 565 |
| 491 » » » » So(err, ShouldBeRPCOK) | 566 » » » » Convey(`Will successfully retrieve all records i
f non-contiguous is allowed.`, func() { |
| 492 » » » » So(resp, shouldHaveLogs, 0, 1) | 567 » » » » » req.NonContiguous = true |
| 493 » » » }) | 568 » » » » » resp, err := svr.Get(c, &req) |
| 494 | 569 » » » » » So(err, ShouldBeRPCOK) |
| 495 » » » Convey(`When requesting protobufs`, func() { | 570 » » » » » So(resp, shouldHaveLogs, 0, 1, 2, 4, 5,
7) |
| 496 » » » » req.State = true | 571 » » » » }) |
| 497 | 572 |
| 498 » » » » resp, err := s.Get(c, &req) | 573 » » » » Convey(`When newlines are not requested, does no
t include delimiters.`, func() { |
| 499 » » » » So(err, ShouldBeRPCOK) | 574 » » » » » req.LogCount = 1 |
| 500 » » » » So(resp, shouldHaveLogs, 0, 1, 2) | 575 |
| 501 | 576 » » » » » resp, err := svr.Get(c, &req) |
| 502 » » » » // Confirm that this has protobufs. | 577 » » » » » So(err, ShouldBeRPCOK) |
| 503 » » » » So(len(resp.Logs), ShouldEqual, 3) | 578 » » » » » So(resp, shouldHaveLogs, 0) |
| 504 » » » » So(resp.Logs[0], ShouldNotBeNil) | 579 |
| 505 | 580 » » » » » So(resp.Logs[0].GetText(), ShouldResembl
e, &logpb.Text{ |
| 506 » » » » // Confirm that there is a descriptor protobuf. | 581 » » » » » » Lines: []*logpb.Text_Line{ |
| 507 » » » » So(resp.Desc, ShouldResemble, desc) | 582 » » » » » » » {"log entry #0", "\n"}, |
| 508 | 583 » » » » » » » {"another line of text",
""}, |
| 509 » » » » // Confirm that the state was returned. | 584 » » » » » » }, |
| 510 » » » » So(resp.State, ShouldNotBeNil) | 585 » » » » » }) |
| 511 » » » }) | 586 » » » » }) |
| 512 | 587 |
| 513 » » » Convey(`Will successfully retrieve all records if non-co
ntiguous is allowed.`, func() { | 588 » » » » Convey(`Will get a Binary LogEntry`, func() { |
| 514 » » » » req.NonContiguous = true | 589 » » » » » req.Index = 4 |
| 515 » » » » resp, err := s.Get(c, &req) | 590 » » » » » req.LogCount = 1 |
| 516 » » » » So(err, ShouldBeRPCOK) | 591 » » » » » resp, err := svr.Get(c, &req) |
| 517 » » » » So(resp, shouldHaveLogs, 0, 1, 2, 4, 5, 7) | 592 » » » » » So(err, ShouldBeRPCOK) |
| 518 » » » }) | 593 » » » » » So(resp, shouldHaveLogs, 4) |
| 519 | 594 » » » » » So(resp.Logs[0].GetBinary(), ShouldResem
ble, &logpb.Binary{ |
| 520 » » » Convey(`When newlines are not requested, does not includ
e delimiters.`, func() { | 595 » » » » » » Data: []byte{0x00, 0x01, 0x02, 0
x03}, |
| 521 » » » » req.LogCount = 1 | 596 » » » » » }) |
| 522 | 597 » » » » }) |
| 523 » » » » resp, err := s.Get(c, &req) | 598 |
| 524 » » » » So(err, ShouldBeRPCOK) | 599 » » » » Convey(`Will get a Datagram LogEntry`, func() { |
| 525 » » » » So(resp, shouldHaveLogs, 0) | 600 » » » » » req.Index = 5 |
| 526 | 601 » » » » » req.LogCount = 1 |
| 527 » » » » So(resp.Logs[0].GetText(), ShouldResemble, &logp
b.Text{ | 602 » » » » » resp, err := svr.Get(c, &req) |
| 528 » » » » » Lines: []*logpb.Text_Line{ | 603 » » » » » So(err, ShouldBeRPCOK) |
| 529 » » » » » » {"log entry #0", "\n"}, | 604 » » » » » So(resp, shouldHaveLogs, 5) |
| 530 » » » » » » {"another line of text", ""}, | 605 » » » » » So(resp.Logs[0].GetDatagram(), ShouldRes
emble, &logpb.Datagram{ |
| 531 » » » » » }, | 606 » » » » » » Data: []byte{0x00, 0x01, 0x02, 0
x03}, |
| 532 » » » » }) | 607 » » » » » » Partial: &logpb.Datagram_Partial
{ |
| 533 » » » }) | 608 » » » » » » » Index: 2, |
| 534 | 609 » » » » » » » Size: 1024, |
| 535 » » » Convey(`Will get a Binary LogEntry`, func() { | 610 » » » » » » » Last: false, |
| 536 » » » » req.Index = 4 | 611 » » » » » » }, |
| 537 » » » » req.LogCount = 1 | 612 » » » » » }) |
| 538 » » » » resp, err := s.Get(c, &req) | 613 » » » » }) |
| 539 » » » » So(err, ShouldBeRPCOK) | 614 » » » }) |
| 540 » » » » So(resp, shouldHaveLogs, 4) | 615 |
| 541 » » » » So(resp.Logs[0].GetBinary(), ShouldResemble, &lo
gpb.Binary{ | 616 » » » Convey(`Testing tail requests`, func() { |
| 542 » » » » » Data: []byte{0x00, 0x01, 0x02, 0x03}, | 617 » » » » req := logdog.TailRequest{ |
| 543 » » » » }) | 618 » » » » » Project: "test-project", |
| 544 » » » }) | 619 » » » » » Path: string(ls.Path()), |
| 545 | 620 » » » » } |
| 546 » » » Convey(`Will get a Datagram LogEntry`, func() { | 621 |
| 547 » » » » req.Index = 5 | 622 » » » » Convey(`Will successfully retrieve a stream path
.`, func() { |
| 548 » » » » req.LogCount = 1 | 623 » » » » » resp, err := svr.Tail(c, &req) |
| 549 » » » » resp, err := s.Get(c, &req) | 624 » » » » » So(err, ShouldBeRPCOK) |
| 550 » » » » So(err, ShouldBeRPCOK) | 625 » » » » » So(resp, shouldHaveLogs, 7) |
| 551 » » » » So(resp, shouldHaveLogs, 5) | 626 » » » » }) |
| 552 » » » » So(resp.Logs[0].GetDatagram(), ShouldResemble, &
logpb.Datagram{ | 627 |
| 553 » » » » » Data: []byte{0x00, 0x01, 0x02, 0x03}, | 628 » » » » Convey(`Will successfully retrieve a stream path
hash and state.`, func() { |
| 554 » » » » » Partial: &logpb.Datagram_Partial{ | 629 » » » » » req.Path = ls.HashID |
| 555 » » » » » » Index: 2, | 630 » » » » » req.State = true |
| 556 » » » » » » Size: 1024, | 631 |
| 557 » » » » » » Last: false, | 632 » » » » » resp, err := svr.Tail(c, &req) |
| 558 » » » » » }, | 633 » » » » » So(err, ShouldBeRPCOK) |
| 559 » » » » }) | 634 » » » » » So(resp, shouldHaveLogs, 7) |
| 560 » » » }) | 635 » » » » » So(resp.State, ShouldResemble, loadLogSt
reamState(ls)) |
| 561 » » }) | 636 » » » » }) |
| 562 | |
| 563 » » Convey(`Testing tail requests`, func() { | |
| 564 » » » req := logdog.TailRequest{ | |
| 565 » » » » Path: string(ls.Path()), | |
| 566 » » » } | |
| 567 | |
| 568 » » » Convey(`Will successfully retrieve a stream path.`, func
() { | |
| 569 » » » » resp, err := s.Tail(c, &req) | |
| 570 » » » » So(err, ShouldBeRPCOK) | |
| 571 » » » » So(resp, shouldHaveLogs, 7) | |
| 572 » » » }) | |
| 573 | |
| 574 » » » Convey(`Will successfully retrieve a stream path hash an
d state.`, func() { | |
| 575 » » » » req.Path = ls.HashID | |
| 576 » » » » req.State = true | |
| 577 | |
| 578 » » » » resp, err := s.Tail(c, &req) | |
| 579 » » » » So(err, ShouldBeRPCOK) | |
| 580 » » » » So(resp, shouldHaveLogs, 7) | |
| 581 » » » » So(resp.State, ShouldResemble, loadLogStreamStat
e(ls)) | |
| 582 }) | 637 }) |
| 583 }) | 638 }) |
| 584 }) | 639 }) |
| 585 } | 640 } |
| 586 | 641 |
| 587 func TestGetIntermediate(t *testing.T) { | 642 func TestGetIntermediate(t *testing.T) { |
| 588 t.Parallel() | 643 t.Parallel() |
| 589 | 644 |
| 590 testGetImpl(t, false) | 645 testGetImpl(t, false) |
| 591 } | 646 } |
| 592 | 647 |
| 593 func TestGetArchived(t *testing.T) { | 648 func TestGetArchived(t *testing.T) { |
| 594 t.Parallel() | 649 t.Parallel() |
| 595 | 650 |
| 596 » testGetImpl(t, true) | 651 » testGetImpl(t, false) |
| 597 } | 652 } |
| OLD | NEW |