Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(5)

Side by Side Diff: logdog/common/archive/index.go

Issue 2435883002: LogDog: Fix archival Get/Tail implementations. (Closed)
Patch Set: LogDog: Fix archival Get/Tail implementations. Created 4 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The LUCI Authors. All rights reserved. 1 // Copyright 2015 The LUCI Authors. All rights reserved.
2 // Use of this source code is governed under the Apache License, Version 2.0 2 // Use of this source code is governed under the Apache License, Version 2.0
3 // that can be found in the LICENSE file. 3 // that can be found in the LICENSE file.
4 4
5 package archive 5 package archive
6 6
7 import ( 7 import (
8 "io" 8 "io"
9 9
10 "github.com/golang/protobuf/proto" 10 "github.com/golang/protobuf/proto"
11 "github.com/luci/luci-go/logdog/api/logpb" 11 "github.com/luci/luci-go/logdog/api/logpb"
12 ) 12 )
13 13
14 // indexBuilder is a stateful engine that constructs an archival index. 14 // indexBuilder is a stateful engine that constructs an archival index.
15 type indexBuilder struct { 15 type indexBuilder struct {
16 *Manifest 16 *Manifest
17 index logpb.LogIndex 17 index logpb.LogIndex
18 18
19 lastPrefixIndex uint64 19 lastPrefixIndex uint64
20 lastStreamIndex uint64 20 lastStreamIndex uint64
21 lastBytes uint64 21 lastBytes uint64
22 22
23 latestBufferedEntry *logpb.LogIndex_Entry
24
23 sizeFunc func(proto.Message) int 25 sizeFunc func(proto.Message) int
24 } 26 }
25 27
26 func (i *indexBuilder) addLogEntry(le *logpb.LogEntry, offset int64) { 28 func (i *indexBuilder) addLogEntry(le *logpb.LogEntry, offset int64) {
27 // Only calculate the size if we actually use it. 29 // Only calculate the size if we actually use it.
28 if i.ByteRange > 0 { 30 if i.ByteRange > 0 {
29 i.lastBytes += uint64(i.size(le)) 31 i.lastBytes += uint64(i.size(le))
30 } 32 }
31 33
32 // Update our stream properties. 34 // Update our stream properties.
33 i.index.LastPrefixIndex = le.PrefixIndex 35 i.index.LastPrefixIndex = le.PrefixIndex
34 i.index.LastStreamIndex = le.StreamIndex 36 i.index.LastStreamIndex = le.StreamIndex
35 i.index.LogEntryCount++ 37 i.index.LogEntryCount++
36 38
39 entry := logpb.LogIndex_Entry{
40 Sequence: le.Sequence,
41 PrefixIndex: le.PrefixIndex,
42 StreamIndex: le.StreamIndex,
43 Offset: uint64(offset),
44 TimeOffset: le.TimeOffset,
45 }
46
37 // Do we index this LogEntry? 47 // Do we index this LogEntry?
38 if len(i.index.Entries) > 0 { 48 if len(i.index.Entries) > 0 {
39 if !((i.StreamIndexRange > 0 && (le.StreamIndex-i.lastStreamInde x) >= uint64(i.StreamIndexRange)) || 49 if !((i.StreamIndexRange > 0 && (le.StreamIndex-i.lastStreamInde x) >= uint64(i.StreamIndexRange)) ||
40 (i.PrefixIndexRange > 0 && (le.PrefixIndex-i.lastPrefixI ndex) >= uint64(i.PrefixIndexRange)) || 50 (i.PrefixIndexRange > 0 && (le.PrefixIndex-i.lastPrefixI ndex) >= uint64(i.PrefixIndexRange)) ||
41 (i.ByteRange > 0 && i.lastBytes >= uint64(i.ByteRange))) { 51 (i.ByteRange > 0 && i.lastBytes >= uint64(i.ByteRange))) {
42 » » » // Not going to index this entry. 52 » » » // Not going to index this entry. Buffer it as a termina tor.
53 » » » i.latestBufferedEntry = &entry
43 return 54 return
44 } 55 }
45 56
46 i.lastBytes = 0 57 i.lastBytes = 0
47 } 58 }
48 59
49 » i.index.Entries = append(i.index.Entries, &logpb.LogIndex_Entry{ 60 » i.index.Entries = append(i.index.Entries, &entry)
50 » » Sequence: le.Sequence, 61 » i.latestBufferedEntry = nil
51 » » PrefixIndex: le.PrefixIndex,
52 » » StreamIndex: le.StreamIndex,
53 » » Offset: uint64(offset),
54 » » TimeOffset: le.TimeOffset,
55 » })
56 62
57 // Update our counters. 63 // Update our counters.
58 i.lastStreamIndex = le.StreamIndex 64 i.lastStreamIndex = le.StreamIndex
59 i.lastPrefixIndex = le.PrefixIndex 65 i.lastPrefixIndex = le.PrefixIndex
60 } 66 }
61 67
62 func (i *indexBuilder) emit(w io.Writer) error { 68 func (i *indexBuilder) emit(w io.Writer) error {
69 // Always include the last stream entry in the index.
70 if i.latestBufferedEntry != nil {
71 i.index.Entries = append(i.index.Entries, i.latestBufferedEntry)
72 }
73
63 d, err := proto.Marshal(&i.index) 74 d, err := proto.Marshal(&i.index)
64 if err != nil { 75 if err != nil {
65 return err 76 return err
66 } 77 }
67 78
68 if _, err := w.Write(d); err != nil { 79 if _, err := w.Write(d); err != nil {
69 return err 80 return err
70 } 81 }
71 return nil 82 return nil
72 } 83 }
73 84
74 func (i *indexBuilder) size(pb proto.Message) int { 85 func (i *indexBuilder) size(pb proto.Message) int {
75 if f := i.sizeFunc; f != nil { 86 if f := i.sizeFunc; f != nil {
76 return f(pb) 87 return f(pb)
77 } 88 }
78 return proto.Size(pb) 89 return proto.Size(pb)
79 } 90 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698