| Index: server/logdog/storage/memory/memory.go
|
| diff --git a/server/logdog/storage/memory/memory.go b/server/logdog/storage/memory/memory.go
|
| index bc320d922f25657ee601772dc9d698e1204f7cf7..40eeaaf357ba8796a1ad3115d98e7f5b6b58a54c 100644
|
| --- a/server/logdog/storage/memory/memory.go
|
| +++ b/server/logdog/storage/memory/memory.go
|
| @@ -9,6 +9,7 @@ import (
|
| "sync"
|
| "time"
|
|
|
| + "github.com/luci/luci-go/common/config"
|
| "github.com/luci/luci-go/common/logdog/types"
|
| "github.com/luci/luci-go/server/logdog/storage"
|
| )
|
| @@ -23,6 +24,11 @@ type rec struct {
|
| data []byte
|
| }
|
|
|
| +type streamKey struct {
|
| + project config.ProjectName
|
| + path types.StreamPath
|
| +}
|
| +
|
| // Storage is an implementation of the storage.Storage interface that stores
|
| // data in memory.
|
| //
|
| @@ -36,7 +42,7 @@ type Storage struct {
|
| MaxLogAge time.Duration
|
|
|
| stateMu sync.Mutex
|
| - streams map[types.StreamPath]*logStream
|
| + streams map[streamKey]*logStream
|
| closed bool
|
| err error
|
| }
|
| @@ -62,7 +68,7 @@ func (s *Storage) Config(cfg storage.Config) error {
|
| // Put implements storage.Storage.
|
| func (s *Storage) Put(req storage.PutRequest) error {
|
| return s.run(func() error {
|
| - ls := s.getLogStreamLocked(req.Path, true)
|
| + ls := s.getLogStreamLocked(req.Project, req.Path, true)
|
|
|
| for i, v := range req.Values {
|
| index := req.Index + types.MessageIndex(i)
|
| @@ -85,7 +91,7 @@ func (s *Storage) Put(req storage.PutRequest) error {
|
| func (s *Storage) Get(req storage.GetRequest, cb storage.GetCallback) error {
|
| recs := []*rec(nil)
|
| err := s.run(func() error {
|
| - ls := s.getLogStreamLocked(req.Path, false)
|
| + ls := s.getLogStreamLocked(req.Project, req.Path, false)
|
| if ls == nil {
|
| return storage.ErrDoesNotExist
|
| }
|
| @@ -135,12 +141,12 @@ func (s *Storage) Get(req storage.GetRequest, cb storage.GetCallback) error {
|
| }
|
|
|
| // Tail implements storage.Storage.
|
| -func (s *Storage) Tail(p types.StreamPath) ([]byte, types.MessageIndex, error) {
|
| +func (s *Storage) Tail(project config.ProjectName, path types.StreamPath) ([]byte, types.MessageIndex, error) {
|
| var r *rec
|
|
|
| // Find the latest log, then return it.
|
| err := s.run(func() error {
|
| - ls := s.getLogStreamLocked(p, false)
|
| + ls := s.getLogStreamLocked(project, path, false)
|
| if ls == nil {
|
| return storage.ErrDoesNotExist
|
| }
|
| @@ -158,10 +164,9 @@ func (s *Storage) Tail(p types.StreamPath) ([]byte, types.MessageIndex, error) {
|
| }
|
|
|
| // Count returns the number of log records for the given stream.
|
| -func (s *Storage) Count(p types.StreamPath) (c int) {
|
| +func (s *Storage) Count(project config.ProjectName, path types.StreamPath) (c int) {
|
| s.run(func() error {
|
| - st := s.streams[p]
|
| - if st != nil {
|
| + if st := s.getLogStreamLocked(project, path, false); st != nil {
|
| c = len(st.logs)
|
| }
|
| return nil
|
| @@ -190,8 +195,13 @@ func (s *Storage) run(f func() error) error {
|
| return f()
|
| }
|
|
|
| -func (s *Storage) getLogStreamLocked(p types.StreamPath, create bool) *logStream {
|
| - ls := s.streams[p]
|
| +func (s *Storage) getLogStreamLocked(project config.ProjectName, path types.StreamPath, create bool) *logStream {
|
| + key := streamKey{
|
| + project: project,
|
| + path: path,
|
| + }
|
| +
|
| + ls := s.streams[key]
|
| if ls == nil && create {
|
| ls = &logStream{
|
| logs: map[types.MessageIndex][]byte{},
|
| @@ -199,9 +209,9 @@ func (s *Storage) getLogStreamLocked(p types.StreamPath, create bool) *logStream
|
| }
|
|
|
| if s.streams == nil {
|
| - s.streams = map[types.StreamPath]*logStream{}
|
| + s.streams = map[streamKey]*logStream{}
|
| }
|
| - s.streams[p] = ls
|
| + s.streams[key] = ls
|
| }
|
|
|
| return ls
|
|
|