OLD | NEW |
1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 // Package archive implements a storage.Storage instance that retrieves logs | 5 // Package archive implements a storage.Storage instance that retrieves logs |
6 // from a Google Storage archive. | 6 // from a Google Storage archive. |
7 // | 7 // |
8 // This is a special implementation of storage.Storage, and does not fully | 8 // This is a special implementation of storage.Storage, and does not fully |
9 // conform to the API expecations. Namely: | 9 // conform to the API expecations. Namely: |
10 // - It is read-only. Mutation methods will return storage.ErrReadOnly. | 10 // - It is read-only. Mutation methods will return storage.ErrReadOnly. |
(...skipping 35 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
46 // Path parameters in requests will be ignored in favor of the Google Storage | 46 // Path parameters in requests will be ignored in favor of the Google Storage |
47 // URLs. | 47 // URLs. |
48 type Options struct { | 48 type Options struct { |
49 // IndexURL is the Google Storage URL for the stream's index. | 49 // IndexURL is the Google Storage URL for the stream's index. |
50 IndexURL string | 50 IndexURL string |
51 // StreamURL is the Google Storage URL for the stream's entries. | 51 // StreamURL is the Google Storage URL for the stream's entries. |
52 StreamURL string | 52 StreamURL string |
53 | 53 |
54 // Client is the HTTP client to use for authentication. | 54 // Client is the HTTP client to use for authentication. |
55 // | 55 // |
56 » // Closing this Storage instance does not close the underlying Client. | 56 » // Closing this Storage instance will close the underlying Client. |
57 Client gs.Client | 57 Client gs.Client |
58 } | 58 } |
59 | 59 |
60 type storageImpl struct { | 60 type storageImpl struct { |
61 context.Context | 61 context.Context |
62 *Options | 62 *Options |
63 | 63 |
64 streamBucket string | 64 streamBucket string |
65 streamPath string | 65 streamPath string |
66 indexBucket string | 66 indexBucket string |
67 indexPath string | 67 indexPath string |
68 | 68 |
69 » indexMu sync.Mutex | 69 » indexMu sync.Mutex |
70 » index *logpb.LogIndex | 70 » index *logpb.LogIndex |
71 » closeClient bool | |
72 } | 71 } |
73 | 72 |
74 // New instantiates a new Storage instance, bound to the supplied Options. | 73 // New instantiates a new Storage instance, bound to the supplied Options. |
75 func New(c context.Context, o Options) (storage.Storage, error) { | 74 func New(c context.Context, o Options) (storage.Storage, error) { |
76 s := storageImpl{ | 75 s := storageImpl{ |
77 Context: c, | 76 Context: c, |
78 Options: &o, | 77 Options: &o, |
79 } | 78 } |
80 | 79 |
81 s.indexBucket, s.indexPath = splitGSURL(o.IndexURL) | 80 s.indexBucket, s.indexPath = splitGSURL(o.IndexURL) |
82 if s.indexBucket == "" || s.indexPath == "" { | 81 if s.indexBucket == "" || s.indexPath == "" { |
83 return nil, errors.New("invalid index URL") | 82 return nil, errors.New("invalid index URL") |
84 } | 83 } |
85 | 84 |
86 s.streamBucket, s.streamPath = splitGSURL(o.StreamURL) | 85 s.streamBucket, s.streamPath = splitGSURL(o.StreamURL) |
87 if s.streamBucket == "" || s.streamPath == "" { | 86 if s.streamBucket == "" || s.streamPath == "" { |
88 return nil, errors.New("invalid stream URL") | 87 return nil, errors.New("invalid stream URL") |
89 } | 88 } |
90 | 89 |
91 if s.Client == nil { | 90 if s.Client == nil { |
92 var err error | 91 var err error |
93 s.Client, err = gs.NewProdClient(c) | 92 s.Client, err = gs.NewProdClient(c) |
94 if err != nil { | 93 if err != nil { |
95 return nil, err | 94 return nil, err |
96 } | 95 } |
97 | |
98 s.closeClient = true | |
99 } | 96 } |
100 | 97 |
101 return &s, nil | 98 return &s, nil |
102 } | 99 } |
103 | 100 |
104 func (s *storageImpl) Close() { | 101 func (s *storageImpl) Close() { |
105 » if s.closeClient { | 102 » if err := s.Client.Close(); err != nil { |
106 » » if err := s.Client.Close(); err != nil { | 103 » » log.WithError(err).Errorf(s, "Failed to close client.") |
107 » » » log.WithError(err).Errorf(s, "Failed to close client.") | |
108 » » } | |
109 } | 104 } |
110 } | 105 } |
111 | 106 |
112 func (s *storageImpl) Put(*storage.PutRequest) error { return storage.ErrReadOnl
y } | 107 func (s *storageImpl) Put(*storage.PutRequest) error { return storage.ErrReadOnl
y } |
113 func (s *storageImpl) Purge(types.StreamPath) error { return storage.ErrReadOnl
y } | 108 func (s *storageImpl) Purge(types.StreamPath) error { return storage.ErrReadOnl
y } |
114 | 109 |
115 func (s *storageImpl) Get(req *storage.GetRequest, cb storage.GetCallback) error
{ | 110 func (s *storageImpl) Get(req *storage.GetRequest, cb storage.GetCallback) error
{ |
116 idx, err := s.getIndex() | 111 idx, err := s.getIndex() |
117 if err != nil { | 112 if err != nil { |
118 return err | 113 return err |
(...skipping 194 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
313 le := idx.Entries[startIdx] | 308 le := idx.Entries[startIdx] |
314 st.from = int64(le.Offset) | 309 st.from = int64(le.Offset) |
315 | 310 |
316 // If we have a limit, and we have enough index entries to upper-bound o
ur | 311 // If we have a limit, and we have enough index entries to upper-bound o
ur |
317 // stream based on that limit, use that. | 312 // stream based on that limit, use that. |
318 // | 313 // |
319 // Note that this may overshoot if the index and/or stream is sparse. We
know | 314 // Note that this may overshoot if the index and/or stream is sparse. We
know |
320 // for sure that we have one LogEntry per index entry, so that's the bes
t we | 315 // for sure that we have one LogEntry per index entry, so that's the bes
t we |
321 // can do. | 316 // can do. |
322 if req.Limit > 0 { | 317 if req.Limit > 0 { |
323 » » if ub := startIdx + req.Limit; ub < len(idx.Entries) { | 318 » » if ub := int64(startIdx) + req.Limit; ub < int64(len(idx.Entries
)) { |
324 st.to = int64(idx.Entries[ub].Offset) | 319 st.to = int64(idx.Entries[ub].Offset) |
325 } | 320 } |
326 } | 321 } |
327 return &st | 322 return &st |
328 } | 323 } |
329 | 324 |
330 // indexEntryFor identifies the log index entry closest (<=) to the specified | 325 // indexEntryFor identifies the log index entry closest (<=) to the specified |
331 // index. | 326 // index. |
332 // | 327 // |
333 // If the first index entry is greater than our search index, -1 will be | 328 // If the first index entry is greater than our search index, -1 will be |
334 // returned. This should never happen in practice, though, since our index | 329 // returned. This should never happen in practice, though, since our index |
335 // construction always indexes log entry #0. | 330 // construction always indexes log entry #0. |
336 // | 331 // |
337 // It does this by performing a binary search over the index entries. | 332 // It does this by performing a binary search over the index entries. |
338 func indexEntryFor(entries []*logpb.LogIndex_Entry, i types.MessageIndex) int { | 333 func indexEntryFor(entries []*logpb.LogIndex_Entry, i types.MessageIndex) int { |
339 ui := uint64(i) | 334 ui := uint64(i) |
340 s := sort.Search(len(entries), func(i int) bool { | 335 s := sort.Search(len(entries), func(i int) bool { |
341 return entries[i].StreamIndex > ui | 336 return entries[i].StreamIndex > ui |
342 }) | 337 }) |
343 | 338 |
344 // The returned index is the one immediately after the index that we wan
t. If | 339 // The returned index is the one immediately after the index that we wan
t. If |
345 // our search returned 0, the first index entry is > our search entry, a
nd we | 340 // our search returned 0, the first index entry is > our search entry, a
nd we |
346 // will return nil. | 341 // will return nil. |
347 return s - 1 | 342 return s - 1 |
348 } | 343 } |
OLD | NEW |