| OLD | NEW |
| (Empty) | |
| 1 // Copyright 2016 The LUCI Authors. All rights reserved. |
| 2 // Use of this source code is governed under the Apache License, Version 2.0 |
| 3 // that can be found in the LICENSE file. |
| 4 |
| 5 package archive |
| 6 |
| 7 import ( |
| 8 "bytes" |
| 9 "fmt" |
| 10 "io" |
| 11 "io/ioutil" |
| 12 "testing" |
| 13 |
| 14 "github.com/luci/luci-go/common/errors" |
| 15 "github.com/luci/luci-go/common/gcloud/gs" |
| 16 "github.com/luci/luci-go/logdog/api/logpb" |
| 17 "github.com/luci/luci-go/logdog/common/archive" |
| 18 "github.com/luci/luci-go/logdog/common/renderer" |
| 19 "github.com/luci/luci-go/logdog/common/storage" |
| 20 |
| 21 cloudStorage "cloud.google.com/go/storage" |
| 22 "github.com/golang/protobuf/proto" |
| 23 "golang.org/x/net/context" |
| 24 |
| 25 . "github.com/luci/luci-go/common/testing/assertions" |
| 26 . "github.com/smartystreets/goconvey/convey" |
| 27 ) |
| 28 |
| 29 const ( |
| 30 testIndexURL = "gs://+/index" |
| 31 testStreamURL = "gs://+/stream" |
| 32 ) |
| 33 |
| 34 type logStreamGenerator struct { |
| 35 lines []string |
| 36 |
| 37 indexBuf bytes.Buffer |
| 38 streamBuf bytes.Buffer |
| 39 } |
| 40 |
| 41 func (g *logStreamGenerator) lineFromEntry(e *storage.Entry) string { |
| 42 le, err := e.GetLogEntry() |
| 43 if err != nil { |
| 44 panic(err) |
| 45 } |
| 46 |
| 47 text := le.GetText() |
| 48 if text == nil || len(text.Lines) != 1 { |
| 49 panic(fmt.Errorf("bad generated log entry: %#v", le)) |
| 50 } |
| 51 return text.Lines[0].Value |
| 52 } |
| 53 |
| 54 func (g *logStreamGenerator) generate(lines ...string) { |
| 55 logEntries := make([]*logpb.LogEntry, len(lines)) |
| 56 for i, line := range lines { |
| 57 logEntries[i] = &logpb.LogEntry{ |
| 58 PrefixIndex: uint64(i), |
| 59 StreamIndex: uint64(i), |
| 60 Content: &logpb.LogEntry_Text{ |
| 61 Text: &logpb.Text{ |
| 62 Lines: []*logpb.Text_Line{ |
| 63 {Value: line, Delimiter: "\n"}, |
| 64 }, |
| 65 }, |
| 66 }, |
| 67 } |
| 68 } |
| 69 |
| 70 g.lines = lines |
| 71 g.indexBuf.Reset() |
| 72 g.streamBuf.Reset() |
| 73 src := renderer.StaticSource(logEntries) |
| 74 err := archive.Archive(archive.Manifest{ |
| 75 Desc: &logpb.LogStreamDescriptor{ |
| 76 Prefix: "prefix", |
| 77 Name: "name", |
| 78 }, |
| 79 Source: &src, |
| 80 LogWriter: &g.streamBuf, |
| 81 IndexWriter: &g.indexBuf, |
| 82 }) |
| 83 if err != nil { |
| 84 panic(err) |
| 85 } |
| 86 } |
| 87 |
| 88 func (g *logStreamGenerator) pruneIndexHints() { |
| 89 g.modIndex(func(idx *logpb.LogIndex) { |
| 90 idx.LastPrefixIndex = 0 |
| 91 idx.LastStreamIndex = 0 |
| 92 idx.LogEntryCount = 0 |
| 93 }) |
| 94 } |
| 95 |
| 96 func (g *logStreamGenerator) sparseIndex(indices ...uint64) { |
| 97 idxMap := make(map[uint64]struct{}, len(indices)) |
| 98 for _, i := range indices { |
| 99 idxMap[i] = struct{}{} |
| 100 } |
| 101 |
| 102 g.modIndex(func(idx *logpb.LogIndex) { |
| 103 entries := make([]*logpb.LogIndex_Entry, 0, len(idx.Entries)) |
| 104 for _, entry := range idx.Entries { |
| 105 if _, ok := idxMap[entry.StreamIndex]; ok { |
| 106 entries = append(entries, entry) |
| 107 } |
| 108 } |
| 109 idx.Entries = entries |
| 110 }) |
| 111 } |
| 112 |
| 113 func (g *logStreamGenerator) modIndex(fn func(*logpb.LogIndex)) { |
| 114 var index logpb.LogIndex |
| 115 if err := proto.Unmarshal(g.indexBuf.Bytes(), &index); err != nil { |
| 116 panic(err) |
| 117 } |
| 118 fn(&index) |
| 119 data, err := proto.Marshal(&index) |
| 120 if err != nil { |
| 121 panic(err) |
| 122 } |
| 123 g.indexBuf.Reset() |
| 124 if _, err := g.indexBuf.Write(data); err != nil { |
| 125 panic(err) |
| 126 } |
| 127 } |
| 128 |
| 129 type errReader struct { |
| 130 io.Reader |
| 131 err error |
| 132 } |
| 133 |
| 134 func (r *errReader) Read(d []byte) (int, error) { |
| 135 if r.err != nil { |
| 136 return 0, r.err |
| 137 } |
| 138 return r.Reader.Read(d) |
| 139 } |
| 140 |
| 141 type fakeGSClient struct { |
| 142 gs.Client |
| 143 |
| 144 index []byte |
| 145 stream []byte |
| 146 |
| 147 closed bool |
| 148 |
| 149 err error |
| 150 indexErr error |
| 151 streamErr error |
| 152 } |
| 153 |
| 154 func (c *fakeGSClient) assertNotClosed() { |
| 155 if c.closed { |
| 156 panic(errors.New("client is closed")) |
| 157 } |
| 158 } |
| 159 |
| 160 func (c *fakeGSClient) load(g *logStreamGenerator) { |
| 161 c.index = append([]byte{}, g.indexBuf.Bytes()...) |
| 162 c.stream = append([]byte{}, g.streamBuf.Bytes()...) |
| 163 } |
| 164 |
| 165 func (c *fakeGSClient) Close() error { |
| 166 c.assertNotClosed() |
| 167 c.closed = true |
| 168 return nil |
| 169 } |
| 170 |
| 171 func (c *fakeGSClient) NewReader(p gs.Path, offset, length int64) (io.ReadCloser
, error) { |
| 172 c.assertNotClosed() |
| 173 |
| 174 // If we have a client-level error, return it. |
| 175 if c.err != nil { |
| 176 return nil, c.err |
| 177 } |
| 178 |
| 179 var ( |
| 180 data []byte |
| 181 readerErr error |
| 182 ) |
| 183 switch string(p) { |
| 184 case testIndexURL: |
| 185 data, readerErr = c.index, c.indexErr |
| 186 case testStreamURL: |
| 187 data, readerErr = c.stream, c.streamErr |
| 188 default: |
| 189 return nil, cloudStorage.ErrObjectNotExist |
| 190 } |
| 191 |
| 192 if offset >= 0 { |
| 193 if offset >= int64(len(data)) { |
| 194 offset = int64(len(data)) |
| 195 } |
| 196 data = data[offset:] |
| 197 } |
| 198 |
| 199 if length >= 0 { |
| 200 if length > int64(len(data)) { |
| 201 length = int64(len(data)) |
| 202 } |
| 203 data = data[:length] |
| 204 } |
| 205 return ioutil.NopCloser(&errReader{bytes.NewReader(data), readerErr}), n
il |
| 206 } |
| 207 |
| 208 func TestArchiveStorage(t *testing.T) { |
| 209 t.Parallel() |
| 210 |
| 211 Convey(`A testing archive instance`, t, func() { |
| 212 var ( |
| 213 c = context.Background() |
| 214 client fakeGSClient |
| 215 gen logStreamGenerator |
| 216 ) |
| 217 defer client.Close() |
| 218 |
| 219 opts := Options{ |
| 220 IndexURL: testIndexURL, |
| 221 StreamURL: testStreamURL, |
| 222 Client: &client, |
| 223 } |
| 224 st, err := New(c, opts) |
| 225 So(err, ShouldBeNil) |
| 226 defer st.Close() |
| 227 |
| 228 stImpl := st.(*storageImpl) |
| 229 |
| 230 Convey(`Will fail to configure with ErrReadOnly`, func() { |
| 231 So(st.Config(storage.Config{}), ShouldEqual, storage.Err
ReadOnly) |
| 232 }) |
| 233 |
| 234 Convey(`Will fail to Put with ErrReadOnly`, func() { |
| 235 So(st.Put(storage.PutRequest{}), ShouldEqual, storage.Er
rReadOnly) |
| 236 }) |
| 237 |
| 238 Convey(`Given a stream with 5 log entries`, func() { |
| 239 gen.generate("foo", "bar", "baz", "qux", "quux") |
| 240 |
| 241 // Basic test cases. |
| 242 for _, tc := range []struct { |
| 243 title string |
| 244 mod func() |
| 245 }{ |
| 246 {`Complete index`, func() {}}, |
| 247 {`Empty index protobuf`, func() { gen.sparseInde
x() }}, |
| 248 {`No index provided`, func() { stImpl.indexPath
= "" }}, |
| 249 {`Invalid index path`, func() { stImpl.indexPath
= "does-not-exist" }}, |
| 250 {`Sparse index with a start and terminal entry`,
func() { gen.sparseIndex(0, 2, 4) }}, |
| 251 {`Sparse index with a terminal entry`, func() {
gen.sparseIndex(1, 3, 4) }}, |
| 252 {`Sparse index missing a terminal entry`, func()
{ gen.sparseIndex(1, 3) }}, |
| 253 } { |
| 254 Convey(fmt.Sprintf(`Test Case: %q`, tc.title), f
unc() { |
| 255 tc.mod() |
| 256 |
| 257 // Run through per-testcase variant set. |
| 258 for _, variant := range []struct { |
| 259 title string |
| 260 mod func() |
| 261 }{ |
| 262 {"with hints", func() {}}, |
| 263 {"without hints", func() { gen.p
runeIndexHints() }}, |
| 264 } { |
| 265 Convey(variant.title, func() { |
| 266 variant.mod() |
| 267 client.load(&gen) |
| 268 |
| 269 var entries []string |
| 270 collect := func(e *stora
ge.Entry) bool { |
| 271 entries = append
(entries, gen.lineFromEntry(e)) |
| 272 return true |
| 273 } |
| 274 |
| 275 Convey(`Can Get [0..]`,
func() { |
| 276 So(st.Get(storag
e.GetRequest{}, collect), ShouldBeNil) |
| 277 So(entries, Shou
ldResemble, gen.lines) |
| 278 }) |
| 279 |
| 280 Convey(`Can Get [1..].`,
func() { |
| 281 So(st.Get(storag
e.GetRequest{Index: 1}, collect), ShouldBeNil) |
| 282 So(entries, Shou
ldResemble, gen.lines[1:]) |
| 283 }) |
| 284 |
| 285 Convey(`Can Get [1..2].`
, func() { |
| 286 So(st.Get(storag
e.GetRequest{Index: 1, Limit: 2}, collect), ShouldBeNil) |
| 287 So(entries, Shou
ldResemble, gen.lines[1:3]) |
| 288 }) |
| 289 |
| 290 Convey(`Can Get [5..].`,
func() { |
| 291 So(st.Get(storag
e.GetRequest{Index: 5}, collect), ShouldBeNil) |
| 292 So(entries, Shou
ldHaveLength, 0) |
| 293 }) |
| 294 |
| 295 Convey(`Can Get [4].`, f
unc() { |
| 296 So(st.Get(storag
e.GetRequest{Index: 4, Limit: 1}, collect), ShouldBeNil) |
| 297 So(entries, Shou
ldResemble, gen.lines[4:]) |
| 298 }) |
| 299 |
| 300 Convey(`Can tail.`, func
() { |
| 301 e, err := st.Tai
l("", "") |
| 302 So(err, ShouldBe
Nil) |
| 303 So(gen.lineFromE
ntry(e), ShouldEqual, gen.lines[len(gen.lines)-1]) |
| 304 }) |
| 305 }) |
| 306 } |
| 307 }) |
| 308 } |
| 309 }) |
| 310 |
| 311 // Individual error test cases. |
| 312 for _, tc := range []struct { |
| 313 title string |
| 314 fn func() error |
| 315 }{ |
| 316 {"Get", func() error { return st.Get(storage.GetRequest{
}, nil) }}, |
| 317 {"Tail", func() (err error) { |
| 318 _, err = st.Tail("", "") |
| 319 return |
| 320 }}, |
| 321 } { |
| 322 Convey(fmt.Sprintf("Error case: %q", tc.title), func() { |
| 323 Convey(`With missing log stream returns ErrDoesN
otExist.`, func() { |
| 324 stImpl.streamPath = "does-not-exist" |
| 325 |
| 326 So(st.Get(storage.GetRequest{}, nil), Sh
ouldEqual, storage.ErrDoesNotExist) |
| 327 }) |
| 328 |
| 329 Convey(`With a client error returns that error.`
, func() { |
| 330 client.err = errors.New("test error") |
| 331 |
| 332 So(errors.Unwrap(tc.fn()), ShouldEqual,
client.err) |
| 333 }) |
| 334 |
| 335 Convey(`With an index reader error returns that
error.`, func() { |
| 336 client.indexErr = errors.New("test error
") |
| 337 |
| 338 So(errors.Unwrap(tc.fn()), ShouldEqual,
client.indexErr) |
| 339 }) |
| 340 |
| 341 Convey(`With an stream reader error returns that
error.`, func() { |
| 342 client.streamErr = errors.New("test erro
r") |
| 343 |
| 344 So(errors.Unwrap(tc.fn()), ShouldEqual,
client.streamErr) |
| 345 }) |
| 346 |
| 347 Convey(`With junk index data returns an error.`,
func() { |
| 348 client.index = []byte{0x00} |
| 349 |
| 350 So(tc.fn(), ShouldErrLike, "failed to un
marshal index") |
| 351 }) |
| 352 |
| 353 Convey(`With junk stream data returns an error.`
, func() { |
| 354 client.stream = []byte{0x00, 0x01, 0xff} |
| 355 |
| 356 So(tc.fn(), ShouldErrLike, "failed to un
marshal") |
| 357 }) |
| 358 }) |
| 359 } |
| 360 |
| 361 Convey(`Tail with no log entries returns ErrDoesNotExist.`, func
() { |
| 362 client.load(&gen) |
| 363 |
| 364 _, err := st.Tail("", "") |
| 365 So(err, ShouldEqual, storage.ErrDoesNotExist) |
| 366 }) |
| 367 }) |
| 368 } |
| OLD | NEW |