| OLD | NEW | 
|---|
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. | 
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be | 
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. | 
| 4 | 4 | 
| 5 package logs | 5 package logs | 
| 6 | 6 | 
| 7 import ( | 7 import ( | 
| 8         "bytes" | 8         "bytes" | 
| 9         "errors" | 9         "errors" | 
| 10         "fmt" | 10         "fmt" | 
| 11         "io" | 11         "io" | 
| 12         "math" | 12         "math" | 
| 13         "testing" | 13         "testing" | 
| 14         "time" | 14         "time" | 
| 15 | 15 | 
| 16         "github.com/golang/protobuf/proto" | 16         "github.com/golang/protobuf/proto" | 
| 17         "github.com/luci/gae/filter/featureBreaker" | 17         "github.com/luci/gae/filter/featureBreaker" | 
| 18         "github.com/luci/gae/impl/memory" | 18         "github.com/luci/gae/impl/memory" | 
| 19         ds "github.com/luci/gae/service/datastore" | 19         ds "github.com/luci/gae/service/datastore" | 
| 20         "github.com/luci/luci-go/appengine/logdog/coordinator" | 20         "github.com/luci/luci-go/appengine/logdog/coordinator" | 
| 21         ct "github.com/luci/luci-go/appengine/logdog/coordinator/coordinatorTest
     " | 21         ct "github.com/luci/luci-go/appengine/logdog/coordinator/coordinatorTest
     " | 
| 22         "github.com/luci/luci-go/common/api/logdog_coordinator/logs/v1" | 22         "github.com/luci/luci-go/common/api/logdog_coordinator/logs/v1" | 
| 23         "github.com/luci/luci-go/common/clock/testclock" | 23         "github.com/luci/luci-go/common/clock/testclock" | 
|  | 24         "github.com/luci/luci-go/common/config" | 
| 24         "github.com/luci/luci-go/common/gcloud/gs" | 25         "github.com/luci/luci-go/common/gcloud/gs" | 
| 25         "github.com/luci/luci-go/common/iotools" | 26         "github.com/luci/luci-go/common/iotools" | 
| 26         "github.com/luci/luci-go/common/logdog/types" | 27         "github.com/luci/luci-go/common/logdog/types" | 
| 27         "github.com/luci/luci-go/common/proto/logdog/logpb" | 28         "github.com/luci/luci-go/common/proto/logdog/logpb" | 
| 28         "github.com/luci/luci-go/common/recordio" | 29         "github.com/luci/luci-go/common/recordio" | 
| 29         "github.com/luci/luci-go/server/auth" | 30         "github.com/luci/luci-go/server/auth" | 
| 30         "github.com/luci/luci-go/server/auth/authtest" | 31         "github.com/luci/luci-go/server/auth/authtest" | 
| 31         "github.com/luci/luci-go/server/logdog/archive" | 32         "github.com/luci/luci-go/server/logdog/archive" | 
| 32         "github.com/luci/luci-go/server/logdog/storage" | 33         "github.com/luci/luci-go/server/logdog/storage" | 
| 33         memoryStorage "github.com/luci/luci-go/server/logdog/storage/memory" | 34         memoryStorage "github.com/luci/luci-go/server/logdog/storage/memory" | 
| (...skipping 116 matching lines...) Expand 10 before | Expand all | Expand 10 after  Loading... | 
| 150 | 151 | 
| 151 func testGetImpl(t *testing.T, archived bool) { | 152 func testGetImpl(t *testing.T, archived bool) { | 
| 152         Convey(fmt.Sprintf(`With a testing configuration, a Get request (archive
     d=%v)`, archived), t, func() { | 153         Convey(fmt.Sprintf(`With a testing configuration, a Get request (archive
     d=%v)`, archived), t, func() { | 
| 153                 c, tc := testclock.UseTime(context.Background(), testclock.TestT
     imeLocal) | 154                 c, tc := testclock.UseTime(context.Background(), testclock.TestT
     imeLocal) | 
| 154                 c = memory.Use(c) | 155                 c = memory.Use(c) | 
| 155 | 156 | 
| 156                 fs := authtest.FakeState{} | 157                 fs := authtest.FakeState{} | 
| 157                 c = auth.WithState(c, &fs) | 158                 c = auth.WithState(c, &fs) | 
| 158 | 159 | 
| 159                 ms := memoryStorage.Storage{} | 160                 ms := memoryStorage.Storage{} | 
|  | 161 | 
| 160                 gsc := testGSClient{} | 162                 gsc := testGSClient{} | 
| 161                 svcStub := ct.Services{ | 163                 svcStub := ct.Services{ | 
| 162                         IS: func() (storage.Storage, error) { | 164                         IS: func() (storage.Storage, error) { | 
| 163                                 return &ms, nil | 165                                 return &ms, nil | 
| 164                         }, | 166                         }, | 
| 165                         GS: func() (gs.Client, error) { | 167                         GS: func() (gs.Client, error) { | 
| 166                                 return gsc, nil | 168                                 return gsc, nil | 
| 167                         }, | 169                         }, | 
| 168                 } | 170                 } | 
| 169                 svcStub.InitConfig() | 171                 svcStub.InitConfig() | 
| 170                 svcStub.ServiceConfig.Coordinator.AdminAuthGroup = "test-adminis
     trators" | 172                 svcStub.ServiceConfig.Coordinator.AdminAuthGroup = "test-adminis
     trators" | 
| 171                 c = coordinator.WithServices(c, &svcStub) | 173                 c = coordinator.WithServices(c, &svcStub) | 
| 172 | 174 | 
| 173 »       »       s := New() | 175 »       »       svr := New() | 
|  | 176 | 
|  | 177 »       »       // di is a datastore bound to the test project namespace. | 
|  | 178 »       »       const project = "test-project" | 
|  | 179 »       »       if err := coordinator.WithProjectNamespace(&c, config.ProjectNam
     e(project)); err != nil { | 
|  | 180 »       »       »       panic(err) | 
|  | 181 »       »       } | 
|  | 182 »       »       di := ds.Get(c) | 
| 174 | 183 | 
| 175                 // Generate our test stream. | 184                 // Generate our test stream. | 
| 176                 desc := ct.TestLogStreamDescriptor(c, "foo/bar") | 185                 desc := ct.TestLogStreamDescriptor(c, "foo/bar") | 
| 177                 ls := ct.TestLogStream(c, desc) | 186                 ls := ct.TestLogStream(c, desc) | 
| 178 »       »       if err := ds.Get(c).Put(ls); err != nil { | 187 »       »       if err := di.Put(ls); err != nil { | 
| 179                         panic(err) | 188                         panic(err) | 
| 180                 } | 189                 } | 
| 181 | 190 | 
| 182                 tc.Add(time.Second) | 191                 tc.Add(time.Second) | 
| 183                 var entries []*logpb.LogEntry | 192                 var entries []*logpb.LogEntry | 
| 184                 protobufs := map[uint64][]byte{} | 193                 protobufs := map[uint64][]byte{} | 
| 185                 for _, v := range []int{0, 1, 2, 4, 5, 7} { | 194                 for _, v := range []int{0, 1, 2, 4, 5, 7} { | 
| 186                         le := ct.TestLogEntry(c, ls, v) | 195                         le := ct.TestLogEntry(c, ls, v) | 
| 187                         le.GetText().Lines = append(le.GetText().Lines, &logpb.T
     ext_Line{ | 196                         le.GetText().Lines = append(le.GetText().Lines, &logpb.T
     ext_Line{ | 
| 188                                 Value: "another line of text", | 197                                 Value: "another line of text", | 
| (...skipping 37 matching lines...) Expand 10 before | Expand all | Expand 10 after  Loading... | 
| 226                                 size += recordio.FrameHeaderSize(int64(len(pb)))
      + len(pb) | 235                                 size += recordio.FrameHeaderSize(int64(len(pb)))
      + len(pb) | 
| 227                         } | 236                         } | 
| 228                         if size > math.MaxInt32 { | 237                         if size > math.MaxInt32 { | 
| 229                                 panic(size) | 238                                 panic(size) | 
| 230                         } | 239                         } | 
| 231                         return int32(size) | 240                         return int32(size) | 
| 232                 } | 241                 } | 
| 233 | 242 | 
| 234                 Convey(`Testing Get requests (no logs)`, func() { | 243                 Convey(`Testing Get requests (no logs)`, func() { | 
| 235                         req := logdog.GetRequest{ | 244                         req := logdog.GetRequest{ | 
| 236 »       »       »       »       Path: string(ls.Path()), | 245 »       »       »       »       Project: project, | 
|  | 246 »       »       »       »       Path:    string(ls.Path()), | 
| 237                         } | 247                         } | 
| 238 | 248 | 
|  | 249                         Convey(`Will succeed with no logs.`, func() { | 
|  | 250                                 resp, err := svr.Get(c, &req) | 
|  | 251 | 
|  | 252                                 So(err, ShouldBeRPCOK) | 
|  | 253                                 So(resp, shouldHaveLogs) | 
|  | 254                         }) | 
|  | 255 | 
| 239                         Convey(`Will fail if the Path is not a stream path or a 
     hash.`, func() { | 256                         Convey(`Will fail if the Path is not a stream path or a 
     hash.`, func() { | 
| 240                                 req.Path = "not/a/full/stream/path" | 257                                 req.Path = "not/a/full/stream/path" | 
| 241 »       »       »       »       _, err := s.Get(c, &req) | 258 »       »       »       »       _, err := svr.Get(c, &req) | 
| 242                                 So(err, ShouldErrLike, "invalid path value") | 259                                 So(err, ShouldErrLike, "invalid path value") | 
| 243                         }) | 260                         }) | 
| 244 | 261 | 
| 245                         Convey(`Will fail with Internal if the datastore Get() d
     oesn't work.`, func() { | 262                         Convey(`Will fail with Internal if the datastore Get() d
     oesn't work.`, func() { | 
| 246                                 c, fb := featureBreaker.FilterRDS(c, nil) | 263                                 c, fb := featureBreaker.FilterRDS(c, nil) | 
| 247                                 fb.BreakFeatures(errors.New("testing error"), "G
     etMulti") | 264                                 fb.BreakFeatures(errors.New("testing error"), "G
     etMulti") | 
| 248 | 265 | 
| 249 »       »       »       »       _, err := s.Get(c, &req) | 266 »       »       »       »       _, err := svr.Get(c, &req) | 
| 250                                 So(err, ShouldBeRPCInternal) | 267                                 So(err, ShouldBeRPCInternal) | 
| 251                         }) | 268                         }) | 
| 252 | 269 | 
| 253 »       »       »       Convey(`Will fail with NotFound if the log stream does n
     ot exist.`, func() { | 270 »       »       »       Convey(`Will fail with NotFound if the log stream does n
     ot exist (different project).`, func() { | 
|  | 271 »       »       »       »       req.Project = "does-not-exist" | 
|  | 272 »       »       »       »       _, err := svr.Get(c, &req) | 
|  | 273 »       »       »       »       So(err, ShouldBeRPCNotFound) | 
|  | 274 »       »       »       }) | 
|  | 275 | 
|  | 276 »       »       »       Convey(`Will fail with NotFound if the log path does not
      exist (different path).`, func() { | 
| 254                                 req.Path = "testing/+/does/not/exist" | 277                                 req.Path = "testing/+/does/not/exist" | 
| 255 »       »       »       »       _, err := s.Get(c, &req) | 278 »       »       »       »       _, err := svr.Get(c, &req) | 
| 256                                 So(err, ShouldBeRPCNotFound) | 279                                 So(err, ShouldBeRPCNotFound) | 
| 257                         }) | 280                         }) | 
| 258                 }) | 281                 }) | 
| 259 | 282 | 
| 260 »       »       if !archived { | 283 »       »       Convey(`Testing Tail requests (no logs)`, func() { | 
| 261 »       »       »       // Add the logs to the in-memory temporary storage. | 284 »       »       »       req := logdog.TailRequest{ | 
| 262 »       »       »       for _, le := range entries { | 285 »       »       »       »       Project: project, | 
| 263 »       »       »       »       err := ms.Put(storage.PutRequest{ | 286 »       »       »       »       Path:    string(ls.Path()), | 
| 264 »       »       »       »       »       Path:   ls.Path(), |  | 
| 265 »       »       »       »       »       Index:  types.MessageIndex(le.StreamInde
     x), |  | 
| 266 »       »       »       »       »       Values: [][]byte{protobufs[le.StreamInde
     x]}, |  | 
| 267 »       »       »       »       }) |  | 
| 268 »       »       »       »       if err != nil { |  | 
| 269 »       »       »       »       »       panic(fmt.Errorf("failed to Put() LogEnt
     ry: %v", err)) |  | 
| 270 »       »       »       »       } |  | 
| 271                         } | 287                         } | 
| 272 »       »       } else { | 288 | 
| 273 »       »       »       // Archive this log stream. We will generate one index e
     ntry for every | 289 »       »       »       Convey(`Will succeed with no logs.`, func() { | 
| 274 »       »       »       // 2 log entries. | 290 »       »       »       »       resp, err := svr.Tail(c, &req) | 
| 275 »       »       »       src := staticArchiveSource(entries) | 291 | 
| 276 »       »       »       var lbuf, ibuf bytes.Buffer | 292 »       »       »       »       So(err, ShouldBeRPCOK) | 
| 277 »       »       »       m := archive.Manifest{ | 293 »       »       »       »       So(resp, shouldHaveLogs) | 
| 278 »       »       »       »       Desc:             desc, | 294 »       »       »       }) | 
| 279 »       »       »       »       Source:           &src, | 295 | 
| 280 »       »       »       »       LogWriter:        &lbuf, | 296 »       »       »       Convey(`Will fail with NotFound if the log stream does n
     ot exist (different project).`, func() { | 
| 281 »       »       »       »       IndexWriter:      &ibuf, | 297 »       »       »       »       req.Project = "does-not-exist" | 
| 282 »       »       »       »       StreamIndexRange: 2, | 298 »       »       »       »       _, err := svr.Tail(c, &req) | 
|  | 299 »       »       »       »       So(err, ShouldBeRPCNotFound) | 
|  | 300 »       »       »       }) | 
|  | 301 | 
|  | 302 »       »       »       Convey(`Will fail with NotFound if the log path does not
      exist (different path).`, func() { | 
|  | 303 »       »       »       »       req.Path = "testing/+/does/not/exist" | 
|  | 304 »       »       »       »       _, err := svr.Tail(c, &req) | 
|  | 305 »       »       »       »       So(err, ShouldBeRPCNotFound) | 
|  | 306 »       »       »       }) | 
|  | 307 »       »       }) | 
|  | 308 | 
|  | 309 »       »       Convey(`When testing log data is added`, func() { | 
|  | 310 »       »       »       if !archived { | 
|  | 311 »       »       »       »       // Add the logs to the in-memory temporary stora
     ge. | 
|  | 312 »       »       »       »       for _, le := range entries { | 
|  | 313 »       »       »       »       »       err := ms.Put(storage.PutRequest{ | 
|  | 314 »       »       »       »       »       »       Project: project, | 
|  | 315 »       »       »       »       »       »       Path:    ls.Path(), | 
|  | 316 »       »       »       »       »       »       Index:   types.MessageIndex(le.S
     treamIndex), | 
|  | 317 »       »       »       »       »       »       Values:  [][]byte{protobufs[le.S
     treamIndex]}, | 
|  | 318 »       »       »       »       »       }) | 
|  | 319 »       »       »       »       »       if err != nil { | 
|  | 320 »       »       »       »       »       »       panic(fmt.Errorf("failed to Put(
     ) LogEntry: %v", err)) | 
|  | 321 »       »       »       »       »       } | 
|  | 322 »       »       »       »       } | 
|  | 323 »       »       »       } else { | 
|  | 324 »       »       »       »       // Archive this log stream. We will generate one
      index entry for every | 
|  | 325 »       »       »       »       // 2 log entries. | 
|  | 326 »       »       »       »       src := staticArchiveSource(entries) | 
|  | 327 »       »       »       »       var lbuf, ibuf bytes.Buffer | 
|  | 328 »       »       »       »       m := archive.Manifest{ | 
|  | 329 »       »       »       »       »       Desc:             desc, | 
|  | 330 »       »       »       »       »       Source:           &src, | 
|  | 331 »       »       »       »       »       LogWriter:        &lbuf, | 
|  | 332 »       »       »       »       »       IndexWriter:      &ibuf, | 
|  | 333 »       »       »       »       »       StreamIndexRange: 2, | 
|  | 334 »       »       »       »       } | 
|  | 335 »       »       »       »       if err := archive.Archive(m); err != nil { | 
|  | 336 »       »       »       »       »       panic(err) | 
|  | 337 »       »       »       »       } | 
|  | 338 | 
|  | 339 »       »       »       »       now := tc.Now().UTC() | 
|  | 340 | 
|  | 341 »       »       »       »       gsc.put("gs://testbucket/stream", lbuf.Bytes()) | 
|  | 342 »       »       »       »       gsc.put("gs://testbucket/index", ibuf.Bytes()) | 
|  | 343 »       »       »       »       ls.State = coordinator.LSArchived | 
|  | 344 »       »       »       »       ls.TerminatedTime = now | 
|  | 345 »       »       »       »       ls.ArchivedTime = now | 
|  | 346 »       »       »       »       ls.ArchiveStreamURL = "gs://testbucket/stream" | 
|  | 347 »       »       »       »       ls.ArchiveIndexURL = "gs://testbucket/index" | 
| 283                         } | 348                         } | 
| 284 »       »       »       if err := archive.Archive(m); err != nil { | 349 »       »       »       if err := di.Put(ls); err != nil { | 
| 285                                 panic(err) | 350                                 panic(err) | 
| 286                         } | 351                         } | 
| 287 | 352 | 
| 288 »       »       »       now := tc.Now().UTC() | 353 »       »       »       Convey(`Testing Get requests`, func() { | 
| 289 | 354 »       »       »       »       req := logdog.GetRequest{ | 
| 290 »       »       »       gsc.put("gs://testbucket/stream", lbuf.Bytes()) | 355 »       »       »       »       »       Project: project, | 
| 291 »       »       »       gsc.put("gs://testbucket/index", ibuf.Bytes()) | 356 »       »       »       »       »       Path:    string(ls.Path()), | 
| 292 »       »       »       ls.State = coordinator.LSArchived | 357 »       »       »       »       } | 
| 293 »       »       »       ls.TerminatedTime = now | 358 | 
| 294 »       »       »       ls.ArchivedTime = now | 359 »       »       »       »       Convey(`When the log stream is purged`, func() { | 
| 295 »       »       »       ls.ArchiveStreamURL = "gs://testbucket/stream" | 360 »       »       »       »       »       ls.Purged = true | 
| 296 »       »       »       ls.ArchiveIndexURL = "gs://testbucket/index" | 361 »       »       »       »       »       if err := di.Put(ls); err != nil { | 
| 297 »       »       } |  | 
| 298 »       »       if err := ds.Get(c).Put(ls); err != nil { |  | 
| 299 »       »       »       panic(err) |  | 
| 300 »       »       } |  | 
| 301 |  | 
| 302 »       »       Convey(`Testing Get requests`, func() { |  | 
| 303 »       »       »       req := logdog.GetRequest{ |  | 
| 304 »       »       »       »       Path: string(ls.Path()), |  | 
| 305 »       »       »       } |  | 
| 306 |  | 
| 307 »       »       »       Convey(`When the log stream is purged`, func() { |  | 
| 308 »       »       »       »       ls.Purged = true |  | 
| 309 »       »       »       »       if err := ds.Get(c).Put(ls); err != nil { |  | 
| 310 »       »       »       »       »       panic(err) |  | 
| 311 »       »       »       »       } |  | 
| 312 |  | 
| 313 »       »       »       »       Convey(`Will return NotFound if the user is not 
     an administrator.`, func() { |  | 
| 314 »       »       »       »       »       _, err := s.Get(c, &req) |  | 
| 315 »       »       »       »       »       So(err, ShouldBeRPCNotFound) |  | 
| 316 »       »       »       »       }) |  | 
| 317 |  | 
| 318 »       »       »       »       Convey(`Will process the request if the user is 
     an administrator.`, func() { |  | 
| 319 »       »       »       »       »       fs.IdentityGroups = []string{"test-admin
     istrators"} |  | 
| 320 |  | 
| 321 »       »       »       »       »       resp, err := s.Get(c, &req) |  | 
| 322 »       »       »       »       »       So(err, ShouldBeRPCOK) |  | 
| 323 »       »       »       »       »       So(resp, shouldHaveLogs, 0, 1, 2) |  | 
| 324 »       »       »       »       }) |  | 
| 325 »       »       »       }) |  | 
| 326 |  | 
| 327 »       »       »       Convey(`Will return empty if no records were requested.`
     , func() { |  | 
| 328 »       »       »       »       req.LogCount = -1 |  | 
| 329 »       »       »       »       req.State = false |  | 
| 330 |  | 
| 331 »       »       »       »       resp, err := s.Get(c, &req) |  | 
| 332 »       »       »       »       So(err, ShouldBeRPCOK) |  | 
| 333 »       »       »       »       So(resp.Logs, ShouldHaveLength, 0) |  | 
| 334 »       »       »       }) |  | 
| 335 |  | 
| 336 »       »       »       Convey(`Will successfully retrieve a stream path.`, func
     () { |  | 
| 337 »       »       »       »       resp, err := s.Get(c, &req) |  | 
| 338 »       »       »       »       So(err, ShouldBeRPCOK) |  | 
| 339 »       »       »       »       So(resp, shouldHaveLogs, 0, 1, 2) |  | 
| 340 »       »       »       }) |  | 
| 341 |  | 
| 342 »       »       »       Convey(`Will successfully retrieve a stream path offset 
     at 4.`, func() { |  | 
| 343 »       »       »       »       req.Index = 4 |  | 
| 344 |  | 
| 345 »       »       »       »       resp, err := s.Get(c, &req) |  | 
| 346 »       »       »       »       So(err, ShouldBeRPCOK) |  | 
| 347 »       »       »       »       So(resp, shouldHaveLogs, 4, 5) |  | 
| 348 »       »       »       }) |  | 
| 349 |  | 
| 350 »       »       »       Convey(`Will retrieve no logs for contiguous offset 6.`,
      func() { |  | 
| 351 »       »       »       »       req.Index = 6 |  | 
| 352 |  | 
| 353 »       »       »       »       resp, err := s.Get(c, &req) |  | 
| 354 »       »       »       »       So(err, ShouldBeRPCOK) |  | 
| 355 »       »       »       »       So(len(resp.Logs), ShouldEqual, 0) |  | 
| 356 »       »       »       }) |  | 
| 357 |  | 
| 358 »       »       »       Convey(`Will retrieve log 7 for non-contiguous offset 6.
     `, func() { |  | 
| 359 »       »       »       »       req.NonContiguous = true |  | 
| 360 »       »       »       »       req.Index = 6 |  | 
| 361 |  | 
| 362 »       »       »       »       resp, err := s.Get(c, &req) |  | 
| 363 »       »       »       »       So(err, ShouldBeRPCOK) |  | 
| 364 »       »       »       »       So(resp, shouldHaveLogs, 7) |  | 
| 365 »       »       »       }) |  | 
| 366 |  | 
| 367 »       »       »       Convey(`With a byte limit of 1, will still return at lea
     st one log entry.`, func() { |  | 
| 368 »       »       »       »       req.ByteCount = 1 |  | 
| 369 |  | 
| 370 »       »       »       »       resp, err := s.Get(c, &req) |  | 
| 371 »       »       »       »       So(err, ShouldBeRPCOK) |  | 
| 372 »       »       »       »       So(resp, shouldHaveLogs, 0) |  | 
| 373 »       »       »       }) |  | 
| 374 |  | 
| 375 »       »       »       Convey(`With a byte limit of sizeof(0), will return log 
     entry 0.`, func() { |  | 
| 376 »       »       »       »       req.ByteCount = frameSize(0) |  | 
| 377 |  | 
| 378 »       »       »       »       resp, err := s.Get(c, &req) |  | 
| 379 »       »       »       »       So(err, ShouldBeRPCOK) |  | 
| 380 »       »       »       »       So(resp, shouldHaveLogs, 0) |  | 
| 381 »       »       »       }) |  | 
| 382 |  | 
| 383 »       »       »       Convey(`With a byte limit of sizeof(0)+1, will return lo
     g entry 0.`, func() { |  | 
| 384 »       »       »       »       req.ByteCount = frameSize(0) + 1 |  | 
| 385 |  | 
| 386 »       »       »       »       resp, err := s.Get(c, &req) |  | 
| 387 »       »       »       »       So(err, ShouldBeRPCOK) |  | 
| 388 »       »       »       »       So(resp, shouldHaveLogs, 0) |  | 
| 389 »       »       »       }) |  | 
| 390 |  | 
| 391 »       »       »       Convey(`With a byte limit of sizeof({0, 1}), will return
      log entries {0, 1}.`, func() { |  | 
| 392 »       »       »       »       req.ByteCount = frameSize(0, 1) |  | 
| 393 |  | 
| 394 »       »       »       »       resp, err := s.Get(c, &req) |  | 
| 395 »       »       »       »       So(err, ShouldBeRPCOK) |  | 
| 396 »       »       »       »       So(resp, shouldHaveLogs, 0, 1) |  | 
| 397 »       »       »       }) |  | 
| 398 |  | 
| 399 »       »       »       Convey(`With a byte limit of sizeof({0, 1, 2}), will ret
     urn log entries {0, 1, 2}.`, func() { |  | 
| 400 »       »       »       »       req.ByteCount = frameSize(0, 1, 2) |  | 
| 401 |  | 
| 402 »       »       »       »       resp, err := s.Get(c, &req) |  | 
| 403 »       »       »       »       So(err, ShouldBeRPCOK) |  | 
| 404 »       »       »       »       So(resp, shouldHaveLogs, 0, 1, 2) |  | 
| 405 »       »       »       }) |  | 
| 406 |  | 
| 407 »       »       »       Convey(`With a byte limit of sizeof({0, 1, 2})+1, will r
     eturn log entries {0, 1, 2}.`, func() { |  | 
| 408 »       »       »       »       req.ByteCount = frameSize(0, 1, 2) + 1 |  | 
| 409 |  | 
| 410 »       »       »       »       resp, err := s.Get(c, &req) |  | 
| 411 »       »       »       »       So(err, ShouldBeRPCOK) |  | 
| 412 »       »       »       »       So(resp, shouldHaveLogs, 0, 1, 2) |  | 
| 413 »       »       »       }) |  | 
| 414 |  | 
| 415 »       »       »       Convey(`Will successfully retrieve a stream path hash.`,
      func() { |  | 
| 416 »       »       »       »       req.Path = ls.HashID |  | 
| 417 »       »       »       »       resp, err := s.Get(c, &req) |  | 
| 418 »       »       »       »       So(err, ShouldBeRPCOK) |  | 
| 419 »       »       »       »       So(resp, shouldHaveLogs, 0, 1, 2) |  | 
| 420 »       »       »       }) |  | 
| 421 |  | 
| 422 »       »       »       Convey(`When requesting state`, func() { |  | 
| 423 »       »       »       »       req.State = true |  | 
| 424 »       »       »       »       req.LogCount = -1 |  | 
| 425 |  | 
| 426 »       »       »       »       Convey(`Will successfully retrieve stream state.
     `, func() { |  | 
| 427 »       »       »       »       »       resp, err := s.Get(c, &req) |  | 
| 428 »       »       »       »       »       So(err, ShouldBeRPCOK) |  | 
| 429 »       »       »       »       »       So(resp.State, ShouldResemble, loadLogSt
     reamState(ls)) |  | 
| 430 »       »       »       »       »       So(len(resp.Logs), ShouldEqual, 0) |  | 
| 431 »       »       »       »       }) |  | 
| 432 |  | 
| 433 »       »       »       »       Convey(`Will return Internal if the protobuf des
     criptor data is corrupt.`, func() { |  | 
| 434 »       »       »       »       »       ls.SetDSValidate(false) |  | 
| 435 »       »       »       »       »       ls.Descriptor = []byte{0x00} // Invalid 
     protobuf, zero tag. |  | 
| 436 »       »       »       »       »       if err := ds.Get(c).Put(ls); err != nil 
     { |  | 
| 437                                                 panic(err) | 362                                                 panic(err) | 
| 438                                         } | 363                                         } | 
| 439 | 364 | 
| 440 »       »       »       »       »       _, err := s.Get(c, &req) | 365 »       »       »       »       »       Convey(`Will return NotFound if the user
      is not an administrator.`, func() { | 
|  | 366 »       »       »       »       »       »       _, err := svr.Get(c, &req) | 
|  | 367 »       »       »       »       »       »       So(err, ShouldBeRPCNotFound) | 
|  | 368 »       »       »       »       »       }) | 
|  | 369 | 
|  | 370 »       »       »       »       »       Convey(`Will process the request if the 
     user is an administrator.`, func() { | 
|  | 371 »       »       »       »       »       »       fs.IdentityGroups = []string{"te
     st-administrators"} | 
|  | 372 | 
|  | 373 »       »       »       »       »       »       resp, err := svr.Get(c, &req) | 
|  | 374 »       »       »       »       »       »       So(err, ShouldBeRPCOK) | 
|  | 375 »       »       »       »       »       »       So(resp, shouldHaveLogs, 0, 1, 2
     ) | 
|  | 376 »       »       »       »       »       }) | 
|  | 377 »       »       »       »       }) | 
|  | 378 | 
|  | 379 »       »       »       »       Convey(`Will return empty if no records were req
     uested.`, func() { | 
|  | 380 »       »       »       »       »       req.LogCount = -1 | 
|  | 381 »       »       »       »       »       req.State = false | 
|  | 382 | 
|  | 383 »       »       »       »       »       resp, err := svr.Get(c, &req) | 
|  | 384 »       »       »       »       »       So(err, ShouldBeRPCOK) | 
|  | 385 »       »       »       »       »       So(resp.Logs, ShouldHaveLength, 0) | 
|  | 386 »       »       »       »       }) | 
|  | 387 | 
|  | 388 »       »       »       »       Convey(`Will successfully retrieve a stream path
     .`, func() { | 
|  | 389 »       »       »       »       »       resp, err := svr.Get(c, &req) | 
|  | 390 »       »       »       »       »       So(err, ShouldBeRPCOK) | 
|  | 391 »       »       »       »       »       So(resp, shouldHaveLogs, 0, 1, 2) | 
|  | 392 »       »       »       »       }) | 
|  | 393 | 
|  | 394 »       »       »       »       Convey(`Will successfully retrieve a stream path
      offset at 4.`, func() { | 
|  | 395 »       »       »       »       »       req.Index = 4 | 
|  | 396 | 
|  | 397 »       »       »       »       »       resp, err := svr.Get(c, &req) | 
|  | 398 »       »       »       »       »       So(err, ShouldBeRPCOK) | 
|  | 399 »       »       »       »       »       So(resp, shouldHaveLogs, 4, 5) | 
|  | 400 »       »       »       »       }) | 
|  | 401 | 
|  | 402 »       »       »       »       Convey(`Will retrieve no logs for contiguous off
     set 6.`, func() { | 
|  | 403 »       »       »       »       »       req.Index = 6 | 
|  | 404 | 
|  | 405 »       »       »       »       »       resp, err := svr.Get(c, &req) | 
|  | 406 »       »       »       »       »       So(err, ShouldBeRPCOK) | 
|  | 407 »       »       »       »       »       So(len(resp.Logs), ShouldEqual, 0) | 
|  | 408 »       »       »       »       }) | 
|  | 409 | 
|  | 410 »       »       »       »       Convey(`Will retrieve log 7 for non-contiguous o
     ffset 6.`, func() { | 
|  | 411 »       »       »       »       »       req.NonContiguous = true | 
|  | 412 »       »       »       »       »       req.Index = 6 | 
|  | 413 | 
|  | 414 »       »       »       »       »       resp, err := svr.Get(c, &req) | 
|  | 415 »       »       »       »       »       So(err, ShouldBeRPCOK) | 
|  | 416 »       »       »       »       »       So(resp, shouldHaveLogs, 7) | 
|  | 417 »       »       »       »       }) | 
|  | 418 | 
|  | 419 »       »       »       »       Convey(`With a byte limit of 1, will still retur
     n at least one log entry.`, func() { | 
|  | 420 »       »       »       »       »       req.ByteCount = 1 | 
|  | 421 | 
|  | 422 »       »       »       »       »       resp, err := svr.Get(c, &req) | 
|  | 423 »       »       »       »       »       So(err, ShouldBeRPCOK) | 
|  | 424 »       »       »       »       »       So(resp, shouldHaveLogs, 0) | 
|  | 425 »       »       »       »       }) | 
|  | 426 | 
|  | 427 »       »       »       »       Convey(`With a byte limit of sizeof(0), will ret
     urn log entry 0.`, func() { | 
|  | 428 »       »       »       »       »       req.ByteCount = frameSize(0) | 
|  | 429 | 
|  | 430 »       »       »       »       »       resp, err := svr.Get(c, &req) | 
|  | 431 »       »       »       »       »       So(err, ShouldBeRPCOK) | 
|  | 432 »       »       »       »       »       So(resp, shouldHaveLogs, 0) | 
|  | 433 »       »       »       »       }) | 
|  | 434 | 
|  | 435 »       »       »       »       Convey(`With a byte limit of sizeof(0)+1, will r
     eturn log entry 0.`, func() { | 
|  | 436 »       »       »       »       »       req.ByteCount = frameSize(0) + 1 | 
|  | 437 | 
|  | 438 »       »       »       »       »       resp, err := svr.Get(c, &req) | 
|  | 439 »       »       »       »       »       So(err, ShouldBeRPCOK) | 
|  | 440 »       »       »       »       »       So(resp, shouldHaveLogs, 0) | 
|  | 441 »       »       »       »       }) | 
|  | 442 | 
|  | 443 »       »       »       »       Convey(`With a byte limit of sizeof({0, 1}), wil
     l return log entries {0, 1}.`, func() { | 
|  | 444 »       »       »       »       »       req.ByteCount = frameSize(0, 1) | 
|  | 445 | 
|  | 446 »       »       »       »       »       resp, err := svr.Get(c, &req) | 
|  | 447 »       »       »       »       »       So(err, ShouldBeRPCOK) | 
|  | 448 »       »       »       »       »       So(resp, shouldHaveLogs, 0, 1) | 
|  | 449 »       »       »       »       }) | 
|  | 450 | 
|  | 451 »       »       »       »       Convey(`With a byte limit of sizeof({0, 1, 2}), 
     will return log entries {0, 1, 2}.`, func() { | 
|  | 452 »       »       »       »       »       req.ByteCount = frameSize(0, 1, 2) | 
|  | 453 | 
|  | 454 »       »       »       »       »       resp, err := svr.Get(c, &req) | 
|  | 455 »       »       »       »       »       So(err, ShouldBeRPCOK) | 
|  | 456 »       »       »       »       »       So(resp, shouldHaveLogs, 0, 1, 2) | 
|  | 457 »       »       »       »       }) | 
|  | 458 | 
|  | 459 »       »       »       »       Convey(`With a byte limit of sizeof({0, 1, 2})+1
     , will return log entries {0, 1, 2}.`, func() { | 
|  | 460 »       »       »       »       »       req.ByteCount = frameSize(0, 1, 2) + 1 | 
|  | 461 | 
|  | 462 »       »       »       »       »       resp, err := svr.Get(c, &req) | 
|  | 463 »       »       »       »       »       So(err, ShouldBeRPCOK) | 
|  | 464 »       »       »       »       »       So(resp, shouldHaveLogs, 0, 1, 2) | 
|  | 465 »       »       »       »       }) | 
|  | 466 | 
|  | 467 »       »       »       »       Convey(`Will successfully retrieve a stream path
      hash.`, func() { | 
|  | 468 »       »       »       »       »       req.Path = ls.HashID | 
|  | 469 »       »       »       »       »       resp, err := svr.Get(c, &req) | 
|  | 470 »       »       »       »       »       So(err, ShouldBeRPCOK) | 
|  | 471 »       »       »       »       »       So(resp, shouldHaveLogs, 0, 1, 2) | 
|  | 472 »       »       »       »       }) | 
|  | 473 | 
|  | 474 »       »       »       »       Convey(`When requesting state`, func() { | 
|  | 475 »       »       »       »       »       req.State = true | 
|  | 476 »       »       »       »       »       req.LogCount = -1 | 
|  | 477 | 
|  | 478 »       »       »       »       »       Convey(`Will successfully retrieve strea
     m state.`, func() { | 
|  | 479 »       »       »       »       »       »       resp, err := svr.Get(c, &req) | 
|  | 480 »       »       »       »       »       »       So(err, ShouldBeRPCOK) | 
|  | 481 »       »       »       »       »       »       So(resp.State, ShouldResemble, l
     oadLogStreamState(ls)) | 
|  | 482 »       »       »       »       »       »       So(len(resp.Logs), ShouldEqual, 
     0) | 
|  | 483 »       »       »       »       »       }) | 
|  | 484 | 
|  | 485 »       »       »       »       »       Convey(`Will return Internal if the prot
     obuf descriptor data is corrupt.`, func() { | 
|  | 486 »       »       »       »       »       »       ls.SetDSValidate(false) | 
|  | 487 »       »       »       »       »       »       ls.Descriptor = []byte{0x00} // 
     Invalid protobuf, zero tag. | 
|  | 488 »       »       »       »       »       »       if err := di.Put(ls); err != nil
      { | 
|  | 489 »       »       »       »       »       »       »       panic(err) | 
|  | 490 »       »       »       »       »       »       } | 
|  | 491 | 
|  | 492 »       »       »       »       »       »       _, err := svr.Get(c, &req) | 
|  | 493 »       »       »       »       »       »       So(err, ShouldBeRPCInternal) | 
|  | 494 »       »       »       »       »       }) | 
|  | 495 »       »       »       »       }) | 
|  | 496 | 
|  | 497 »       »       »       »       Convey(`Will return Internal if the protobuf log
      entry data is corrupt.`, func() { | 
|  | 498 »       »       »       »       »       if archived { | 
|  | 499 »       »       »       »       »       »       // Corrupt the archive datastrea
     m. | 
|  | 500 »       »       »       »       »       »       stream := gsc.get("gs://testbuck
     et/stream") | 
|  | 501 »       »       »       »       »       »       zeroRecords(stream) | 
|  | 502 »       »       »       »       »       } else { | 
|  | 503 »       »       »       »       »       »       // Add corrupted entry to Storag
     e. Create a new entry here, since | 
|  | 504 »       »       »       »       »       »       // the storage will reject a dup
     licate/overwrite. | 
|  | 505 »       »       »       »       »       »       err := ms.Put(storage.PutRequest
     { | 
|  | 506 »       »       »       »       »       »       »       Project: project, | 
|  | 507 »       »       »       »       »       »       »       Path:    types.StreamPat
     h(req.Path), | 
|  | 508 »       »       »       »       »       »       »       Index:   666, | 
|  | 509 »       »       »       »       »       »       »       Values:  [][]byte{{0x00}
     }, // Invalid protobuf, zero tag. | 
|  | 510 »       »       »       »       »       »       }) | 
|  | 511 »       »       »       »       »       »       if err != nil { | 
|  | 512 »       »       »       »       »       »       »       panic(err) | 
|  | 513 »       »       »       »       »       »       } | 
|  | 514 »       »       »       »       »       »       req.Index = 666 | 
|  | 515 »       »       »       »       »       } | 
|  | 516 | 
|  | 517 »       »       »       »       »       _, err := svr.Get(c, &req) | 
| 441                                         So(err, ShouldBeRPCInternal) | 518                                         So(err, ShouldBeRPCInternal) | 
| 442                                 }) | 519                                 }) | 
| 443 »       »       »       }) | 520 | 
| 444 | 521 »       »       »       »       Convey(`Will successfully retrieve both logs and
      stream state.`, func() { | 
| 445 »       »       »       Convey(`Will return Internal if the protobuf log entry d
     ata is corrupt.`, func() { | 522 »       »       »       »       »       req.State = true | 
| 446 »       »       »       »       if archived { | 523 | 
| 447 »       »       »       »       »       // Corrupt the archive datastream. | 524 »       »       »       »       »       resp, err := svr.Get(c, &req) | 
| 448 »       »       »       »       »       stream := gsc.get("gs://testbucket/strea
     m") | 525 »       »       »       »       »       So(err, ShouldBeRPCOK) | 
| 449 »       »       »       »       »       zeroRecords(stream) | 526 »       »       »       »       »       So(resp.State, ShouldResemble, loadLogSt
     reamState(ls)) | 
| 450 »       »       »       »       } else { | 527 »       »       »       »       »       So(resp, shouldHaveLogs, 0, 1, 2) | 
| 451 »       »       »       »       »       // Add corrupted entry to Storage. Creat
     e a new entry here, since | 528 »       »       »       »       }) | 
| 452 »       »       »       »       »       // the storage will reject a duplicate/o
     verwrite. | 529 | 
| 453 »       »       »       »       »       err := ms.Put(storage.PutRequest{ | 530 »       »       »       »       Convey(`Will return Internal if the Storage is n
     ot working.`, func() { | 
| 454 »       »       »       »       »       »       Path:   types.StreamPath(req.Pat
     h), | 531 »       »       »       »       »       if archived { | 
| 455 »       »       »       »       »       »       Index:  666, | 532 »       »       »       »       »       »       gsc["error"] = []byte("test erro
     r") | 
| 456 »       »       »       »       »       »       Values: [][]byte{{0x00}}, // Inv
     alid protobuf, zero tag. | 533 »       »       »       »       »       } else { | 
| 457 »       »       »       »       »       }) | 534 »       »       »       »       »       »       ms.Close() | 
| 458 »       »       »       »       »       if err != nil { |  | 
| 459 »       »       »       »       »       »       panic(err) |  | 
| 460                                         } | 535                                         } | 
| 461 »       »       »       »       »       req.Index = 666 | 536 | 
| 462 »       »       »       »       } | 537 »       »       »       »       »       _, err := svr.Get(c, &req) | 
| 463 | 538 »       »       »       »       »       So(err, ShouldBeRPCInternal) | 
| 464 »       »       »       »       _, err := s.Get(c, &req) | 539 »       »       »       »       }) | 
| 465 »       »       »       »       So(err, ShouldBeRPCInternal) | 540 | 
| 466 »       »       »       }) | 541 »       »       »       »       Convey(`Will enforce a maximum count of 2.`, fun
     c() { | 
| 467 | 542 »       »       »       »       »       req.LogCount = 2 | 
| 468 »       »       »       Convey(`Will successfully retrieve both logs and stream 
     state.`, func() { | 543 »       »       »       »       »       resp, err := svr.Get(c, &req) | 
| 469 »       »       »       »       req.State = true | 544 »       »       »       »       »       So(err, ShouldBeRPCOK) | 
| 470 | 545 »       »       »       »       »       So(resp, shouldHaveLogs, 0, 1) | 
| 471 »       »       »       »       resp, err := s.Get(c, &req) | 546 »       »       »       »       }) | 
| 472 »       »       »       »       So(err, ShouldBeRPCOK) | 547 | 
| 473 »       »       »       »       So(resp.State, ShouldResemble, loadLogStreamStat
     e(ls)) | 548 »       »       »       »       Convey(`When requesting protobufs`, func() { | 
| 474 »       »       »       »       So(resp, shouldHaveLogs, 0, 1, 2) | 549 »       »       »       »       »       req.State = true | 
| 475 »       »       »       }) | 550 | 
| 476 | 551 »       »       »       »       »       resp, err := svr.Get(c, &req) | 
| 477 »       »       »       Convey(`Will return Internal if the Storage is not worki
     ng.`, func() { | 552 »       »       »       »       »       So(err, ShouldBeRPCOK) | 
| 478 »       »       »       »       if archived { | 553 »       »       »       »       »       So(resp, shouldHaveLogs, 0, 1, 2) | 
| 479 »       »       »       »       »       gsc["error"] = []byte("test error") | 554 | 
| 480 »       »       »       »       } else { | 555 »       »       »       »       »       // Confirm that this has protobufs. | 
| 481 »       »       »       »       »       ms.Close() | 556 »       »       »       »       »       So(len(resp.Logs), ShouldEqual, 3) | 
| 482 »       »       »       »       } | 557 »       »       »       »       »       So(resp.Logs[0], ShouldNotBeNil) | 
| 483 | 558 | 
| 484 »       »       »       »       _, err := s.Get(c, &req) | 559 »       »       »       »       »       // Confirm that there is a descriptor pr
     otobuf. | 
| 485 »       »       »       »       So(err, ShouldBeRPCInternal) | 560 »       »       »       »       »       So(resp.Desc, ShouldResemble, desc) | 
| 486 »       »       »       }) | 561 | 
| 487 | 562 »       »       »       »       »       // Confirm that the state was returned. | 
| 488 »       »       »       Convey(`Will enforce a maximum count of 2.`, func() { | 563 »       »       »       »       »       So(resp.State, ShouldNotBeNil) | 
| 489 »       »       »       »       req.LogCount = 2 | 564 »       »       »       »       }) | 
| 490 »       »       »       »       resp, err := s.Get(c, &req) | 565 | 
| 491 »       »       »       »       So(err, ShouldBeRPCOK) | 566 »       »       »       »       Convey(`Will successfully retrieve all records i
     f non-contiguous is allowed.`, func() { | 
| 492 »       »       »       »       So(resp, shouldHaveLogs, 0, 1) | 567 »       »       »       »       »       req.NonContiguous = true | 
| 493 »       »       »       }) | 568 »       »       »       »       »       resp, err := svr.Get(c, &req) | 
| 494 | 569 »       »       »       »       »       So(err, ShouldBeRPCOK) | 
| 495 »       »       »       Convey(`When requesting protobufs`, func() { | 570 »       »       »       »       »       So(resp, shouldHaveLogs, 0, 1, 2, 4, 5, 
     7) | 
| 496 »       »       »       »       req.State = true | 571 »       »       »       »       }) | 
| 497 | 572 | 
| 498 »       »       »       »       resp, err := s.Get(c, &req) | 573 »       »       »       »       Convey(`When newlines are not requested, does no
     t include delimiters.`, func() { | 
| 499 »       »       »       »       So(err, ShouldBeRPCOK) | 574 »       »       »       »       »       req.LogCount = 1 | 
| 500 »       »       »       »       So(resp, shouldHaveLogs, 0, 1, 2) | 575 | 
| 501 | 576 »       »       »       »       »       resp, err := svr.Get(c, &req) | 
| 502 »       »       »       »       // Confirm that this has protobufs. | 577 »       »       »       »       »       So(err, ShouldBeRPCOK) | 
| 503 »       »       »       »       So(len(resp.Logs), ShouldEqual, 3) | 578 »       »       »       »       »       So(resp, shouldHaveLogs, 0) | 
| 504 »       »       »       »       So(resp.Logs[0], ShouldNotBeNil) | 579 | 
| 505 | 580 »       »       »       »       »       So(resp.Logs[0].GetText(), ShouldResembl
     e, &logpb.Text{ | 
| 506 »       »       »       »       // Confirm that there is a descriptor protobuf. | 581 »       »       »       »       »       »       Lines: []*logpb.Text_Line{ | 
| 507 »       »       »       »       So(resp.Desc, ShouldResemble, desc) | 582 »       »       »       »       »       »       »       {"log entry #0", "\n"}, | 
| 508 | 583 »       »       »       »       »       »       »       {"another line of text",
      ""}, | 
| 509 »       »       »       »       // Confirm that the state was returned. | 584 »       »       »       »       »       »       }, | 
| 510 »       »       »       »       So(resp.State, ShouldNotBeNil) | 585 »       »       »       »       »       }) | 
| 511 »       »       »       }) | 586 »       »       »       »       }) | 
| 512 | 587 | 
| 513 »       »       »       Convey(`Will successfully retrieve all records if non-co
     ntiguous is allowed.`, func() { | 588 »       »       »       »       Convey(`Will get a Binary LogEntry`, func() { | 
| 514 »       »       »       »       req.NonContiguous = true | 589 »       »       »       »       »       req.Index = 4 | 
| 515 »       »       »       »       resp, err := s.Get(c, &req) | 590 »       »       »       »       »       req.LogCount = 1 | 
| 516 »       »       »       »       So(err, ShouldBeRPCOK) | 591 »       »       »       »       »       resp, err := svr.Get(c, &req) | 
| 517 »       »       »       »       So(resp, shouldHaveLogs, 0, 1, 2, 4, 5, 7) | 592 »       »       »       »       »       So(err, ShouldBeRPCOK) | 
| 518 »       »       »       }) | 593 »       »       »       »       »       So(resp, shouldHaveLogs, 4) | 
| 519 | 594 »       »       »       »       »       So(resp.Logs[0].GetBinary(), ShouldResem
     ble, &logpb.Binary{ | 
| 520 »       »       »       Convey(`When newlines are not requested, does not includ
     e delimiters.`, func() { | 595 »       »       »       »       »       »       Data: []byte{0x00, 0x01, 0x02, 0
     x03}, | 
| 521 »       »       »       »       req.LogCount = 1 | 596 »       »       »       »       »       }) | 
| 522 | 597 »       »       »       »       }) | 
| 523 »       »       »       »       resp, err := s.Get(c, &req) | 598 | 
| 524 »       »       »       »       So(err, ShouldBeRPCOK) | 599 »       »       »       »       Convey(`Will get a Datagram LogEntry`, func() { | 
| 525 »       »       »       »       So(resp, shouldHaveLogs, 0) | 600 »       »       »       »       »       req.Index = 5 | 
| 526 | 601 »       »       »       »       »       req.LogCount = 1 | 
| 527 »       »       »       »       So(resp.Logs[0].GetText(), ShouldResemble, &logp
     b.Text{ | 602 »       »       »       »       »       resp, err := svr.Get(c, &req) | 
| 528 »       »       »       »       »       Lines: []*logpb.Text_Line{ | 603 »       »       »       »       »       So(err, ShouldBeRPCOK) | 
| 529 »       »       »       »       »       »       {"log entry #0", "\n"}, | 604 »       »       »       »       »       So(resp, shouldHaveLogs, 5) | 
| 530 »       »       »       »       »       »       {"another line of text", ""}, | 605 »       »       »       »       »       So(resp.Logs[0].GetDatagram(), ShouldRes
     emble, &logpb.Datagram{ | 
| 531 »       »       »       »       »       }, | 606 »       »       »       »       »       »       Data: []byte{0x00, 0x01, 0x02, 0
     x03}, | 
| 532 »       »       »       »       }) | 607 »       »       »       »       »       »       Partial: &logpb.Datagram_Partial
     { | 
| 533 »       »       »       }) | 608 »       »       »       »       »       »       »       Index: 2, | 
| 534 | 609 »       »       »       »       »       »       »       Size:  1024, | 
| 535 »       »       »       Convey(`Will get a Binary LogEntry`, func() { | 610 »       »       »       »       »       »       »       Last:  false, | 
| 536 »       »       »       »       req.Index = 4 | 611 »       »       »       »       »       »       }, | 
| 537 »       »       »       »       req.LogCount = 1 | 612 »       »       »       »       »       }) | 
| 538 »       »       »       »       resp, err := s.Get(c, &req) | 613 »       »       »       »       }) | 
| 539 »       »       »       »       So(err, ShouldBeRPCOK) | 614 »       »       »       }) | 
| 540 »       »       »       »       So(resp, shouldHaveLogs, 4) | 615 | 
| 541 »       »       »       »       So(resp.Logs[0].GetBinary(), ShouldResemble, &lo
     gpb.Binary{ | 616 »       »       »       Convey(`Testing tail requests`, func() { | 
| 542 »       »       »       »       »       Data: []byte{0x00, 0x01, 0x02, 0x03}, | 617 »       »       »       »       req := logdog.TailRequest{ | 
| 543 »       »       »       »       }) | 618 »       »       »       »       »       Project: "test-project", | 
| 544 »       »       »       }) | 619 »       »       »       »       »       Path:    string(ls.Path()), | 
| 545 | 620 »       »       »       »       } | 
| 546 »       »       »       Convey(`Will get a Datagram LogEntry`, func() { | 621 | 
| 547 »       »       »       »       req.Index = 5 | 622 »       »       »       »       Convey(`Will successfully retrieve a stream path
     .`, func() { | 
| 548 »       »       »       »       req.LogCount = 1 | 623 »       »       »       »       »       resp, err := svr.Tail(c, &req) | 
| 549 »       »       »       »       resp, err := s.Get(c, &req) | 624 »       »       »       »       »       So(err, ShouldBeRPCOK) | 
| 550 »       »       »       »       So(err, ShouldBeRPCOK) | 625 »       »       »       »       »       So(resp, shouldHaveLogs, 7) | 
| 551 »       »       »       »       So(resp, shouldHaveLogs, 5) | 626 »       »       »       »       }) | 
| 552 »       »       »       »       So(resp.Logs[0].GetDatagram(), ShouldResemble, &
     logpb.Datagram{ | 627 | 
| 553 »       »       »       »       »       Data: []byte{0x00, 0x01, 0x02, 0x03}, | 628 »       »       »       »       Convey(`Will successfully retrieve a stream path
      hash and state.`, func() { | 
| 554 »       »       »       »       »       Partial: &logpb.Datagram_Partial{ | 629 »       »       »       »       »       req.Path = ls.HashID | 
| 555 »       »       »       »       »       »       Index: 2, | 630 »       »       »       »       »       req.State = true | 
| 556 »       »       »       »       »       »       Size:  1024, | 631 | 
| 557 »       »       »       »       »       »       Last:  false, | 632 »       »       »       »       »       resp, err := svr.Tail(c, &req) | 
| 558 »       »       »       »       »       }, | 633 »       »       »       »       »       So(err, ShouldBeRPCOK) | 
| 559 »       »       »       »       }) | 634 »       »       »       »       »       So(resp, shouldHaveLogs, 7) | 
| 560 »       »       »       }) | 635 »       »       »       »       »       So(resp.State, ShouldResemble, loadLogSt
     reamState(ls)) | 
| 561 »       »       }) | 636 »       »       »       »       }) | 
| 562 |  | 
| 563 »       »       Convey(`Testing tail requests`, func() { |  | 
| 564 »       »       »       req := logdog.TailRequest{ |  | 
| 565 »       »       »       »       Path: string(ls.Path()), |  | 
| 566 »       »       »       } |  | 
| 567 |  | 
| 568 »       »       »       Convey(`Will successfully retrieve a stream path.`, func
     () { |  | 
| 569 »       »       »       »       resp, err := s.Tail(c, &req) |  | 
| 570 »       »       »       »       So(err, ShouldBeRPCOK) |  | 
| 571 »       »       »       »       So(resp, shouldHaveLogs, 7) |  | 
| 572 »       »       »       }) |  | 
| 573 |  | 
| 574 »       »       »       Convey(`Will successfully retrieve a stream path hash an
     d state.`, func() { |  | 
| 575 »       »       »       »       req.Path = ls.HashID |  | 
| 576 »       »       »       »       req.State = true |  | 
| 577 |  | 
| 578 »       »       »       »       resp, err := s.Tail(c, &req) |  | 
| 579 »       »       »       »       So(err, ShouldBeRPCOK) |  | 
| 580 »       »       »       »       So(resp, shouldHaveLogs, 7) |  | 
| 581 »       »       »       »       So(resp.State, ShouldResemble, loadLogStreamStat
     e(ls)) |  | 
| 582                         }) | 637                         }) | 
| 583                 }) | 638                 }) | 
| 584         }) | 639         }) | 
| 585 } | 640 } | 
| 586 | 641 | 
| 587 func TestGetIntermediate(t *testing.T) { | 642 func TestGetIntermediate(t *testing.T) { | 
| 588         t.Parallel() | 643         t.Parallel() | 
| 589 | 644 | 
| 590         testGetImpl(t, false) | 645         testGetImpl(t, false) | 
| 591 } | 646 } | 
| 592 | 647 | 
| 593 func TestGetArchived(t *testing.T) { | 648 func TestGetArchived(t *testing.T) { | 
| 594         t.Parallel() | 649         t.Parallel() | 
| 595 | 650 | 
| 596 »       testGetImpl(t, true) | 651 »       testGetImpl(t, false) | 
| 597 } | 652 } | 
| OLD | NEW | 
|---|