Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(142)

Side by Side Diff: logdog/common/storage/archive/storage_test.go

Issue 2435883002: LogDog: Fix archival Get/Tail implementations. (Closed)
Patch Set: LogDog: Fix archival Get/Tail implementations. Created 4 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2016 The LUCI Authors. All rights reserved.
2 // Use of this source code is governed under the Apache License, Version 2.0
3 // that can be found in the LICENSE file.
4
5 package archive
6
7 import (
8 "bytes"
9 "fmt"
10 "io"
11 "io/ioutil"
12 "testing"
13
14 "github.com/luci/luci-go/common/errors"
15 "github.com/luci/luci-go/common/gcloud/gs"
16 "github.com/luci/luci-go/logdog/api/logpb"
17 "github.com/luci/luci-go/logdog/common/archive"
18 "github.com/luci/luci-go/logdog/common/renderer"
19 "github.com/luci/luci-go/logdog/common/storage"
20
21 cloudStorage "cloud.google.com/go/storage"
22 "github.com/golang/protobuf/proto"
23 "golang.org/x/net/context"
24
25 . "github.com/luci/luci-go/common/testing/assertions"
26 . "github.com/smartystreets/goconvey/convey"
27 )
28
29 const (
30 testIndexURL = "gs://+/index"
31 testStreamURL = "gs://+/stream"
32 )
33
34 type logStreamGenerator struct {
35 lines []string
36
37 indexBuf bytes.Buffer
38 streamBuf bytes.Buffer
39 }
40
41 func (g *logStreamGenerator) lineFromEntry(e *storage.Entry) string {
42 le, err := e.GetLogEntry()
43 if err != nil {
44 panic(err)
45 }
46
47 text := le.GetText()
48 if text == nil || len(text.Lines) != 1 {
49 panic(fmt.Errorf("bad generated log entry: %#v", le))
50 }
51 return text.Lines[0].Value
52 }
53
54 func (g *logStreamGenerator) generate(lines ...string) {
55 logEntries := make([]*logpb.LogEntry, len(lines))
56 for i, line := range lines {
57 logEntries[i] = &logpb.LogEntry{
58 PrefixIndex: uint64(i),
59 StreamIndex: uint64(i),
60 Content: &logpb.LogEntry_Text{
61 Text: &logpb.Text{
62 Lines: []*logpb.Text_Line{
63 {Value: line, Delimiter: "\n"},
64 },
65 },
66 },
67 }
68 }
69
70 g.lines = lines
71 g.indexBuf.Reset()
72 g.streamBuf.Reset()
73 src := renderer.StaticSource(logEntries)
74 err := archive.Archive(archive.Manifest{
75 Desc: &logpb.LogStreamDescriptor{
76 Prefix: "prefix",
77 Name: "name",
78 },
79 Source: &src,
80 LogWriter: &g.streamBuf,
81 IndexWriter: &g.indexBuf,
82 })
83 if err != nil {
84 panic(err)
85 }
86 }
87
88 func (g *logStreamGenerator) pruneIndexHints() {
89 g.modIndex(func(idx *logpb.LogIndex) {
90 idx.LastPrefixIndex = 0
91 idx.LastStreamIndex = 0
92 idx.LogEntryCount = 0
93 })
94 }
95
96 func (g *logStreamGenerator) sparseIndex(indices ...uint64) {
97 idxMap := make(map[uint64]struct{}, len(indices))
98 for _, i := range indices {
99 idxMap[i] = struct{}{}
100 }
101
102 g.modIndex(func(idx *logpb.LogIndex) {
103 entries := make([]*logpb.LogIndex_Entry, 0, len(idx.Entries))
104 for _, entry := range idx.Entries {
105 if _, ok := idxMap[entry.StreamIndex]; ok {
106 entries = append(entries, entry)
107 }
108 }
109 idx.Entries = entries
110 })
111 }
112
113 func (g *logStreamGenerator) modIndex(fn func(*logpb.LogIndex)) {
114 var index logpb.LogIndex
115 if err := proto.Unmarshal(g.indexBuf.Bytes(), &index); err != nil {
116 panic(err)
117 }
118 fn(&index)
119 data, err := proto.Marshal(&index)
120 if err != nil {
121 panic(err)
122 }
123 g.indexBuf.Reset()
124 if _, err := g.indexBuf.Write(data); err != nil {
125 panic(err)
126 }
127 }
128
129 type errReader struct {
130 io.Reader
131 err error
132 }
133
134 func (r *errReader) Read(d []byte) (int, error) {
135 if r.err != nil {
136 return 0, r.err
137 }
138 return r.Reader.Read(d)
139 }
140
141 type fakeGSClient struct {
142 gs.Client
143
144 index []byte
145 stream []byte
146
147 closed bool
148
149 err error
150 indexErr error
151 streamErr error
152 }
153
154 func (c *fakeGSClient) assertNotClosed() {
155 if c.closed {
156 panic(errors.New("client is closed"))
157 }
158 }
159
160 func (c *fakeGSClient) load(g *logStreamGenerator) {
161 c.index = append([]byte{}, g.indexBuf.Bytes()...)
162 c.stream = append([]byte{}, g.streamBuf.Bytes()...)
163 }
164
165 func (c *fakeGSClient) Close() error {
166 c.assertNotClosed()
167 c.closed = true
168 return nil
169 }
170
171 func (c *fakeGSClient) NewReader(p gs.Path, offset, length int64) (io.ReadCloser , error) {
172 c.assertNotClosed()
173
174 // If we have a client-level error, return it.
175 if c.err != nil {
176 return nil, c.err
177 }
178
179 var (
180 data []byte
181 readerErr error
182 )
183 switch string(p) {
184 case testIndexURL:
185 data, readerErr = c.index, c.indexErr
186 case testStreamURL:
187 data, readerErr = c.stream, c.streamErr
188 default:
189 return nil, cloudStorage.ErrObjectNotExist
190 }
191
192 if offset >= 0 {
193 if offset >= int64(len(data)) {
194 offset = int64(len(data))
195 }
196 data = data[offset:]
197 }
198
199 if length >= 0 {
200 if length > int64(len(data)) {
201 length = int64(len(data))
202 }
203 data = data[:length]
204 }
205 return ioutil.NopCloser(&errReader{bytes.NewReader(data), readerErr}), n il
206 }
207
208 func TestArchiveStorage(t *testing.T) {
209 t.Parallel()
210
211 Convey(`A testing archive instance`, t, func() {
212 var (
213 c = context.Background()
214 client fakeGSClient
215 gen logStreamGenerator
216 )
217 defer client.Close()
218
219 opts := Options{
220 IndexURL: testIndexURL,
221 StreamURL: testStreamURL,
222 Client: &client,
223 }
224 st, err := New(c, opts)
225 So(err, ShouldBeNil)
226 defer st.Close()
227
228 stImpl := st.(*storageImpl)
229
230 Convey(`Will fail to configure with ErrReadOnly`, func() {
231 So(st.Config(storage.Config{}), ShouldEqual, storage.Err ReadOnly)
232 })
233
234 Convey(`Will fail to Put with ErrReadOnly`, func() {
235 So(st.Put(storage.PutRequest{}), ShouldEqual, storage.Er rReadOnly)
236 })
237
238 Convey(`Given a stream with 5 log entries`, func() {
239 gen.generate("foo", "bar", "baz", "qux", "quux")
240
241 // Basic test cases.
242 for _, tc := range []struct {
243 title string
244 mod func()
245 }{
246 {`Complete index`, func() {}},
247 {`Empty index protobuf`, func() { gen.sparseInde x() }},
248 {`No index provided`, func() { stImpl.indexPath = "" }},
249 {`Invalid index path`, func() { stImpl.indexPath = "does-not-exist" }},
250 {`Sparse index with a start and terminal entry`, func() { gen.sparseIndex(0, 2, 4) }},
251 {`Sparse index with a terminal entry`, func() { gen.sparseIndex(1, 3, 4) }},
252 {`Sparse index missing a terminal entry`, func() { gen.sparseIndex(1, 3) }},
253 } {
254 Convey(fmt.Sprintf(`Test Case: %q`, tc.title), f unc() {
255 tc.mod()
256
257 // Run through per-testcase variant set.
258 for _, variant := range []struct {
259 title string
260 mod func()
261 }{
262 {"with hints", func() {}},
263 {"without hints", func() { gen.p runeIndexHints() }},
264 } {
265 Convey(variant.title, func() {
266 variant.mod()
267 client.load(&gen)
268
269 var entries []string
270 collect := func(e *stora ge.Entry) bool {
271 entries = append (entries, gen.lineFromEntry(e))
272 return true
273 }
274
275 Convey(`Can Get [0..]`, func() {
276 So(st.Get(storag e.GetRequest{}, collect), ShouldBeNil)
277 So(entries, Shou ldResemble, gen.lines)
278 })
279
280 Convey(`Can Get [1..].`, func() {
281 So(st.Get(storag e.GetRequest{Index: 1}, collect), ShouldBeNil)
282 So(entries, Shou ldResemble, gen.lines[1:])
283 })
284
285 Convey(`Can Get [1..2].` , func() {
286 So(st.Get(storag e.GetRequest{Index: 1, Limit: 2}, collect), ShouldBeNil)
287 So(entries, Shou ldResemble, gen.lines[1:3])
288 })
289
290 Convey(`Can Get [5..].`, func() {
291 So(st.Get(storag e.GetRequest{Index: 5}, collect), ShouldBeNil)
292 So(entries, Shou ldHaveLength, 0)
293 })
294
295 Convey(`Can Get [4].`, f unc() {
296 So(st.Get(storag e.GetRequest{Index: 4, Limit: 1}, collect), ShouldBeNil)
297 So(entries, Shou ldResemble, gen.lines[4:])
298 })
299
300 Convey(`Can tail.`, func () {
301 e, err := st.Tai l("", "")
302 So(err, ShouldBe Nil)
303 So(gen.lineFromE ntry(e), ShouldEqual, gen.lines[len(gen.lines)-1])
304 })
305 })
306 }
307 })
308 }
309 })
310
311 // Individual error test cases.
312 for _, tc := range []struct {
313 title string
314 fn func() error
315 }{
316 {"Get", func() error { return st.Get(storage.GetRequest{ }, nil) }},
317 {"Tail", func() (err error) {
318 _, err = st.Tail("", "")
319 return
320 }},
321 } {
322 Convey(fmt.Sprintf("Error case: %q", tc.title), func() {
323 Convey(`With missing log stream returns ErrDoesN otExist.`, func() {
324 stImpl.streamPath = "does-not-exist"
325
326 So(st.Get(storage.GetRequest{}, nil), Sh ouldEqual, storage.ErrDoesNotExist)
327 })
328
329 Convey(`With a client error returns that error.` , func() {
330 client.err = errors.New("test error")
331
332 So(errors.Unwrap(tc.fn()), ShouldEqual, client.err)
333 })
334
335 Convey(`With an index reader error returns that error.`, func() {
336 client.indexErr = errors.New("test error ")
337
338 So(errors.Unwrap(tc.fn()), ShouldEqual, client.indexErr)
339 })
340
341 Convey(`With an stream reader error returns that error.`, func() {
342 client.streamErr = errors.New("test erro r")
343
344 So(errors.Unwrap(tc.fn()), ShouldEqual, client.streamErr)
345 })
346
347 Convey(`With junk index data returns an error.`, func() {
348 client.index = []byte{0x00}
349
350 So(tc.fn(), ShouldErrLike, "failed to un marshal index")
351 })
352
353 Convey(`With junk stream data returns an error.` , func() {
354 client.stream = []byte{0x00, 0x01, 0xff}
355
356 So(tc.fn(), ShouldErrLike, "failed to un marshal")
357 })
358 })
359 }
360
361 Convey(`Tail with no log entries returns ErrDoesNotExist.`, func () {
362 client.load(&gen)
363
364 _, err := st.Tail("", "")
365 So(err, ShouldEqual, storage.ErrDoesNotExist)
366 })
367 })
368 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698