Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(596)

Side by Side Diff: logdog/server/archivist/storageSource.go

Issue 2435883002: LogDog: Fix archival Get/Tail implementations. (Closed)
Patch Set: LogDog: Fix archival Get/Tail implementations. Created 4 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2016 The LUCI Authors. All rights reserved. 1 // Copyright 2016 The LUCI Authors. All rights reserved.
2 // Use of this source code is governed under the Apache License, Version 2.0 2 // Use of this source code is governed under the Apache License, Version 2.0
3 // that can be found in the LICENSE file. 3 // that can be found in the LICENSE file.
4 4
5 package archivist 5 package archivist
6 6
7 import ( 7 import (
8 "io" 8 "io"
9 9
10 "github.com/golang/protobuf/proto"
11 "github.com/luci/luci-go/common/config" 10 "github.com/luci/luci-go/common/config"
12 log "github.com/luci/luci-go/common/logging" 11 log "github.com/luci/luci-go/common/logging"
13 "github.com/luci/luci-go/logdog/api/logpb" 12 "github.com/luci/luci-go/logdog/api/logpb"
14 "github.com/luci/luci-go/logdog/common/storage" 13 "github.com/luci/luci-go/logdog/common/storage"
15 "github.com/luci/luci-go/logdog/common/types" 14 "github.com/luci/luci-go/logdog/common/types"
16 "golang.org/x/net/context" 15 "golang.org/x/net/context"
17 ) 16 )
18 17
19 // storageSource is a renderer.Source that pulls log entries from intermediate 18 // storageSource is a renderer.Source that pulls log entries from intermediate
20 // storage via its storage.Storage instance. 19 // storage via its storage.Storage instance.
(...skipping 11 matching lines...) Expand all
32 } 31 }
33 32
34 func (s *storageSource) bufferEntries(start types.MessageIndex) error { 33 func (s *storageSource) bufferEntries(start types.MessageIndex) error {
35 bytes := 0 34 bytes := 0
36 35
37 req := storage.GetRequest{ 36 req := storage.GetRequest{
38 Project: s.project, 37 Project: s.project,
39 Path: s.path, 38 Path: s.path,
40 Index: start, 39 Index: start,
41 } 40 }
42 » return s.st.Get(req, func(idx types.MessageIndex, d []byte) bool { 41 » return s.st.Get(req, func(e *storage.Entry) bool {
43 » » le := logpb.LogEntry{} 42 » » le, err := e.GetLogEntry()
44 » » if err := proto.Unmarshal(d, &le); err != nil { 43 » » if err != nil {
45 » » » log.Fields{ 44 » » » log.WithError(err).Errorf(s, "Failed to unmarshal LogEnt ry.")
46 » » » » log.ErrorKey: err,
47 » » » » "streamIndex": idx,
48 » » » }.Errorf(s, "Failed to unmarshal LogEntry.")
49 return false 45 return false
50 } 46 }
51 » » s.buf = append(s.buf, &le) 47 » » s.buf = append(s.buf, le)
52 48
53 // Stop loading if we've reached or exceeded our buffer size. 49 // Stop loading if we've reached or exceeded our buffer size.
54 » » bytes += len(d) 50 » » bytes += len(e.D)
55 return bytes < storageBufferSize 51 return bytes < storageBufferSize
56 }) 52 })
57 } 53 }
58 54
59 func (s *storageSource) NextLogEntry() (*logpb.LogEntry, error) { 55 func (s *storageSource) NextLogEntry() (*logpb.LogEntry, error) {
60 if len(s.buf) == 0 { 56 if len(s.buf) == 0 {
61 s.buf = s.buf[:0] 57 s.buf = s.buf[:0]
62 if err := s.bufferEntries(s.lastIndex + 1); err != nil { 58 if err := s.bufferEntries(s.lastIndex + 1); err != nil {
63 if err == storage.ErrDoesNotExist { 59 if err == storage.ErrDoesNotExist {
64 log.Warningf(s, "Archive target stream does not exist in intermediate storage.") 60 log.Warningf(s, "Archive target stream does not exist in intermediate storage.")
(...skipping 35 matching lines...) Expand 10 before | Expand all | Expand 10 after
100 "index": sidx, 96 "index": sidx,
101 "terminalIndex": s.terminalIndex, 97 "terminalIndex": s.terminalIndex,
102 }.Warningf(s, "Discarding log entries beyond expected terminal i ndex.") 98 }.Warningf(s, "Discarding log entries beyond expected terminal i ndex.")
103 return nil, io.EOF 99 return nil, io.EOF
104 } 100 }
105 101
106 s.lastIndex = sidx 102 s.lastIndex = sidx
107 s.logEntryCount++ 103 s.logEntryCount++
108 return le, nil 104 return le, nil
109 } 105 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698