| Index: logdog/common/storage/archive/storage_test.go
|
| diff --git a/logdog/common/storage/archive/storage_test.go b/logdog/common/storage/archive/storage_test.go
|
| new file mode 100644
|
| index 0000000000000000000000000000000000000000..c774c3e45c1f0bc5bfc3f382463a2178f3dbd3d4
|
| --- /dev/null
|
| +++ b/logdog/common/storage/archive/storage_test.go
|
| @@ -0,0 +1,368 @@
|
| +// Copyright 2016 The LUCI Authors. All rights reserved.
|
| +// Use of this source code is governed under the Apache License, Version 2.0
|
| +// that can be found in the LICENSE file.
|
| +
|
| +package archive
|
| +
|
| +import (
|
| + "bytes"
|
| + "fmt"
|
| + "io"
|
| + "io/ioutil"
|
| + "testing"
|
| +
|
| + "github.com/luci/luci-go/common/errors"
|
| + "github.com/luci/luci-go/common/gcloud/gs"
|
| + "github.com/luci/luci-go/logdog/api/logpb"
|
| + "github.com/luci/luci-go/logdog/common/archive"
|
| + "github.com/luci/luci-go/logdog/common/renderer"
|
| + "github.com/luci/luci-go/logdog/common/storage"
|
| +
|
| + cloudStorage "cloud.google.com/go/storage"
|
| + "github.com/golang/protobuf/proto"
|
| + "golang.org/x/net/context"
|
| +
|
| + . "github.com/luci/luci-go/common/testing/assertions"
|
| + . "github.com/smartystreets/goconvey/convey"
|
| +)
|
| +
|
| +const (
|
| + testIndexURL = "gs://+/index"
|
| + testStreamURL = "gs://+/stream"
|
| +)
|
| +
|
| +type logStreamGenerator struct {
|
| + lines []string
|
| +
|
| + indexBuf bytes.Buffer
|
| + streamBuf bytes.Buffer
|
| +}
|
| +
|
| +func (g *logStreamGenerator) lineFromEntry(e *storage.Entry) string {
|
| + le, err := e.GetLogEntry()
|
| + if err != nil {
|
| + panic(err)
|
| + }
|
| +
|
| + text := le.GetText()
|
| + if text == nil || len(text.Lines) != 1 {
|
| + panic(fmt.Errorf("bad generated log entry: %#v", le))
|
| + }
|
| + return text.Lines[0].Value
|
| +}
|
| +
|
| +func (g *logStreamGenerator) generate(lines ...string) {
|
| + logEntries := make([]*logpb.LogEntry, len(lines))
|
| + for i, line := range lines {
|
| + logEntries[i] = &logpb.LogEntry{
|
| + PrefixIndex: uint64(i),
|
| + StreamIndex: uint64(i),
|
| + Content: &logpb.LogEntry_Text{
|
| + Text: &logpb.Text{
|
| + Lines: []*logpb.Text_Line{
|
| + {Value: line, Delimiter: "\n"},
|
| + },
|
| + },
|
| + },
|
| + }
|
| + }
|
| +
|
| + g.lines = lines
|
| + g.indexBuf.Reset()
|
| + g.streamBuf.Reset()
|
| + src := renderer.StaticSource(logEntries)
|
| + err := archive.Archive(archive.Manifest{
|
| + Desc: &logpb.LogStreamDescriptor{
|
| + Prefix: "prefix",
|
| + Name: "name",
|
| + },
|
| + Source: &src,
|
| + LogWriter: &g.streamBuf,
|
| + IndexWriter: &g.indexBuf,
|
| + })
|
| + if err != nil {
|
| + panic(err)
|
| + }
|
| +}
|
| +
|
| +func (g *logStreamGenerator) pruneIndexHints() {
|
| + g.modIndex(func(idx *logpb.LogIndex) {
|
| + idx.LastPrefixIndex = 0
|
| + idx.LastStreamIndex = 0
|
| + idx.LogEntryCount = 0
|
| + })
|
| +}
|
| +
|
| +func (g *logStreamGenerator) sparseIndex(indices ...uint64) {
|
| + idxMap := make(map[uint64]struct{}, len(indices))
|
| + for _, i := range indices {
|
| + idxMap[i] = struct{}{}
|
| + }
|
| +
|
| + g.modIndex(func(idx *logpb.LogIndex) {
|
| + entries := make([]*logpb.LogIndex_Entry, 0, len(idx.Entries))
|
| + for _, entry := range idx.Entries {
|
| + if _, ok := idxMap[entry.StreamIndex]; ok {
|
| + entries = append(entries, entry)
|
| + }
|
| + }
|
| + idx.Entries = entries
|
| + })
|
| +}
|
| +
|
| +func (g *logStreamGenerator) modIndex(fn func(*logpb.LogIndex)) {
|
| + var index logpb.LogIndex
|
| + if err := proto.Unmarshal(g.indexBuf.Bytes(), &index); err != nil {
|
| + panic(err)
|
| + }
|
| + fn(&index)
|
| + data, err := proto.Marshal(&index)
|
| + if err != nil {
|
| + panic(err)
|
| + }
|
| + g.indexBuf.Reset()
|
| + if _, err := g.indexBuf.Write(data); err != nil {
|
| + panic(err)
|
| + }
|
| +}
|
| +
|
| +type errReader struct {
|
| + io.Reader
|
| + err error
|
| +}
|
| +
|
| +func (r *errReader) Read(d []byte) (int, error) {
|
| + if r.err != nil {
|
| + return 0, r.err
|
| + }
|
| + return r.Reader.Read(d)
|
| +}
|
| +
|
| +type fakeGSClient struct {
|
| + gs.Client
|
| +
|
| + index []byte
|
| + stream []byte
|
| +
|
| + closed bool
|
| +
|
| + err error
|
| + indexErr error
|
| + streamErr error
|
| +}
|
| +
|
| +func (c *fakeGSClient) assertNotClosed() {
|
| + if c.closed {
|
| + panic(errors.New("client is closed"))
|
| + }
|
| +}
|
| +
|
| +func (c *fakeGSClient) load(g *logStreamGenerator) {
|
| + c.index = append([]byte{}, g.indexBuf.Bytes()...)
|
| + c.stream = append([]byte{}, g.streamBuf.Bytes()...)
|
| +}
|
| +
|
| +func (c *fakeGSClient) Close() error {
|
| + c.assertNotClosed()
|
| + c.closed = true
|
| + return nil
|
| +}
|
| +
|
| +func (c *fakeGSClient) NewReader(p gs.Path, offset, length int64) (io.ReadCloser, error) {
|
| + c.assertNotClosed()
|
| +
|
| + // If we have a client-level error, return it.
|
| + if c.err != nil {
|
| + return nil, c.err
|
| + }
|
| +
|
| + var (
|
| + data []byte
|
| + readerErr error
|
| + )
|
| + switch string(p) {
|
| + case testIndexURL:
|
| + data, readerErr = c.index, c.indexErr
|
| + case testStreamURL:
|
| + data, readerErr = c.stream, c.streamErr
|
| + default:
|
| + return nil, cloudStorage.ErrObjectNotExist
|
| + }
|
| +
|
| + if offset >= 0 {
|
| + if offset >= int64(len(data)) {
|
| + offset = int64(len(data))
|
| + }
|
| + data = data[offset:]
|
| + }
|
| +
|
| + if length >= 0 {
|
| + if length > int64(len(data)) {
|
| + length = int64(len(data))
|
| + }
|
| + data = data[:length]
|
| + }
|
| + return ioutil.NopCloser(&errReader{bytes.NewReader(data), readerErr}), nil
|
| +}
|
| +
|
| +func TestArchiveStorage(t *testing.T) {
|
| + t.Parallel()
|
| +
|
| + Convey(`A testing archive instance`, t, func() {
|
| + var (
|
| + c = context.Background()
|
| + client fakeGSClient
|
| + gen logStreamGenerator
|
| + )
|
| + defer client.Close()
|
| +
|
| + opts := Options{
|
| + IndexURL: testIndexURL,
|
| + StreamURL: testStreamURL,
|
| + Client: &client,
|
| + }
|
| + st, err := New(c, opts)
|
| + So(err, ShouldBeNil)
|
| + defer st.Close()
|
| +
|
| + stImpl := st.(*storageImpl)
|
| +
|
| + Convey(`Will fail to configure with ErrReadOnly`, func() {
|
| + So(st.Config(storage.Config{}), ShouldEqual, storage.ErrReadOnly)
|
| + })
|
| +
|
| + Convey(`Will fail to Put with ErrReadOnly`, func() {
|
| + So(st.Put(storage.PutRequest{}), ShouldEqual, storage.ErrReadOnly)
|
| + })
|
| +
|
| + Convey(`Given a stream with 5 log entries`, func() {
|
| + gen.generate("foo", "bar", "baz", "qux", "quux")
|
| +
|
| + // Basic test cases.
|
| + for _, tc := range []struct {
|
| + title string
|
| + mod func()
|
| + }{
|
| + {`Complete index`, func() {}},
|
| + {`Empty index protobuf`, func() { gen.sparseIndex() }},
|
| + {`No index provided`, func() { stImpl.indexPath = "" }},
|
| + {`Invalid index path`, func() { stImpl.indexPath = "does-not-exist" }},
|
| + {`Sparse index with a start and terminal entry`, func() { gen.sparseIndex(0, 2, 4) }},
|
| + {`Sparse index with a terminal entry`, func() { gen.sparseIndex(1, 3, 4) }},
|
| + {`Sparse index missing a terminal entry`, func() { gen.sparseIndex(1, 3) }},
|
| + } {
|
| + Convey(fmt.Sprintf(`Test Case: %q`, tc.title), func() {
|
| + tc.mod()
|
| +
|
| + // Run through per-testcase variant set.
|
| + for _, variant := range []struct {
|
| + title string
|
| + mod func()
|
| + }{
|
| + {"with hints", func() {}},
|
| + {"without hints", func() { gen.pruneIndexHints() }},
|
| + } {
|
| + Convey(variant.title, func() {
|
| + variant.mod()
|
| + client.load(&gen)
|
| +
|
| + var entries []string
|
| + collect := func(e *storage.Entry) bool {
|
| + entries = append(entries, gen.lineFromEntry(e))
|
| + return true
|
| + }
|
| +
|
| + Convey(`Can Get [0..]`, func() {
|
| + So(st.Get(storage.GetRequest{}, collect), ShouldBeNil)
|
| + So(entries, ShouldResemble, gen.lines)
|
| + })
|
| +
|
| + Convey(`Can Get [1..].`, func() {
|
| + So(st.Get(storage.GetRequest{Index: 1}, collect), ShouldBeNil)
|
| + So(entries, ShouldResemble, gen.lines[1:])
|
| + })
|
| +
|
| + Convey(`Can Get [1..2].`, func() {
|
| + So(st.Get(storage.GetRequest{Index: 1, Limit: 2}, collect), ShouldBeNil)
|
| + So(entries, ShouldResemble, gen.lines[1:3])
|
| + })
|
| +
|
| + Convey(`Can Get [5..].`, func() {
|
| + So(st.Get(storage.GetRequest{Index: 5}, collect), ShouldBeNil)
|
| + So(entries, ShouldHaveLength, 0)
|
| + })
|
| +
|
| + Convey(`Can Get [4].`, func() {
|
| + So(st.Get(storage.GetRequest{Index: 4, Limit: 1}, collect), ShouldBeNil)
|
| + So(entries, ShouldResemble, gen.lines[4:])
|
| + })
|
| +
|
| + Convey(`Can tail.`, func() {
|
| + e, err := st.Tail("", "")
|
| + So(err, ShouldBeNil)
|
| + So(gen.lineFromEntry(e), ShouldEqual, gen.lines[len(gen.lines)-1])
|
| + })
|
| + })
|
| + }
|
| + })
|
| + }
|
| + })
|
| +
|
| + // Individual error test cases.
|
| + for _, tc := range []struct {
|
| + title string
|
| + fn func() error
|
| + }{
|
| + {"Get", func() error { return st.Get(storage.GetRequest{}, nil) }},
|
| + {"Tail", func() (err error) {
|
| + _, err = st.Tail("", "")
|
| + return
|
| + }},
|
| + } {
|
| + Convey(fmt.Sprintf("Error case: %q", tc.title), func() {
|
| + Convey(`With missing log stream returns ErrDoesNotExist.`, func() {
|
| + stImpl.streamPath = "does-not-exist"
|
| +
|
| + So(st.Get(storage.GetRequest{}, nil), ShouldEqual, storage.ErrDoesNotExist)
|
| + })
|
| +
|
| + Convey(`With a client error returns that error.`, func() {
|
| + client.err = errors.New("test error")
|
| +
|
| + So(errors.Unwrap(tc.fn()), ShouldEqual, client.err)
|
| + })
|
| +
|
| + Convey(`With an index reader error returns that error.`, func() {
|
| + client.indexErr = errors.New("test error")
|
| +
|
| + So(errors.Unwrap(tc.fn()), ShouldEqual, client.indexErr)
|
| + })
|
| +
|
| + Convey(`With an stream reader error returns that error.`, func() {
|
| + client.streamErr = errors.New("test error")
|
| +
|
| + So(errors.Unwrap(tc.fn()), ShouldEqual, client.streamErr)
|
| + })
|
| +
|
| + Convey(`With junk index data returns an error.`, func() {
|
| + client.index = []byte{0x00}
|
| +
|
| + So(tc.fn(), ShouldErrLike, "failed to unmarshal index")
|
| + })
|
| +
|
| + Convey(`With junk stream data returns an error.`, func() {
|
| + client.stream = []byte{0x00, 0x01, 0xff}
|
| +
|
| + So(tc.fn(), ShouldErrLike, "failed to unmarshal")
|
| + })
|
| + })
|
| + }
|
| +
|
| + Convey(`Tail with no log entries returns ErrDoesNotExist.`, func() {
|
| + client.load(&gen)
|
| +
|
| + _, err := st.Tail("", "")
|
| + So(err, ShouldEqual, storage.ErrDoesNotExist)
|
| + })
|
| + })
|
| +}
|
|
|