| OLD | NEW |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 // Package archive implements a storage.Storage instance that retrieves logs | 5 // Package archive implements a storage.Storage instance that retrieves logs |
| 6 // from a Google Storage archive. | 6 // from a Google Storage archive. |
| 7 // | 7 // |
| 8 // This is a special implementation of storage.Storage, and does not fully | 8 // This is a special implementation of storage.Storage, and does not fully |
| 9 // conform to the API expecations. Namely: | 9 // conform to the API expecations. Namely: |
| 10 // - It is read-only. Mutation methods will return storage.ErrReadOnly. | 10 // - It is read-only. Mutation methods will return storage.ErrReadOnly. |
| (...skipping 40 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 51 // StreamURL is the Google Storage URL for the stream's entries. | 51 // StreamURL is the Google Storage URL for the stream's entries. |
| 52 StreamURL string | 52 StreamURL string |
| 53 | 53 |
| 54 // Client is the HTTP client to use for authentication. | 54 // Client is the HTTP client to use for authentication. |
| 55 // | 55 // |
| 56 // Closing this Storage instance does not close the underlying Client. | 56 // Closing this Storage instance does not close the underlying Client. |
| 57 Client gs.Client | 57 Client gs.Client |
| 58 } | 58 } |
| 59 | 59 |
| 60 type storageImpl struct { | 60 type storageImpl struct { |
| 61 *Options |
| 61 context.Context | 62 context.Context |
| 62 *Options | |
| 63 | 63 |
| 64 streamBucket string | 64 streamBucket string |
| 65 streamPath string | 65 streamPath string |
| 66 indexBucket string | 66 indexBucket string |
| 67 indexPath string | 67 indexPath string |
| 68 | 68 |
| 69 indexMu sync.Mutex | 69 indexMu sync.Mutex |
| 70 index *logpb.LogIndex | 70 index *logpb.LogIndex |
| 71 closeClient bool | 71 closeClient bool |
| 72 } | 72 } |
| 73 | 73 |
| 74 // New instantiates a new Storage instance, bound to the supplied Options. | 74 // New instantiates a new Storage instance, bound to the supplied Options. |
| 75 func New(c context.Context, o Options) (storage.Storage, error) { | 75 func New(ctx context.Context, o Options) (storage.Storage, error) { |
| 76 s := storageImpl{ | 76 s := storageImpl{ |
| 77 Context: c, | |
| 78 Options: &o, | 77 Options: &o, |
| 78 Context: ctx, |
| 79 } | 79 } |
| 80 | 80 |
| 81 s.indexBucket, s.indexPath = splitGSURL(o.IndexURL) | 81 s.indexBucket, s.indexPath = splitGSURL(o.IndexURL) |
| 82 if s.indexBucket == "" || s.indexPath == "" { | 82 if s.indexBucket == "" || s.indexPath == "" { |
| 83 return nil, errors.New("invalid index URL") | 83 return nil, errors.New("invalid index URL") |
| 84 } | 84 } |
| 85 | 85 |
| 86 s.streamBucket, s.streamPath = splitGSURL(o.StreamURL) | 86 s.streamBucket, s.streamPath = splitGSURL(o.StreamURL) |
| 87 if s.streamBucket == "" || s.streamPath == "" { | 87 if s.streamBucket == "" || s.streamPath == "" { |
| 88 return nil, errors.New("invalid stream URL") | 88 return nil, errors.New("invalid stream URL") |
| 89 } | 89 } |
| 90 return &s, nil | 90 return &s, nil |
| 91 } | 91 } |
| 92 | 92 |
| 93 func (s *storageImpl) Close() { | 93 func (s *storageImpl) Close() { |
| 94 if s.closeClient { | 94 if s.closeClient { |
| 95 » » if err := s.Client.Close(); err != nil { | 95 » » _ = s.Client.Close() |
| 96 » » » log.WithError(err).Errorf(s, "Failed to close client.") | |
| 97 » » } | |
| 98 } | 96 } |
| 99 } | 97 } |
| 100 | 98 |
| 101 func (s *storageImpl) Config(storage.Config) error { return storage.ErrReadOnl
y } | 99 func (s *storageImpl) Config(storage.Config) error { return storage.ErrReadOnly
} |
| 102 func (s *storageImpl) Put(*storage.PutRequest) error { return storage.ErrReadOnl
y } | 100 func (s *storageImpl) Put(storage.PutRequest) error { return storage.ErrReadOnly
} |
| 103 | 101 |
| 104 func (s *storageImpl) Get(req *storage.GetRequest, cb storage.GetCallback) error
{ | 102 func (s *storageImpl) Get(req storage.GetRequest, cb storage.GetCallback) error
{ |
| 105 idx, err := s.getIndex() | 103 idx, err := s.getIndex() |
| 106 if err != nil { | 104 if err != nil { |
| 107 return err | 105 return err |
| 108 } | 106 } |
| 109 | 107 |
| 110 // Identify the byte offsets that we want to fetch from the entries stre
am. | 108 // Identify the byte offsets that we want to fetch from the entries stre
am. |
| 111 » st := s.buildGetStrategy(req, idx) | 109 » st := s.buildGetStrategy(&req, idx) |
| 112 if st.lastIndex == -1 || req.Index > st.lastIndex { | 110 if st.lastIndex == -1 || req.Index > st.lastIndex { |
| 113 // No records to read. | 111 // No records to read. |
| 114 return nil | 112 return nil |
| 115 } | 113 } |
| 116 | 114 |
| 117 r, err := s.Client.NewReader(s.streamBucket, s.streamPath, gs.Options{ | 115 r, err := s.Client.NewReader(s.streamBucket, s.streamPath, gs.Options{ |
| 118 From: st.from, | 116 From: st.from, |
| 119 To: st.to, | 117 To: st.to, |
| 120 }) | 118 }) |
| 121 if err != nil { | 119 if err != nil { |
| (...skipping 66 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 188 if max > 0 { | 186 if max > 0 { |
| 189 max-- | 187 max-- |
| 190 if max == 0 { | 188 if max == 0 { |
| 191 break | 189 break |
| 192 } | 190 } |
| 193 } | 191 } |
| 194 } | 192 } |
| 195 return nil | 193 return nil |
| 196 } | 194 } |
| 197 | 195 |
| 198 func (s *storageImpl) Tail(types.StreamPath) ([]byte, types.MessageIndex, error)
{ | 196 func (s *storageImpl) Tail(path types.StreamPath) ([]byte, types.MessageIndex, e
rror) { |
| 199 idx, err := s.getIndex() | 197 idx, err := s.getIndex() |
| 200 if err != nil { | 198 if err != nil { |
| 201 return nil, 0, err | 199 return nil, 0, err |
| 202 } | 200 } |
| 203 | 201 |
| 204 // Get the offset of the last record. | 202 // Get the offset of the last record. |
| 205 if len(idx.Entries) == 0 { | 203 if len(idx.Entries) == 0 { |
| 206 return nil, 0, nil | 204 return nil, 0, nil |
| 207 } | 205 } |
| 208 lle := idx.Entries[len(idx.Entries)-1] | 206 lle := idx.Entries[len(idx.Entries)-1] |
| (...skipping 119 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 328 ui := uint64(i) | 326 ui := uint64(i) |
| 329 s := sort.Search(len(entries), func(i int) bool { | 327 s := sort.Search(len(entries), func(i int) bool { |
| 330 return entries[i].StreamIndex > ui | 328 return entries[i].StreamIndex > ui |
| 331 }) | 329 }) |
| 332 | 330 |
| 333 // The returned index is the one immediately after the index that we wan
t. If | 331 // The returned index is the one immediately after the index that we wan
t. If |
| 334 // our search returned 0, the first index entry is > our search entry, a
nd we | 332 // our search returned 0, the first index entry is > our search entry, a
nd we |
| 335 // will return nil. | 333 // will return nil. |
| 336 return s - 1 | 334 return s - 1 |
| 337 } | 335 } |
| OLD | NEW |