Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(34)

Side by Side Diff: server/logdog/storage/memory/memory.go

Issue 1909943003: LogDog: Add project support to Storage. (Closed) Base URL: https://github.com/luci/luci-go@logdog-project-coordinator-services
Patch Set: Rebase? Created 4 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 package memory 5 package memory
6 6
7 import ( 7 import (
8 "errors" 8 "errors"
9 "sync" 9 "sync"
10 "time" 10 "time"
11 11
12 "github.com/luci/luci-go/common/config"
12 "github.com/luci/luci-go/common/logdog/types" 13 "github.com/luci/luci-go/common/logdog/types"
13 "github.com/luci/luci-go/server/logdog/storage" 14 "github.com/luci/luci-go/server/logdog/storage"
14 ) 15 )
15 16
16 type logStream struct { 17 type logStream struct {
17 logs map[types.MessageIndex][]byte 18 logs map[types.MessageIndex][]byte
18 latestIndex types.MessageIndex 19 latestIndex types.MessageIndex
19 } 20 }
20 21
21 type rec struct { 22 type rec struct {
22 index types.MessageIndex 23 index types.MessageIndex
23 data []byte 24 data []byte
24 } 25 }
25 26
27 type streamKey struct {
28 project config.ProjectName
29 path types.StreamPath
30 }
31
26 // Storage is an implementation of the storage.Storage interface that stores 32 // Storage is an implementation of the storage.Storage interface that stores
27 // data in memory. 33 // data in memory.
28 // 34 //
29 // This is intended for testing, and not intended to be performant. 35 // This is intended for testing, and not intended to be performant.
30 type Storage struct { 36 type Storage struct {
31 // MaxGetCount, if not zero, is the maximum number of records to retriev e from 37 // MaxGetCount, if not zero, is the maximum number of records to retriev e from
32 // a single Get request. 38 // a single Get request.
33 MaxGetCount int 39 MaxGetCount int
34 40
35 // MaxLogAge is the configured maximum log age. 41 // MaxLogAge is the configured maximum log age.
36 MaxLogAge time.Duration 42 MaxLogAge time.Duration
37 43
38 stateMu sync.Mutex 44 stateMu sync.Mutex
39 » streams map[types.StreamPath]*logStream 45 » streams map[streamKey]*logStream
40 closed bool 46 closed bool
41 err error 47 err error
42 } 48 }
43 49
44 var _ storage.Storage = (*Storage)(nil) 50 var _ storage.Storage = (*Storage)(nil)
45 51
46 // Close implements storage.Storage. 52 // Close implements storage.Storage.
47 func (s *Storage) Close() { 53 func (s *Storage) Close() {
48 s.run(func() error { 54 s.run(func() error {
49 s.closed = true 55 s.closed = true
50 return nil 56 return nil
51 }) 57 })
52 } 58 }
53 59
54 // Config implements storage.Storage. 60 // Config implements storage.Storage.
55 func (s *Storage) Config(cfg storage.Config) error { 61 func (s *Storage) Config(cfg storage.Config) error {
56 return s.run(func() error { 62 return s.run(func() error {
57 s.MaxLogAge = cfg.MaxLogAge 63 s.MaxLogAge = cfg.MaxLogAge
58 return nil 64 return nil
59 }) 65 })
60 } 66 }
61 67
62 // Put implements storage.Storage. 68 // Put implements storage.Storage.
63 func (s *Storage) Put(req storage.PutRequest) error { 69 func (s *Storage) Put(req storage.PutRequest) error {
64 return s.run(func() error { 70 return s.run(func() error {
65 » » ls := s.getLogStreamLocked(req.Path, true) 71 » » ls := s.getLogStreamLocked(req.Project, req.Path, true)
66 72
67 for i, v := range req.Values { 73 for i, v := range req.Values {
68 index := req.Index + types.MessageIndex(i) 74 index := req.Index + types.MessageIndex(i)
69 if _, ok := ls.logs[index]; ok { 75 if _, ok := ls.logs[index]; ok {
70 return storage.ErrExists 76 return storage.ErrExists
71 } 77 }
72 78
73 clone := make([]byte, len(v)) 79 clone := make([]byte, len(v))
74 copy(clone, v) 80 copy(clone, v)
75 ls.logs[index] = clone 81 ls.logs[index] = clone
76 if index > ls.latestIndex { 82 if index > ls.latestIndex {
77 ls.latestIndex = index 83 ls.latestIndex = index
78 } 84 }
79 } 85 }
80 return nil 86 return nil
81 }) 87 })
82 } 88 }
83 89
84 // Get implements storage.Storage. 90 // Get implements storage.Storage.
85 func (s *Storage) Get(req storage.GetRequest, cb storage.GetCallback) error { 91 func (s *Storage) Get(req storage.GetRequest, cb storage.GetCallback) error {
86 recs := []*rec(nil) 92 recs := []*rec(nil)
87 err := s.run(func() error { 93 err := s.run(func() error {
88 » » ls := s.getLogStreamLocked(req.Path, false) 94 » » ls := s.getLogStreamLocked(req.Project, req.Path, false)
89 if ls == nil { 95 if ls == nil {
90 return storage.ErrDoesNotExist 96 return storage.ErrDoesNotExist
91 } 97 }
92 98
93 limit := len(ls.logs) 99 limit := len(ls.logs)
94 if req.Limit > 0 && req.Limit < limit { 100 if req.Limit > 0 && req.Limit < limit {
95 limit = req.Limit 101 limit = req.Limit
96 } 102 }
97 if s.MaxGetCount > 0 && s.MaxGetCount < limit { 103 if s.MaxGetCount > 0 && s.MaxGetCount < limit {
98 limit = s.MaxGetCount 104 limit = s.MaxGetCount
(...skipping 29 matching lines...) Expand all
128 } 134 }
129 if !cb(r.index, dataCopy) { 135 if !cb(r.index, dataCopy) {
130 break 136 break
131 } 137 }
132 } 138 }
133 139
134 return nil 140 return nil
135 } 141 }
136 142
137 // Tail implements storage.Storage. 143 // Tail implements storage.Storage.
138 func (s *Storage) Tail(p types.StreamPath) ([]byte, types.MessageIndex, error) { 144 func (s *Storage) Tail(project config.ProjectName, path types.StreamPath) ([]byt e, types.MessageIndex, error) {
139 var r *rec 145 var r *rec
140 146
141 // Find the latest log, then return it. 147 // Find the latest log, then return it.
142 err := s.run(func() error { 148 err := s.run(func() error {
143 » » ls := s.getLogStreamLocked(p, false) 149 » » ls := s.getLogStreamLocked(project, path, false)
144 if ls == nil { 150 if ls == nil {
145 return storage.ErrDoesNotExist 151 return storage.ErrDoesNotExist
146 } 152 }
147 153
148 r = &rec{ 154 r = &rec{
149 index: ls.latestIndex, 155 index: ls.latestIndex,
150 data: ls.logs[ls.latestIndex], 156 data: ls.logs[ls.latestIndex],
151 } 157 }
152 return nil 158 return nil
153 }) 159 })
154 if err != nil { 160 if err != nil {
155 return nil, 0, err 161 return nil, 0, err
156 } 162 }
157 return r.data, r.index, nil 163 return r.data, r.index, nil
158 } 164 }
159 165
160 // Count returns the number of log records for the given stream. 166 // Count returns the number of log records for the given stream.
161 func (s *Storage) Count(p types.StreamPath) (c int) { 167 func (s *Storage) Count(project config.ProjectName, path types.StreamPath) (c in t) {
162 s.run(func() error { 168 s.run(func() error {
163 » » st := s.streams[p] 169 » » if st := s.getLogStreamLocked(project, path, false); st != nil {
164 » » if st != nil {
165 c = len(st.logs) 170 c = len(st.logs)
166 } 171 }
167 return nil 172 return nil
168 }) 173 })
169 return 174 return
170 } 175 }
171 176
172 // SetErr sets the storage's error value. If not nil, all operations will fail 177 // SetErr sets the storage's error value. If not nil, all operations will fail
173 // with this error. 178 // with this error.
174 func (s *Storage) SetErr(err error) { 179 func (s *Storage) SetErr(err error) {
175 s.stateMu.Lock() 180 s.stateMu.Lock()
176 defer s.stateMu.Unlock() 181 defer s.stateMu.Unlock()
177 s.err = err 182 s.err = err
178 } 183 }
179 184
180 func (s *Storage) run(f func() error) error { 185 func (s *Storage) run(f func() error) error {
181 s.stateMu.Lock() 186 s.stateMu.Lock()
182 defer s.stateMu.Unlock() 187 defer s.stateMu.Unlock()
183 188
184 if s.err != nil { 189 if s.err != nil {
185 return s.err 190 return s.err
186 } 191 }
187 if s.closed { 192 if s.closed {
188 return errors.New("storage is closed") 193 return errors.New("storage is closed")
189 } 194 }
190 return f() 195 return f()
191 } 196 }
192 197
193 func (s *Storage) getLogStreamLocked(p types.StreamPath, create bool) *logStream { 198 func (s *Storage) getLogStreamLocked(project config.ProjectName, path types.Stre amPath, create bool) *logStream {
194 » ls := s.streams[p] 199 » key := streamKey{
200 » » project: project,
201 » » path: path,
202 » }
203
204 » ls := s.streams[key]
195 if ls == nil && create { 205 if ls == nil && create {
196 ls = &logStream{ 206 ls = &logStream{
197 logs: map[types.MessageIndex][]byte{}, 207 logs: map[types.MessageIndex][]byte{},
198 latestIndex: -1, 208 latestIndex: -1,
199 } 209 }
200 210
201 if s.streams == nil { 211 if s.streams == nil {
202 » » » s.streams = map[types.StreamPath]*logStream{} 212 » » » s.streams = map[streamKey]*logStream{}
203 } 213 }
204 » » s.streams[p] = ls 214 » » s.streams[key] = ls
205 } 215 }
206 216
207 return ls 217 return ls
208 } 218 }
OLDNEW
« no previous file with comments | « server/logdog/storage/bigtable/storage_test.go ('k') | server/logdog/storage/memory/memory_test.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698