| OLD | NEW |
| 1 // Copyright 2015 The LUCI Authors. All rights reserved. | 1 // Copyright 2015 The LUCI Authors. All rights reserved. |
| 2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
| 3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
| 4 | 4 |
| 5 package memory | 5 package memory |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "errors" | 8 "errors" |
| 9 "sync" | 9 "sync" |
| 10 "time" | 10 "time" |
| (...skipping 114 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 125 | 125 |
| 126 // Punt all of the records upstream. We copy the data to prevent the | 126 // Punt all of the records upstream. We copy the data to prevent the |
| 127 // callback from accidentally mutating it. We reuse the data buffer to t
ry | 127 // callback from accidentally mutating it. We reuse the data buffer to t
ry |
| 128 // and catch errors when the callback retains the data. | 128 // and catch errors when the callback retains the data. |
| 129 for _, r := range recs { | 129 for _, r := range recs { |
| 130 var dataCopy []byte | 130 var dataCopy []byte |
| 131 if !req.KeysOnly { | 131 if !req.KeysOnly { |
| 132 dataCopy = make([]byte, len(r.data)) | 132 dataCopy = make([]byte, len(r.data)) |
| 133 copy(dataCopy, r.data) | 133 copy(dataCopy, r.data) |
| 134 } | 134 } |
| 135 » » if !cb(r.index, dataCopy) { | 135 » » if !cb(storage.MakeEntry(dataCopy, r.index)) { |
| 136 break | 136 break |
| 137 } | 137 } |
| 138 } | 138 } |
| 139 | 139 |
| 140 return nil | 140 return nil |
| 141 } | 141 } |
| 142 | 142 |
| 143 // Tail implements storage.Storage. | 143 // Tail implements storage.Storage. |
| 144 func (s *Storage) Tail(project config.ProjectName, path types.StreamPath) ([]byt
e, types.MessageIndex, error) { | 144 func (s *Storage) Tail(project config.ProjectName, path types.StreamPath) (*stor
age.Entry, error) { |
| 145 var r *rec | 145 var r *rec |
| 146 | 146 |
| 147 // Find the latest log, then return it. | 147 // Find the latest log, then return it. |
| 148 err := s.run(func() error { | 148 err := s.run(func() error { |
| 149 ls := s.getLogStreamLocked(project, path, false) | 149 ls := s.getLogStreamLocked(project, path, false) |
| 150 if ls == nil { | 150 if ls == nil { |
| 151 return storage.ErrDoesNotExist | 151 return storage.ErrDoesNotExist |
| 152 } | 152 } |
| 153 | 153 |
| 154 r = &rec{ | 154 r = &rec{ |
| 155 index: ls.latestIndex, | 155 index: ls.latestIndex, |
| 156 data: ls.logs[ls.latestIndex], | 156 data: ls.logs[ls.latestIndex], |
| 157 } | 157 } |
| 158 return nil | 158 return nil |
| 159 }) | 159 }) |
| 160 if err != nil { | 160 if err != nil { |
| 161 » » return nil, 0, err | 161 » » return nil, err |
| 162 } | 162 } |
| 163 » return r.data, r.index, nil | 163 » return storage.MakeEntry(r.data, r.index), nil |
| 164 } | 164 } |
| 165 | 165 |
| 166 // Count returns the number of log records for the given stream. | 166 // Count returns the number of log records for the given stream. |
| 167 func (s *Storage) Count(project config.ProjectName, path types.StreamPath) (c in
t) { | 167 func (s *Storage) Count(project config.ProjectName, path types.StreamPath) (c in
t) { |
| 168 s.run(func() error { | 168 s.run(func() error { |
| 169 if st := s.getLogStreamLocked(project, path, false); st != nil { | 169 if st := s.getLogStreamLocked(project, path, false); st != nil { |
| 170 c = len(st.logs) | 170 c = len(st.logs) |
| 171 } | 171 } |
| 172 return nil | 172 return nil |
| 173 }) | 173 }) |
| (...skipping 35 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 209 } | 209 } |
| 210 | 210 |
| 211 if s.streams == nil { | 211 if s.streams == nil { |
| 212 s.streams = map[streamKey]*logStream{} | 212 s.streams = map[streamKey]*logStream{} |
| 213 } | 213 } |
| 214 s.streams[key] = ls | 214 s.streams[key] = ls |
| 215 } | 215 } |
| 216 | 216 |
| 217 return ls | 217 return ls |
| 218 } | 218 } |
| OLD | NEW |