| OLD | NEW |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 // Package archive implements a storage.Storage instance that retrieves logs | 5 // Package archive implements a storage.Storage instance that retrieves logs |
| 6 // from a Google Storage archive. | 6 // from a Google Storage archive. |
| 7 // | 7 // |
| 8 // This is a special implementation of storage.Storage, and does not fully | 8 // This is a special implementation of storage.Storage, and does not fully |
| 9 // conform to the API expecations. Namely: | 9 // conform to the API expecations. Namely: |
| 10 // - It is read-only. Mutation methods will return storage.ErrReadOnly. | 10 // - It is read-only. Mutation methods will return storage.ErrReadOnly. |
| (...skipping 35 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 46 // Path parameters in requests will be ignored in favor of the Google Storage | 46 // Path parameters in requests will be ignored in favor of the Google Storage |
| 47 // URLs. | 47 // URLs. |
| 48 type Options struct { | 48 type Options struct { |
| 49 // IndexURL is the Google Storage URL for the stream's index. | 49 // IndexURL is the Google Storage URL for the stream's index. |
| 50 IndexURL string | 50 IndexURL string |
| 51 // StreamURL is the Google Storage URL for the stream's entries. | 51 // StreamURL is the Google Storage URL for the stream's entries. |
| 52 StreamURL string | 52 StreamURL string |
| 53 | 53 |
| 54 // Client is the HTTP client to use for authentication. | 54 // Client is the HTTP client to use for authentication. |
| 55 // | 55 // |
| 56 » // Closing this Storage instance does not close the underlying Client. | 56 » // Closing this Storage instance will close the underlying Client. |
| 57 Client gs.Client | 57 Client gs.Client |
| 58 } | 58 } |
| 59 | 59 |
| 60 type storageImpl struct { | 60 type storageImpl struct { |
| 61 context.Context | 61 context.Context |
| 62 *Options | 62 *Options |
| 63 | 63 |
| 64 streamBucket string | 64 streamBucket string |
| 65 streamPath string | 65 streamPath string |
| 66 indexBucket string | 66 indexBucket string |
| 67 indexPath string | 67 indexPath string |
| 68 | 68 |
| 69 » indexMu sync.Mutex | 69 » indexMu sync.Mutex |
| 70 » index *logpb.LogIndex | 70 » index *logpb.LogIndex |
| 71 » closeClient bool | |
| 72 } | 71 } |
| 73 | 72 |
| 74 // New instantiates a new Storage instance, bound to the supplied Options. | 73 // New instantiates a new Storage instance, bound to the supplied Options. |
| 75 func New(c context.Context, o Options) (storage.Storage, error) { | 74 func New(c context.Context, o Options) (storage.Storage, error) { |
| 76 s := storageImpl{ | 75 s := storageImpl{ |
| 77 Context: c, | 76 Context: c, |
| 78 Options: &o, | 77 Options: &o, |
| 79 } | 78 } |
| 80 | 79 |
| 81 s.indexBucket, s.indexPath = splitGSURL(o.IndexURL) | 80 s.indexBucket, s.indexPath = splitGSURL(o.IndexURL) |
| 82 if s.indexBucket == "" || s.indexPath == "" { | 81 if s.indexBucket == "" || s.indexPath == "" { |
| 83 return nil, errors.New("invalid index URL") | 82 return nil, errors.New("invalid index URL") |
| 84 } | 83 } |
| 85 | 84 |
| 86 s.streamBucket, s.streamPath = splitGSURL(o.StreamURL) | 85 s.streamBucket, s.streamPath = splitGSURL(o.StreamURL) |
| 87 if s.streamBucket == "" || s.streamPath == "" { | 86 if s.streamBucket == "" || s.streamPath == "" { |
| 88 return nil, errors.New("invalid stream URL") | 87 return nil, errors.New("invalid stream URL") |
| 89 } | 88 } |
| 90 | 89 |
| 91 if s.Client == nil { | 90 if s.Client == nil { |
| 92 var err error | 91 var err error |
| 93 s.Client, err = gs.NewProdClient(c) | 92 s.Client, err = gs.NewProdClient(c) |
| 94 if err != nil { | 93 if err != nil { |
| 95 return nil, err | 94 return nil, err |
| 96 } | 95 } |
| 97 | |
| 98 s.closeClient = true | |
| 99 } | 96 } |
| 100 | 97 |
| 101 return &s, nil | 98 return &s, nil |
| 102 } | 99 } |
| 103 | 100 |
| 104 func (s *storageImpl) Close() { | 101 func (s *storageImpl) Close() { |
| 105 » if s.closeClient { | 102 » if err := s.Client.Close(); err != nil { |
| 106 » » if err := s.Client.Close(); err != nil { | 103 » » log.WithError(err).Errorf(s, "Failed to close client.") |
| 107 » » » log.WithError(err).Errorf(s, "Failed to close client.") | |
| 108 » » } | |
| 109 } | 104 } |
| 110 } | 105 } |
| 111 | 106 |
| 112 func (s *storageImpl) Put(*storage.PutRequest) error { return storage.ErrReadOnl
y } | 107 func (s *storageImpl) Put(*storage.PutRequest) error { return storage.ErrReadOnl
y } |
| 113 func (s *storageImpl) Purge(types.StreamPath) error { return storage.ErrReadOnl
y } | 108 func (s *storageImpl) Purge(types.StreamPath) error { return storage.ErrReadOnl
y } |
| 114 | 109 |
| 115 func (s *storageImpl) Get(req *storage.GetRequest, cb storage.GetCallback) error
{ | 110 func (s *storageImpl) Get(req *storage.GetRequest, cb storage.GetCallback) error
{ |
| 116 idx, err := s.getIndex() | 111 idx, err := s.getIndex() |
| 117 if err != nil { | 112 if err != nil { |
| 118 return err | 113 return err |
| (...skipping 194 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 313 le := idx.Entries[startIdx] | 308 le := idx.Entries[startIdx] |
| 314 st.from = int64(le.Offset) | 309 st.from = int64(le.Offset) |
| 315 | 310 |
| 316 // If we have a limit, and we have enough index entries to upper-bound o
ur | 311 // If we have a limit, and we have enough index entries to upper-bound o
ur |
| 317 // stream based on that limit, use that. | 312 // stream based on that limit, use that. |
| 318 // | 313 // |
| 319 // Note that this may overshoot if the index and/or stream is sparse. We
know | 314 // Note that this may overshoot if the index and/or stream is sparse. We
know |
| 320 // for sure that we have one LogEntry per index entry, so that's the bes
t we | 315 // for sure that we have one LogEntry per index entry, so that's the bes
t we |
| 321 // can do. | 316 // can do. |
| 322 if req.Limit > 0 { | 317 if req.Limit > 0 { |
| 323 » » if ub := startIdx + req.Limit; ub < len(idx.Entries) { | 318 » » if ub := int64(startIdx) + req.Limit; ub < int64(len(idx.Entries
)) { |
| 324 st.to = int64(idx.Entries[ub].Offset) | 319 st.to = int64(idx.Entries[ub].Offset) |
| 325 } | 320 } |
| 326 } | 321 } |
| 327 return &st | 322 return &st |
| 328 } | 323 } |
| 329 | 324 |
| 330 // indexEntryFor identifies the log index entry closest (<=) to the specified | 325 // indexEntryFor identifies the log index entry closest (<=) to the specified |
| 331 // index. | 326 // index. |
| 332 // | 327 // |
| 333 // If the first index entry is greater than our search index, -1 will be | 328 // If the first index entry is greater than our search index, -1 will be |
| 334 // returned. This should never happen in practice, though, since our index | 329 // returned. This should never happen in practice, though, since our index |
| 335 // construction always indexes log entry #0. | 330 // construction always indexes log entry #0. |
| 336 // | 331 // |
| 337 // It does this by performing a binary search over the index entries. | 332 // It does this by performing a binary search over the index entries. |
| 338 func indexEntryFor(entries []*logpb.LogIndex_Entry, i types.MessageIndex) int { | 333 func indexEntryFor(entries []*logpb.LogIndex_Entry, i types.MessageIndex) int { |
| 339 ui := uint64(i) | 334 ui := uint64(i) |
| 340 s := sort.Search(len(entries), func(i int) bool { | 335 s := sort.Search(len(entries), func(i int) bool { |
| 341 return entries[i].StreamIndex > ui | 336 return entries[i].StreamIndex > ui |
| 342 }) | 337 }) |
| 343 | 338 |
| 344 // The returned index is the one immediately after the index that we wan
t. If | 339 // The returned index is the one immediately after the index that we wan
t. If |
| 345 // our search returned 0, the first index entry is > our search entry, a
nd we | 340 // our search returned 0, the first index entry is > our search entry, a
nd we |
| 346 // will return nil. | 341 // will return nil. |
| 347 return s - 1 | 342 return s - 1 |
| 348 } | 343 } |
| OLD | NEW |