Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(37)

Side by Side Diff: appengine/logdog/coordinator/endpoints/logs/get_test.go

Issue 1909073002: LogDog: Add project namespace to logs endpoint. (Closed) Base URL: https://github.com/luci/luci-go@logdog-project-storage
Patch Set: Rebase? Created 4 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 package logs 5 package logs
6 6
7 import ( 7 import (
8 "bytes" 8 "bytes"
9 "errors" 9 "errors"
10 "fmt" 10 "fmt"
11 "io" 11 "io"
12 "math" 12 "math"
13 "testing" 13 "testing"
14 "time" 14 "time"
15 15
16 "github.com/golang/protobuf/proto" 16 "github.com/golang/protobuf/proto"
17 "github.com/luci/gae/filter/featureBreaker" 17 "github.com/luci/gae/filter/featureBreaker"
18 "github.com/luci/gae/impl/memory" 18 "github.com/luci/gae/impl/memory"
19 ds "github.com/luci/gae/service/datastore" 19 ds "github.com/luci/gae/service/datastore"
20 "github.com/luci/luci-go/appengine/logdog/coordinator" 20 "github.com/luci/luci-go/appengine/logdog/coordinator"
21 ct "github.com/luci/luci-go/appengine/logdog/coordinator/coordinatorTest " 21 ct "github.com/luci/luci-go/appengine/logdog/coordinator/coordinatorTest "
22 "github.com/luci/luci-go/common/api/logdog_coordinator/logs/v1" 22 "github.com/luci/luci-go/common/api/logdog_coordinator/logs/v1"
23 "github.com/luci/luci-go/common/clock/testclock" 23 "github.com/luci/luci-go/common/clock/testclock"
24 "github.com/luci/luci-go/common/config"
24 "github.com/luci/luci-go/common/gcloud/gs" 25 "github.com/luci/luci-go/common/gcloud/gs"
25 "github.com/luci/luci-go/common/iotools" 26 "github.com/luci/luci-go/common/iotools"
26 "github.com/luci/luci-go/common/logdog/types" 27 "github.com/luci/luci-go/common/logdog/types"
27 "github.com/luci/luci-go/common/proto/logdog/logpb" 28 "github.com/luci/luci-go/common/proto/logdog/logpb"
28 "github.com/luci/luci-go/common/recordio" 29 "github.com/luci/luci-go/common/recordio"
29 "github.com/luci/luci-go/server/auth" 30 "github.com/luci/luci-go/server/auth"
30 "github.com/luci/luci-go/server/auth/authtest" 31 "github.com/luci/luci-go/server/auth/authtest"
31 "github.com/luci/luci-go/server/logdog/archive" 32 "github.com/luci/luci-go/server/logdog/archive"
32 "github.com/luci/luci-go/server/logdog/storage" 33 "github.com/luci/luci-go/server/logdog/storage"
33 memoryStorage "github.com/luci/luci-go/server/logdog/storage/memory" 34 memoryStorage "github.com/luci/luci-go/server/logdog/storage/memory"
(...skipping 116 matching lines...) Expand 10 before | Expand all | Expand 10 after
150 151
151 func testGetImpl(t *testing.T, archived bool) { 152 func testGetImpl(t *testing.T, archived bool) {
152 Convey(fmt.Sprintf(`With a testing configuration, a Get request (archive d=%v)`, archived), t, func() { 153 Convey(fmt.Sprintf(`With a testing configuration, a Get request (archive d=%v)`, archived), t, func() {
153 c, tc := testclock.UseTime(context.Background(), testclock.TestT imeLocal) 154 c, tc := testclock.UseTime(context.Background(), testclock.TestT imeLocal)
154 c = memory.Use(c) 155 c = memory.Use(c)
155 156
156 fs := authtest.FakeState{} 157 fs := authtest.FakeState{}
157 c = auth.WithState(c, &fs) 158 c = auth.WithState(c, &fs)
158 159
159 ms := memoryStorage.Storage{} 160 ms := memoryStorage.Storage{}
161
160 gsc := testGSClient{} 162 gsc := testGSClient{}
161 svcStub := ct.Services{ 163 svcStub := ct.Services{
162 IS: func() (storage.Storage, error) { 164 IS: func() (storage.Storage, error) {
163 return &ms, nil 165 return &ms, nil
164 }, 166 },
165 GS: func() (gs.Client, error) { 167 GS: func() (gs.Client, error) {
166 return gsc, nil 168 return gsc, nil
167 }, 169 },
168 } 170 }
169 svcStub.InitConfig() 171 svcStub.InitConfig()
170 svcStub.ServiceConfig.Coordinator.AdminAuthGroup = "test-adminis trators" 172 svcStub.ServiceConfig.Coordinator.AdminAuthGroup = "test-adminis trators"
171 c = coordinator.WithServices(c, &svcStub) 173 c = coordinator.WithServices(c, &svcStub)
172 174
173 » » s := New() 175 » » svr := New()
176
177 » » // di is a datastore bound to the test project namespace.
178 » » const project = "test-project"
179 » » if err := coordinator.WithProjectNamespace(&c, config.ProjectNam e(project)); err != nil {
180 » » » panic(err)
181 » » }
182 » » di := ds.Get(c)
174 183
175 // Generate our test stream. 184 // Generate our test stream.
176 desc := ct.TestLogStreamDescriptor(c, "foo/bar") 185 desc := ct.TestLogStreamDescriptor(c, "foo/bar")
177 ls := ct.TestLogStream(c, desc) 186 ls := ct.TestLogStream(c, desc)
178 » » if err := ds.Get(c).Put(ls); err != nil { 187 » » if err := di.Put(ls); err != nil {
179 panic(err) 188 panic(err)
180 } 189 }
181 190
182 tc.Add(time.Second) 191 tc.Add(time.Second)
183 var entries []*logpb.LogEntry 192 var entries []*logpb.LogEntry
184 protobufs := map[uint64][]byte{} 193 protobufs := map[uint64][]byte{}
185 for _, v := range []int{0, 1, 2, 4, 5, 7} { 194 for _, v := range []int{0, 1, 2, 4, 5, 7} {
186 le := ct.TestLogEntry(c, ls, v) 195 le := ct.TestLogEntry(c, ls, v)
187 le.GetText().Lines = append(le.GetText().Lines, &logpb.T ext_Line{ 196 le.GetText().Lines = append(le.GetText().Lines, &logpb.T ext_Line{
188 Value: "another line of text", 197 Value: "another line of text",
(...skipping 37 matching lines...) Expand 10 before | Expand all | Expand 10 after
226 size += recordio.FrameHeaderSize(int64(len(pb))) + len(pb) 235 size += recordio.FrameHeaderSize(int64(len(pb))) + len(pb)
227 } 236 }
228 if size > math.MaxInt32 { 237 if size > math.MaxInt32 {
229 panic(size) 238 panic(size)
230 } 239 }
231 return int32(size) 240 return int32(size)
232 } 241 }
233 242
234 Convey(`Testing Get requests (no logs)`, func() { 243 Convey(`Testing Get requests (no logs)`, func() {
235 req := logdog.GetRequest{ 244 req := logdog.GetRequest{
236 » » » » Path: string(ls.Path()), 245 » » » » Project: project,
246 » » » » Path: string(ls.Path()),
237 } 247 }
238 248
249 Convey(`Will succeed with no logs.`, func() {
250 resp, err := svr.Get(c, &req)
251
252 So(err, ShouldBeRPCOK)
253 So(resp, shouldHaveLogs)
254 })
255
239 Convey(`Will fail if the Path is not a stream path or a hash.`, func() { 256 Convey(`Will fail if the Path is not a stream path or a hash.`, func() {
240 req.Path = "not/a/full/stream/path" 257 req.Path = "not/a/full/stream/path"
241 » » » » _, err := s.Get(c, &req) 258 » » » » _, err := svr.Get(c, &req)
242 So(err, ShouldErrLike, "invalid path value") 259 So(err, ShouldErrLike, "invalid path value")
243 }) 260 })
244 261
245 Convey(`Will fail with Internal if the datastore Get() d oesn't work.`, func() { 262 Convey(`Will fail with Internal if the datastore Get() d oesn't work.`, func() {
246 c, fb := featureBreaker.FilterRDS(c, nil) 263 c, fb := featureBreaker.FilterRDS(c, nil)
247 fb.BreakFeatures(errors.New("testing error"), "G etMulti") 264 fb.BreakFeatures(errors.New("testing error"), "G etMulti")
248 265
249 » » » » _, err := s.Get(c, &req) 266 » » » » _, err := svr.Get(c, &req)
250 So(err, ShouldBeRPCInternal) 267 So(err, ShouldBeRPCInternal)
251 }) 268 })
252 269
253 » » » Convey(`Will fail with NotFound if the log stream does n ot exist.`, func() { 270 » » » Convey(`Will fail with NotFound if the log stream does n ot exist (different project).`, func() {
271 » » » » req.Project = "does-not-exist"
272 » » » » _, err := svr.Get(c, &req)
273 » » » » So(err, ShouldBeRPCNotFound)
274 » » » })
275
276 » » » Convey(`Will fail with NotFound if the log path does not exist (different path).`, func() {
254 req.Path = "testing/+/does/not/exist" 277 req.Path = "testing/+/does/not/exist"
255 » » » » _, err := s.Get(c, &req) 278 » » » » _, err := svr.Get(c, &req)
256 So(err, ShouldBeRPCNotFound) 279 So(err, ShouldBeRPCNotFound)
257 }) 280 })
258 }) 281 })
259 282
260 » » if !archived { 283 » » Convey(`Testing Tail requests (no logs)`, func() {
261 » » » // Add the logs to the in-memory temporary storage. 284 » » » req := logdog.TailRequest{
262 » » » for _, le := range entries { 285 » » » » Project: project,
263 » » » » err := ms.Put(storage.PutRequest{ 286 » » » » Path: string(ls.Path()),
264 » » » » » Path: ls.Path(),
265 » » » » » Index: types.MessageIndex(le.StreamInde x),
266 » » » » » Values: [][]byte{protobufs[le.StreamInde x]},
267 » » » » })
268 » » » » if err != nil {
269 » » » » » panic(fmt.Errorf("failed to Put() LogEnt ry: %v", err))
270 » » » » }
271 } 287 }
272 » » } else { 288
273 » » » // Archive this log stream. We will generate one index e ntry for every 289 » » » Convey(`Will succeed with no logs.`, func() {
274 » » » // 2 log entries. 290 » » » » resp, err := svr.Tail(c, &req)
275 » » » src := staticArchiveSource(entries) 291
276 » » » var lbuf, ibuf bytes.Buffer 292 » » » » So(err, ShouldBeRPCOK)
277 » » » m := archive.Manifest{ 293 » » » » So(resp, shouldHaveLogs)
278 » » » » Desc: desc, 294 » » » })
279 » » » » Source: &src, 295
280 » » » » LogWriter: &lbuf, 296 » » » Convey(`Will fail with NotFound if the log stream does n ot exist (different project).`, func() {
281 » » » » IndexWriter: &ibuf, 297 » » » » req.Project = "does-not-exist"
282 » » » » StreamIndexRange: 2, 298 » » » » _, err := svr.Tail(c, &req)
299 » » » » So(err, ShouldBeRPCNotFound)
300 » » » })
301
302 » » » Convey(`Will fail with NotFound if the log path does not exist (different path).`, func() {
303 » » » » req.Path = "testing/+/does/not/exist"
304 » » » » _, err := svr.Tail(c, &req)
305 » » » » So(err, ShouldBeRPCNotFound)
306 » » » })
307 » » })
308
309 » » Convey(`When testing log data is added`, func() {
310 » » » if !archived {
311 » » » » // Add the logs to the in-memory temporary stora ge.
312 » » » » for _, le := range entries {
313 » » » » » err := ms.Put(storage.PutRequest{
314 » » » » » » Project: project,
315 » » » » » » Path: ls.Path(),
316 » » » » » » Index: types.MessageIndex(le.S treamIndex),
317 » » » » » » Values: [][]byte{protobufs[le.S treamIndex]},
318 » » » » » })
319 » » » » » if err != nil {
320 » » » » » » panic(fmt.Errorf("failed to Put( ) LogEntry: %v", err))
321 » » » » » }
322 » » » » }
323 » » » } else {
324 » » » » // Archive this log stream. We will generate one index entry for every
325 » » » » // 2 log entries.
326 » » » » src := staticArchiveSource(entries)
327 » » » » var lbuf, ibuf bytes.Buffer
328 » » » » m := archive.Manifest{
329 » » » » » Desc: desc,
330 » » » » » Source: &src,
331 » » » » » LogWriter: &lbuf,
332 » » » » » IndexWriter: &ibuf,
333 » » » » » StreamIndexRange: 2,
334 » » » » }
335 » » » » if err := archive.Archive(m); err != nil {
336 » » » » » panic(err)
337 » » » » }
338
339 » » » » now := tc.Now().UTC()
340
341 » » » » gsc.put("gs://testbucket/stream", lbuf.Bytes())
342 » » » » gsc.put("gs://testbucket/index", ibuf.Bytes())
343 » » » » ls.State = coordinator.LSArchived
344 » » » » ls.TerminatedTime = now
345 » » » » ls.ArchivedTime = now
346 » » » » ls.ArchiveStreamURL = "gs://testbucket/stream"
347 » » » » ls.ArchiveIndexURL = "gs://testbucket/index"
283 } 348 }
284 » » » if err := archive.Archive(m); err != nil { 349 » » » if err := di.Put(ls); err != nil {
285 panic(err) 350 panic(err)
286 } 351 }
287 352
288 » » » now := tc.Now().UTC() 353 » » » Convey(`Testing Get requests`, func() {
289 354 » » » » req := logdog.GetRequest{
290 » » » gsc.put("gs://testbucket/stream", lbuf.Bytes()) 355 » » » » » Project: project,
291 » » » gsc.put("gs://testbucket/index", ibuf.Bytes()) 356 » » » » » Path: string(ls.Path()),
292 » » » ls.State = coordinator.LSArchived 357 » » » » }
293 » » » ls.TerminatedTime = now 358
294 » » » ls.ArchivedTime = now 359 » » » » Convey(`When the log stream is purged`, func() {
295 » » » ls.ArchiveStreamURL = "gs://testbucket/stream" 360 » » » » » ls.Purged = true
296 » » » ls.ArchiveIndexURL = "gs://testbucket/index" 361 » » » » » if err := di.Put(ls); err != nil {
297 » » }
298 » » if err := ds.Get(c).Put(ls); err != nil {
299 » » » panic(err)
300 » » }
301
302 » » Convey(`Testing Get requests`, func() {
303 » » » req := logdog.GetRequest{
304 » » » » Path: string(ls.Path()),
305 » » » }
306
307 » » » Convey(`When the log stream is purged`, func() {
308 » » » » ls.Purged = true
309 » » » » if err := ds.Get(c).Put(ls); err != nil {
310 » » » » » panic(err)
311 » » » » }
312
313 » » » » Convey(`Will return NotFound if the user is not an administrator.`, func() {
314 » » » » » _, err := s.Get(c, &req)
315 » » » » » So(err, ShouldBeRPCNotFound)
316 » » » » })
317
318 » » » » Convey(`Will process the request if the user is an administrator.`, func() {
319 » » » » » fs.IdentityGroups = []string{"test-admin istrators"}
320
321 » » » » » resp, err := s.Get(c, &req)
322 » » » » » So(err, ShouldBeRPCOK)
323 » » » » » So(resp, shouldHaveLogs, 0, 1, 2)
324 » » » » })
325 » » » })
326
327 » » » Convey(`Will return empty if no records were requested.` , func() {
328 » » » » req.LogCount = -1
329 » » » » req.State = false
330
331 » » » » resp, err := s.Get(c, &req)
332 » » » » So(err, ShouldBeRPCOK)
333 » » » » So(resp.Logs, ShouldHaveLength, 0)
334 » » » })
335
336 » » » Convey(`Will successfully retrieve a stream path.`, func () {
337 » » » » resp, err := s.Get(c, &req)
338 » » » » So(err, ShouldBeRPCOK)
339 » » » » So(resp, shouldHaveLogs, 0, 1, 2)
340 » » » })
341
342 » » » Convey(`Will successfully retrieve a stream path offset at 4.`, func() {
343 » » » » req.Index = 4
344
345 » » » » resp, err := s.Get(c, &req)
346 » » » » So(err, ShouldBeRPCOK)
347 » » » » So(resp, shouldHaveLogs, 4, 5)
348 » » » })
349
350 » » » Convey(`Will retrieve no logs for contiguous offset 6.`, func() {
351 » » » » req.Index = 6
352
353 » » » » resp, err := s.Get(c, &req)
354 » » » » So(err, ShouldBeRPCOK)
355 » » » » So(len(resp.Logs), ShouldEqual, 0)
356 » » » })
357
358 » » » Convey(`Will retrieve log 7 for non-contiguous offset 6. `, func() {
359 » » » » req.NonContiguous = true
360 » » » » req.Index = 6
361
362 » » » » resp, err := s.Get(c, &req)
363 » » » » So(err, ShouldBeRPCOK)
364 » » » » So(resp, shouldHaveLogs, 7)
365 » » » })
366
367 » » » Convey(`With a byte limit of 1, will still return at lea st one log entry.`, func() {
368 » » » » req.ByteCount = 1
369
370 » » » » resp, err := s.Get(c, &req)
371 » » » » So(err, ShouldBeRPCOK)
372 » » » » So(resp, shouldHaveLogs, 0)
373 » » » })
374
375 » » » Convey(`With a byte limit of sizeof(0), will return log entry 0.`, func() {
376 » » » » req.ByteCount = frameSize(0)
377
378 » » » » resp, err := s.Get(c, &req)
379 » » » » So(err, ShouldBeRPCOK)
380 » » » » So(resp, shouldHaveLogs, 0)
381 » » » })
382
383 » » » Convey(`With a byte limit of sizeof(0)+1, will return lo g entry 0.`, func() {
384 » » » » req.ByteCount = frameSize(0) + 1
385
386 » » » » resp, err := s.Get(c, &req)
387 » » » » So(err, ShouldBeRPCOK)
388 » » » » So(resp, shouldHaveLogs, 0)
389 » » » })
390
391 » » » Convey(`With a byte limit of sizeof({0, 1}), will return log entries {0, 1}.`, func() {
392 » » » » req.ByteCount = frameSize(0, 1)
393
394 » » » » resp, err := s.Get(c, &req)
395 » » » » So(err, ShouldBeRPCOK)
396 » » » » So(resp, shouldHaveLogs, 0, 1)
397 » » » })
398
399 » » » Convey(`With a byte limit of sizeof({0, 1, 2}), will ret urn log entries {0, 1, 2}.`, func() {
400 » » » » req.ByteCount = frameSize(0, 1, 2)
401
402 » » » » resp, err := s.Get(c, &req)
403 » » » » So(err, ShouldBeRPCOK)
404 » » » » So(resp, shouldHaveLogs, 0, 1, 2)
405 » » » })
406
407 » » » Convey(`With a byte limit of sizeof({0, 1, 2})+1, will r eturn log entries {0, 1, 2}.`, func() {
408 » » » » req.ByteCount = frameSize(0, 1, 2) + 1
409
410 » » » » resp, err := s.Get(c, &req)
411 » » » » So(err, ShouldBeRPCOK)
412 » » » » So(resp, shouldHaveLogs, 0, 1, 2)
413 » » » })
414
415 » » » Convey(`Will successfully retrieve a stream path hash.`, func() {
416 » » » » req.Path = ls.HashID
417 » » » » resp, err := s.Get(c, &req)
418 » » » » So(err, ShouldBeRPCOK)
419 » » » » So(resp, shouldHaveLogs, 0, 1, 2)
420 » » » })
421
422 » » » Convey(`When requesting state`, func() {
423 » » » » req.State = true
424 » » » » req.LogCount = -1
425
426 » » » » Convey(`Will successfully retrieve stream state. `, func() {
427 » » » » » resp, err := s.Get(c, &req)
428 » » » » » So(err, ShouldBeRPCOK)
429 » » » » » So(resp.State, ShouldResemble, loadLogSt reamState(ls))
430 » » » » » So(len(resp.Logs), ShouldEqual, 0)
431 » » » » })
432
433 » » » » Convey(`Will return Internal if the protobuf des criptor data is corrupt.`, func() {
434 » » » » » ls.SetDSValidate(false)
435 » » » » » ls.Descriptor = []byte{0x00} // Invalid protobuf, zero tag.
436 » » » » » if err := ds.Get(c).Put(ls); err != nil {
437 panic(err) 362 panic(err)
438 } 363 }
439 364
440 » » » » » _, err := s.Get(c, &req) 365 » » » » » Convey(`Will return NotFound if the user is not an administrator.`, func() {
366 » » » » » » _, err := svr.Get(c, &req)
367 » » » » » » So(err, ShouldBeRPCNotFound)
368 » » » » » })
369
370 » » » » » Convey(`Will process the request if the user is an administrator.`, func() {
371 » » » » » » fs.IdentityGroups = []string{"te st-administrators"}
372
373 » » » » » » resp, err := svr.Get(c, &req)
374 » » » » » » So(err, ShouldBeRPCOK)
375 » » » » » » So(resp, shouldHaveLogs, 0, 1, 2 )
376 » » » » » })
377 » » » » })
378
379 » » » » Convey(`Will return empty if no records were req uested.`, func() {
380 » » » » » req.LogCount = -1
381 » » » » » req.State = false
382
383 » » » » » resp, err := svr.Get(c, &req)
384 » » » » » So(err, ShouldBeRPCOK)
385 » » » » » So(resp.Logs, ShouldHaveLength, 0)
386 » » » » })
387
388 » » » » Convey(`Will successfully retrieve a stream path .`, func() {
389 » » » » » resp, err := svr.Get(c, &req)
390 » » » » » So(err, ShouldBeRPCOK)
391 » » » » » So(resp, shouldHaveLogs, 0, 1, 2)
392 » » » » })
393
394 » » » » Convey(`Will successfully retrieve a stream path offset at 4.`, func() {
395 » » » » » req.Index = 4
396
397 » » » » » resp, err := svr.Get(c, &req)
398 » » » » » So(err, ShouldBeRPCOK)
399 » » » » » So(resp, shouldHaveLogs, 4, 5)
400 » » » » })
401
402 » » » » Convey(`Will retrieve no logs for contiguous off set 6.`, func() {
403 » » » » » req.Index = 6
404
405 » » » » » resp, err := svr.Get(c, &req)
406 » » » » » So(err, ShouldBeRPCOK)
407 » » » » » So(len(resp.Logs), ShouldEqual, 0)
408 » » » » })
409
410 » » » » Convey(`Will retrieve log 7 for non-contiguous o ffset 6.`, func() {
411 » » » » » req.NonContiguous = true
412 » » » » » req.Index = 6
413
414 » » » » » resp, err := svr.Get(c, &req)
415 » » » » » So(err, ShouldBeRPCOK)
416 » » » » » So(resp, shouldHaveLogs, 7)
417 » » » » })
418
419 » » » » Convey(`With a byte limit of 1, will still retur n at least one log entry.`, func() {
420 » » » » » req.ByteCount = 1
421
422 » » » » » resp, err := svr.Get(c, &req)
423 » » » » » So(err, ShouldBeRPCOK)
424 » » » » » So(resp, shouldHaveLogs, 0)
425 » » » » })
426
427 » » » » Convey(`With a byte limit of sizeof(0), will ret urn log entry 0.`, func() {
428 » » » » » req.ByteCount = frameSize(0)
429
430 » » » » » resp, err := svr.Get(c, &req)
431 » » » » » So(err, ShouldBeRPCOK)
432 » » » » » So(resp, shouldHaveLogs, 0)
433 » » » » })
434
435 » » » » Convey(`With a byte limit of sizeof(0)+1, will r eturn log entry 0.`, func() {
436 » » » » » req.ByteCount = frameSize(0) + 1
437
438 » » » » » resp, err := svr.Get(c, &req)
439 » » » » » So(err, ShouldBeRPCOK)
440 » » » » » So(resp, shouldHaveLogs, 0)
441 » » » » })
442
443 » » » » Convey(`With a byte limit of sizeof({0, 1}), wil l return log entries {0, 1}.`, func() {
444 » » » » » req.ByteCount = frameSize(0, 1)
445
446 » » » » » resp, err := svr.Get(c, &req)
447 » » » » » So(err, ShouldBeRPCOK)
448 » » » » » So(resp, shouldHaveLogs, 0, 1)
449 » » » » })
450
451 » » » » Convey(`With a byte limit of sizeof({0, 1, 2}), will return log entries {0, 1, 2}.`, func() {
452 » » » » » req.ByteCount = frameSize(0, 1, 2)
453
454 » » » » » resp, err := svr.Get(c, &req)
455 » » » » » So(err, ShouldBeRPCOK)
456 » » » » » So(resp, shouldHaveLogs, 0, 1, 2)
457 » » » » })
458
459 » » » » Convey(`With a byte limit of sizeof({0, 1, 2})+1 , will return log entries {0, 1, 2}.`, func() {
460 » » » » » req.ByteCount = frameSize(0, 1, 2) + 1
461
462 » » » » » resp, err := svr.Get(c, &req)
463 » » » » » So(err, ShouldBeRPCOK)
464 » » » » » So(resp, shouldHaveLogs, 0, 1, 2)
465 » » » » })
466
467 » » » » Convey(`Will successfully retrieve a stream path hash.`, func() {
468 » » » » » req.Path = ls.HashID
469 » » » » » resp, err := svr.Get(c, &req)
470 » » » » » So(err, ShouldBeRPCOK)
471 » » » » » So(resp, shouldHaveLogs, 0, 1, 2)
472 » » » » })
473
474 » » » » Convey(`When requesting state`, func() {
475 » » » » » req.State = true
476 » » » » » req.LogCount = -1
477
478 » » » » » Convey(`Will successfully retrieve strea m state.`, func() {
479 » » » » » » resp, err := svr.Get(c, &req)
480 » » » » » » So(err, ShouldBeRPCOK)
481 » » » » » » So(resp.State, ShouldResemble, l oadLogStreamState(ls))
482 » » » » » » So(len(resp.Logs), ShouldEqual, 0)
483 » » » » » })
484
485 » » » » » Convey(`Will return Internal if the prot obuf descriptor data is corrupt.`, func() {
486 » » » » » » ls.SetDSValidate(false)
487 » » » » » » ls.Descriptor = []byte{0x00} // Invalid protobuf, zero tag.
488 » » » » » » if err := di.Put(ls); err != nil {
489 » » » » » » » panic(err)
490 » » » » » » }
491
492 » » » » » » _, err := svr.Get(c, &req)
493 » » » » » » So(err, ShouldBeRPCInternal)
494 » » » » » })
495 » » » » })
496
497 » » » » Convey(`Will return Internal if the protobuf log entry data is corrupt.`, func() {
498 » » » » » if archived {
499 » » » » » » // Corrupt the archive datastrea m.
500 » » » » » » stream := gsc.get("gs://testbuck et/stream")
501 » » » » » » zeroRecords(stream)
502 » » » » » } else {
503 » » » » » » // Add corrupted entry to Storag e. Create a new entry here, since
504 » » » » » » // the storage will reject a dup licate/overwrite.
505 » » » » » » err := ms.Put(storage.PutRequest {
506 » » » » » » » Project: project,
507 » » » » » » » Path: types.StreamPat h(req.Path),
508 » » » » » » » Index: 666,
509 » » » » » » » Values: [][]byte{{0x00} }, // Invalid protobuf, zero tag.
510 » » » » » » })
511 » » » » » » if err != nil {
512 » » » » » » » panic(err)
513 » » » » » » }
514 » » » » » » req.Index = 666
515 » » » » » }
516
517 » » » » » _, err := svr.Get(c, &req)
441 So(err, ShouldBeRPCInternal) 518 So(err, ShouldBeRPCInternal)
442 }) 519 })
443 » » » }) 520
444 521 » » » » Convey(`Will successfully retrieve both logs and stream state.`, func() {
445 » » » Convey(`Will return Internal if the protobuf log entry d ata is corrupt.`, func() { 522 » » » » » req.State = true
446 » » » » if archived { 523
447 » » » » » // Corrupt the archive datastream. 524 » » » » » resp, err := svr.Get(c, &req)
448 » » » » » stream := gsc.get("gs://testbucket/strea m") 525 » » » » » So(err, ShouldBeRPCOK)
449 » » » » » zeroRecords(stream) 526 » » » » » So(resp.State, ShouldResemble, loadLogSt reamState(ls))
450 » » » » } else { 527 » » » » » So(resp, shouldHaveLogs, 0, 1, 2)
451 » » » » » // Add corrupted entry to Storage. Creat e a new entry here, since 528 » » » » })
452 » » » » » // the storage will reject a duplicate/o verwrite. 529
453 » » » » » err := ms.Put(storage.PutRequest{ 530 » » » » Convey(`Will return Internal if the Storage is n ot working.`, func() {
454 » » » » » » Path: types.StreamPath(req.Pat h), 531 » » » » » if archived {
455 » » » » » » Index: 666, 532 » » » » » » gsc["error"] = []byte("test erro r")
456 » » » » » » Values: [][]byte{{0x00}}, // Inv alid protobuf, zero tag. 533 » » » » » } else {
457 » » » » » }) 534 » » » » » » ms.Close()
458 » » » » » if err != nil {
459 » » » » » » panic(err)
460 } 535 }
461 » » » » » req.Index = 666 536
462 » » » » } 537 » » » » » _, err := svr.Get(c, &req)
463 538 » » » » » So(err, ShouldBeRPCInternal)
464 » » » » _, err := s.Get(c, &req) 539 » » » » })
465 » » » » So(err, ShouldBeRPCInternal) 540
466 » » » }) 541 » » » » Convey(`Will enforce a maximum count of 2.`, fun c() {
467 542 » » » » » req.LogCount = 2
468 » » » Convey(`Will successfully retrieve both logs and stream state.`, func() { 543 » » » » » resp, err := svr.Get(c, &req)
469 » » » » req.State = true 544 » » » » » So(err, ShouldBeRPCOK)
470 545 » » » » » So(resp, shouldHaveLogs, 0, 1)
471 » » » » resp, err := s.Get(c, &req) 546 » » » » })
472 » » » » So(err, ShouldBeRPCOK) 547
473 » » » » So(resp.State, ShouldResemble, loadLogStreamStat e(ls)) 548 » » » » Convey(`When requesting protobufs`, func() {
474 » » » » So(resp, shouldHaveLogs, 0, 1, 2) 549 » » » » » req.State = true
475 » » » }) 550
476 551 » » » » » resp, err := svr.Get(c, &req)
477 » » » Convey(`Will return Internal if the Storage is not worki ng.`, func() { 552 » » » » » So(err, ShouldBeRPCOK)
478 » » » » if archived { 553 » » » » » So(resp, shouldHaveLogs, 0, 1, 2)
479 » » » » » gsc["error"] = []byte("test error") 554
480 » » » » } else { 555 » » » » » // Confirm that this has protobufs.
481 » » » » » ms.Close() 556 » » » » » So(len(resp.Logs), ShouldEqual, 3)
482 » » » » } 557 » » » » » So(resp.Logs[0], ShouldNotBeNil)
483 558
484 » » » » _, err := s.Get(c, &req) 559 » » » » » // Confirm that there is a descriptor pr otobuf.
485 » » » » So(err, ShouldBeRPCInternal) 560 » » » » » So(resp.Desc, ShouldResemble, desc)
486 » » » }) 561
487 562 » » » » » // Confirm that the state was returned.
488 » » » Convey(`Will enforce a maximum count of 2.`, func() { 563 » » » » » So(resp.State, ShouldNotBeNil)
489 » » » » req.LogCount = 2 564 » » » » })
490 » » » » resp, err := s.Get(c, &req) 565
491 » » » » So(err, ShouldBeRPCOK) 566 » » » » Convey(`Will successfully retrieve all records i f non-contiguous is allowed.`, func() {
492 » » » » So(resp, shouldHaveLogs, 0, 1) 567 » » » » » req.NonContiguous = true
493 » » » }) 568 » » » » » resp, err := svr.Get(c, &req)
494 569 » » » » » So(err, ShouldBeRPCOK)
495 » » » Convey(`When requesting protobufs`, func() { 570 » » » » » So(resp, shouldHaveLogs, 0, 1, 2, 4, 5, 7)
496 » » » » req.State = true 571 » » » » })
497 572
498 » » » » resp, err := s.Get(c, &req) 573 » » » » Convey(`When newlines are not requested, does no t include delimiters.`, func() {
499 » » » » So(err, ShouldBeRPCOK) 574 » » » » » req.LogCount = 1
500 » » » » So(resp, shouldHaveLogs, 0, 1, 2) 575
501 576 » » » » » resp, err := svr.Get(c, &req)
502 » » » » // Confirm that this has protobufs. 577 » » » » » So(err, ShouldBeRPCOK)
503 » » » » So(len(resp.Logs), ShouldEqual, 3) 578 » » » » » So(resp, shouldHaveLogs, 0)
504 » » » » So(resp.Logs[0], ShouldNotBeNil) 579
505 580 » » » » » So(resp.Logs[0].GetText(), ShouldResembl e, &logpb.Text{
506 » » » » // Confirm that there is a descriptor protobuf. 581 » » » » » » Lines: []*logpb.Text_Line{
507 » » » » So(resp.Desc, ShouldResemble, desc) 582 » » » » » » » {"log entry #0", "\n"},
508 583 » » » » » » » {"another line of text", ""},
509 » » » » // Confirm that the state was returned. 584 » » » » » » },
510 » » » » So(resp.State, ShouldNotBeNil) 585 » » » » » })
511 » » » }) 586 » » » » })
512 587
513 » » » Convey(`Will successfully retrieve all records if non-co ntiguous is allowed.`, func() { 588 » » » » Convey(`Will get a Binary LogEntry`, func() {
514 » » » » req.NonContiguous = true 589 » » » » » req.Index = 4
515 » » » » resp, err := s.Get(c, &req) 590 » » » » » req.LogCount = 1
516 » » » » So(err, ShouldBeRPCOK) 591 » » » » » resp, err := svr.Get(c, &req)
517 » » » » So(resp, shouldHaveLogs, 0, 1, 2, 4, 5, 7) 592 » » » » » So(err, ShouldBeRPCOK)
518 » » » }) 593 » » » » » So(resp, shouldHaveLogs, 4)
519 594 » » » » » So(resp.Logs[0].GetBinary(), ShouldResem ble, &logpb.Binary{
520 » » » Convey(`When newlines are not requested, does not includ e delimiters.`, func() { 595 » » » » » » Data: []byte{0x00, 0x01, 0x02, 0 x03},
521 » » » » req.LogCount = 1 596 » » » » » })
522 597 » » » » })
523 » » » » resp, err := s.Get(c, &req) 598
524 » » » » So(err, ShouldBeRPCOK) 599 » » » » Convey(`Will get a Datagram LogEntry`, func() {
525 » » » » So(resp, shouldHaveLogs, 0) 600 » » » » » req.Index = 5
526 601 » » » » » req.LogCount = 1
527 » » » » So(resp.Logs[0].GetText(), ShouldResemble, &logp b.Text{ 602 » » » » » resp, err := svr.Get(c, &req)
528 » » » » » Lines: []*logpb.Text_Line{ 603 » » » » » So(err, ShouldBeRPCOK)
529 » » » » » » {"log entry #0", "\n"}, 604 » » » » » So(resp, shouldHaveLogs, 5)
530 » » » » » » {"another line of text", ""}, 605 » » » » » So(resp.Logs[0].GetDatagram(), ShouldRes emble, &logpb.Datagram{
531 » » » » » }, 606 » » » » » » Data: []byte{0x00, 0x01, 0x02, 0 x03},
532 » » » » }) 607 » » » » » » Partial: &logpb.Datagram_Partial {
533 » » » }) 608 » » » » » » » Index: 2,
534 609 » » » » » » » Size: 1024,
535 » » » Convey(`Will get a Binary LogEntry`, func() { 610 » » » » » » » Last: false,
536 » » » » req.Index = 4 611 » » » » » » },
537 » » » » req.LogCount = 1 612 » » » » » })
538 » » » » resp, err := s.Get(c, &req) 613 » » » » })
539 » » » » So(err, ShouldBeRPCOK) 614 » » » })
540 » » » » So(resp, shouldHaveLogs, 4) 615
541 » » » » So(resp.Logs[0].GetBinary(), ShouldResemble, &lo gpb.Binary{ 616 » » » Convey(`Testing tail requests`, func() {
542 » » » » » Data: []byte{0x00, 0x01, 0x02, 0x03}, 617 » » » » req := logdog.TailRequest{
543 » » » » }) 618 » » » » » Project: "test-project",
544 » » » }) 619 » » » » » Path: string(ls.Path()),
545 620 » » » » }
546 » » » Convey(`Will get a Datagram LogEntry`, func() { 621
547 » » » » req.Index = 5 622 » » » » Convey(`Will successfully retrieve a stream path .`, func() {
548 » » » » req.LogCount = 1 623 » » » » » resp, err := svr.Tail(c, &req)
549 » » » » resp, err := s.Get(c, &req) 624 » » » » » So(err, ShouldBeRPCOK)
550 » » » » So(err, ShouldBeRPCOK) 625 » » » » » So(resp, shouldHaveLogs, 7)
551 » » » » So(resp, shouldHaveLogs, 5) 626 » » » » })
552 » » » » So(resp.Logs[0].GetDatagram(), ShouldResemble, & logpb.Datagram{ 627
553 » » » » » Data: []byte{0x00, 0x01, 0x02, 0x03}, 628 » » » » Convey(`Will successfully retrieve a stream path hash and state.`, func() {
554 » » » » » Partial: &logpb.Datagram_Partial{ 629 » » » » » req.Path = ls.HashID
555 » » » » » » Index: 2, 630 » » » » » req.State = true
556 » » » » » » Size: 1024, 631
557 » » » » » » Last: false, 632 » » » » » resp, err := svr.Tail(c, &req)
558 » » » » » }, 633 » » » » » So(err, ShouldBeRPCOK)
559 » » » » }) 634 » » » » » So(resp, shouldHaveLogs, 7)
560 » » » }) 635 » » » » » So(resp.State, ShouldResemble, loadLogSt reamState(ls))
561 » » }) 636 » » » » })
562
563 » » Convey(`Testing tail requests`, func() {
564 » » » req := logdog.TailRequest{
565 » » » » Path: string(ls.Path()),
566 » » » }
567
568 » » » Convey(`Will successfully retrieve a stream path.`, func () {
569 » » » » resp, err := s.Tail(c, &req)
570 » » » » So(err, ShouldBeRPCOK)
571 » » » » So(resp, shouldHaveLogs, 7)
572 » » » })
573
574 » » » Convey(`Will successfully retrieve a stream path hash an d state.`, func() {
575 » » » » req.Path = ls.HashID
576 » » » » req.State = true
577
578 » » » » resp, err := s.Tail(c, &req)
579 » » » » So(err, ShouldBeRPCOK)
580 » » » » So(resp, shouldHaveLogs, 7)
581 » » » » So(resp.State, ShouldResemble, loadLogStreamStat e(ls))
582 }) 637 })
583 }) 638 })
584 }) 639 })
585 } 640 }
586 641
587 func TestGetIntermediate(t *testing.T) { 642 func TestGetIntermediate(t *testing.T) {
588 t.Parallel() 643 t.Parallel()
589 644
590 testGetImpl(t, false) 645 testGetImpl(t, false)
591 } 646 }
592 647
593 func TestGetArchived(t *testing.T) { 648 func TestGetArchived(t *testing.T) {
594 t.Parallel() 649 t.Parallel()
595 650
596 » testGetImpl(t, true) 651 » testGetImpl(t, false)
597 } 652 }
OLDNEW
« no previous file with comments | « appengine/logdog/coordinator/endpoints/logs/get.go ('k') | appengine/logdog/coordinator/endpoints/logs/list.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698