| OLD | NEW |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 // Package archive implements a storage.Storage instance that retrieves logs | 5 // Package archive implements a storage.Storage instance that retrieves logs |
| 6 // from a Google Storage archive. | 6 // from a Google Storage archive. |
| 7 // | 7 // |
| 8 // This is a special implementation of storage.Storage, and does not fully | 8 // This is a special implementation of storage.Storage, and does not fully |
| 9 // conform to the API expecations. Namely: | 9 // conform to the API expecations. Namely: |
| 10 // - It is read-only. Mutation methods will return storage.ErrReadOnly. | 10 // - It is read-only. Mutation methods will return storage.ErrReadOnly. |
| (...skipping 36 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 47 type Options struct { | 47 type Options struct { |
| 48 // IndexURL is the Google Storage URL for the stream's index. | 48 // IndexURL is the Google Storage URL for the stream's index. |
| 49 IndexURL string | 49 IndexURL string |
| 50 // StreamURL is the Google Storage URL for the stream's entries. | 50 // StreamURL is the Google Storage URL for the stream's entries. |
| 51 StreamURL string | 51 StreamURL string |
| 52 | 52 |
| 53 // Client is the HTTP client to use for authentication. | 53 // Client is the HTTP client to use for authentication. |
| 54 // | 54 // |
| 55 // Closing this Storage instance does not close the underlying Client. | 55 // Closing this Storage instance does not close the underlying Client. |
| 56 Client gs.Client | 56 Client gs.Client |
| 57 |
| 58 // MaxBytes, if >0, is the maximum number of bytes to fetch in any given |
| 59 // request. This should be set for GAE fetches, as large log streams may |
| 60 // exceed the urlfetch system's maximum response size otherwise. |
| 61 // |
| 62 // This is the number of bytes to request, not the number of bytes of lo
g data |
| 63 // to return. The difference is that the former includes the RecordIO fr
ame |
| 64 // headers. |
| 65 MaxBytes int |
| 57 } | 66 } |
| 58 | 67 |
| 59 type storageImpl struct { | 68 type storageImpl struct { |
| 60 *Options | 69 *Options |
| 61 context.Context | 70 context.Context |
| 62 | 71 |
| 63 streamPath gs.Path | 72 streamPath gs.Path |
| 64 indexPath gs.Path | 73 indexPath gs.Path |
| 65 | 74 |
| 66 indexMu sync.Mutex | 75 indexMu sync.Mutex |
| (...skipping 36 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 103 return err | 112 return err |
| 104 } | 113 } |
| 105 | 114 |
| 106 // Identify the byte offsets that we want to fetch from the entries stre
am. | 115 // Identify the byte offsets that we want to fetch from the entries stre
am. |
| 107 st := s.buildGetStrategy(&req, idx) | 116 st := s.buildGetStrategy(&req, idx) |
| 108 if st.lastIndex == -1 || req.Index > st.lastIndex { | 117 if st.lastIndex == -1 || req.Index > st.lastIndex { |
| 109 // No records to read. | 118 // No records to read. |
| 110 return nil | 119 return nil |
| 111 } | 120 } |
| 112 | 121 |
| 113 » r, err := s.Client.NewReader(s.streamPath, gs.Options{ | 122 » offset := int64(st.startOffset) |
| 114 » » From: st.from, | 123 » log.Fields{ |
| 115 » » To: st.to, | 124 » » "offset": offset, |
| 116 » }) | 125 » » "length": st.length(), |
| 126 » » "path": s.streamPath, |
| 127 » }.Debugf(s, "Creating stream reader for range.") |
| 128 » r, err := s.Client.NewReader(s.streamPath, offset, st.length()) |
| 117 if err != nil { | 129 if err != nil { |
| 118 log.WithError(err).Errorf(s, "Failed to create stream Reader.") | 130 log.WithError(err).Errorf(s, "Failed to create stream Reader.") |
| 119 return err | 131 return err |
| 120 } | 132 } |
| 121 defer func() { | 133 defer func() { |
| 122 if err := r.Close(); err != nil { | 134 if err := r.Close(); err != nil { |
| 123 log.WithError(err).Warningf(s, "Error closing stream Rea
der.") | 135 log.WithError(err).Warningf(s, "Error closing stream Rea
der.") |
| 124 } | 136 } |
| 125 }() | 137 }() |
| 126 cr := iotools.CountingReader{Reader: r} | 138 cr := iotools.CountingReader{Reader: r} |
| 127 rio := recordio.NewReader(&cr, maxStreamRecordSize) | 139 rio := recordio.NewReader(&cr, maxStreamRecordSize) |
| 128 | 140 |
| 129 buf := bytes.Buffer{} | 141 buf := bytes.Buffer{} |
| 130 le := logpb.LogEntry{} | 142 le := logpb.LogEntry{} |
| 131 » max := req.Limit | 143 » max := st.count |
| 132 for { | 144 for { |
| 133 » » offset := st.from + cr.Count() | 145 » » offset += cr.Count() |
| 134 | 146 |
| 135 sz, r, err := rio.ReadFrame() | 147 sz, r, err := rio.ReadFrame() |
| 136 switch err { | 148 switch err { |
| 137 case nil: | 149 case nil: |
| 138 break | 150 break |
| 139 | 151 |
| 140 case io.EOF: | 152 case io.EOF: |
| 141 return nil | 153 return nil |
| 142 | 154 |
| 143 default: | 155 default: |
| (...skipping 54 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 198 return nil, 0, err | 210 return nil, 0, err |
| 199 } | 211 } |
| 200 | 212 |
| 201 // Get the offset of the last record. | 213 // Get the offset of the last record. |
| 202 if len(idx.Entries) == 0 { | 214 if len(idx.Entries) == 0 { |
| 203 return nil, 0, nil | 215 return nil, 0, nil |
| 204 } | 216 } |
| 205 lle := idx.Entries[len(idx.Entries)-1] | 217 lle := idx.Entries[len(idx.Entries)-1] |
| 206 | 218 |
| 207 // Get a Reader for the Tail entry. | 219 // Get a Reader for the Tail entry. |
| 208 » r, err := s.Client.NewReader(s.streamPath, gs.Options{ | 220 » r, err := s.Client.NewReader(s.streamPath, int64(lle.Offset), -1) |
| 209 » » From: int64(lle.Offset), | |
| 210 » }) | |
| 211 if err != nil { | 221 if err != nil { |
| 212 log.Fields{ | 222 log.Fields{ |
| 213 log.ErrorKey: err, | 223 log.ErrorKey: err, |
| 214 "offset": lle.Offset, | 224 "offset": lle.Offset, |
| 215 }.Errorf(s, "Failed to create reader.") | 225 }.Errorf(s, "Failed to create reader.") |
| 216 return nil, 0, err | 226 return nil, 0, err |
| 217 } | 227 } |
| 218 defer func() { | 228 defer func() { |
| 219 if err := r.Close(); err != nil { | 229 if err := r.Close(); err != nil { |
| 220 log.WithError(err).Warningf(s, "Failed to close Reader."
) | 230 log.WithError(err).Warningf(s, "Failed to close Reader."
) |
| 221 } | 231 } |
| 222 }() | 232 }() |
| 223 | 233 |
| 224 rio := recordio.NewReader(r, maxStreamRecordSize) | 234 rio := recordio.NewReader(r, maxStreamRecordSize) |
| 225 d, err := rio.ReadFrameAll() | 235 d, err := rio.ReadFrameAll() |
| 226 if err != nil { | 236 if err != nil { |
| 227 log.WithError(err).Errorf(s, "Failed to read log frame.") | 237 log.WithError(err).Errorf(s, "Failed to read log frame.") |
| 228 return nil, 0, err | 238 return nil, 0, err |
| 229 } | 239 } |
| 230 | 240 |
| 231 return d, types.MessageIndex(lle.StreamIndex), nil | 241 return d, types.MessageIndex(lle.StreamIndex), nil |
| 232 } | 242 } |
| 233 | 243 |
| 234 // getIndex returns the cached log stream index, fetching it if necessary. | 244 // getIndex returns the cached log stream index, fetching it if necessary. |
| 235 func (s *storageImpl) getIndex() (*logpb.LogIndex, error) { | 245 func (s *storageImpl) getIndex() (*logpb.LogIndex, error) { |
| 236 s.indexMu.Lock() | 246 s.indexMu.Lock() |
| 237 defer s.indexMu.Unlock() | 247 defer s.indexMu.Unlock() |
| 238 | 248 |
| 239 if s.index == nil { | 249 if s.index == nil { |
| 240 » » r, err := s.Client.NewReader(s.indexPath, gs.Options{}) | 250 » » r, err := s.Client.NewReader(s.indexPath, 0, -1) |
| 241 if err != nil { | 251 if err != nil { |
| 242 log.WithError(err).Errorf(s, "Failed to create index Rea
der.") | 252 log.WithError(err).Errorf(s, "Failed to create index Rea
der.") |
| 243 return nil, err | 253 return nil, err |
| 244 } | 254 } |
| 245 defer func() { | 255 defer func() { |
| 246 if err := r.Close(); err != nil { | 256 if err := r.Close(); err != nil { |
| 247 log.WithError(err).Warningf(s, "Error closing in
dex Reader.") | 257 log.WithError(err).Warningf(s, "Error closing in
dex Reader.") |
| 248 } | 258 } |
| 249 }() | 259 }() |
| 250 indexData, err := ioutil.ReadAll(r) | 260 indexData, err := ioutil.ReadAll(r) |
| 251 if err != nil { | 261 if err != nil { |
| 252 log.WithError(err).Errorf(s, "Failed to read index.") | 262 log.WithError(err).Errorf(s, "Failed to read index.") |
| 253 return nil, err | 263 return nil, err |
| 254 } | 264 } |
| 255 | 265 |
| 256 index := logpb.LogIndex{} | 266 index := logpb.LogIndex{} |
| 257 if err := proto.Unmarshal(indexData, &index); err != nil { | 267 if err := proto.Unmarshal(indexData, &index); err != nil { |
| 258 log.WithError(err).Errorf(s, "Failed to unmarshal index.
") | 268 log.WithError(err).Errorf(s, "Failed to unmarshal index.
") |
| 259 return nil, err | 269 return nil, err |
| 260 } | 270 } |
| 261 | 271 |
| 262 s.index = &index | 272 s.index = &index |
| 263 } | 273 } |
| 264 return s.index, nil | 274 return s.index, nil |
| 265 } | 275 } |
| 266 | 276 |
| 267 type getStrategy struct { | 277 type getStrategy struct { |
| 268 » // from is the beginning byte offset of the log entry stream. | 278 » // startOffset is the beginning byte offset of the log entry stream. |
| 269 » from int64 | 279 » startOffset uint64 |
| 270 » // to is the ending byte offset of the log entry stream. | 280 » // endOffset is the ending byte offset of the log entry stream. |
| 271 » to int64 | 281 » endOffset uint64 |
| 272 | 282 |
| 283 // count is the number of log entries that will be fetched. |
| 284 count int |
| 273 // lastIndex is the last log entry index in the stream. This will be -1
if | 285 // lastIndex is the last log entry index in the stream. This will be -1
if |
| 274 // there are no entries in the stream. | 286 // there are no entries in the stream. |
| 275 lastIndex types.MessageIndex | 287 lastIndex types.MessageIndex |
| 276 } | 288 } |
| 277 | 289 |
| 290 func (gs *getStrategy) length() int64 { |
| 291 if gs.startOffset < gs.endOffset { |
| 292 return int64(gs.endOffset - gs.startOffset) |
| 293 } |
| 294 return -1 |
| 295 } |
| 296 |
| 297 // setCount sets the `count` field. If called multiple times, the smallest |
| 298 // assigned value will be retained. |
| 299 func (gs *getStrategy) setCount(v int) { |
| 300 if gs.count <= 0 || gs.count > v { |
| 301 gs.count = v |
| 302 } |
| 303 } |
| 304 |
| 305 // setEndOffset sets the `length` field. If called multiple times, the smallest |
| 306 // assigned value will be retained. |
| 307 func (gs *getStrategy) setEndOffset(v uint64) { |
| 308 if gs.endOffset == 0 || gs.endOffset > v { |
| 309 gs.endOffset = v |
| 310 } |
| 311 } |
| 312 |
| 278 func (s *storageImpl) buildGetStrategy(req *storage.GetRequest, idx *logpb.LogIn
dex) *getStrategy { | 313 func (s *storageImpl) buildGetStrategy(req *storage.GetRequest, idx *logpb.LogIn
dex) *getStrategy { |
| 279 st := getStrategy{} | 314 st := getStrategy{} |
| 280 | 315 |
| 281 if len(idx.Entries) == 0 { | 316 if len(idx.Entries) == 0 { |
| 282 st.lastIndex = -1 | 317 st.lastIndex = -1 |
| 283 return &st | 318 return &st |
| 284 } | 319 } |
| 285 | 320 |
| 286 st.lastIndex = types.MessageIndex(idx.Entries[len(idx.Entries)-1].Stream
Index) | 321 st.lastIndex = types.MessageIndex(idx.Entries[len(idx.Entries)-1].Stream
Index) |
| 287 startIdx := indexEntryFor(idx.Entries, req.Index) | 322 startIdx := indexEntryFor(idx.Entries, req.Index) |
| 288 if startIdx < 0 { | 323 if startIdx < 0 { |
| 289 startIdx = 0 | 324 startIdx = 0 |
| 290 } | 325 } |
| 291 le := idx.Entries[startIdx] | 326 le := idx.Entries[startIdx] |
| 292 » st.from = int64(le.Offset) | 327 » st.startOffset = le.Offset |
| 293 | 328 |
| 294 » // If we have a limit, and we have enough index entries to upper-bound o
ur | 329 » // Determine an upper bound based on our limits. |
| 295 » // stream based on that limit, use that. | |
| 296 // | 330 // |
| 297 » // Note that this may overshoot if the index and/or stream is sparse. We
know | 331 » // If we have a count limit, and we have enough index entries to upper-b
ound |
| 298 » // for sure that we have one LogEntry per index entry, so that's the bes
t we | 332 » // our stream based on that limit, use that. Note that this may overshoo
t if |
| 299 » // can do. | 333 » // the index and/or stream is sparse. We know for sure that we have one |
| 334 » // LogEntry per index entry, so that's the best we can do. |
| 300 if req.Limit > 0 { | 335 if req.Limit > 0 { |
| 301 if ub := startIdx + req.Limit; ub < len(idx.Entries) { | 336 if ub := startIdx + req.Limit; ub < len(idx.Entries) { |
| 302 » » » st.to = int64(idx.Entries[ub].Offset) | 337 » » » st.setEndOffset(idx.Entries[ub].Offset) |
| 338 » » } |
| 339 » » st.setCount(req.Limit) |
| 340 » } |
| 341 |
| 342 » // If we have a byte limit, count the entry sizes until we reach that li
mit. |
| 343 » if mb := int64(s.MaxBytes); mb > 0 { |
| 344 » » mb := uint64(mb) |
| 345 |
| 346 » » for i, e := range idx.Entries[startIdx:] { |
| 347 » » » if e.Offset < st.startOffset { |
| 348 » » » » // This shouldn't really happen, but it could ha
ppen if there is a |
| 349 » » » » // corrupt index. |
| 350 » » » » continue |
| 351 » » » } |
| 352 |
| 353 » » » // Calculate the request offset and truncate if we've ex
ceeded our maximum |
| 354 » » » // request bytes. |
| 355 » » » if size := (e.Offset - st.startOffset); size > mb { |
| 356 » » » » st.setEndOffset(e.Offset) |
| 357 » » » » st.setCount(i) |
| 358 » » » » break |
| 359 » » » } |
| 303 } | 360 } |
| 304 } | 361 } |
| 305 return &st | 362 return &st |
| 306 } | 363 } |
| 307 | 364 |
| 308 // indexEntryFor identifies the log index entry closest (<=) to the specified | 365 // indexEntryFor identifies the log index entry closest (<=) to the specified |
| 309 // index. | 366 // index. |
| 310 // | 367 // |
| 311 // If the first index entry is greater than our search index, -1 will be | 368 // If the first index entry is greater than our search index, -1 will be |
| 312 // returned. This should never happen in practice, though, since our index | 369 // returned. This should never happen in practice, though, since our index |
| 313 // construction always indexes log entry #0. | 370 // construction always indexes log entry #0. |
| 314 // | 371 // |
| 315 // It does this by performing a binary search over the index entries. | 372 // It does this by performing a binary search over the index entries. |
| 316 func indexEntryFor(entries []*logpb.LogIndex_Entry, i types.MessageIndex) int { | 373 func indexEntryFor(entries []*logpb.LogIndex_Entry, i types.MessageIndex) int { |
| 317 ui := uint64(i) | 374 ui := uint64(i) |
| 318 s := sort.Search(len(entries), func(i int) bool { | 375 s := sort.Search(len(entries), func(i int) bool { |
| 319 return entries[i].StreamIndex > ui | 376 return entries[i].StreamIndex > ui |
| 320 }) | 377 }) |
| 321 | 378 |
| 322 // The returned index is the one immediately after the index that we wan
t. If | 379 // The returned index is the one immediately after the index that we wan
t. If |
| 323 // our search returned 0, the first index entry is > our search entry, a
nd we | 380 // our search returned 0, the first index entry is > our search entry, a
nd we |
| 324 // will return nil. | 381 // will return nil. |
| 325 return s - 1 | 382 return s - 1 |
| 326 } | 383 } |
| OLD | NEW |