Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(5)

Side by Side Diff: logdog/common/storage/archive/storage.go

Issue 2538203002: LogDog: Add signed GS URL fetching. (Closed)
Patch Set: Allow index signing, use gaesigner. Created 4 years ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The LUCI Authors. All rights reserved. 1 // Copyright 2015 The LUCI Authors. All rights reserved.
2 // Use of this source code is governed under the Apache License, Version 2.0 2 // Use of this source code is governed under the Apache License, Version 2.0
3 // that can be found in the LICENSE file. 3 // that can be found in the LICENSE file.
4 4
5 // Package archive implements a storage.Storage instance that retrieves logs 5 // Package archive implements a storage.Storage instance that retrieves logs
6 // from a Google Storage archive. 6 // from a Google Storage archive.
7 // 7 //
8 // This is a special implementation of storage.Storage, and does not fully 8 // This is a special implementation of storage.Storage, and does not fully
9 // conform to the API expecations. Namely: 9 // conform to the API expecations. Namely:
10 // - It is read-only. Mutation methods will return storage.ErrReadOnly. 10 // - It is read-only. Mutation methods will return storage.ErrReadOnly.
(...skipping 33 matching lines...) Expand 10 before | Expand all | Expand 10 after
44 // 16MB is larger than the maximum log entry size 44 // 16MB is larger than the maximum log entry size
45 maxStreamRecordSize = 2 * types.MaxLogEntryDataSize 45 maxStreamRecordSize = 2 * types.MaxLogEntryDataSize
46 ) 46 )
47 47
48 // Options is the set of configuration options for this Storage instance. 48 // Options is the set of configuration options for this Storage instance.
49 // 49 //
50 // Unlike other Storage instances, this is bound to a single archived stream. 50 // Unlike other Storage instances, this is bound to a single archived stream.
51 // Project and Path parameters in requests will be ignored in favor of the 51 // Project and Path parameters in requests will be ignored in favor of the
52 // Google Storage URLs. 52 // Google Storage URLs.
53 type Options struct { 53 type Options struct {
54 » // IndexURL is the Google Storage URL for the stream's index. 54 » // Index is the Google Storage URL for the stream's index.
55 » IndexURL string 55 » Index gs.Path
56 » // StreamURL is the Google Storage URL for the stream's entries. 56 » // Stream is the Google Storage URL for the stream's entries.
57 » StreamURL string 57 » Stream gs.Path
58 58
59 // Client is the HTTP client to use for authentication. 59 // Client is the HTTP client to use for authentication.
60 // 60 //
61 // Closing this Storage instance does not close the underlying Client. 61 // Closing this Storage instance does not close the underlying Client.
62 Client gs.Client 62 Client gs.Client
63 63
64 // Cache, if not nil, will be used to cache data. 64 // Cache, if not nil, will be used to cache data.
65 Cache caching.Cache 65 Cache caching.Cache
66 } 66 }
67 67
68 type storageImpl struct { 68 type storageImpl struct {
69 *Options 69 *Options
70 context.Context 70 context.Context
71 71
72 streamPath gs.Path
73 indexPath gs.Path
74
75 index atomic.Value 72 index atomic.Value
76 } 73 }
77 74
78 // New instantiates a new Storage instance, bound to the supplied Options. 75 // New instantiates a new Storage instance, bound to the supplied Options.
79 func New(ctx context.Context, o Options) (storage.Storage, error) { 76 func New(ctx context.Context, o Options) (storage.Storage, error) {
80 s := storageImpl{ 77 s := storageImpl{
81 Options: &o, 78 Options: &o,
82 Context: ctx, 79 Context: ctx,
83
84 streamPath: gs.Path(o.StreamURL),
85 indexPath: gs.Path(o.IndexURL),
86 } 80 }
87 81
88 » if !s.streamPath.IsFullPath() { 82 » if !s.Stream.IsFullPath() {
89 » » return nil, fmt.Errorf("invalid stream URL: %q", s.streamPath) 83 » » return nil, fmt.Errorf("invalid stream URL: %q", s.Stream)
90 } 84 }
91 » if s.indexPath != "" && !s.indexPath.IsFullPath() { 85 » if s.Index != "" && !s.Index.IsFullPath() {
92 » » return nil, fmt.Errorf("invalid index URL: %v", s.indexPath) 86 » » return nil, fmt.Errorf("invalid index URL: %v", s.Index)
93 } 87 }
94 88
95 return &s, nil 89 return &s, nil
96 } 90 }
97 91
98 func (s *storageImpl) Close() {} 92 func (s *storageImpl) Close() {}
99 93
100 func (s *storageImpl) Config(storage.Config) error { return storage.ErrReadOnly } 94 func (s *storageImpl) Config(storage.Config) error { return storage.ErrReadOnly }
101 func (s *storageImpl) Put(storage.PutRequest) error { return storage.ErrReadOnly } 95 func (s *storageImpl) Put(storage.PutRequest) error { return storage.ErrReadOnly }
102 96
(...skipping 29 matching lines...) Expand all
132 // apply that limit too. 126 // apply that limit too.
133 // Get an archive reader. 127 // Get an archive reader.
134 var ( 128 var (
135 offset = st.startOffset 129 offset = st.startOffset
136 length = st.length() 130 length = st.length()
137 ) 131 )
138 132
139 log.Fields{ 133 log.Fields{
140 "offset": offset, 134 "offset": offset,
141 "length": length, 135 "length": length,
142 » » "path": s.streamPath, 136 » » "path": s.Stream,
143 }.Debugf(s, "Creating stream reader for range.") 137 }.Debugf(s, "Creating stream reader for range.")
144 » storageReader, err := s.Client.NewReader(s.streamPath, int64(offset), le ngth) 138 » storageReader, err := s.Client.NewReader(s.Stream, int64(offset), length )
145 if err != nil { 139 if err != nil {
146 log.WithError(err).Errorf(s, "Failed to create stream Reader.") 140 log.WithError(err).Errorf(s, "Failed to create stream Reader.")
147 return errors.Annotate(err).Reason("failed to create stream Read er").Err() 141 return errors.Annotate(err).Reason("failed to create stream Read er").Err()
148 } 142 }
149 defer func() { 143 defer func() {
150 if tmpErr := storageReader.Close(); tmpErr != nil { 144 if tmpErr := storageReader.Close(); tmpErr != nil {
151 // (Non-fatal) 145 // (Non-fatal)
152 log.WithError(tmpErr).Warningf(s, "Error closing stream Reader.") 146 log.WithError(tmpErr).Warningf(s, "Error closing stream Reader.")
153 } 147 }
154 }() 148 }()
(...skipping 130 matching lines...) Expand 10 before | Expand all | Expand 10 after
285 } 279 }
286 } 280 }
287 281
288 // getIndex returns the cached log stream index, fetching it if necessary. 282 // getIndex returns the cached log stream index, fetching it if necessary.
289 func (s *storageImpl) getIndex() (*logpb.LogIndex, error) { 283 func (s *storageImpl) getIndex() (*logpb.LogIndex, error) {
290 idx := s.index.Load() 284 idx := s.index.Load()
291 if idx != nil { 285 if idx != nil {
292 return idx.(*logpb.LogIndex), nil 286 return idx.(*logpb.LogIndex), nil
293 } 287 }
294 288
295 » index, err := loadIndex(s, s.Client, s.indexPath, s.Cache) 289 » index, err := loadIndex(s, s.Client, s.Index, s.Cache)
296 switch errors.Unwrap(err) { 290 switch errors.Unwrap(err) {
297 case nil: 291 case nil:
298 break 292 break
299 293
300 case cloudStorage.ErrBucketNotExist, cloudStorage.ErrObjectNotExist: 294 case cloudStorage.ErrBucketNotExist, cloudStorage.ErrObjectNotExist:
301 // Treat a missing index the same as an empty index. 295 // Treat a missing index the same as an empty index.
302 log.WithError(err).Warningf(s, "Index is invalid, using empty in dex.") 296 log.WithError(err).Warningf(s, "Index is invalid, using empty in dex.")
303 index = &logpb.LogIndex{} 297 index = &logpb.LogIndex{}
304 298
305 default: 299 default:
(...skipping 168 matching lines...) Expand 10 before | Expand all | Expand 10 after
474 ui := uint64(i) 468 ui := uint64(i)
475 s := sort.Search(len(entries), func(i int) bool { 469 s := sort.Search(len(entries), func(i int) bool {
476 return entries[i].StreamIndex > ui 470 return entries[i].StreamIndex > ui
477 }) 471 })
478 472
479 // The returned index is the one immediately after the index that we wan t. If 473 // The returned index is the one immediately after the index that we wan t. If
480 // our search returned 0, the first index entry is > our search entry, a nd we 474 // our search returned 0, the first index entry is > our search entry, a nd we
481 // will return nil. 475 // will return nil.
482 return s - 1 476 return s - 1
483 } 477 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698