Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(248)

Side by Side Diff: server/internal/logdog/archivist/storageSource.go

Issue 1909053003: LogDog: Add project namespacing to Archivist. (Closed) Base URL: https://github.com/luci/luci-go@logdog-project-coordinator-logs
Patch Set: Rebase? Created 4 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « server/internal/logdog/archivist/archivist_test.go ('k') | no next file » | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2016 The Chromium Authors. All rights reserved. 1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 package archivist 5 package archivist
6 6
7 import ( 7 import (
8 "github.com/golang/protobuf/proto" 8 "github.com/golang/protobuf/proto"
9 "github.com/luci/luci-go/common/config"
9 "github.com/luci/luci-go/common/logdog/types" 10 "github.com/luci/luci-go/common/logdog/types"
10 log "github.com/luci/luci-go/common/logging" 11 log "github.com/luci/luci-go/common/logging"
11 "github.com/luci/luci-go/common/proto/logdog/logpb" 12 "github.com/luci/luci-go/common/proto/logdog/logpb"
12 "github.com/luci/luci-go/server/logdog/archive" 13 "github.com/luci/luci-go/server/logdog/archive"
13 "github.com/luci/luci-go/server/logdog/storage" 14 "github.com/luci/luci-go/server/logdog/storage"
14 "golang.org/x/net/context" 15 "golang.org/x/net/context"
15 ) 16 )
16 17
17 // storageSource is an archive.LogEntrySource that pulls log entries from 18 // storageSource is an archive.LogEntrySource that pulls log entries from
18 // intermediate storage via its storage.Storage instance. 19 // intermediate storage via its storage.Storage instance.
19 type storageSource struct { 20 type storageSource struct {
20 context.Context 21 context.Context
21 22
22 st storage.Storage // the storage instance to read from 23 st storage.Storage // the storage instance to read from
24 project config.ProjectName // the path of the log stream
23 path types.StreamPath // the path of the log stream 25 path types.StreamPath // the path of the log stream
24 terminalIndex types.MessageIndex // if >= 0, discard logs beyond this 26 terminalIndex types.MessageIndex // if >= 0, discard logs beyond this
25 27
26 buf []*logpb.LogEntry 28 buf []*logpb.LogEntry
27 lastIndex types.MessageIndex 29 lastIndex types.MessageIndex
28 logEntryCount int64 30 logEntryCount int64
29 } 31 }
30 32
31 func (s *storageSource) bufferEntries(start types.MessageIndex) error { 33 func (s *storageSource) bufferEntries(start types.MessageIndex) error {
32 bytes := 0 34 bytes := 0
33 35
34 req := storage.GetRequest{ 36 req := storage.GetRequest{
35 » » Path: s.path, 37 » » Project: s.project,
36 » » Index: start, 38 » » Path: s.path,
39 » » Index: start,
37 } 40 }
38 return s.st.Get(req, func(idx types.MessageIndex, d []byte) bool { 41 return s.st.Get(req, func(idx types.MessageIndex, d []byte) bool {
39 le := logpb.LogEntry{} 42 le := logpb.LogEntry{}
40 if err := proto.Unmarshal(d, &le); err != nil { 43 if err := proto.Unmarshal(d, &le); err != nil {
41 log.Fields{ 44 log.Fields{
42 log.ErrorKey: err, 45 log.ErrorKey: err,
43 "streamIndex": idx, 46 "streamIndex": idx,
44 }.Errorf(s, "Failed to unmarshal LogEntry.") 47 }.Errorf(s, "Failed to unmarshal LogEntry.")
45 return false 48 return false
46 } 49 }
(...skipping 49 matching lines...) Expand 10 before | Expand all | Expand 10 after
96 "index": sidx, 99 "index": sidx,
97 "terminalIndex": s.terminalIndex, 100 "terminalIndex": s.terminalIndex,
98 }.Warningf(s, "Discarding log entries beyond expected terminal i ndex.") 101 }.Warningf(s, "Discarding log entries beyond expected terminal i ndex.")
99 return nil, archive.ErrEndOfStream 102 return nil, archive.ErrEndOfStream
100 } 103 }
101 104
102 s.lastIndex = sidx 105 s.lastIndex = sidx
103 s.logEntryCount++ 106 s.logEntryCount++
104 return le, nil 107 return le, nil
105 } 108 }
OLDNEW
« no previous file with comments | « server/internal/logdog/archivist/archivist_test.go ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698