OLD | NEW |
1 // Copyright 2015 The LUCI Authors. All rights reserved. | 1 // Copyright 2015 The LUCI Authors. All rights reserved. |
2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
4 | 4 |
5 // Package archive implements a storage.Storage instance that retrieves logs | 5 // Package archive implements a storage.Storage instance that retrieves logs |
6 // from a Google Storage archive. | 6 // from a Google Storage archive. |
7 // | 7 // |
8 // This is a special implementation of storage.Storage, and does not fully | 8 // This is a special implementation of storage.Storage, and does not fully |
9 // conform to the API expecations. Namely: | 9 // conform to the API expecations. Namely: |
10 // - It is read-only. Mutation methods will return storage.ErrReadOnly. | 10 // - It is read-only. Mutation methods will return storage.ErrReadOnly. |
(...skipping 98 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
109 | 109 |
110 switch err := s.getLogEntriesIter(st, cb); errors.Unwrap(err) { | 110 switch err := s.getLogEntriesIter(st, cb); errors.Unwrap(err) { |
111 case nil, io.EOF: | 111 case nil, io.EOF: |
112 // We hit the end of our log stream. | 112 // We hit the end of our log stream. |
113 return nil | 113 return nil |
114 | 114 |
115 case cloudStorage.ErrObjectNotExist, cloudStorage.ErrBucketNotExist: | 115 case cloudStorage.ErrObjectNotExist, cloudStorage.ErrBucketNotExist: |
116 return storage.ErrDoesNotExist | 116 return storage.ErrDoesNotExist |
117 | 117 |
118 default: | 118 default: |
119 » » return errors.Annotate(err).Reason("failed to read log stream").
Err() | 119 » » return errors.Annotate(err, "failed to read log stream").Err() |
120 } | 120 } |
121 } | 121 } |
122 | 122 |
123 // getLogEntriesImpl retrieves log entries from archive until complete. | 123 // getLogEntriesImpl retrieves log entries from archive until complete. |
124 func (s *storageImpl) getLogEntriesIter(st *getStrategy, cb storage.GetCallback)
error { | 124 func (s *storageImpl) getLogEntriesIter(st *getStrategy, cb storage.GetCallback)
error { |
125 // Get our maximum byte limit. If we are externally constrained via MaxB
ytes, | 125 // Get our maximum byte limit. If we are externally constrained via MaxB
ytes, |
126 // apply that limit too. | 126 // apply that limit too. |
127 // Get an archive reader. | 127 // Get an archive reader. |
128 var ( | 128 var ( |
129 offset = st.startOffset | 129 offset = st.startOffset |
130 length = st.length() | 130 length = st.length() |
131 ) | 131 ) |
132 | 132 |
133 log.Fields{ | 133 log.Fields{ |
134 "offset": offset, | 134 "offset": offset, |
135 "length": length, | 135 "length": length, |
136 "path": s.Stream, | 136 "path": s.Stream, |
137 }.Debugf(s, "Creating stream reader for range.") | 137 }.Debugf(s, "Creating stream reader for range.") |
138 storageReader, err := s.Client.NewReader(s.Stream, int64(offset), length
) | 138 storageReader, err := s.Client.NewReader(s.Stream, int64(offset), length
) |
139 if err != nil { | 139 if err != nil { |
140 log.WithError(err).Errorf(s, "Failed to create stream Reader.") | 140 log.WithError(err).Errorf(s, "Failed to create stream Reader.") |
141 » » return errors.Annotate(err).Reason("failed to create stream Read
er").Err() | 141 » » return errors.Annotate(err, "failed to create stream Reader").Er
r() |
142 } | 142 } |
143 defer func() { | 143 defer func() { |
144 if tmpErr := storageReader.Close(); tmpErr != nil { | 144 if tmpErr := storageReader.Close(); tmpErr != nil { |
145 // (Non-fatal) | 145 // (Non-fatal) |
146 log.WithError(tmpErr).Warningf(s, "Error closing stream
Reader.") | 146 log.WithError(tmpErr).Warningf(s, "Error closing stream
Reader.") |
147 } | 147 } |
148 }() | 148 }() |
149 | 149 |
150 // Count how many bytes we've read. | 150 // Count how many bytes we've read. |
151 cr := iotools.CountingReader{Reader: storageReader} | 151 cr := iotools.CountingReader{Reader: storageReader} |
152 | 152 |
153 // Iteratively update our strategy's start offset each time we read a co
mplete | 153 // Iteratively update our strategy's start offset each time we read a co
mplete |
154 // frame. | 154 // frame. |
155 var ( | 155 var ( |
156 rio = recordio.NewReader(&cr, maxStreamRecordSize) | 156 rio = recordio.NewReader(&cr, maxStreamRecordSize) |
157 buf bytes.Buffer | 157 buf bytes.Buffer |
158 remaining = st.count | 158 remaining = st.count |
159 ) | 159 ) |
160 for { | 160 for { |
161 // Reset the count so we know how much we read for this frame. | 161 // Reset the count so we know how much we read for this frame. |
162 cr.Count = 0 | 162 cr.Count = 0 |
163 | 163 |
164 sz, r, err := rio.ReadFrame() | 164 sz, r, err := rio.ReadFrame() |
165 if err != nil { | 165 if err != nil { |
166 » » » return errors.Annotate(err).Reason("failed to read frame
").Err() | 166 » » » return errors.Annotate(err, "failed to read frame").Err(
) |
167 } | 167 } |
168 | 168 |
169 buf.Reset() | 169 buf.Reset() |
170 buf.Grow(int(sz)) | 170 buf.Grow(int(sz)) |
171 | 171 |
172 switch amt, err := buf.ReadFrom(r); { | 172 switch amt, err := buf.ReadFrom(r); { |
173 case err != nil: | 173 case err != nil: |
174 log.Fields{ | 174 log.Fields{ |
175 log.ErrorKey: err, | 175 log.ErrorKey: err, |
176 "frameOffset": offset, | 176 "frameOffset": offset, |
177 "frameSize": sz, | 177 "frameSize": sz, |
178 }.Errorf(s, "Failed to read frame data.") | 178 }.Errorf(s, "Failed to read frame data.") |
179 » » » return errors.Annotate(err).Reason("failed to read frame
data").Err() | 179 » » » return errors.Annotate(err, "failed to read frame data")
.Err() |
180 | 180 |
181 case amt != sz: | 181 case amt != sz: |
182 // If we didn't buffer the complete frame, we hit a prem
ature EOF. | 182 // If we didn't buffer the complete frame, we hit a prem
ature EOF. |
183 » » » return errors.Annotate(io.EOF).Reason("incomplete frame
read").Err() | 183 » » » return errors.Annotate(io.EOF, "incomplete frame read").
Err() |
184 } | 184 } |
185 | 185 |
186 // If we read from offset 0, the first frame will be the log str
eam's | 186 // If we read from offset 0, the first frame will be the log str
eam's |
187 // descriptor, which we can discard. | 187 // descriptor, which we can discard. |
188 discardFrame := (offset == 0) | 188 discardFrame := (offset == 0) |
189 offset += uint64(cr.Count) | 189 offset += uint64(cr.Count) |
190 if discardFrame { | 190 if discardFrame { |
191 continue | 191 continue |
192 } | 192 } |
193 | 193 |
194 // Punt this log entry to our callback, if appropriate. | 194 // Punt this log entry to our callback, if appropriate. |
195 entry := storage.MakeEntry(buf.Bytes(), -1) | 195 entry := storage.MakeEntry(buf.Bytes(), -1) |
196 switch idx, err := entry.GetStreamIndex(); { | 196 switch idx, err := entry.GetStreamIndex(); { |
197 case err != nil: | 197 case err != nil: |
198 log.Fields{ | 198 log.Fields{ |
199 log.ErrorKey: err, | 199 log.ErrorKey: err, |
200 "frameOffset": offset, | 200 "frameOffset": offset, |
201 "frameSize": sz, | 201 "frameSize": sz, |
202 }.Errorf(s, "Failed to get log entry index.") | 202 }.Errorf(s, "Failed to get log entry index.") |
203 » » » return errors.Annotate(err).Reason("failed to get log en
try index").Err() | 203 » » » return errors.Annotate(err, "failed to get log entry ind
ex").Err() |
204 | 204 |
205 case idx < st.startIndex: | 205 case idx < st.startIndex: |
206 // Skip this entry, as it's before the first requested e
ntry. | 206 // Skip this entry, as it's before the first requested e
ntry. |
207 continue | 207 continue |
208 } | 208 } |
209 | 209 |
210 // We want to punt this entry, but we also want to re-use our Bu
ffer. Clone | 210 // We want to punt this entry, but we also want to re-use our Bu
ffer. Clone |
211 // its data so it is independent. | 211 // its data so it is independent. |
212 entry.D = make([]byte, len(entry.D)) | 212 entry.D = make([]byte, len(entry.D)) |
213 copy(entry.D, buf.Bytes()) | 213 copy(entry.D, buf.Bytes()) |
(...skipping 107 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
321 if indexData != nil { | 321 if indexData != nil { |
322 cached = true | 322 cached = true |
323 } | 323 } |
324 } | 324 } |
325 | 325 |
326 if indexData == nil { | 326 if indexData == nil { |
327 // No cache, or no cached entry. Load from storage. | 327 // No cache, or no cached entry. Load from storage. |
328 r, err := client.NewReader(path, 0, -1) | 328 r, err := client.NewReader(path, 0, -1) |
329 if err != nil { | 329 if err != nil { |
330 log.WithError(err).Errorf(c, "Failed to create index Rea
der.") | 330 log.WithError(err).Errorf(c, "Failed to create index Rea
der.") |
331 » » » return nil, errors.Annotate(err).Reason("failed to creat
e index Reader").Err() | 331 » » » return nil, errors.Annotate(err, "failed to create index
Reader").Err() |
332 } | 332 } |
333 defer func() { | 333 defer func() { |
334 if err := r.Close(); err != nil { | 334 if err := r.Close(); err != nil { |
335 log.WithError(err).Warningf(c, "Error closing in
dex Reader.") | 335 log.WithError(err).Warningf(c, "Error closing in
dex Reader.") |
336 } | 336 } |
337 }() | 337 }() |
338 | 338 |
339 if indexData, err = ioutil.ReadAll(r); err != nil { | 339 if indexData, err = ioutil.ReadAll(r); err != nil { |
340 log.WithError(err).Errorf(c, "Failed to read index.") | 340 log.WithError(err).Errorf(c, "Failed to read index.") |
341 » » » return nil, errors.Annotate(err).Reason("failed to read
index").Err() | 341 » » » return nil, errors.Annotate(err, "failed to read index")
.Err() |
342 } | 342 } |
343 } | 343 } |
344 | 344 |
345 index := logpb.LogIndex{} | 345 index := logpb.LogIndex{} |
346 if err := proto.Unmarshal(indexData, &index); err != nil { | 346 if err := proto.Unmarshal(indexData, &index); err != nil { |
347 log.WithError(err).Errorf(c, "Failed to unmarshal index.") | 347 log.WithError(err).Errorf(c, "Failed to unmarshal index.") |
348 » » return nil, errors.Annotate(err).Reason("failed to unmarshal ind
ex").Err() | 348 » » return nil, errors.Annotate(err, "failed to unmarshal index").Er
r() |
349 } | 349 } |
350 | 350 |
351 // If the index is valid, but wasn't cached previously, then cache it. | 351 // If the index is valid, but wasn't cached previously, then cache it. |
352 if cache != nil && !cached { | 352 if cache != nil && !cached { |
353 putCachedLogIndexData(c, cache, path, indexData) | 353 putCachedLogIndexData(c, cache, path, indexData) |
354 } | 354 } |
355 | 355 |
356 return &index, nil | 356 return &index, nil |
357 } | 357 } |
358 | 358 |
(...skipping 109 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
468 ui := uint64(i) | 468 ui := uint64(i) |
469 s := sort.Search(len(entries), func(i int) bool { | 469 s := sort.Search(len(entries), func(i int) bool { |
470 return entries[i].StreamIndex > ui | 470 return entries[i].StreamIndex > ui |
471 }) | 471 }) |
472 | 472 |
473 // The returned index is the one immediately after the index that we wan
t. If | 473 // The returned index is the one immediately after the index that we wan
t. If |
474 // our search returned 0, the first index entry is > our search entry, a
nd we | 474 // our search returned 0, the first index entry is > our search entry, a
nd we |
475 // will return nil. | 475 // will return nil. |
476 return s - 1 | 476 return s - 1 |
477 } | 477 } |
OLD | NEW |