| OLD | NEW |
| 1 // Copyright 2015 The LUCI Authors. All rights reserved. | 1 // Copyright 2015 The LUCI Authors. All rights reserved. |
| 2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
| 3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
| 4 | 4 |
| 5 // Package archive implements a storage.Storage instance that retrieves logs | 5 // Package archive implements a storage.Storage instance that retrieves logs |
| 6 // from a Google Storage archive. | 6 // from a Google Storage archive. |
| 7 // | 7 // |
| 8 // This is a special implementation of storage.Storage, and does not fully | 8 // This is a special implementation of storage.Storage, and does not fully |
| 9 // conform to the API expecations. Namely: | 9 // conform to the API expecations. Namely: |
| 10 // - It is read-only. Mutation methods will return storage.ErrReadOnly. | 10 // - It is read-only. Mutation methods will return storage.ErrReadOnly. |
| (...skipping 33 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 44 // 16MB is larger than the maximum log entry size | 44 // 16MB is larger than the maximum log entry size |
| 45 maxStreamRecordSize = 2 * types.MaxLogEntryDataSize | 45 maxStreamRecordSize = 2 * types.MaxLogEntryDataSize |
| 46 ) | 46 ) |
| 47 | 47 |
| 48 // Options is the set of configuration options for this Storage instance. | 48 // Options is the set of configuration options for this Storage instance. |
| 49 // | 49 // |
| 50 // Unlike other Storage instances, this is bound to a single archived stream. | 50 // Unlike other Storage instances, this is bound to a single archived stream. |
| 51 // Project and Path parameters in requests will be ignored in favor of the | 51 // Project and Path parameters in requests will be ignored in favor of the |
| 52 // Google Storage URLs. | 52 // Google Storage URLs. |
| 53 type Options struct { | 53 type Options struct { |
| 54 » // IndexURL is the Google Storage URL for the stream's index. | 54 » // Index is the Google Storage URL for the stream's index. |
| 55 » IndexURL string | 55 » Index gs.Path |
| 56 » // StreamURL is the Google Storage URL for the stream's entries. | 56 » // Stream is the Google Storage URL for the stream's entries. |
| 57 » StreamURL string | 57 » Stream gs.Path |
| 58 | 58 |
| 59 // Client is the HTTP client to use for authentication. | 59 // Client is the HTTP client to use for authentication. |
| 60 // | 60 // |
| 61 // Closing this Storage instance does not close the underlying Client. | 61 // Closing this Storage instance does not close the underlying Client. |
| 62 Client gs.Client | 62 Client gs.Client |
| 63 | 63 |
| 64 // Cache, if not nil, will be used to cache data. | 64 // Cache, if not nil, will be used to cache data. |
| 65 Cache caching.Cache | 65 Cache caching.Cache |
| 66 } | 66 } |
| 67 | 67 |
| 68 type storageImpl struct { | 68 type storageImpl struct { |
| 69 *Options | 69 *Options |
| 70 context.Context | 70 context.Context |
| 71 | 71 |
| 72 streamPath gs.Path | |
| 73 indexPath gs.Path | |
| 74 | |
| 75 index atomic.Value | 72 index atomic.Value |
| 76 } | 73 } |
| 77 | 74 |
| 78 // New instantiates a new Storage instance, bound to the supplied Options. | 75 // New instantiates a new Storage instance, bound to the supplied Options. |
| 79 func New(ctx context.Context, o Options) (storage.Storage, error) { | 76 func New(ctx context.Context, o Options) (storage.Storage, error) { |
| 80 s := storageImpl{ | 77 s := storageImpl{ |
| 81 Options: &o, | 78 Options: &o, |
| 82 Context: ctx, | 79 Context: ctx, |
| 83 | |
| 84 streamPath: gs.Path(o.StreamURL), | |
| 85 indexPath: gs.Path(o.IndexURL), | |
| 86 } | 80 } |
| 87 | 81 |
| 88 » if !s.streamPath.IsFullPath() { | 82 » if !s.Stream.IsFullPath() { |
| 89 » » return nil, fmt.Errorf("invalid stream URL: %q", s.streamPath) | 83 » » return nil, fmt.Errorf("invalid stream URL: %q", s.Stream) |
| 90 } | 84 } |
| 91 » if s.indexPath != "" && !s.indexPath.IsFullPath() { | 85 » if s.Index != "" && !s.Index.IsFullPath() { |
| 92 » » return nil, fmt.Errorf("invalid index URL: %v", s.indexPath) | 86 » » return nil, fmt.Errorf("invalid index URL: %v", s.Index) |
| 93 } | 87 } |
| 94 | 88 |
| 95 return &s, nil | 89 return &s, nil |
| 96 } | 90 } |
| 97 | 91 |
| 98 func (s *storageImpl) Close() {} | 92 func (s *storageImpl) Close() {} |
| 99 | 93 |
| 100 func (s *storageImpl) Config(storage.Config) error { return storage.ErrReadOnly
} | 94 func (s *storageImpl) Config(storage.Config) error { return storage.ErrReadOnly
} |
| 101 func (s *storageImpl) Put(storage.PutRequest) error { return storage.ErrReadOnly
} | 95 func (s *storageImpl) Put(storage.PutRequest) error { return storage.ErrReadOnly
} |
| 102 | 96 |
| (...skipping 29 matching lines...) Expand all Loading... |
| 132 // apply that limit too. | 126 // apply that limit too. |
| 133 // Get an archive reader. | 127 // Get an archive reader. |
| 134 var ( | 128 var ( |
| 135 offset = st.startOffset | 129 offset = st.startOffset |
| 136 length = st.length() | 130 length = st.length() |
| 137 ) | 131 ) |
| 138 | 132 |
| 139 log.Fields{ | 133 log.Fields{ |
| 140 "offset": offset, | 134 "offset": offset, |
| 141 "length": length, | 135 "length": length, |
| 142 » » "path": s.streamPath, | 136 » » "path": s.Stream, |
| 143 }.Debugf(s, "Creating stream reader for range.") | 137 }.Debugf(s, "Creating stream reader for range.") |
| 144 » storageReader, err := s.Client.NewReader(s.streamPath, int64(offset), le
ngth) | 138 » storageReader, err := s.Client.NewReader(s.Stream, int64(offset), length
) |
| 145 if err != nil { | 139 if err != nil { |
| 146 log.WithError(err).Errorf(s, "Failed to create stream Reader.") | 140 log.WithError(err).Errorf(s, "Failed to create stream Reader.") |
| 147 return errors.Annotate(err).Reason("failed to create stream Read
er").Err() | 141 return errors.Annotate(err).Reason("failed to create stream Read
er").Err() |
| 148 } | 142 } |
| 149 defer func() { | 143 defer func() { |
| 150 if tmpErr := storageReader.Close(); tmpErr != nil { | 144 if tmpErr := storageReader.Close(); tmpErr != nil { |
| 151 // (Non-fatal) | 145 // (Non-fatal) |
| 152 log.WithError(tmpErr).Warningf(s, "Error closing stream
Reader.") | 146 log.WithError(tmpErr).Warningf(s, "Error closing stream
Reader.") |
| 153 } | 147 } |
| 154 }() | 148 }() |
| (...skipping 130 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 285 } | 279 } |
| 286 } | 280 } |
| 287 | 281 |
| 288 // getIndex returns the cached log stream index, fetching it if necessary. | 282 // getIndex returns the cached log stream index, fetching it if necessary. |
| 289 func (s *storageImpl) getIndex() (*logpb.LogIndex, error) { | 283 func (s *storageImpl) getIndex() (*logpb.LogIndex, error) { |
| 290 idx := s.index.Load() | 284 idx := s.index.Load() |
| 291 if idx != nil { | 285 if idx != nil { |
| 292 return idx.(*logpb.LogIndex), nil | 286 return idx.(*logpb.LogIndex), nil |
| 293 } | 287 } |
| 294 | 288 |
| 295 » index, err := loadIndex(s, s.Client, s.indexPath, s.Cache) | 289 » index, err := loadIndex(s, s.Client, s.Index, s.Cache) |
| 296 switch errors.Unwrap(err) { | 290 switch errors.Unwrap(err) { |
| 297 case nil: | 291 case nil: |
| 298 break | 292 break |
| 299 | 293 |
| 300 case cloudStorage.ErrBucketNotExist, cloudStorage.ErrObjectNotExist: | 294 case cloudStorage.ErrBucketNotExist, cloudStorage.ErrObjectNotExist: |
| 301 // Treat a missing index the same as an empty index. | 295 // Treat a missing index the same as an empty index. |
| 302 log.WithError(err).Warningf(s, "Index is invalid, using empty in
dex.") | 296 log.WithError(err).Warningf(s, "Index is invalid, using empty in
dex.") |
| 303 index = &logpb.LogIndex{} | 297 index = &logpb.LogIndex{} |
| 304 | 298 |
| 305 default: | 299 default: |
| (...skipping 168 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 474 ui := uint64(i) | 468 ui := uint64(i) |
| 475 s := sort.Search(len(entries), func(i int) bool { | 469 s := sort.Search(len(entries), func(i int) bool { |
| 476 return entries[i].StreamIndex > ui | 470 return entries[i].StreamIndex > ui |
| 477 }) | 471 }) |
| 478 | 472 |
| 479 // The returned index is the one immediately after the index that we wan
t. If | 473 // The returned index is the one immediately after the index that we wan
t. If |
| 480 // our search returned 0, the first index entry is > our search entry, a
nd we | 474 // our search returned 0, the first index entry is > our search entry, a
nd we |
| 481 // will return nil. | 475 // will return nil. |
| 482 return s - 1 | 476 return s - 1 |
| 483 } | 477 } |
| OLD | NEW |