Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(6)

Side by Side Diff: server/internal/logdog/archivist/storageSource.go

Issue 1874563005: Archivist asserts completeness through keys scan. (Closed) Base URL: https://github.com/luci/luci-go@logdog-storage-keysonly
Patch Set: Remove unused members. Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2016 The Chromium Authors. All rights reserved. 1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 package archivist 5 package archivist
6 6
7 import ( 7 import (
8 "github.com/golang/protobuf/proto" 8 "github.com/golang/protobuf/proto"
9 "github.com/luci/luci-go/common/logdog/types" 9 "github.com/luci/luci-go/common/logdog/types"
10 log "github.com/luci/luci-go/common/logging" 10 log "github.com/luci/luci-go/common/logging"
11 "github.com/luci/luci-go/common/proto/logdog/logpb" 11 "github.com/luci/luci-go/common/proto/logdog/logpb"
12 "github.com/luci/luci-go/server/logdog/archive" 12 "github.com/luci/luci-go/server/logdog/archive"
13 "github.com/luci/luci-go/server/logdog/storage" 13 "github.com/luci/luci-go/server/logdog/storage"
14 "golang.org/x/net/context" 14 "golang.org/x/net/context"
15 ) 15 )
16 16
17 // storageSource is an archive.LogEntrySource that pulls log entries from 17 // storageSource is an archive.LogEntrySource that pulls log entries from
18 // intermediate storage via its storage.Storage instance. 18 // intermediate storage via its storage.Storage instance.
19 type storageSource struct { 19 type storageSource struct {
20 context.Context 20 context.Context
21 21
22 st storage.Storage // the storage instance to read from 22 st storage.Storage // the storage instance to read from
23 path types.StreamPath // the path of the log stream 23 path types.StreamPath // the path of the log stream
24 contiguous bool // if true, enforce contiguous entries
25 terminalIndex types.MessageIndex // if >= 0, discard logs beyond this 24 terminalIndex types.MessageIndex // if >= 0, discard logs beyond this
26 25
27 » buf []*logpb.LogEntry 26 » buf []*logpb.LogEntry
28 » lastIndex types.MessageIndex 27 » lastIndex types.MessageIndex
29 » logEntryCount int64 28 » logEntryCount int64
30 » hasMissingEntries bool // true if some log entries were missing.
31 } 29 }
32 30
33 func (s *storageSource) bufferEntries(start types.MessageIndex) error { 31 func (s *storageSource) bufferEntries(start types.MessageIndex) error {
34 bytes := 0 32 bytes := 0
35 33
36 req := storage.GetRequest{ 34 req := storage.GetRequest{
37 Path: s.path, 35 Path: s.path,
38 Index: start, 36 Index: start,
39 } 37 }
40 return s.st.Get(req, func(idx types.MessageIndex, d []byte) bool { 38 return s.st.Get(req, func(idx types.MessageIndex, d []byte) bool {
(...skipping 12 matching lines...) Expand all
53 return bytes < storageBufferSize 51 return bytes < storageBufferSize
54 }) 52 })
55 } 53 }
56 54
57 func (s *storageSource) NextLogEntry() (*logpb.LogEntry, error) { 55 func (s *storageSource) NextLogEntry() (*logpb.LogEntry, error) {
58 if len(s.buf) == 0 { 56 if len(s.buf) == 0 {
59 s.buf = s.buf[:0] 57 s.buf = s.buf[:0]
60 if err := s.bufferEntries(s.lastIndex + 1); err != nil { 58 if err := s.bufferEntries(s.lastIndex + 1); err != nil {
61 if err == storage.ErrDoesNotExist { 59 if err == storage.ErrDoesNotExist {
62 log.Warningf(s, "Archive target stream does not exist in intermediate storage.") 60 log.Warningf(s, "Archive target stream does not exist in intermediate storage.")
63 if s.terminalIndex >= 0 {
64 s.hasMissingEntries = true
65 }
66 return nil, archive.ErrEndOfStream 61 return nil, archive.ErrEndOfStream
67 } 62 }
68 63
69 log.WithError(err).Errorf(s, "Failed to retrieve log str eam from storage.") 64 log.WithError(err).Errorf(s, "Failed to retrieve log str eam from storage.")
70 return nil, err 65 return nil, err
71 } 66 }
72 } 67 }
73 68
74 // If we have no more buffered entries, we have exhausted our log stream . 69 // If we have no more buffered entries, we have exhausted our log stream .
75 if len(s.buf) == 0 { 70 if len(s.buf) == 0 {
76 // If we have a terminal index, but we didn't actually emit that index, 71 // If we have a terminal index, but we didn't actually emit that index,
77 // mark that we have missing entries. 72 // mark that we have missing entries.
78 if s.terminalIndex >= 0 && s.lastIndex != s.terminalIndex { 73 if s.terminalIndex >= 0 && s.lastIndex != s.terminalIndex {
79 log.Fields{ 74 log.Fields{
80 "terminalIndex": s.terminalIndex, 75 "terminalIndex": s.terminalIndex,
81 "lastIndex": s.lastIndex, 76 "lastIndex": s.lastIndex,
82 }.Warningf(s, "Log stream stopped before terminal index. ") 77 }.Warningf(s, "Log stream stopped before terminal index. ")
83 s.hasMissingEntries = true
84 } else { 78 } else {
85 log.Fields{ 79 log.Fields{
86 "lastIndex": s.lastIndex, 80 "lastIndex": s.lastIndex,
87 }.Debugf(s, "Encountered end of stream.") 81 }.Debugf(s, "Encountered end of stream.")
88 } 82 }
89 83
90 return nil, archive.ErrEndOfStream 84 return nil, archive.ErrEndOfStream
91 } 85 }
92 86
93 // Pop the next log entry and advance the stream. 87 // Pop the next log entry and advance the stream.
94 var le *logpb.LogEntry 88 var le *logpb.LogEntry
95 le, s.buf = s.buf[0], s.buf[1:] 89 le, s.buf = s.buf[0], s.buf[1:]
96 90
97 // If we're enforcing a contiguous log stream, error if this LogEntry is not
98 // contiguous.
99 sidx := types.MessageIndex(le.StreamIndex)
100 nidx := (s.lastIndex + 1)
101 if sidx != nidx {
102 s.hasMissingEntries = true
103 }
104 if s.contiguous && s.hasMissingEntries {
105 log.Fields{
106 "index": sidx,
107 "nextIndex": nidx,
108 }.Warningf(s, "Non-contiguous log stream while enforcing.")
109 return nil, archive.ErrEndOfStream
110 }
111
112 // If we're enforcing a maximum terminal index, return end of stream if this 91 // If we're enforcing a maximum terminal index, return end of stream if this
113 // LogEntry exceeds that index. 92 // LogEntry exceeds that index.
93 sidx := types.MessageIndex(le.StreamIndex)
114 if s.terminalIndex >= 0 && sidx > s.terminalIndex { 94 if s.terminalIndex >= 0 && sidx > s.terminalIndex {
115 log.Fields{ 95 log.Fields{
116 "index": sidx, 96 "index": sidx,
117 "terminalIndex": s.terminalIndex, 97 "terminalIndex": s.terminalIndex,
118 }.Warningf(s, "Discarding log entries beyond expected terminal i ndex.") 98 }.Warningf(s, "Discarding log entries beyond expected terminal i ndex.")
119 return nil, archive.ErrEndOfStream 99 return nil, archive.ErrEndOfStream
120 } 100 }
121 101
122 s.lastIndex = sidx 102 s.lastIndex = sidx
123 s.logEntryCount++ 103 s.logEntryCount++
124 return le, nil 104 return le, nil
125 } 105 }
OLDNEW
« no previous file with comments | « server/internal/logdog/archivist/archivist_test.go ('k') | server/logdog/storage/bigtable/bigtable.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698