Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(305)

Side by Side Diff: logdog/common/storage/archive/storage.go

Issue 2963503003: [errors] Greatly simplify common/errors package. (Closed)
Patch Set: fix nits Created 3 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The LUCI Authors. All rights reserved. 1 // Copyright 2015 The LUCI Authors. All rights reserved.
2 // Use of this source code is governed under the Apache License, Version 2.0 2 // Use of this source code is governed under the Apache License, Version 2.0
3 // that can be found in the LICENSE file. 3 // that can be found in the LICENSE file.
4 4
5 // Package archive implements a storage.Storage instance that retrieves logs 5 // Package archive implements a storage.Storage instance that retrieves logs
6 // from a Google Storage archive. 6 // from a Google Storage archive.
7 // 7 //
8 // This is a special implementation of storage.Storage, and does not fully 8 // This is a special implementation of storage.Storage, and does not fully
9 // conform to the API expecations. Namely: 9 // conform to the API expecations. Namely:
10 // - It is read-only. Mutation methods will return storage.ErrReadOnly. 10 // - It is read-only. Mutation methods will return storage.ErrReadOnly.
(...skipping 98 matching lines...) Expand 10 before | Expand all | Expand 10 after
109 109
110 switch err := s.getLogEntriesIter(st, cb); errors.Unwrap(err) { 110 switch err := s.getLogEntriesIter(st, cb); errors.Unwrap(err) {
111 case nil, io.EOF: 111 case nil, io.EOF:
112 // We hit the end of our log stream. 112 // We hit the end of our log stream.
113 return nil 113 return nil
114 114
115 case cloudStorage.ErrObjectNotExist, cloudStorage.ErrBucketNotExist: 115 case cloudStorage.ErrObjectNotExist, cloudStorage.ErrBucketNotExist:
116 return storage.ErrDoesNotExist 116 return storage.ErrDoesNotExist
117 117
118 default: 118 default:
119 » » return errors.Annotate(err).Reason("failed to read log stream"). Err() 119 » » return errors.Annotate(err, "failed to read log stream").Err()
120 } 120 }
121 } 121 }
122 122
123 // getLogEntriesImpl retrieves log entries from archive until complete. 123 // getLogEntriesImpl retrieves log entries from archive until complete.
124 func (s *storageImpl) getLogEntriesIter(st *getStrategy, cb storage.GetCallback) error { 124 func (s *storageImpl) getLogEntriesIter(st *getStrategy, cb storage.GetCallback) error {
125 // Get our maximum byte limit. If we are externally constrained via MaxB ytes, 125 // Get our maximum byte limit. If we are externally constrained via MaxB ytes,
126 // apply that limit too. 126 // apply that limit too.
127 // Get an archive reader. 127 // Get an archive reader.
128 var ( 128 var (
129 offset = st.startOffset 129 offset = st.startOffset
130 length = st.length() 130 length = st.length()
131 ) 131 )
132 132
133 log.Fields{ 133 log.Fields{
134 "offset": offset, 134 "offset": offset,
135 "length": length, 135 "length": length,
136 "path": s.Stream, 136 "path": s.Stream,
137 }.Debugf(s, "Creating stream reader for range.") 137 }.Debugf(s, "Creating stream reader for range.")
138 storageReader, err := s.Client.NewReader(s.Stream, int64(offset), length ) 138 storageReader, err := s.Client.NewReader(s.Stream, int64(offset), length )
139 if err != nil { 139 if err != nil {
140 log.WithError(err).Errorf(s, "Failed to create stream Reader.") 140 log.WithError(err).Errorf(s, "Failed to create stream Reader.")
141 » » return errors.Annotate(err).Reason("failed to create stream Read er").Err() 141 » » return errors.Annotate(err, "failed to create stream Reader").Er r()
142 } 142 }
143 defer func() { 143 defer func() {
144 if tmpErr := storageReader.Close(); tmpErr != nil { 144 if tmpErr := storageReader.Close(); tmpErr != nil {
145 // (Non-fatal) 145 // (Non-fatal)
146 log.WithError(tmpErr).Warningf(s, "Error closing stream Reader.") 146 log.WithError(tmpErr).Warningf(s, "Error closing stream Reader.")
147 } 147 }
148 }() 148 }()
149 149
150 // Count how many bytes we've read. 150 // Count how many bytes we've read.
151 cr := iotools.CountingReader{Reader: storageReader} 151 cr := iotools.CountingReader{Reader: storageReader}
152 152
153 // Iteratively update our strategy's start offset each time we read a co mplete 153 // Iteratively update our strategy's start offset each time we read a co mplete
154 // frame. 154 // frame.
155 var ( 155 var (
156 rio = recordio.NewReader(&cr, maxStreamRecordSize) 156 rio = recordio.NewReader(&cr, maxStreamRecordSize)
157 buf bytes.Buffer 157 buf bytes.Buffer
158 remaining = st.count 158 remaining = st.count
159 ) 159 )
160 for { 160 for {
161 // Reset the count so we know how much we read for this frame. 161 // Reset the count so we know how much we read for this frame.
162 cr.Count = 0 162 cr.Count = 0
163 163
164 sz, r, err := rio.ReadFrame() 164 sz, r, err := rio.ReadFrame()
165 if err != nil { 165 if err != nil {
166 » » » return errors.Annotate(err).Reason("failed to read frame ").Err() 166 » » » return errors.Annotate(err, "failed to read frame").Err( )
167 } 167 }
168 168
169 buf.Reset() 169 buf.Reset()
170 buf.Grow(int(sz)) 170 buf.Grow(int(sz))
171 171
172 switch amt, err := buf.ReadFrom(r); { 172 switch amt, err := buf.ReadFrom(r); {
173 case err != nil: 173 case err != nil:
174 log.Fields{ 174 log.Fields{
175 log.ErrorKey: err, 175 log.ErrorKey: err,
176 "frameOffset": offset, 176 "frameOffset": offset,
177 "frameSize": sz, 177 "frameSize": sz,
178 }.Errorf(s, "Failed to read frame data.") 178 }.Errorf(s, "Failed to read frame data.")
179 » » » return errors.Annotate(err).Reason("failed to read frame data").Err() 179 » » » return errors.Annotate(err, "failed to read frame data") .Err()
180 180
181 case amt != sz: 181 case amt != sz:
182 // If we didn't buffer the complete frame, we hit a prem ature EOF. 182 // If we didn't buffer the complete frame, we hit a prem ature EOF.
183 » » » return errors.Annotate(io.EOF).Reason("incomplete frame read").Err() 183 » » » return errors.Annotate(io.EOF, "incomplete frame read"). Err()
184 } 184 }
185 185
186 // If we read from offset 0, the first frame will be the log str eam's 186 // If we read from offset 0, the first frame will be the log str eam's
187 // descriptor, which we can discard. 187 // descriptor, which we can discard.
188 discardFrame := (offset == 0) 188 discardFrame := (offset == 0)
189 offset += uint64(cr.Count) 189 offset += uint64(cr.Count)
190 if discardFrame { 190 if discardFrame {
191 continue 191 continue
192 } 192 }
193 193
194 // Punt this log entry to our callback, if appropriate. 194 // Punt this log entry to our callback, if appropriate.
195 entry := storage.MakeEntry(buf.Bytes(), -1) 195 entry := storage.MakeEntry(buf.Bytes(), -1)
196 switch idx, err := entry.GetStreamIndex(); { 196 switch idx, err := entry.GetStreamIndex(); {
197 case err != nil: 197 case err != nil:
198 log.Fields{ 198 log.Fields{
199 log.ErrorKey: err, 199 log.ErrorKey: err,
200 "frameOffset": offset, 200 "frameOffset": offset,
201 "frameSize": sz, 201 "frameSize": sz,
202 }.Errorf(s, "Failed to get log entry index.") 202 }.Errorf(s, "Failed to get log entry index.")
203 » » » return errors.Annotate(err).Reason("failed to get log en try index").Err() 203 » » » return errors.Annotate(err, "failed to get log entry ind ex").Err()
204 204
205 case idx < st.startIndex: 205 case idx < st.startIndex:
206 // Skip this entry, as it's before the first requested e ntry. 206 // Skip this entry, as it's before the first requested e ntry.
207 continue 207 continue
208 } 208 }
209 209
210 // We want to punt this entry, but we also want to re-use our Bu ffer. Clone 210 // We want to punt this entry, but we also want to re-use our Bu ffer. Clone
211 // its data so it is independent. 211 // its data so it is independent.
212 entry.D = make([]byte, len(entry.D)) 212 entry.D = make([]byte, len(entry.D))
213 copy(entry.D, buf.Bytes()) 213 copy(entry.D, buf.Bytes())
(...skipping 107 matching lines...) Expand 10 before | Expand all | Expand 10 after
321 if indexData != nil { 321 if indexData != nil {
322 cached = true 322 cached = true
323 } 323 }
324 } 324 }
325 325
326 if indexData == nil { 326 if indexData == nil {
327 // No cache, or no cached entry. Load from storage. 327 // No cache, or no cached entry. Load from storage.
328 r, err := client.NewReader(path, 0, -1) 328 r, err := client.NewReader(path, 0, -1)
329 if err != nil { 329 if err != nil {
330 log.WithError(err).Errorf(c, "Failed to create index Rea der.") 330 log.WithError(err).Errorf(c, "Failed to create index Rea der.")
331 » » » return nil, errors.Annotate(err).Reason("failed to creat e index Reader").Err() 331 » » » return nil, errors.Annotate(err, "failed to create index Reader").Err()
332 } 332 }
333 defer func() { 333 defer func() {
334 if err := r.Close(); err != nil { 334 if err := r.Close(); err != nil {
335 log.WithError(err).Warningf(c, "Error closing in dex Reader.") 335 log.WithError(err).Warningf(c, "Error closing in dex Reader.")
336 } 336 }
337 }() 337 }()
338 338
339 if indexData, err = ioutil.ReadAll(r); err != nil { 339 if indexData, err = ioutil.ReadAll(r); err != nil {
340 log.WithError(err).Errorf(c, "Failed to read index.") 340 log.WithError(err).Errorf(c, "Failed to read index.")
341 » » » return nil, errors.Annotate(err).Reason("failed to read index").Err() 341 » » » return nil, errors.Annotate(err, "failed to read index") .Err()
342 } 342 }
343 } 343 }
344 344
345 index := logpb.LogIndex{} 345 index := logpb.LogIndex{}
346 if err := proto.Unmarshal(indexData, &index); err != nil { 346 if err := proto.Unmarshal(indexData, &index); err != nil {
347 log.WithError(err).Errorf(c, "Failed to unmarshal index.") 347 log.WithError(err).Errorf(c, "Failed to unmarshal index.")
348 » » return nil, errors.Annotate(err).Reason("failed to unmarshal ind ex").Err() 348 » » return nil, errors.Annotate(err, "failed to unmarshal index").Er r()
349 } 349 }
350 350
351 // If the index is valid, but wasn't cached previously, then cache it. 351 // If the index is valid, but wasn't cached previously, then cache it.
352 if cache != nil && !cached { 352 if cache != nil && !cached {
353 putCachedLogIndexData(c, cache, path, indexData) 353 putCachedLogIndexData(c, cache, path, indexData)
354 } 354 }
355 355
356 return &index, nil 356 return &index, nil
357 } 357 }
358 358
(...skipping 109 matching lines...) Expand 10 before | Expand all | Expand 10 after
468 ui := uint64(i) 468 ui := uint64(i)
469 s := sort.Search(len(entries), func(i int) bool { 469 s := sort.Search(len(entries), func(i int) bool {
470 return entries[i].StreamIndex > ui 470 return entries[i].StreamIndex > ui
471 }) 471 })
472 472
473 // The returned index is the one immediately after the index that we wan t. If 473 // The returned index is the one immediately after the index that we wan t. If
474 // our search returned 0, the first index entry is > our search entry, a nd we 474 // our search returned 0, the first index entry is > our search entry, a nd we
475 // will return nil. 475 // will return nil.
476 return s - 1 476 return s - 1
477 } 477 }
OLDNEW
« no previous file with comments | « logdog/common/storage/archive/logdog_archive_test/main.go ('k') | logdog/common/storage/bigtable/logdog_bigtable_test/main.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698