OLD | NEW |
1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 package memory | 5 package memory |
6 | 6 |
7 import ( | 7 import ( |
8 "errors" | 8 "errors" |
9 "sync" | 9 "sync" |
10 "time" | 10 "time" |
11 | 11 |
| 12 "github.com/luci/luci-go/common/config" |
12 "github.com/luci/luci-go/common/logdog/types" | 13 "github.com/luci/luci-go/common/logdog/types" |
13 "github.com/luci/luci-go/server/logdog/storage" | 14 "github.com/luci/luci-go/server/logdog/storage" |
14 ) | 15 ) |
15 | 16 |
16 type logStream struct { | 17 type logStream struct { |
17 logs map[types.MessageIndex][]byte | 18 logs map[types.MessageIndex][]byte |
18 latestIndex types.MessageIndex | 19 latestIndex types.MessageIndex |
19 } | 20 } |
20 | 21 |
21 type rec struct { | 22 type rec struct { |
22 index types.MessageIndex | 23 index types.MessageIndex |
23 data []byte | 24 data []byte |
24 } | 25 } |
25 | 26 |
| 27 type streamKey struct { |
| 28 project config.ProjectName |
| 29 path types.StreamPath |
| 30 } |
| 31 |
26 // Storage is an implementation of the storage.Storage interface that stores | 32 // Storage is an implementation of the storage.Storage interface that stores |
27 // data in memory. | 33 // data in memory. |
28 // | 34 // |
29 // This is intended for testing, and not intended to be performant. | 35 // This is intended for testing, and not intended to be performant. |
30 type Storage struct { | 36 type Storage struct { |
31 // MaxGetCount, if not zero, is the maximum number of records to retriev
e from | 37 // MaxGetCount, if not zero, is the maximum number of records to retriev
e from |
32 // a single Get request. | 38 // a single Get request. |
33 MaxGetCount int | 39 MaxGetCount int |
34 | 40 |
35 // MaxLogAge is the configured maximum log age. | 41 // MaxLogAge is the configured maximum log age. |
36 MaxLogAge time.Duration | 42 MaxLogAge time.Duration |
37 | 43 |
38 stateMu sync.Mutex | 44 stateMu sync.Mutex |
39 » streams map[types.StreamPath]*logStream | 45 » streams map[streamKey]*logStream |
40 closed bool | 46 closed bool |
41 err error | 47 err error |
42 } | 48 } |
43 | 49 |
44 var _ storage.Storage = (*Storage)(nil) | 50 var _ storage.Storage = (*Storage)(nil) |
45 | 51 |
46 // Close implements storage.Storage. | 52 // Close implements storage.Storage. |
47 func (s *Storage) Close() { | 53 func (s *Storage) Close() { |
48 s.run(func() error { | 54 s.run(func() error { |
49 s.closed = true | 55 s.closed = true |
50 return nil | 56 return nil |
51 }) | 57 }) |
52 } | 58 } |
53 | 59 |
54 // Config implements storage.Storage. | 60 // Config implements storage.Storage. |
55 func (s *Storage) Config(cfg storage.Config) error { | 61 func (s *Storage) Config(cfg storage.Config) error { |
56 return s.run(func() error { | 62 return s.run(func() error { |
57 s.MaxLogAge = cfg.MaxLogAge | 63 s.MaxLogAge = cfg.MaxLogAge |
58 return nil | 64 return nil |
59 }) | 65 }) |
60 } | 66 } |
61 | 67 |
62 // Put implements storage.Storage. | 68 // Put implements storage.Storage. |
63 func (s *Storage) Put(req storage.PutRequest) error { | 69 func (s *Storage) Put(req storage.PutRequest) error { |
64 return s.run(func() error { | 70 return s.run(func() error { |
65 » » ls := s.getLogStreamLocked(req.Path, true) | 71 » » ls := s.getLogStreamLocked(req.Project, req.Path, true) |
66 | 72 |
67 for i, v := range req.Values { | 73 for i, v := range req.Values { |
68 index := req.Index + types.MessageIndex(i) | 74 index := req.Index + types.MessageIndex(i) |
69 if _, ok := ls.logs[index]; ok { | 75 if _, ok := ls.logs[index]; ok { |
70 return storage.ErrExists | 76 return storage.ErrExists |
71 } | 77 } |
72 | 78 |
73 clone := make([]byte, len(v)) | 79 clone := make([]byte, len(v)) |
74 copy(clone, v) | 80 copy(clone, v) |
75 ls.logs[index] = clone | 81 ls.logs[index] = clone |
76 if index > ls.latestIndex { | 82 if index > ls.latestIndex { |
77 ls.latestIndex = index | 83 ls.latestIndex = index |
78 } | 84 } |
79 } | 85 } |
80 return nil | 86 return nil |
81 }) | 87 }) |
82 } | 88 } |
83 | 89 |
84 // Get implements storage.Storage. | 90 // Get implements storage.Storage. |
85 func (s *Storage) Get(req storage.GetRequest, cb storage.GetCallback) error { | 91 func (s *Storage) Get(req storage.GetRequest, cb storage.GetCallback) error { |
86 recs := []*rec(nil) | 92 recs := []*rec(nil) |
87 err := s.run(func() error { | 93 err := s.run(func() error { |
88 » » ls := s.getLogStreamLocked(req.Path, false) | 94 » » ls := s.getLogStreamLocked(req.Project, req.Path, false) |
89 if ls == nil { | 95 if ls == nil { |
90 return storage.ErrDoesNotExist | 96 return storage.ErrDoesNotExist |
91 } | 97 } |
92 | 98 |
93 limit := len(ls.logs) | 99 limit := len(ls.logs) |
94 if req.Limit > 0 && req.Limit < limit { | 100 if req.Limit > 0 && req.Limit < limit { |
95 limit = req.Limit | 101 limit = req.Limit |
96 } | 102 } |
97 if s.MaxGetCount > 0 && s.MaxGetCount < limit { | 103 if s.MaxGetCount > 0 && s.MaxGetCount < limit { |
98 limit = s.MaxGetCount | 104 limit = s.MaxGetCount |
(...skipping 29 matching lines...) Expand all Loading... |
128 } | 134 } |
129 if !cb(r.index, dataCopy) { | 135 if !cb(r.index, dataCopy) { |
130 break | 136 break |
131 } | 137 } |
132 } | 138 } |
133 | 139 |
134 return nil | 140 return nil |
135 } | 141 } |
136 | 142 |
137 // Tail implements storage.Storage. | 143 // Tail implements storage.Storage. |
138 func (s *Storage) Tail(p types.StreamPath) ([]byte, types.MessageIndex, error) { | 144 func (s *Storage) Tail(project config.ProjectName, path types.StreamPath) ([]byt
e, types.MessageIndex, error) { |
139 var r *rec | 145 var r *rec |
140 | 146 |
141 // Find the latest log, then return it. | 147 // Find the latest log, then return it. |
142 err := s.run(func() error { | 148 err := s.run(func() error { |
143 » » ls := s.getLogStreamLocked(p, false) | 149 » » ls := s.getLogStreamLocked(project, path, false) |
144 if ls == nil { | 150 if ls == nil { |
145 return storage.ErrDoesNotExist | 151 return storage.ErrDoesNotExist |
146 } | 152 } |
147 | 153 |
148 r = &rec{ | 154 r = &rec{ |
149 index: ls.latestIndex, | 155 index: ls.latestIndex, |
150 data: ls.logs[ls.latestIndex], | 156 data: ls.logs[ls.latestIndex], |
151 } | 157 } |
152 return nil | 158 return nil |
153 }) | 159 }) |
154 if err != nil { | 160 if err != nil { |
155 return nil, 0, err | 161 return nil, 0, err |
156 } | 162 } |
157 return r.data, r.index, nil | 163 return r.data, r.index, nil |
158 } | 164 } |
159 | 165 |
160 // Count returns the number of log records for the given stream. | 166 // Count returns the number of log records for the given stream. |
161 func (s *Storage) Count(p types.StreamPath) (c int) { | 167 func (s *Storage) Count(project config.ProjectName, path types.StreamPath) (c in
t) { |
162 s.run(func() error { | 168 s.run(func() error { |
163 » » st := s.streams[p] | 169 » » if st := s.getLogStreamLocked(project, path, false); st != nil { |
164 » » if st != nil { | |
165 c = len(st.logs) | 170 c = len(st.logs) |
166 } | 171 } |
167 return nil | 172 return nil |
168 }) | 173 }) |
169 return | 174 return |
170 } | 175 } |
171 | 176 |
172 // SetErr sets the storage's error value. If not nil, all operations will fail | 177 // SetErr sets the storage's error value. If not nil, all operations will fail |
173 // with this error. | 178 // with this error. |
174 func (s *Storage) SetErr(err error) { | 179 func (s *Storage) SetErr(err error) { |
175 s.stateMu.Lock() | 180 s.stateMu.Lock() |
176 defer s.stateMu.Unlock() | 181 defer s.stateMu.Unlock() |
177 s.err = err | 182 s.err = err |
178 } | 183 } |
179 | 184 |
180 func (s *Storage) run(f func() error) error { | 185 func (s *Storage) run(f func() error) error { |
181 s.stateMu.Lock() | 186 s.stateMu.Lock() |
182 defer s.stateMu.Unlock() | 187 defer s.stateMu.Unlock() |
183 | 188 |
184 if s.err != nil { | 189 if s.err != nil { |
185 return s.err | 190 return s.err |
186 } | 191 } |
187 if s.closed { | 192 if s.closed { |
188 return errors.New("storage is closed") | 193 return errors.New("storage is closed") |
189 } | 194 } |
190 return f() | 195 return f() |
191 } | 196 } |
192 | 197 |
193 func (s *Storage) getLogStreamLocked(p types.StreamPath, create bool) *logStream
{ | 198 func (s *Storage) getLogStreamLocked(project config.ProjectName, path types.Stre
amPath, create bool) *logStream { |
194 » ls := s.streams[p] | 199 » key := streamKey{ |
| 200 » » project: project, |
| 201 » » path: path, |
| 202 » } |
| 203 |
| 204 » ls := s.streams[key] |
195 if ls == nil && create { | 205 if ls == nil && create { |
196 ls = &logStream{ | 206 ls = &logStream{ |
197 logs: map[types.MessageIndex][]byte{}, | 207 logs: map[types.MessageIndex][]byte{}, |
198 latestIndex: -1, | 208 latestIndex: -1, |
199 } | 209 } |
200 | 210 |
201 if s.streams == nil { | 211 if s.streams == nil { |
202 » » » s.streams = map[types.StreamPath]*logStream{} | 212 » » » s.streams = map[streamKey]*logStream{} |
203 } | 213 } |
204 » » s.streams[p] = ls | 214 » » s.streams[key] = ls |
205 } | 215 } |
206 | 216 |
207 return ls | 217 return ls |
208 } | 218 } |
OLD | NEW |