Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(840)

Side by Side Diff: server/internal/logdog/archivist/storageSource.go

Issue 1863973002: LogDog: Update to archival V2. (Closed) Base URL: https://github.com/luci/luci-go@grpcutil-errors
Patch Set: Code review comments, use Pub/Sub, archival staging, quality of life. Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 package archivist
6
7 import (
8 "github.com/golang/protobuf/proto"
9 "github.com/luci/luci-go/common/logdog/types"
10 log "github.com/luci/luci-go/common/logging"
11 "github.com/luci/luci-go/common/proto/logdog/logpb"
12 "github.com/luci/luci-go/server/logdog/archive"
13 "github.com/luci/luci-go/server/logdog/storage"
14 "golang.org/x/net/context"
15 )
16
17 // storageSource is an archive.LogEntrySource that pulls log entries from
18 // intermediate storage via its storage.Storage instance.
19 type storageSource struct {
dnj 2016/04/11 17:20:04 This was pulled out of archivist.go. Nothing reall
20 context.Context
21
22 st storage.Storage // the storage instance to read from
23 path types.StreamPath // the path of the log stream
24 contiguous bool // if true, enforce contiguous entries
25 terminalIndex types.MessageIndex // if >= 0, discard logs beyond this
26
27 buf []*logpb.LogEntry
28 lastIndex types.MessageIndex
29 logEntryCount int64
30 hasMissingEntries bool // true if some log entries were missing.
31 }
32
33 func (s *storageSource) bufferEntries(start types.MessageIndex) error {
34 bytes := 0
35
36 req := storage.GetRequest{
37 Path: s.path,
38 Index: start,
39 }
40 return s.st.Get(req, func(idx types.MessageIndex, d []byte) bool {
41 le := logpb.LogEntry{}
42 if err := proto.Unmarshal(d, &le); err != nil {
43 log.Fields{
44 log.ErrorKey: err,
45 "streamIndex": idx,
46 }.Errorf(s, "Failed to unmarshal LogEntry.")
47 return false
48 }
49 s.buf = append(s.buf, &le)
50
51 // Stop loading if we've reached or exceeded our buffer size.
52 bytes += len(d)
53 return bytes < storageBufferSize
54 })
55 }
56
57 func (s *storageSource) NextLogEntry() (*logpb.LogEntry, error) {
58 if len(s.buf) == 0 {
59 s.buf = s.buf[:0]
60 if err := s.bufferEntries(s.lastIndex + 1); err != nil {
61 if err == storage.ErrDoesNotExist {
62 log.Warningf(s, "Archive target stream does not exist in intermediate storage.")
63 if s.terminalIndex >= 0 {
64 s.hasMissingEntries = true
65 }
66 return nil, archive.ErrEndOfStream
67 }
68
69 log.WithError(err).Errorf(s, "Failed to retrieve log str eam from storage.")
70 return nil, err
71 }
72 }
73
74 // If we have no more buffered entries, we have exhausted our log stream .
75 if len(s.buf) == 0 {
76 // If we have a terminal index, but we didn't actually emit that index,
77 // mark that we have missing entries.
78 if s.terminalIndex >= 0 && s.lastIndex != s.terminalIndex {
79 log.Fields{
80 "terminalIndex": s.terminalIndex,
81 "lastIndex": s.lastIndex,
82 }.Warningf(s, "Log stream stopped before terminal index. ")
83 s.hasMissingEntries = true
84 } else {
85 log.Fields{
86 "lastIndex": s.lastIndex,
87 }.Debugf(s, "Encountered end of stream.")
88 }
89
90 return nil, archive.ErrEndOfStream
91 }
92
93 // Pop the next log entry and advance the stream.
94 var le *logpb.LogEntry
95 le, s.buf = s.buf[0], s.buf[1:]
96
97 // If we're enforcing a contiguous log stream, error if this LogEntry is not
98 // contiguous.
99 sidx := types.MessageIndex(le.StreamIndex)
100 nidx := (s.lastIndex + 1)
101 if sidx != nidx {
102 s.hasMissingEntries = true
103 }
104 if s.contiguous && s.hasMissingEntries {
105 log.Fields{
106 "index": sidx,
107 "nextIndex": nidx,
108 }.Warningf(s, "Non-contiguous log stream while enforcing.")
109 return nil, archive.ErrEndOfStream
110 }
111
112 // If we're enforcing a maximum terminal index, return end of stream if this
113 // LogEntry exceeds that index.
114 if s.terminalIndex >= 0 && sidx > s.terminalIndex {
115 log.Fields{
116 "index": sidx,
117 "terminalIndex": s.terminalIndex,
118 }.Warningf(s, "Discarding log entries beyond expected terminal i ndex.")
119 return nil, archive.ErrEndOfStream
120 }
121
122 s.lastIndex = sidx
123 s.logEntryCount++
124 return le, nil
125 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698