OLD | NEW |
1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 package logs | 5 package logs |
6 | 6 |
7 import ( | 7 import ( |
8 "bytes" | 8 "bytes" |
9 "errors" | 9 "errors" |
10 "fmt" | 10 "fmt" |
11 "io" | 11 "io" |
12 "math" | 12 "math" |
13 "testing" | 13 "testing" |
14 "time" | 14 "time" |
15 | 15 |
16 "github.com/golang/protobuf/proto" | 16 "github.com/golang/protobuf/proto" |
17 "github.com/luci/gae/filter/featureBreaker" | 17 "github.com/luci/gae/filter/featureBreaker" |
18 "github.com/luci/gae/impl/memory" | 18 "github.com/luci/gae/impl/memory" |
19 ds "github.com/luci/gae/service/datastore" | 19 ds "github.com/luci/gae/service/datastore" |
20 "github.com/luci/luci-go/appengine/logdog/coordinator" | 20 "github.com/luci/luci-go/appengine/logdog/coordinator" |
21 ct "github.com/luci/luci-go/appengine/logdog/coordinator/coordinatorTest
" | 21 ct "github.com/luci/luci-go/appengine/logdog/coordinator/coordinatorTest
" |
22 "github.com/luci/luci-go/common/api/logdog_coordinator/logs/v1" | 22 "github.com/luci/luci-go/common/api/logdog_coordinator/logs/v1" |
23 "github.com/luci/luci-go/common/clock/testclock" | 23 "github.com/luci/luci-go/common/clock/testclock" |
| 24 "github.com/luci/luci-go/common/config" |
24 "github.com/luci/luci-go/common/gcloud/gs" | 25 "github.com/luci/luci-go/common/gcloud/gs" |
25 "github.com/luci/luci-go/common/iotools" | 26 "github.com/luci/luci-go/common/iotools" |
26 "github.com/luci/luci-go/common/logdog/types" | 27 "github.com/luci/luci-go/common/logdog/types" |
27 "github.com/luci/luci-go/common/proto/logdog/logpb" | 28 "github.com/luci/luci-go/common/proto/logdog/logpb" |
28 "github.com/luci/luci-go/common/recordio" | 29 "github.com/luci/luci-go/common/recordio" |
29 "github.com/luci/luci-go/server/auth" | 30 "github.com/luci/luci-go/server/auth" |
30 "github.com/luci/luci-go/server/auth/authtest" | 31 "github.com/luci/luci-go/server/auth/authtest" |
31 "github.com/luci/luci-go/server/logdog/archive" | 32 "github.com/luci/luci-go/server/logdog/archive" |
32 "github.com/luci/luci-go/server/logdog/storage" | 33 "github.com/luci/luci-go/server/logdog/storage" |
33 memoryStorage "github.com/luci/luci-go/server/logdog/storage/memory" | 34 memoryStorage "github.com/luci/luci-go/server/logdog/storage/memory" |
(...skipping 116 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
150 | 151 |
151 func testGetImpl(t *testing.T, archived bool) { | 152 func testGetImpl(t *testing.T, archived bool) { |
152 Convey(fmt.Sprintf(`With a testing configuration, a Get request (archive
d=%v)`, archived), t, func() { | 153 Convey(fmt.Sprintf(`With a testing configuration, a Get request (archive
d=%v)`, archived), t, func() { |
153 c, tc := testclock.UseTime(context.Background(), testclock.TestT
imeLocal) | 154 c, tc := testclock.UseTime(context.Background(), testclock.TestT
imeLocal) |
154 c = memory.Use(c) | 155 c = memory.Use(c) |
155 | 156 |
156 fs := authtest.FakeState{} | 157 fs := authtest.FakeState{} |
157 c = auth.WithState(c, &fs) | 158 c = auth.WithState(c, &fs) |
158 | 159 |
159 ms := memoryStorage.Storage{} | 160 ms := memoryStorage.Storage{} |
| 161 |
160 gsc := testGSClient{} | 162 gsc := testGSClient{} |
161 svcStub := ct.Services{ | 163 svcStub := ct.Services{ |
162 IS: func() (storage.Storage, error) { | 164 IS: func() (storage.Storage, error) { |
163 return &ms, nil | 165 return &ms, nil |
164 }, | 166 }, |
165 GS: func() (gs.Client, error) { | 167 GS: func() (gs.Client, error) { |
166 return gsc, nil | 168 return gsc, nil |
167 }, | 169 }, |
168 } | 170 } |
169 svcStub.InitConfig() | 171 svcStub.InitConfig() |
170 svcStub.ServiceConfig.Coordinator.AdminAuthGroup = "test-adminis
trators" | 172 svcStub.ServiceConfig.Coordinator.AdminAuthGroup = "test-adminis
trators" |
171 c = coordinator.WithServices(c, &svcStub) | 173 c = coordinator.WithServices(c, &svcStub) |
172 | 174 |
173 » » s := New() | 175 » » svr := New() |
| 176 |
| 177 » » // di is a datastore bound to the test project namespace. |
| 178 » » const project = "test-project" |
| 179 » » if err := coordinator.WithProjectNamespace(&c, config.ProjectNam
e(project)); err != nil { |
| 180 » » » panic(err) |
| 181 » » } |
| 182 » » di := ds.Get(c) |
174 | 183 |
175 // Generate our test stream. | 184 // Generate our test stream. |
176 desc := ct.TestLogStreamDescriptor(c, "foo/bar") | 185 desc := ct.TestLogStreamDescriptor(c, "foo/bar") |
177 ls := ct.TestLogStream(c, desc) | 186 ls := ct.TestLogStream(c, desc) |
178 » » if err := ds.Get(c).Put(ls); err != nil { | 187 » » if err := di.Put(ls); err != nil { |
179 panic(err) | 188 panic(err) |
180 } | 189 } |
181 | 190 |
182 tc.Add(time.Second) | 191 tc.Add(time.Second) |
183 var entries []*logpb.LogEntry | 192 var entries []*logpb.LogEntry |
184 protobufs := map[uint64][]byte{} | 193 protobufs := map[uint64][]byte{} |
185 for _, v := range []int{0, 1, 2, 4, 5, 7} { | 194 for _, v := range []int{0, 1, 2, 4, 5, 7} { |
186 le := ct.TestLogEntry(c, ls, v) | 195 le := ct.TestLogEntry(c, ls, v) |
187 le.GetText().Lines = append(le.GetText().Lines, &logpb.T
ext_Line{ | 196 le.GetText().Lines = append(le.GetText().Lines, &logpb.T
ext_Line{ |
188 Value: "another line of text", | 197 Value: "another line of text", |
(...skipping 37 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
226 size += recordio.FrameHeaderSize(int64(len(pb)))
+ len(pb) | 235 size += recordio.FrameHeaderSize(int64(len(pb)))
+ len(pb) |
227 } | 236 } |
228 if size > math.MaxInt32 { | 237 if size > math.MaxInt32 { |
229 panic(size) | 238 panic(size) |
230 } | 239 } |
231 return int32(size) | 240 return int32(size) |
232 } | 241 } |
233 | 242 |
234 Convey(`Testing Get requests (no logs)`, func() { | 243 Convey(`Testing Get requests (no logs)`, func() { |
235 req := logdog.GetRequest{ | 244 req := logdog.GetRequest{ |
236 » » » » Path: string(ls.Path()), | 245 » » » » Project: project, |
| 246 » » » » Path: string(ls.Path()), |
237 } | 247 } |
238 | 248 |
| 249 Convey(`Will succeed with no logs.`, func() { |
| 250 resp, err := svr.Get(c, &req) |
| 251 |
| 252 So(err, ShouldBeRPCOK) |
| 253 So(resp, shouldHaveLogs) |
| 254 }) |
| 255 |
239 Convey(`Will fail if the Path is not a stream path or a
hash.`, func() { | 256 Convey(`Will fail if the Path is not a stream path or a
hash.`, func() { |
240 req.Path = "not/a/full/stream/path" | 257 req.Path = "not/a/full/stream/path" |
241 » » » » _, err := s.Get(c, &req) | 258 » » » » _, err := svr.Get(c, &req) |
242 So(err, ShouldErrLike, "invalid path value") | 259 So(err, ShouldErrLike, "invalid path value") |
243 }) | 260 }) |
244 | 261 |
245 Convey(`Will fail with Internal if the datastore Get() d
oesn't work.`, func() { | 262 Convey(`Will fail with Internal if the datastore Get() d
oesn't work.`, func() { |
246 c, fb := featureBreaker.FilterRDS(c, nil) | 263 c, fb := featureBreaker.FilterRDS(c, nil) |
247 fb.BreakFeatures(errors.New("testing error"), "G
etMulti") | 264 fb.BreakFeatures(errors.New("testing error"), "G
etMulti") |
248 | 265 |
249 » » » » _, err := s.Get(c, &req) | 266 » » » » _, err := svr.Get(c, &req) |
250 So(err, ShouldBeRPCInternal) | 267 So(err, ShouldBeRPCInternal) |
251 }) | 268 }) |
252 | 269 |
253 » » » Convey(`Will fail with NotFound if the log stream does n
ot exist.`, func() { | 270 » » » Convey(`Will fail with NotFound if the log stream does n
ot exist (different project).`, func() { |
| 271 » » » » req.Project = "does-not-exist" |
| 272 » » » » _, err := svr.Get(c, &req) |
| 273 » » » » So(err, ShouldBeRPCNotFound) |
| 274 » » » }) |
| 275 |
| 276 » » » Convey(`Will fail with NotFound if the log path does not
exist (different path).`, func() { |
254 req.Path = "testing/+/does/not/exist" | 277 req.Path = "testing/+/does/not/exist" |
255 » » » » _, err := s.Get(c, &req) | 278 » » » » _, err := svr.Get(c, &req) |
256 So(err, ShouldBeRPCNotFound) | 279 So(err, ShouldBeRPCNotFound) |
257 }) | 280 }) |
258 }) | 281 }) |
259 | 282 |
260 » » if !archived { | 283 » » Convey(`Testing Tail requests (no logs)`, func() { |
261 » » » // Add the logs to the in-memory temporary storage. | 284 » » » req := logdog.TailRequest{ |
262 » » » for _, le := range entries { | 285 » » » » Project: project, |
263 » » » » err := ms.Put(storage.PutRequest{ | 286 » » » » Path: string(ls.Path()), |
264 » » » » » Path: ls.Path(), | |
265 » » » » » Index: types.MessageIndex(le.StreamInde
x), | |
266 » » » » » Values: [][]byte{protobufs[le.StreamInde
x]}, | |
267 » » » » }) | |
268 » » » » if err != nil { | |
269 » » » » » panic(fmt.Errorf("failed to Put() LogEnt
ry: %v", err)) | |
270 » » » » } | |
271 } | 287 } |
272 » » } else { | 288 |
273 » » » // Archive this log stream. We will generate one index e
ntry for every | 289 » » » Convey(`Will succeed with no logs.`, func() { |
274 » » » // 2 log entries. | 290 » » » » resp, err := svr.Tail(c, &req) |
275 » » » src := staticArchiveSource(entries) | 291 |
276 » » » var lbuf, ibuf bytes.Buffer | 292 » » » » So(err, ShouldBeRPCOK) |
277 » » » m := archive.Manifest{ | 293 » » » » So(resp, shouldHaveLogs) |
278 » » » » Desc: desc, | 294 » » » }) |
279 » » » » Source: &src, | 295 |
280 » » » » LogWriter: &lbuf, | 296 » » » Convey(`Will fail with NotFound if the log stream does n
ot exist (different project).`, func() { |
281 » » » » IndexWriter: &ibuf, | 297 » » » » req.Project = "does-not-exist" |
282 » » » » StreamIndexRange: 2, | 298 » » » » _, err := svr.Tail(c, &req) |
| 299 » » » » So(err, ShouldBeRPCNotFound) |
| 300 » » » }) |
| 301 |
| 302 » » » Convey(`Will fail with NotFound if the log path does not
exist (different path).`, func() { |
| 303 » » » » req.Path = "testing/+/does/not/exist" |
| 304 » » » » _, err := svr.Tail(c, &req) |
| 305 » » » » So(err, ShouldBeRPCNotFound) |
| 306 » » » }) |
| 307 » » }) |
| 308 |
| 309 » » Convey(`When testing log data is added`, func() { |
| 310 » » » if !archived { |
| 311 » » » » // Add the logs to the in-memory temporary stora
ge. |
| 312 » » » » for _, le := range entries { |
| 313 » » » » » err := ms.Put(storage.PutRequest{ |
| 314 » » » » » » Project: project, |
| 315 » » » » » » Path: ls.Path(), |
| 316 » » » » » » Index: types.MessageIndex(le.S
treamIndex), |
| 317 » » » » » » Values: [][]byte{protobufs[le.S
treamIndex]}, |
| 318 » » » » » }) |
| 319 » » » » » if err != nil { |
| 320 » » » » » » panic(fmt.Errorf("failed to Put(
) LogEntry: %v", err)) |
| 321 » » » » » } |
| 322 » » » » } |
| 323 » » » } else { |
| 324 » » » » // Archive this log stream. We will generate one
index entry for every |
| 325 » » » » // 2 log entries. |
| 326 » » » » src := staticArchiveSource(entries) |
| 327 » » » » var lbuf, ibuf bytes.Buffer |
| 328 » » » » m := archive.Manifest{ |
| 329 » » » » » Desc: desc, |
| 330 » » » » » Source: &src, |
| 331 » » » » » LogWriter: &lbuf, |
| 332 » » » » » IndexWriter: &ibuf, |
| 333 » » » » » StreamIndexRange: 2, |
| 334 » » » » } |
| 335 » » » » if err := archive.Archive(m); err != nil { |
| 336 » » » » » panic(err) |
| 337 » » » » } |
| 338 |
| 339 » » » » now := tc.Now().UTC() |
| 340 |
| 341 » » » » gsc.put("gs://testbucket/stream", lbuf.Bytes()) |
| 342 » » » » gsc.put("gs://testbucket/index", ibuf.Bytes()) |
| 343 » » » » ls.State = coordinator.LSArchived |
| 344 » » » » ls.TerminatedTime = now |
| 345 » » » » ls.ArchivedTime = now |
| 346 » » » » ls.ArchiveStreamURL = "gs://testbucket/stream" |
| 347 » » » » ls.ArchiveIndexURL = "gs://testbucket/index" |
283 } | 348 } |
284 » » » if err := archive.Archive(m); err != nil { | 349 » » » if err := di.Put(ls); err != nil { |
285 panic(err) | 350 panic(err) |
286 } | 351 } |
287 | 352 |
288 » » » now := tc.Now().UTC() | 353 » » » Convey(`Testing Get requests`, func() { |
289 | 354 » » » » req := logdog.GetRequest{ |
290 » » » gsc.put("gs://testbucket/stream", lbuf.Bytes()) | 355 » » » » » Project: project, |
291 » » » gsc.put("gs://testbucket/index", ibuf.Bytes()) | 356 » » » » » Path: string(ls.Path()), |
292 » » » ls.State = coordinator.LSArchived | 357 » » » » } |
293 » » » ls.TerminatedTime = now | 358 |
294 » » » ls.ArchivedTime = now | 359 » » » » Convey(`When the log stream is purged`, func() { |
295 » » » ls.ArchiveStreamURL = "gs://testbucket/stream" | 360 » » » » » ls.Purged = true |
296 » » » ls.ArchiveIndexURL = "gs://testbucket/index" | 361 » » » » » if err := di.Put(ls); err != nil { |
297 » » } | |
298 » » if err := ds.Get(c).Put(ls); err != nil { | |
299 » » » panic(err) | |
300 » » } | |
301 | |
302 » » Convey(`Testing Get requests`, func() { | |
303 » » » req := logdog.GetRequest{ | |
304 » » » » Path: string(ls.Path()), | |
305 » » » } | |
306 | |
307 » » » Convey(`When the log stream is purged`, func() { | |
308 » » » » ls.Purged = true | |
309 » » » » if err := ds.Get(c).Put(ls); err != nil { | |
310 » » » » » panic(err) | |
311 » » » » } | |
312 | |
313 » » » » Convey(`Will return NotFound if the user is not
an administrator.`, func() { | |
314 » » » » » _, err := s.Get(c, &req) | |
315 » » » » » So(err, ShouldBeRPCNotFound) | |
316 » » » » }) | |
317 | |
318 » » » » Convey(`Will process the request if the user is
an administrator.`, func() { | |
319 » » » » » fs.IdentityGroups = []string{"test-admin
istrators"} | |
320 | |
321 » » » » » resp, err := s.Get(c, &req) | |
322 » » » » » So(err, ShouldBeRPCOK) | |
323 » » » » » So(resp, shouldHaveLogs, 0, 1, 2) | |
324 » » » » }) | |
325 » » » }) | |
326 | |
327 » » » Convey(`Will return empty if no records were requested.`
, func() { | |
328 » » » » req.LogCount = -1 | |
329 » » » » req.State = false | |
330 | |
331 » » » » resp, err := s.Get(c, &req) | |
332 » » » » So(err, ShouldBeRPCOK) | |
333 » » » » So(resp.Logs, ShouldHaveLength, 0) | |
334 » » » }) | |
335 | |
336 » » » Convey(`Will successfully retrieve a stream path.`, func
() { | |
337 » » » » resp, err := s.Get(c, &req) | |
338 » » » » So(err, ShouldBeRPCOK) | |
339 » » » » So(resp, shouldHaveLogs, 0, 1, 2) | |
340 » » » }) | |
341 | |
342 » » » Convey(`Will successfully retrieve a stream path offset
at 4.`, func() { | |
343 » » » » req.Index = 4 | |
344 | |
345 » » » » resp, err := s.Get(c, &req) | |
346 » » » » So(err, ShouldBeRPCOK) | |
347 » » » » So(resp, shouldHaveLogs, 4, 5) | |
348 » » » }) | |
349 | |
350 » » » Convey(`Will retrieve no logs for contiguous offset 6.`,
func() { | |
351 » » » » req.Index = 6 | |
352 | |
353 » » » » resp, err := s.Get(c, &req) | |
354 » » » » So(err, ShouldBeRPCOK) | |
355 » » » » So(len(resp.Logs), ShouldEqual, 0) | |
356 » » » }) | |
357 | |
358 » » » Convey(`Will retrieve log 7 for non-contiguous offset 6.
`, func() { | |
359 » » » » req.NonContiguous = true | |
360 » » » » req.Index = 6 | |
361 | |
362 » » » » resp, err := s.Get(c, &req) | |
363 » » » » So(err, ShouldBeRPCOK) | |
364 » » » » So(resp, shouldHaveLogs, 7) | |
365 » » » }) | |
366 | |
367 » » » Convey(`With a byte limit of 1, will still return at lea
st one log entry.`, func() { | |
368 » » » » req.ByteCount = 1 | |
369 | |
370 » » » » resp, err := s.Get(c, &req) | |
371 » » » » So(err, ShouldBeRPCOK) | |
372 » » » » So(resp, shouldHaveLogs, 0) | |
373 » » » }) | |
374 | |
375 » » » Convey(`With a byte limit of sizeof(0), will return log
entry 0.`, func() { | |
376 » » » » req.ByteCount = frameSize(0) | |
377 | |
378 » » » » resp, err := s.Get(c, &req) | |
379 » » » » So(err, ShouldBeRPCOK) | |
380 » » » » So(resp, shouldHaveLogs, 0) | |
381 » » » }) | |
382 | |
383 » » » Convey(`With a byte limit of sizeof(0)+1, will return lo
g entry 0.`, func() { | |
384 » » » » req.ByteCount = frameSize(0) + 1 | |
385 | |
386 » » » » resp, err := s.Get(c, &req) | |
387 » » » » So(err, ShouldBeRPCOK) | |
388 » » » » So(resp, shouldHaveLogs, 0) | |
389 » » » }) | |
390 | |
391 » » » Convey(`With a byte limit of sizeof({0, 1}), will return
log entries {0, 1}.`, func() { | |
392 » » » » req.ByteCount = frameSize(0, 1) | |
393 | |
394 » » » » resp, err := s.Get(c, &req) | |
395 » » » » So(err, ShouldBeRPCOK) | |
396 » » » » So(resp, shouldHaveLogs, 0, 1) | |
397 » » » }) | |
398 | |
399 » » » Convey(`With a byte limit of sizeof({0, 1, 2}), will ret
urn log entries {0, 1, 2}.`, func() { | |
400 » » » » req.ByteCount = frameSize(0, 1, 2) | |
401 | |
402 » » » » resp, err := s.Get(c, &req) | |
403 » » » » So(err, ShouldBeRPCOK) | |
404 » » » » So(resp, shouldHaveLogs, 0, 1, 2) | |
405 » » » }) | |
406 | |
407 » » » Convey(`With a byte limit of sizeof({0, 1, 2})+1, will r
eturn log entries {0, 1, 2}.`, func() { | |
408 » » » » req.ByteCount = frameSize(0, 1, 2) + 1 | |
409 | |
410 » » » » resp, err := s.Get(c, &req) | |
411 » » » » So(err, ShouldBeRPCOK) | |
412 » » » » So(resp, shouldHaveLogs, 0, 1, 2) | |
413 » » » }) | |
414 | |
415 » » » Convey(`Will successfully retrieve a stream path hash.`,
func() { | |
416 » » » » req.Path = ls.HashID | |
417 » » » » resp, err := s.Get(c, &req) | |
418 » » » » So(err, ShouldBeRPCOK) | |
419 » » » » So(resp, shouldHaveLogs, 0, 1, 2) | |
420 » » » }) | |
421 | |
422 » » » Convey(`When requesting state`, func() { | |
423 » » » » req.State = true | |
424 » » » » req.LogCount = -1 | |
425 | |
426 » » » » Convey(`Will successfully retrieve stream state.
`, func() { | |
427 » » » » » resp, err := s.Get(c, &req) | |
428 » » » » » So(err, ShouldBeRPCOK) | |
429 » » » » » So(resp.State, ShouldResemble, loadLogSt
reamState(ls)) | |
430 » » » » » So(len(resp.Logs), ShouldEqual, 0) | |
431 » » » » }) | |
432 | |
433 » » » » Convey(`Will return Internal if the protobuf des
criptor data is corrupt.`, func() { | |
434 » » » » » ls.SetDSValidate(false) | |
435 » » » » » ls.Descriptor = []byte{0x00} // Invalid
protobuf, zero tag. | |
436 » » » » » if err := ds.Get(c).Put(ls); err != nil
{ | |
437 panic(err) | 362 panic(err) |
438 } | 363 } |
439 | 364 |
440 » » » » » _, err := s.Get(c, &req) | 365 » » » » » Convey(`Will return NotFound if the user
is not an administrator.`, func() { |
| 366 » » » » » » _, err := svr.Get(c, &req) |
| 367 » » » » » » So(err, ShouldBeRPCNotFound) |
| 368 » » » » » }) |
| 369 |
| 370 » » » » » Convey(`Will process the request if the
user is an administrator.`, func() { |
| 371 » » » » » » fs.IdentityGroups = []string{"te
st-administrators"} |
| 372 |
| 373 » » » » » » resp, err := svr.Get(c, &req) |
| 374 » » » » » » So(err, ShouldBeRPCOK) |
| 375 » » » » » » So(resp, shouldHaveLogs, 0, 1, 2
) |
| 376 » » » » » }) |
| 377 » » » » }) |
| 378 |
| 379 » » » » Convey(`Will return empty if no records were req
uested.`, func() { |
| 380 » » » » » req.LogCount = -1 |
| 381 » » » » » req.State = false |
| 382 |
| 383 » » » » » resp, err := svr.Get(c, &req) |
| 384 » » » » » So(err, ShouldBeRPCOK) |
| 385 » » » » » So(resp.Logs, ShouldHaveLength, 0) |
| 386 » » » » }) |
| 387 |
| 388 » » » » Convey(`Will successfully retrieve a stream path
.`, func() { |
| 389 » » » » » resp, err := svr.Get(c, &req) |
| 390 » » » » » So(err, ShouldBeRPCOK) |
| 391 » » » » » So(resp, shouldHaveLogs, 0, 1, 2) |
| 392 » » » » }) |
| 393 |
| 394 » » » » Convey(`Will successfully retrieve a stream path
offset at 4.`, func() { |
| 395 » » » » » req.Index = 4 |
| 396 |
| 397 » » » » » resp, err := svr.Get(c, &req) |
| 398 » » » » » So(err, ShouldBeRPCOK) |
| 399 » » » » » So(resp, shouldHaveLogs, 4, 5) |
| 400 » » » » }) |
| 401 |
| 402 » » » » Convey(`Will retrieve no logs for contiguous off
set 6.`, func() { |
| 403 » » » » » req.Index = 6 |
| 404 |
| 405 » » » » » resp, err := svr.Get(c, &req) |
| 406 » » » » » So(err, ShouldBeRPCOK) |
| 407 » » » » » So(len(resp.Logs), ShouldEqual, 0) |
| 408 » » » » }) |
| 409 |
| 410 » » » » Convey(`Will retrieve log 7 for non-contiguous o
ffset 6.`, func() { |
| 411 » » » » » req.NonContiguous = true |
| 412 » » » » » req.Index = 6 |
| 413 |
| 414 » » » » » resp, err := svr.Get(c, &req) |
| 415 » » » » » So(err, ShouldBeRPCOK) |
| 416 » » » » » So(resp, shouldHaveLogs, 7) |
| 417 » » » » }) |
| 418 |
| 419 » » » » Convey(`With a byte limit of 1, will still retur
n at least one log entry.`, func() { |
| 420 » » » » » req.ByteCount = 1 |
| 421 |
| 422 » » » » » resp, err := svr.Get(c, &req) |
| 423 » » » » » So(err, ShouldBeRPCOK) |
| 424 » » » » » So(resp, shouldHaveLogs, 0) |
| 425 » » » » }) |
| 426 |
| 427 » » » » Convey(`With a byte limit of sizeof(0), will ret
urn log entry 0.`, func() { |
| 428 » » » » » req.ByteCount = frameSize(0) |
| 429 |
| 430 » » » » » resp, err := svr.Get(c, &req) |
| 431 » » » » » So(err, ShouldBeRPCOK) |
| 432 » » » » » So(resp, shouldHaveLogs, 0) |
| 433 » » » » }) |
| 434 |
| 435 » » » » Convey(`With a byte limit of sizeof(0)+1, will r
eturn log entry 0.`, func() { |
| 436 » » » » » req.ByteCount = frameSize(0) + 1 |
| 437 |
| 438 » » » » » resp, err := svr.Get(c, &req) |
| 439 » » » » » So(err, ShouldBeRPCOK) |
| 440 » » » » » So(resp, shouldHaveLogs, 0) |
| 441 » » » » }) |
| 442 |
| 443 » » » » Convey(`With a byte limit of sizeof({0, 1}), wil
l return log entries {0, 1}.`, func() { |
| 444 » » » » » req.ByteCount = frameSize(0, 1) |
| 445 |
| 446 » » » » » resp, err := svr.Get(c, &req) |
| 447 » » » » » So(err, ShouldBeRPCOK) |
| 448 » » » » » So(resp, shouldHaveLogs, 0, 1) |
| 449 » » » » }) |
| 450 |
| 451 » » » » Convey(`With a byte limit of sizeof({0, 1, 2}),
will return log entries {0, 1, 2}.`, func() { |
| 452 » » » » » req.ByteCount = frameSize(0, 1, 2) |
| 453 |
| 454 » » » » » resp, err := svr.Get(c, &req) |
| 455 » » » » » So(err, ShouldBeRPCOK) |
| 456 » » » » » So(resp, shouldHaveLogs, 0, 1, 2) |
| 457 » » » » }) |
| 458 |
| 459 » » » » Convey(`With a byte limit of sizeof({0, 1, 2})+1
, will return log entries {0, 1, 2}.`, func() { |
| 460 » » » » » req.ByteCount = frameSize(0, 1, 2) + 1 |
| 461 |
| 462 » » » » » resp, err := svr.Get(c, &req) |
| 463 » » » » » So(err, ShouldBeRPCOK) |
| 464 » » » » » So(resp, shouldHaveLogs, 0, 1, 2) |
| 465 » » » » }) |
| 466 |
| 467 » » » » Convey(`Will successfully retrieve a stream path
hash.`, func() { |
| 468 » » » » » req.Path = ls.HashID |
| 469 » » » » » resp, err := svr.Get(c, &req) |
| 470 » » » » » So(err, ShouldBeRPCOK) |
| 471 » » » » » So(resp, shouldHaveLogs, 0, 1, 2) |
| 472 » » » » }) |
| 473 |
| 474 » » » » Convey(`When requesting state`, func() { |
| 475 » » » » » req.State = true |
| 476 » » » » » req.LogCount = -1 |
| 477 |
| 478 » » » » » Convey(`Will successfully retrieve strea
m state.`, func() { |
| 479 » » » » » » resp, err := svr.Get(c, &req) |
| 480 » » » » » » So(err, ShouldBeRPCOK) |
| 481 » » » » » » So(resp.State, ShouldResemble, l
oadLogStreamState(ls)) |
| 482 » » » » » » So(len(resp.Logs), ShouldEqual,
0) |
| 483 » » » » » }) |
| 484 |
| 485 » » » » » Convey(`Will return Internal if the prot
obuf descriptor data is corrupt.`, func() { |
| 486 » » » » » » ls.SetDSValidate(false) |
| 487 » » » » » » ls.Descriptor = []byte{0x00} //
Invalid protobuf, zero tag. |
| 488 » » » » » » if err := di.Put(ls); err != nil
{ |
| 489 » » » » » » » panic(err) |
| 490 » » » » » » } |
| 491 |
| 492 » » » » » » _, err := svr.Get(c, &req) |
| 493 » » » » » » So(err, ShouldBeRPCInternal) |
| 494 » » » » » }) |
| 495 » » » » }) |
| 496 |
| 497 » » » » Convey(`Will return Internal if the protobuf log
entry data is corrupt.`, func() { |
| 498 » » » » » if archived { |
| 499 » » » » » » // Corrupt the archive datastrea
m. |
| 500 » » » » » » stream := gsc.get("gs://testbuck
et/stream") |
| 501 » » » » » » zeroRecords(stream) |
| 502 » » » » » } else { |
| 503 » » » » » » // Add corrupted entry to Storag
e. Create a new entry here, since |
| 504 » » » » » » // the storage will reject a dup
licate/overwrite. |
| 505 » » » » » » err := ms.Put(storage.PutRequest
{ |
| 506 » » » » » » » Project: project, |
| 507 » » » » » » » Path: types.StreamPat
h(req.Path), |
| 508 » » » » » » » Index: 666, |
| 509 » » » » » » » Values: [][]byte{{0x00}
}, // Invalid protobuf, zero tag. |
| 510 » » » » » » }) |
| 511 » » » » » » if err != nil { |
| 512 » » » » » » » panic(err) |
| 513 » » » » » » } |
| 514 » » » » » » req.Index = 666 |
| 515 » » » » » } |
| 516 |
| 517 » » » » » _, err := svr.Get(c, &req) |
441 So(err, ShouldBeRPCInternal) | 518 So(err, ShouldBeRPCInternal) |
442 }) | 519 }) |
443 » » » }) | 520 |
444 | 521 » » » » Convey(`Will successfully retrieve both logs and
stream state.`, func() { |
445 » » » Convey(`Will return Internal if the protobuf log entry d
ata is corrupt.`, func() { | 522 » » » » » req.State = true |
446 » » » » if archived { | 523 |
447 » » » » » // Corrupt the archive datastream. | 524 » » » » » resp, err := svr.Get(c, &req) |
448 » » » » » stream := gsc.get("gs://testbucket/strea
m") | 525 » » » » » So(err, ShouldBeRPCOK) |
449 » » » » » zeroRecords(stream) | 526 » » » » » So(resp.State, ShouldResemble, loadLogSt
reamState(ls)) |
450 » » » » } else { | 527 » » » » » So(resp, shouldHaveLogs, 0, 1, 2) |
451 » » » » » // Add corrupted entry to Storage. Creat
e a new entry here, since | 528 » » » » }) |
452 » » » » » // the storage will reject a duplicate/o
verwrite. | 529 |
453 » » » » » err := ms.Put(storage.PutRequest{ | 530 » » » » Convey(`Will return Internal if the Storage is n
ot working.`, func() { |
454 » » » » » » Path: types.StreamPath(req.Pat
h), | 531 » » » » » if archived { |
455 » » » » » » Index: 666, | 532 » » » » » » gsc["error"] = []byte("test erro
r") |
456 » » » » » » Values: [][]byte{{0x00}}, // Inv
alid protobuf, zero tag. | 533 » » » » » } else { |
457 » » » » » }) | 534 » » » » » » ms.Close() |
458 » » » » » if err != nil { | |
459 » » » » » » panic(err) | |
460 } | 535 } |
461 » » » » » req.Index = 666 | 536 |
462 » » » » } | 537 » » » » » _, err := svr.Get(c, &req) |
463 | 538 » » » » » So(err, ShouldBeRPCInternal) |
464 » » » » _, err := s.Get(c, &req) | 539 » » » » }) |
465 » » » » So(err, ShouldBeRPCInternal) | 540 |
466 » » » }) | 541 » » » » Convey(`Will enforce a maximum count of 2.`, fun
c() { |
467 | 542 » » » » » req.LogCount = 2 |
468 » » » Convey(`Will successfully retrieve both logs and stream
state.`, func() { | 543 » » » » » resp, err := svr.Get(c, &req) |
469 » » » » req.State = true | 544 » » » » » So(err, ShouldBeRPCOK) |
470 | 545 » » » » » So(resp, shouldHaveLogs, 0, 1) |
471 » » » » resp, err := s.Get(c, &req) | 546 » » » » }) |
472 » » » » So(err, ShouldBeRPCOK) | 547 |
473 » » » » So(resp.State, ShouldResemble, loadLogStreamStat
e(ls)) | 548 » » » » Convey(`When requesting protobufs`, func() { |
474 » » » » So(resp, shouldHaveLogs, 0, 1, 2) | 549 » » » » » req.State = true |
475 » » » }) | 550 |
476 | 551 » » » » » resp, err := svr.Get(c, &req) |
477 » » » Convey(`Will return Internal if the Storage is not worki
ng.`, func() { | 552 » » » » » So(err, ShouldBeRPCOK) |
478 » » » » if archived { | 553 » » » » » So(resp, shouldHaveLogs, 0, 1, 2) |
479 » » » » » gsc["error"] = []byte("test error") | 554 |
480 » » » » } else { | 555 » » » » » // Confirm that this has protobufs. |
481 » » » » » ms.Close() | 556 » » » » » So(len(resp.Logs), ShouldEqual, 3) |
482 » » » » } | 557 » » » » » So(resp.Logs[0], ShouldNotBeNil) |
483 | 558 |
484 » » » » _, err := s.Get(c, &req) | 559 » » » » » // Confirm that there is a descriptor pr
otobuf. |
485 » » » » So(err, ShouldBeRPCInternal) | 560 » » » » » So(resp.Desc, ShouldResemble, desc) |
486 » » » }) | 561 |
487 | 562 » » » » » // Confirm that the state was returned. |
488 » » » Convey(`Will enforce a maximum count of 2.`, func() { | 563 » » » » » So(resp.State, ShouldNotBeNil) |
489 » » » » req.LogCount = 2 | 564 » » » » }) |
490 » » » » resp, err := s.Get(c, &req) | 565 |
491 » » » » So(err, ShouldBeRPCOK) | 566 » » » » Convey(`Will successfully retrieve all records i
f non-contiguous is allowed.`, func() { |
492 » » » » So(resp, shouldHaveLogs, 0, 1) | 567 » » » » » req.NonContiguous = true |
493 » » » }) | 568 » » » » » resp, err := svr.Get(c, &req) |
494 | 569 » » » » » So(err, ShouldBeRPCOK) |
495 » » » Convey(`When requesting protobufs`, func() { | 570 » » » » » So(resp, shouldHaveLogs, 0, 1, 2, 4, 5,
7) |
496 » » » » req.State = true | 571 » » » » }) |
497 | 572 |
498 » » » » resp, err := s.Get(c, &req) | 573 » » » » Convey(`When newlines are not requested, does no
t include delimiters.`, func() { |
499 » » » » So(err, ShouldBeRPCOK) | 574 » » » » » req.LogCount = 1 |
500 » » » » So(resp, shouldHaveLogs, 0, 1, 2) | 575 |
501 | 576 » » » » » resp, err := svr.Get(c, &req) |
502 » » » » // Confirm that this has protobufs. | 577 » » » » » So(err, ShouldBeRPCOK) |
503 » » » » So(len(resp.Logs), ShouldEqual, 3) | 578 » » » » » So(resp, shouldHaveLogs, 0) |
504 » » » » So(resp.Logs[0], ShouldNotBeNil) | 579 |
505 | 580 » » » » » So(resp.Logs[0].GetText(), ShouldResembl
e, &logpb.Text{ |
506 » » » » // Confirm that there is a descriptor protobuf. | 581 » » » » » » Lines: []*logpb.Text_Line{ |
507 » » » » So(resp.Desc, ShouldResemble, desc) | 582 » » » » » » » {"log entry #0", "\n"}, |
508 | 583 » » » » » » » {"another line of text",
""}, |
509 » » » » // Confirm that the state was returned. | 584 » » » » » » }, |
510 » » » » So(resp.State, ShouldNotBeNil) | 585 » » » » » }) |
511 » » » }) | 586 » » » » }) |
512 | 587 |
513 » » » Convey(`Will successfully retrieve all records if non-co
ntiguous is allowed.`, func() { | 588 » » » » Convey(`Will get a Binary LogEntry`, func() { |
514 » » » » req.NonContiguous = true | 589 » » » » » req.Index = 4 |
515 » » » » resp, err := s.Get(c, &req) | 590 » » » » » req.LogCount = 1 |
516 » » » » So(err, ShouldBeRPCOK) | 591 » » » » » resp, err := svr.Get(c, &req) |
517 » » » » So(resp, shouldHaveLogs, 0, 1, 2, 4, 5, 7) | 592 » » » » » So(err, ShouldBeRPCOK) |
518 » » » }) | 593 » » » » » So(resp, shouldHaveLogs, 4) |
519 | 594 » » » » » So(resp.Logs[0].GetBinary(), ShouldResem
ble, &logpb.Binary{ |
520 » » » Convey(`When newlines are not requested, does not includ
e delimiters.`, func() { | 595 » » » » » » Data: []byte{0x00, 0x01, 0x02, 0
x03}, |
521 » » » » req.LogCount = 1 | 596 » » » » » }) |
522 | 597 » » » » }) |
523 » » » » resp, err := s.Get(c, &req) | 598 |
524 » » » » So(err, ShouldBeRPCOK) | 599 » » » » Convey(`Will get a Datagram LogEntry`, func() { |
525 » » » » So(resp, shouldHaveLogs, 0) | 600 » » » » » req.Index = 5 |
526 | 601 » » » » » req.LogCount = 1 |
527 » » » » So(resp.Logs[0].GetText(), ShouldResemble, &logp
b.Text{ | 602 » » » » » resp, err := svr.Get(c, &req) |
528 » » » » » Lines: []*logpb.Text_Line{ | 603 » » » » » So(err, ShouldBeRPCOK) |
529 » » » » » » {"log entry #0", "\n"}, | 604 » » » » » So(resp, shouldHaveLogs, 5) |
530 » » » » » » {"another line of text", ""}, | 605 » » » » » So(resp.Logs[0].GetDatagram(), ShouldRes
emble, &logpb.Datagram{ |
531 » » » » » }, | 606 » » » » » » Data: []byte{0x00, 0x01, 0x02, 0
x03}, |
532 » » » » }) | 607 » » » » » » Partial: &logpb.Datagram_Partial
{ |
533 » » » }) | 608 » » » » » » » Index: 2, |
534 | 609 » » » » » » » Size: 1024, |
535 » » » Convey(`Will get a Binary LogEntry`, func() { | 610 » » » » » » » Last: false, |
536 » » » » req.Index = 4 | 611 » » » » » » }, |
537 » » » » req.LogCount = 1 | 612 » » » » » }) |
538 » » » » resp, err := s.Get(c, &req) | 613 » » » » }) |
539 » » » » So(err, ShouldBeRPCOK) | 614 » » » }) |
540 » » » » So(resp, shouldHaveLogs, 4) | 615 |
541 » » » » So(resp.Logs[0].GetBinary(), ShouldResemble, &lo
gpb.Binary{ | 616 » » » Convey(`Testing tail requests`, func() { |
542 » » » » » Data: []byte{0x00, 0x01, 0x02, 0x03}, | 617 » » » » req := logdog.TailRequest{ |
543 » » » » }) | 618 » » » » » Project: "test-project", |
544 » » » }) | 619 » » » » » Path: string(ls.Path()), |
545 | 620 » » » » } |
546 » » » Convey(`Will get a Datagram LogEntry`, func() { | 621 |
547 » » » » req.Index = 5 | 622 » » » » Convey(`Will successfully retrieve a stream path
.`, func() { |
548 » » » » req.LogCount = 1 | 623 » » » » » resp, err := svr.Tail(c, &req) |
549 » » » » resp, err := s.Get(c, &req) | 624 » » » » » So(err, ShouldBeRPCOK) |
550 » » » » So(err, ShouldBeRPCOK) | 625 » » » » » So(resp, shouldHaveLogs, 7) |
551 » » » » So(resp, shouldHaveLogs, 5) | 626 » » » » }) |
552 » » » » So(resp.Logs[0].GetDatagram(), ShouldResemble, &
logpb.Datagram{ | 627 |
553 » » » » » Data: []byte{0x00, 0x01, 0x02, 0x03}, | 628 » » » » Convey(`Will successfully retrieve a stream path
hash and state.`, func() { |
554 » » » » » Partial: &logpb.Datagram_Partial{ | 629 » » » » » req.Path = ls.HashID |
555 » » » » » » Index: 2, | 630 » » » » » req.State = true |
556 » » » » » » Size: 1024, | 631 |
557 » » » » » » Last: false, | 632 » » » » » resp, err := svr.Tail(c, &req) |
558 » » » » » }, | 633 » » » » » So(err, ShouldBeRPCOK) |
559 » » » » }) | 634 » » » » » So(resp, shouldHaveLogs, 7) |
560 » » » }) | 635 » » » » » So(resp.State, ShouldResemble, loadLogSt
reamState(ls)) |
561 » » }) | 636 » » » » }) |
562 | |
563 » » Convey(`Testing tail requests`, func() { | |
564 » » » req := logdog.TailRequest{ | |
565 » » » » Path: string(ls.Path()), | |
566 » » » } | |
567 | |
568 » » » Convey(`Will successfully retrieve a stream path.`, func
() { | |
569 » » » » resp, err := s.Tail(c, &req) | |
570 » » » » So(err, ShouldBeRPCOK) | |
571 » » » » So(resp, shouldHaveLogs, 7) | |
572 » » » }) | |
573 | |
574 » » » Convey(`Will successfully retrieve a stream path hash an
d state.`, func() { | |
575 » » » » req.Path = ls.HashID | |
576 » » » » req.State = true | |
577 | |
578 » » » » resp, err := s.Tail(c, &req) | |
579 » » » » So(err, ShouldBeRPCOK) | |
580 » » » » So(resp, shouldHaveLogs, 7) | |
581 » » » » So(resp.State, ShouldResemble, loadLogStreamStat
e(ls)) | |
582 }) | 637 }) |
583 }) | 638 }) |
584 }) | 639 }) |
585 } | 640 } |
586 | 641 |
587 func TestGetIntermediate(t *testing.T) { | 642 func TestGetIntermediate(t *testing.T) { |
588 t.Parallel() | 643 t.Parallel() |
589 | 644 |
590 testGetImpl(t, false) | 645 testGetImpl(t, false) |
591 } | 646 } |
592 | 647 |
593 func TestGetArchived(t *testing.T) { | 648 func TestGetArchived(t *testing.T) { |
594 t.Parallel() | 649 t.Parallel() |
595 | 650 |
596 » testGetImpl(t, true) | 651 » testGetImpl(t, false) |
597 } | 652 } |
OLD | NEW |