Index: server/logdog/storage/memory/memory.go |
diff --git a/server/logdog/storage/memory/memory.go b/server/logdog/storage/memory/memory.go |
index bc320d922f25657ee601772dc9d698e1204f7cf7..40eeaaf357ba8796a1ad3115d98e7f5b6b58a54c 100644 |
--- a/server/logdog/storage/memory/memory.go |
+++ b/server/logdog/storage/memory/memory.go |
@@ -9,6 +9,7 @@ import ( |
"sync" |
"time" |
+ "github.com/luci/luci-go/common/config" |
"github.com/luci/luci-go/common/logdog/types" |
"github.com/luci/luci-go/server/logdog/storage" |
) |
@@ -23,6 +24,11 @@ type rec struct { |
data []byte |
} |
+type streamKey struct { |
+ project config.ProjectName |
+ path types.StreamPath |
+} |
+ |
// Storage is an implementation of the storage.Storage interface that stores |
// data in memory. |
// |
@@ -36,7 +42,7 @@ type Storage struct { |
MaxLogAge time.Duration |
stateMu sync.Mutex |
- streams map[types.StreamPath]*logStream |
+ streams map[streamKey]*logStream |
closed bool |
err error |
} |
@@ -62,7 +68,7 @@ func (s *Storage) Config(cfg storage.Config) error { |
// Put implements storage.Storage. |
func (s *Storage) Put(req storage.PutRequest) error { |
return s.run(func() error { |
- ls := s.getLogStreamLocked(req.Path, true) |
+ ls := s.getLogStreamLocked(req.Project, req.Path, true) |
for i, v := range req.Values { |
index := req.Index + types.MessageIndex(i) |
@@ -85,7 +91,7 @@ func (s *Storage) Put(req storage.PutRequest) error { |
func (s *Storage) Get(req storage.GetRequest, cb storage.GetCallback) error { |
recs := []*rec(nil) |
err := s.run(func() error { |
- ls := s.getLogStreamLocked(req.Path, false) |
+ ls := s.getLogStreamLocked(req.Project, req.Path, false) |
if ls == nil { |
return storage.ErrDoesNotExist |
} |
@@ -135,12 +141,12 @@ func (s *Storage) Get(req storage.GetRequest, cb storage.GetCallback) error { |
} |
// Tail implements storage.Storage. |
-func (s *Storage) Tail(p types.StreamPath) ([]byte, types.MessageIndex, error) { |
+func (s *Storage) Tail(project config.ProjectName, path types.StreamPath) ([]byte, types.MessageIndex, error) { |
var r *rec |
// Find the latest log, then return it. |
err := s.run(func() error { |
- ls := s.getLogStreamLocked(p, false) |
+ ls := s.getLogStreamLocked(project, path, false) |
if ls == nil { |
return storage.ErrDoesNotExist |
} |
@@ -158,10 +164,9 @@ func (s *Storage) Tail(p types.StreamPath) ([]byte, types.MessageIndex, error) { |
} |
// Count returns the number of log records for the given stream. |
-func (s *Storage) Count(p types.StreamPath) (c int) { |
+func (s *Storage) Count(project config.ProjectName, path types.StreamPath) (c int) { |
s.run(func() error { |
- st := s.streams[p] |
- if st != nil { |
+ if st := s.getLogStreamLocked(project, path, false); st != nil { |
c = len(st.logs) |
} |
return nil |
@@ -190,8 +195,13 @@ func (s *Storage) run(f func() error) error { |
return f() |
} |
-func (s *Storage) getLogStreamLocked(p types.StreamPath, create bool) *logStream { |
- ls := s.streams[p] |
+func (s *Storage) getLogStreamLocked(project config.ProjectName, path types.StreamPath, create bool) *logStream { |
+ key := streamKey{ |
+ project: project, |
+ path: path, |
+ } |
+ |
+ ls := s.streams[key] |
if ls == nil && create { |
ls = &logStream{ |
logs: map[types.MessageIndex][]byte{}, |
@@ -199,9 +209,9 @@ func (s *Storage) getLogStreamLocked(p types.StreamPath, create bool) *logStream |
} |
if s.streams == nil { |
- s.streams = map[types.StreamPath]*logStream{} |
+ s.streams = map[streamKey]*logStream{} |
} |
- s.streams[p] = ls |
+ s.streams[key] = ls |
} |
return ls |