| OLD | NEW |
| 1 // Copyright 2015 The LUCI Authors. All rights reserved. | 1 // Copyright 2015 The LUCI Authors. All rights reserved. |
| 2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
| 3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
| 4 | 4 |
| 5 package archive | 5 package archive |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "io" | 8 "io" |
| 9 | 9 |
| 10 "github.com/golang/protobuf/proto" | 10 "github.com/golang/protobuf/proto" |
| 11 "github.com/luci/luci-go/logdog/api/logpb" | 11 "github.com/luci/luci-go/logdog/api/logpb" |
| 12 ) | 12 ) |
| 13 | 13 |
| 14 // indexBuilder is a stateful engine that constructs an archival index. | 14 // indexBuilder is a stateful engine that constructs an archival index. |
| 15 type indexBuilder struct { | 15 type indexBuilder struct { |
| 16 *Manifest | 16 *Manifest |
| 17 index logpb.LogIndex | 17 index logpb.LogIndex |
| 18 | 18 |
| 19 lastPrefixIndex uint64 | 19 lastPrefixIndex uint64 |
| 20 lastStreamIndex uint64 | 20 lastStreamIndex uint64 |
| 21 lastBytes uint64 | 21 lastBytes uint64 |
| 22 | 22 |
| 23 latestBufferedEntry *logpb.LogIndex_Entry |
| 24 |
| 23 sizeFunc func(proto.Message) int | 25 sizeFunc func(proto.Message) int |
| 24 } | 26 } |
| 25 | 27 |
| 26 func (i *indexBuilder) addLogEntry(le *logpb.LogEntry, offset int64) { | 28 func (i *indexBuilder) addLogEntry(le *logpb.LogEntry, offset int64) { |
| 27 // Only calculate the size if we actually use it. | 29 // Only calculate the size if we actually use it. |
| 28 if i.ByteRange > 0 { | 30 if i.ByteRange > 0 { |
| 29 i.lastBytes += uint64(i.size(le)) | 31 i.lastBytes += uint64(i.size(le)) |
| 30 } | 32 } |
| 31 | 33 |
| 32 // Update our stream properties. | 34 // Update our stream properties. |
| 33 i.index.LastPrefixIndex = le.PrefixIndex | 35 i.index.LastPrefixIndex = le.PrefixIndex |
| 34 i.index.LastStreamIndex = le.StreamIndex | 36 i.index.LastStreamIndex = le.StreamIndex |
| 35 i.index.LogEntryCount++ | 37 i.index.LogEntryCount++ |
| 36 | 38 |
| 39 entry := logpb.LogIndex_Entry{ |
| 40 Sequence: le.Sequence, |
| 41 PrefixIndex: le.PrefixIndex, |
| 42 StreamIndex: le.StreamIndex, |
| 43 Offset: uint64(offset), |
| 44 TimeOffset: le.TimeOffset, |
| 45 } |
| 46 |
| 37 // Do we index this LogEntry? | 47 // Do we index this LogEntry? |
| 38 if len(i.index.Entries) > 0 { | 48 if len(i.index.Entries) > 0 { |
| 39 if !((i.StreamIndexRange > 0 && (le.StreamIndex-i.lastStreamInde
x) >= uint64(i.StreamIndexRange)) || | 49 if !((i.StreamIndexRange > 0 && (le.StreamIndex-i.lastStreamInde
x) >= uint64(i.StreamIndexRange)) || |
| 40 (i.PrefixIndexRange > 0 && (le.PrefixIndex-i.lastPrefixI
ndex) >= uint64(i.PrefixIndexRange)) || | 50 (i.PrefixIndexRange > 0 && (le.PrefixIndex-i.lastPrefixI
ndex) >= uint64(i.PrefixIndexRange)) || |
| 41 (i.ByteRange > 0 && i.lastBytes >= uint64(i.ByteRange)))
{ | 51 (i.ByteRange > 0 && i.lastBytes >= uint64(i.ByteRange)))
{ |
| 42 » » » // Not going to index this entry. | 52 » » » // Not going to index this entry. Buffer it as a termina
tor. |
| 53 » » » i.latestBufferedEntry = &entry |
| 43 return | 54 return |
| 44 } | 55 } |
| 45 | 56 |
| 46 i.lastBytes = 0 | 57 i.lastBytes = 0 |
| 47 } | 58 } |
| 48 | 59 |
| 49 » i.index.Entries = append(i.index.Entries, &logpb.LogIndex_Entry{ | 60 » i.index.Entries = append(i.index.Entries, &entry) |
| 50 » » Sequence: le.Sequence, | 61 » i.latestBufferedEntry = nil |
| 51 » » PrefixIndex: le.PrefixIndex, | |
| 52 » » StreamIndex: le.StreamIndex, | |
| 53 » » Offset: uint64(offset), | |
| 54 » » TimeOffset: le.TimeOffset, | |
| 55 » }) | |
| 56 | 62 |
| 57 // Update our counters. | 63 // Update our counters. |
| 58 i.lastStreamIndex = le.StreamIndex | 64 i.lastStreamIndex = le.StreamIndex |
| 59 i.lastPrefixIndex = le.PrefixIndex | 65 i.lastPrefixIndex = le.PrefixIndex |
| 60 } | 66 } |
| 61 | 67 |
| 62 func (i *indexBuilder) emit(w io.Writer) error { | 68 func (i *indexBuilder) emit(w io.Writer) error { |
| 69 // Always include the last stream entry in the index. |
| 70 if i.latestBufferedEntry != nil { |
| 71 i.index.Entries = append(i.index.Entries, i.latestBufferedEntry) |
| 72 } |
| 73 |
| 63 d, err := proto.Marshal(&i.index) | 74 d, err := proto.Marshal(&i.index) |
| 64 if err != nil { | 75 if err != nil { |
| 65 return err | 76 return err |
| 66 } | 77 } |
| 67 | 78 |
| 68 if _, err := w.Write(d); err != nil { | 79 if _, err := w.Write(d); err != nil { |
| 69 return err | 80 return err |
| 70 } | 81 } |
| 71 return nil | 82 return nil |
| 72 } | 83 } |
| 73 | 84 |
| 74 func (i *indexBuilder) size(pb proto.Message) int { | 85 func (i *indexBuilder) size(pb proto.Message) int { |
| 75 if f := i.sizeFunc; f != nil { | 86 if f := i.sizeFunc; f != nil { |
| 76 return f(pb) | 87 return f(pb) |
| 77 } | 88 } |
| 78 return proto.Size(pb) | 89 return proto.Size(pb) |
| 79 } | 90 } |
| OLD | NEW |