| OLD | NEW |
| 1 // Copyright 2015 The LUCI Authors. All rights reserved. | 1 // Copyright 2015 The LUCI Authors. All rights reserved. |
| 2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
| 3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
| 4 | 4 |
| 5 // Package archive implements a storage.Storage instance that retrieves logs | 5 // Package archive implements a storage.Storage instance that retrieves logs |
| 6 // from a Google Storage archive. | 6 // from a Google Storage archive. |
| 7 // | 7 // |
| 8 // This is a special implementation of storage.Storage, and does not fully | 8 // This is a special implementation of storage.Storage, and does not fully |
| 9 // conform to the API expecations. Namely: | 9 // conform to the API expecations. Namely: |
| 10 // - It is read-only. Mutation methods will return storage.ErrReadOnly. | 10 // - It is read-only. Mutation methods will return storage.ErrReadOnly. |
| (...skipping 98 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 109 | 109 |
| 110 switch err := s.getLogEntriesIter(st, cb); errors.Unwrap(err) { | 110 switch err := s.getLogEntriesIter(st, cb); errors.Unwrap(err) { |
| 111 case nil, io.EOF: | 111 case nil, io.EOF: |
| 112 // We hit the end of our log stream. | 112 // We hit the end of our log stream. |
| 113 return nil | 113 return nil |
| 114 | 114 |
| 115 case cloudStorage.ErrObjectNotExist, cloudStorage.ErrBucketNotExist: | 115 case cloudStorage.ErrObjectNotExist, cloudStorage.ErrBucketNotExist: |
| 116 return storage.ErrDoesNotExist | 116 return storage.ErrDoesNotExist |
| 117 | 117 |
| 118 default: | 118 default: |
| 119 » » return errors.Annotate(err).Reason("failed to read log stream").
Err() | 119 » » return errors.Annotate(err, "failed to read log stream").Err() |
| 120 } | 120 } |
| 121 } | 121 } |
| 122 | 122 |
| 123 // getLogEntriesImpl retrieves log entries from archive until complete. | 123 // getLogEntriesImpl retrieves log entries from archive until complete. |
| 124 func (s *storageImpl) getLogEntriesIter(st *getStrategy, cb storage.GetCallback)
error { | 124 func (s *storageImpl) getLogEntriesIter(st *getStrategy, cb storage.GetCallback)
error { |
| 125 // Get our maximum byte limit. If we are externally constrained via MaxB
ytes, | 125 // Get our maximum byte limit. If we are externally constrained via MaxB
ytes, |
| 126 // apply that limit too. | 126 // apply that limit too. |
| 127 // Get an archive reader. | 127 // Get an archive reader. |
| 128 var ( | 128 var ( |
| 129 offset = st.startOffset | 129 offset = st.startOffset |
| 130 length = st.length() | 130 length = st.length() |
| 131 ) | 131 ) |
| 132 | 132 |
| 133 log.Fields{ | 133 log.Fields{ |
| 134 "offset": offset, | 134 "offset": offset, |
| 135 "length": length, | 135 "length": length, |
| 136 "path": s.Stream, | 136 "path": s.Stream, |
| 137 }.Debugf(s, "Creating stream reader for range.") | 137 }.Debugf(s, "Creating stream reader for range.") |
| 138 storageReader, err := s.Client.NewReader(s.Stream, int64(offset), length
) | 138 storageReader, err := s.Client.NewReader(s.Stream, int64(offset), length
) |
| 139 if err != nil { | 139 if err != nil { |
| 140 log.WithError(err).Errorf(s, "Failed to create stream Reader.") | 140 log.WithError(err).Errorf(s, "Failed to create stream Reader.") |
| 141 » » return errors.Annotate(err).Reason("failed to create stream Read
er").Err() | 141 » » return errors.Annotate(err, "failed to create stream Reader").Er
r() |
| 142 } | 142 } |
| 143 defer func() { | 143 defer func() { |
| 144 if tmpErr := storageReader.Close(); tmpErr != nil { | 144 if tmpErr := storageReader.Close(); tmpErr != nil { |
| 145 // (Non-fatal) | 145 // (Non-fatal) |
| 146 log.WithError(tmpErr).Warningf(s, "Error closing stream
Reader.") | 146 log.WithError(tmpErr).Warningf(s, "Error closing stream
Reader.") |
| 147 } | 147 } |
| 148 }() | 148 }() |
| 149 | 149 |
| 150 // Count how many bytes we've read. | 150 // Count how many bytes we've read. |
| 151 cr := iotools.CountingReader{Reader: storageReader} | 151 cr := iotools.CountingReader{Reader: storageReader} |
| 152 | 152 |
| 153 // Iteratively update our strategy's start offset each time we read a co
mplete | 153 // Iteratively update our strategy's start offset each time we read a co
mplete |
| 154 // frame. | 154 // frame. |
| 155 var ( | 155 var ( |
| 156 rio = recordio.NewReader(&cr, maxStreamRecordSize) | 156 rio = recordio.NewReader(&cr, maxStreamRecordSize) |
| 157 buf bytes.Buffer | 157 buf bytes.Buffer |
| 158 remaining = st.count | 158 remaining = st.count |
| 159 ) | 159 ) |
| 160 for { | 160 for { |
| 161 // Reset the count so we know how much we read for this frame. | 161 // Reset the count so we know how much we read for this frame. |
| 162 cr.Count = 0 | 162 cr.Count = 0 |
| 163 | 163 |
| 164 sz, r, err := rio.ReadFrame() | 164 sz, r, err := rio.ReadFrame() |
| 165 if err != nil { | 165 if err != nil { |
| 166 » » » return errors.Annotate(err).Reason("failed to read frame
").Err() | 166 » » » return errors.Annotate(err, "failed to read frame").Err(
) |
| 167 } | 167 } |
| 168 | 168 |
| 169 buf.Reset() | 169 buf.Reset() |
| 170 buf.Grow(int(sz)) | 170 buf.Grow(int(sz)) |
| 171 | 171 |
| 172 switch amt, err := buf.ReadFrom(r); { | 172 switch amt, err := buf.ReadFrom(r); { |
| 173 case err != nil: | 173 case err != nil: |
| 174 log.Fields{ | 174 log.Fields{ |
| 175 log.ErrorKey: err, | 175 log.ErrorKey: err, |
| 176 "frameOffset": offset, | 176 "frameOffset": offset, |
| 177 "frameSize": sz, | 177 "frameSize": sz, |
| 178 }.Errorf(s, "Failed to read frame data.") | 178 }.Errorf(s, "Failed to read frame data.") |
| 179 » » » return errors.Annotate(err).Reason("failed to read frame
data").Err() | 179 » » » return errors.Annotate(err, "failed to read frame data")
.Err() |
| 180 | 180 |
| 181 case amt != sz: | 181 case amt != sz: |
| 182 // If we didn't buffer the complete frame, we hit a prem
ature EOF. | 182 // If we didn't buffer the complete frame, we hit a prem
ature EOF. |
| 183 » » » return errors.Annotate(io.EOF).Reason("incomplete frame
read").Err() | 183 » » » return errors.Annotate(io.EOF, "incomplete frame read").
Err() |
| 184 } | 184 } |
| 185 | 185 |
| 186 // If we read from offset 0, the first frame will be the log str
eam's | 186 // If we read from offset 0, the first frame will be the log str
eam's |
| 187 // descriptor, which we can discard. | 187 // descriptor, which we can discard. |
| 188 discardFrame := (offset == 0) | 188 discardFrame := (offset == 0) |
| 189 offset += uint64(cr.Count) | 189 offset += uint64(cr.Count) |
| 190 if discardFrame { | 190 if discardFrame { |
| 191 continue | 191 continue |
| 192 } | 192 } |
| 193 | 193 |
| 194 // Punt this log entry to our callback, if appropriate. | 194 // Punt this log entry to our callback, if appropriate. |
| 195 entry := storage.MakeEntry(buf.Bytes(), -1) | 195 entry := storage.MakeEntry(buf.Bytes(), -1) |
| 196 switch idx, err := entry.GetStreamIndex(); { | 196 switch idx, err := entry.GetStreamIndex(); { |
| 197 case err != nil: | 197 case err != nil: |
| 198 log.Fields{ | 198 log.Fields{ |
| 199 log.ErrorKey: err, | 199 log.ErrorKey: err, |
| 200 "frameOffset": offset, | 200 "frameOffset": offset, |
| 201 "frameSize": sz, | 201 "frameSize": sz, |
| 202 }.Errorf(s, "Failed to get log entry index.") | 202 }.Errorf(s, "Failed to get log entry index.") |
| 203 » » » return errors.Annotate(err).Reason("failed to get log en
try index").Err() | 203 » » » return errors.Annotate(err, "failed to get log entry ind
ex").Err() |
| 204 | 204 |
| 205 case idx < st.startIndex: | 205 case idx < st.startIndex: |
| 206 // Skip this entry, as it's before the first requested e
ntry. | 206 // Skip this entry, as it's before the first requested e
ntry. |
| 207 continue | 207 continue |
| 208 } | 208 } |
| 209 | 209 |
| 210 // We want to punt this entry, but we also want to re-use our Bu
ffer. Clone | 210 // We want to punt this entry, but we also want to re-use our Bu
ffer. Clone |
| 211 // its data so it is independent. | 211 // its data so it is independent. |
| 212 entry.D = make([]byte, len(entry.D)) | 212 entry.D = make([]byte, len(entry.D)) |
| 213 copy(entry.D, buf.Bytes()) | 213 copy(entry.D, buf.Bytes()) |
| (...skipping 107 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 321 if indexData != nil { | 321 if indexData != nil { |
| 322 cached = true | 322 cached = true |
| 323 } | 323 } |
| 324 } | 324 } |
| 325 | 325 |
| 326 if indexData == nil { | 326 if indexData == nil { |
| 327 // No cache, or no cached entry. Load from storage. | 327 // No cache, or no cached entry. Load from storage. |
| 328 r, err := client.NewReader(path, 0, -1) | 328 r, err := client.NewReader(path, 0, -1) |
| 329 if err != nil { | 329 if err != nil { |
| 330 log.WithError(err).Errorf(c, "Failed to create index Rea
der.") | 330 log.WithError(err).Errorf(c, "Failed to create index Rea
der.") |
| 331 » » » return nil, errors.Annotate(err).Reason("failed to creat
e index Reader").Err() | 331 » » » return nil, errors.Annotate(err, "failed to create index
Reader").Err() |
| 332 } | 332 } |
| 333 defer func() { | 333 defer func() { |
| 334 if err := r.Close(); err != nil { | 334 if err := r.Close(); err != nil { |
| 335 log.WithError(err).Warningf(c, "Error closing in
dex Reader.") | 335 log.WithError(err).Warningf(c, "Error closing in
dex Reader.") |
| 336 } | 336 } |
| 337 }() | 337 }() |
| 338 | 338 |
| 339 if indexData, err = ioutil.ReadAll(r); err != nil { | 339 if indexData, err = ioutil.ReadAll(r); err != nil { |
| 340 log.WithError(err).Errorf(c, "Failed to read index.") | 340 log.WithError(err).Errorf(c, "Failed to read index.") |
| 341 » » » return nil, errors.Annotate(err).Reason("failed to read
index").Err() | 341 » » » return nil, errors.Annotate(err, "failed to read index")
.Err() |
| 342 } | 342 } |
| 343 } | 343 } |
| 344 | 344 |
| 345 index := logpb.LogIndex{} | 345 index := logpb.LogIndex{} |
| 346 if err := proto.Unmarshal(indexData, &index); err != nil { | 346 if err := proto.Unmarshal(indexData, &index); err != nil { |
| 347 log.WithError(err).Errorf(c, "Failed to unmarshal index.") | 347 log.WithError(err).Errorf(c, "Failed to unmarshal index.") |
| 348 » » return nil, errors.Annotate(err).Reason("failed to unmarshal ind
ex").Err() | 348 » » return nil, errors.Annotate(err, "failed to unmarshal index").Er
r() |
| 349 } | 349 } |
| 350 | 350 |
| 351 // If the index is valid, but wasn't cached previously, then cache it. | 351 // If the index is valid, but wasn't cached previously, then cache it. |
| 352 if cache != nil && !cached { | 352 if cache != nil && !cached { |
| 353 putCachedLogIndexData(c, cache, path, indexData) | 353 putCachedLogIndexData(c, cache, path, indexData) |
| 354 } | 354 } |
| 355 | 355 |
| 356 return &index, nil | 356 return &index, nil |
| 357 } | 357 } |
| 358 | 358 |
| (...skipping 109 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 468 ui := uint64(i) | 468 ui := uint64(i) |
| 469 s := sort.Search(len(entries), func(i int) bool { | 469 s := sort.Search(len(entries), func(i int) bool { |
| 470 return entries[i].StreamIndex > ui | 470 return entries[i].StreamIndex > ui |
| 471 }) | 471 }) |
| 472 | 472 |
| 473 // The returned index is the one immediately after the index that we wan
t. If | 473 // The returned index is the one immediately after the index that we wan
t. If |
| 474 // our search returned 0, the first index entry is > our search entry, a
nd we | 474 // our search returned 0, the first index entry is > our search entry, a
nd we |
| 475 // will return nil. | 475 // will return nil. |
| 476 return s - 1 | 476 return s - 1 |
| 477 } | 477 } |
| OLD | NEW |