| OLD | NEW |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 package memory | 5 package memory |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "errors" | 8 "errors" |
| 9 "sync" | 9 "sync" |
| 10 "time" | 10 "time" |
| 11 | 11 |
| 12 "github.com/luci/luci-go/common/config" |
| 12 "github.com/luci/luci-go/common/logdog/types" | 13 "github.com/luci/luci-go/common/logdog/types" |
| 13 "github.com/luci/luci-go/server/logdog/storage" | 14 "github.com/luci/luci-go/server/logdog/storage" |
| 14 ) | 15 ) |
| 15 | 16 |
| 16 type logStream struct { | 17 type logStream struct { |
| 17 logs map[types.MessageIndex][]byte | 18 logs map[types.MessageIndex][]byte |
| 18 latestIndex types.MessageIndex | 19 latestIndex types.MessageIndex |
| 19 } | 20 } |
| 20 | 21 |
| 21 type rec struct { | 22 type rec struct { |
| 22 index types.MessageIndex | 23 index types.MessageIndex |
| 23 data []byte | 24 data []byte |
| 24 } | 25 } |
| 25 | 26 |
| 27 type streamKey struct { |
| 28 project config.ProjectName |
| 29 path types.StreamPath |
| 30 } |
| 31 |
| 26 // Storage is an implementation of the storage.Storage interface that stores | 32 // Storage is an implementation of the storage.Storage interface that stores |
| 27 // data in memory. | 33 // data in memory. |
| 28 // | 34 // |
| 29 // This is intended for testing, and not intended to be performant. | 35 // This is intended for testing, and not intended to be performant. |
| 30 type Storage struct { | 36 type Storage struct { |
| 31 // MaxGetCount, if not zero, is the maximum number of records to retriev
e from | 37 // MaxGetCount, if not zero, is the maximum number of records to retriev
e from |
| 32 // a single Get request. | 38 // a single Get request. |
| 33 MaxGetCount int | 39 MaxGetCount int |
| 34 | 40 |
| 35 // MaxLogAge is the configured maximum log age. | 41 // MaxLogAge is the configured maximum log age. |
| 36 MaxLogAge time.Duration | 42 MaxLogAge time.Duration |
| 37 | 43 |
| 38 stateMu sync.Mutex | 44 stateMu sync.Mutex |
| 39 » streams map[types.StreamPath]*logStream | 45 » streams map[streamKey]*logStream |
| 40 closed bool | 46 closed bool |
| 41 err error | 47 err error |
| 42 } | 48 } |
| 43 | 49 |
| 44 var _ storage.Storage = (*Storage)(nil) | 50 var _ storage.Storage = (*Storage)(nil) |
| 45 | 51 |
| 46 // Close implements storage.Storage. | 52 // Close implements storage.Storage. |
| 47 func (s *Storage) Close() { | 53 func (s *Storage) Close() { |
| 48 s.run(func() error { | 54 s.run(func() error { |
| 49 s.closed = true | 55 s.closed = true |
| 50 return nil | 56 return nil |
| 51 }) | 57 }) |
| 52 } | 58 } |
| 53 | 59 |
| 54 // Config implements storage.Storage. | 60 // Config implements storage.Storage. |
| 55 func (s *Storage) Config(cfg storage.Config) error { | 61 func (s *Storage) Config(cfg storage.Config) error { |
| 56 return s.run(func() error { | 62 return s.run(func() error { |
| 57 s.MaxLogAge = cfg.MaxLogAge | 63 s.MaxLogAge = cfg.MaxLogAge |
| 58 return nil | 64 return nil |
| 59 }) | 65 }) |
| 60 } | 66 } |
| 61 | 67 |
| 62 // Put implements storage.Storage. | 68 // Put implements storage.Storage. |
| 63 func (s *Storage) Put(req storage.PutRequest) error { | 69 func (s *Storage) Put(req storage.PutRequest) error { |
| 64 return s.run(func() error { | 70 return s.run(func() error { |
| 65 » » ls := s.getLogStreamLocked(req.Path, true) | 71 » » ls := s.getLogStreamLocked(req.Project, req.Path, true) |
| 66 | 72 |
| 67 for i, v := range req.Values { | 73 for i, v := range req.Values { |
| 68 index := req.Index + types.MessageIndex(i) | 74 index := req.Index + types.MessageIndex(i) |
| 69 if _, ok := ls.logs[index]; ok { | 75 if _, ok := ls.logs[index]; ok { |
| 70 return storage.ErrExists | 76 return storage.ErrExists |
| 71 } | 77 } |
| 72 | 78 |
| 73 clone := make([]byte, len(v)) | 79 clone := make([]byte, len(v)) |
| 74 copy(clone, v) | 80 copy(clone, v) |
| 75 ls.logs[index] = clone | 81 ls.logs[index] = clone |
| 76 if index > ls.latestIndex { | 82 if index > ls.latestIndex { |
| 77 ls.latestIndex = index | 83 ls.latestIndex = index |
| 78 } | 84 } |
| 79 } | 85 } |
| 80 return nil | 86 return nil |
| 81 }) | 87 }) |
| 82 } | 88 } |
| 83 | 89 |
| 84 // Get implements storage.Storage. | 90 // Get implements storage.Storage. |
| 85 func (s *Storage) Get(req storage.GetRequest, cb storage.GetCallback) error { | 91 func (s *Storage) Get(req storage.GetRequest, cb storage.GetCallback) error { |
| 86 recs := []*rec(nil) | 92 recs := []*rec(nil) |
| 87 err := s.run(func() error { | 93 err := s.run(func() error { |
| 88 » » ls := s.getLogStreamLocked(req.Path, false) | 94 » » ls := s.getLogStreamLocked(req.Project, req.Path, false) |
| 89 if ls == nil { | 95 if ls == nil { |
| 90 return storage.ErrDoesNotExist | 96 return storage.ErrDoesNotExist |
| 91 } | 97 } |
| 92 | 98 |
| 93 limit := len(ls.logs) | 99 limit := len(ls.logs) |
| 94 if req.Limit > 0 && req.Limit < limit { | 100 if req.Limit > 0 && req.Limit < limit { |
| 95 limit = req.Limit | 101 limit = req.Limit |
| 96 } | 102 } |
| 97 if s.MaxGetCount > 0 && s.MaxGetCount < limit { | 103 if s.MaxGetCount > 0 && s.MaxGetCount < limit { |
| 98 limit = s.MaxGetCount | 104 limit = s.MaxGetCount |
| (...skipping 29 matching lines...) Expand all Loading... |
| 128 } | 134 } |
| 129 if !cb(r.index, dataCopy) { | 135 if !cb(r.index, dataCopy) { |
| 130 break | 136 break |
| 131 } | 137 } |
| 132 } | 138 } |
| 133 | 139 |
| 134 return nil | 140 return nil |
| 135 } | 141 } |
| 136 | 142 |
| 137 // Tail implements storage.Storage. | 143 // Tail implements storage.Storage. |
| 138 func (s *Storage) Tail(p types.StreamPath) ([]byte, types.MessageIndex, error) { | 144 func (s *Storage) Tail(project config.ProjectName, path types.StreamPath) ([]byt
e, types.MessageIndex, error) { |
| 139 var r *rec | 145 var r *rec |
| 140 | 146 |
| 141 // Find the latest log, then return it. | 147 // Find the latest log, then return it. |
| 142 err := s.run(func() error { | 148 err := s.run(func() error { |
| 143 » » ls := s.getLogStreamLocked(p, false) | 149 » » ls := s.getLogStreamLocked(project, path, false) |
| 144 if ls == nil { | 150 if ls == nil { |
| 145 return storage.ErrDoesNotExist | 151 return storage.ErrDoesNotExist |
| 146 } | 152 } |
| 147 | 153 |
| 148 r = &rec{ | 154 r = &rec{ |
| 149 index: ls.latestIndex, | 155 index: ls.latestIndex, |
| 150 data: ls.logs[ls.latestIndex], | 156 data: ls.logs[ls.latestIndex], |
| 151 } | 157 } |
| 152 return nil | 158 return nil |
| 153 }) | 159 }) |
| 154 if err != nil { | 160 if err != nil { |
| 155 return nil, 0, err | 161 return nil, 0, err |
| 156 } | 162 } |
| 157 return r.data, r.index, nil | 163 return r.data, r.index, nil |
| 158 } | 164 } |
| 159 | 165 |
| 160 // Count returns the number of log records for the given stream. | 166 // Count returns the number of log records for the given stream. |
| 161 func (s *Storage) Count(p types.StreamPath) (c int) { | 167 func (s *Storage) Count(project config.ProjectName, path types.StreamPath) (c in
t) { |
| 162 s.run(func() error { | 168 s.run(func() error { |
| 163 » » st := s.streams[p] | 169 » » if st := s.getLogStreamLocked(project, path, false); st != nil { |
| 164 » » if st != nil { | |
| 165 c = len(st.logs) | 170 c = len(st.logs) |
| 166 } | 171 } |
| 167 return nil | 172 return nil |
| 168 }) | 173 }) |
| 169 return | 174 return |
| 170 } | 175 } |
| 171 | 176 |
| 172 // SetErr sets the storage's error value. If not nil, all operations will fail | 177 // SetErr sets the storage's error value. If not nil, all operations will fail |
| 173 // with this error. | 178 // with this error. |
| 174 func (s *Storage) SetErr(err error) { | 179 func (s *Storage) SetErr(err error) { |
| 175 s.stateMu.Lock() | 180 s.stateMu.Lock() |
| 176 defer s.stateMu.Unlock() | 181 defer s.stateMu.Unlock() |
| 177 s.err = err | 182 s.err = err |
| 178 } | 183 } |
| 179 | 184 |
| 180 func (s *Storage) run(f func() error) error { | 185 func (s *Storage) run(f func() error) error { |
| 181 s.stateMu.Lock() | 186 s.stateMu.Lock() |
| 182 defer s.stateMu.Unlock() | 187 defer s.stateMu.Unlock() |
| 183 | 188 |
| 184 if s.err != nil { | 189 if s.err != nil { |
| 185 return s.err | 190 return s.err |
| 186 } | 191 } |
| 187 if s.closed { | 192 if s.closed { |
| 188 return errors.New("storage is closed") | 193 return errors.New("storage is closed") |
| 189 } | 194 } |
| 190 return f() | 195 return f() |
| 191 } | 196 } |
| 192 | 197 |
| 193 func (s *Storage) getLogStreamLocked(p types.StreamPath, create bool) *logStream
{ | 198 func (s *Storage) getLogStreamLocked(project config.ProjectName, path types.Stre
amPath, create bool) *logStream { |
| 194 » ls := s.streams[p] | 199 » key := streamKey{ |
| 200 » » project: project, |
| 201 » » path: path, |
| 202 » } |
| 203 |
| 204 » ls := s.streams[key] |
| 195 if ls == nil && create { | 205 if ls == nil && create { |
| 196 ls = &logStream{ | 206 ls = &logStream{ |
| 197 logs: map[types.MessageIndex][]byte{}, | 207 logs: map[types.MessageIndex][]byte{}, |
| 198 latestIndex: -1, | 208 latestIndex: -1, |
| 199 } | 209 } |
| 200 | 210 |
| 201 if s.streams == nil { | 211 if s.streams == nil { |
| 202 » » » s.streams = map[types.StreamPath]*logStream{} | 212 » » » s.streams = map[streamKey]*logStream{} |
| 203 } | 213 } |
| 204 » » s.streams[p] = ls | 214 » » s.streams[key] = ls |
| 205 } | 215 } |
| 206 | 216 |
| 207 return ls | 217 return ls |
| 208 } | 218 } |
| OLD | NEW |