Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(253)

Side by Side Diff: server/logdog/storage/archive/storage.go

Issue 1838803002: LogDog: BigTable batching schema. (Closed) Base URL: https://github.com/luci/luci-go@recordio-split
Patch Set: Minor comments and quality of code tweaks. Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « server/internal/logdog/service/service.go ('k') | server/logdog/storage/bigtable/bigtable.go » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 // Package archive implements a storage.Storage instance that retrieves logs 5 // Package archive implements a storage.Storage instance that retrieves logs
6 // from a Google Storage archive. 6 // from a Google Storage archive.
7 // 7 //
8 // This is a special implementation of storage.Storage, and does not fully 8 // This is a special implementation of storage.Storage, and does not fully
9 // conform to the API expecations. Namely: 9 // conform to the API expecations. Namely:
10 // - It is read-only. Mutation methods will return storage.ErrReadOnly. 10 // - It is read-only. Mutation methods will return storage.ErrReadOnly.
(...skipping 40 matching lines...) Expand 10 before | Expand all | Expand 10 after
51 // StreamURL is the Google Storage URL for the stream's entries. 51 // StreamURL is the Google Storage URL for the stream's entries.
52 StreamURL string 52 StreamURL string
53 53
54 // Client is the HTTP client to use for authentication. 54 // Client is the HTTP client to use for authentication.
55 // 55 //
56 // Closing this Storage instance does not close the underlying Client. 56 // Closing this Storage instance does not close the underlying Client.
57 Client gs.Client 57 Client gs.Client
58 } 58 }
59 59
60 type storageImpl struct { 60 type storageImpl struct {
61 *Options
61 context.Context 62 context.Context
62 *Options
63 63
64 streamBucket string 64 streamBucket string
65 streamPath string 65 streamPath string
66 indexBucket string 66 indexBucket string
67 indexPath string 67 indexPath string
68 68
69 indexMu sync.Mutex 69 indexMu sync.Mutex
70 index *logpb.LogIndex 70 index *logpb.LogIndex
71 closeClient bool 71 closeClient bool
72 } 72 }
73 73
74 // New instantiates a new Storage instance, bound to the supplied Options. 74 // New instantiates a new Storage instance, bound to the supplied Options.
75 func New(c context.Context, o Options) (storage.Storage, error) { 75 func New(ctx context.Context, o Options) (storage.Storage, error) {
76 s := storageImpl{ 76 s := storageImpl{
77 Context: c,
78 Options: &o, 77 Options: &o,
78 Context: ctx,
79 } 79 }
80 80
81 s.indexBucket, s.indexPath = splitGSURL(o.IndexURL) 81 s.indexBucket, s.indexPath = splitGSURL(o.IndexURL)
82 if s.indexBucket == "" || s.indexPath == "" { 82 if s.indexBucket == "" || s.indexPath == "" {
83 return nil, errors.New("invalid index URL") 83 return nil, errors.New("invalid index URL")
84 } 84 }
85 85
86 s.streamBucket, s.streamPath = splitGSURL(o.StreamURL) 86 s.streamBucket, s.streamPath = splitGSURL(o.StreamURL)
87 if s.streamBucket == "" || s.streamPath == "" { 87 if s.streamBucket == "" || s.streamPath == "" {
88 return nil, errors.New("invalid stream URL") 88 return nil, errors.New("invalid stream URL")
89 } 89 }
90 return &s, nil 90 return &s, nil
91 } 91 }
92 92
93 func (s *storageImpl) Close() { 93 func (s *storageImpl) Close() {
94 if s.closeClient { 94 if s.closeClient {
95 » » if err := s.Client.Close(); err != nil { 95 » » _ = s.Client.Close()
96 » » » log.WithError(err).Errorf(s, "Failed to close client.")
97 » » }
98 } 96 }
99 } 97 }
100 98
101 func (s *storageImpl) Config(storage.Config) error { return storage.ErrReadOnl y } 99 func (s *storageImpl) Config(storage.Config) error { return storage.ErrReadOnly }
102 func (s *storageImpl) Put(*storage.PutRequest) error { return storage.ErrReadOnl y } 100 func (s *storageImpl) Put(storage.PutRequest) error { return storage.ErrReadOnly }
103 101
104 func (s *storageImpl) Get(req *storage.GetRequest, cb storage.GetCallback) error { 102 func (s *storageImpl) Get(req storage.GetRequest, cb storage.GetCallback) error {
105 idx, err := s.getIndex() 103 idx, err := s.getIndex()
106 if err != nil { 104 if err != nil {
107 return err 105 return err
108 } 106 }
109 107
110 // Identify the byte offsets that we want to fetch from the entries stre am. 108 // Identify the byte offsets that we want to fetch from the entries stre am.
111 » st := s.buildGetStrategy(req, idx) 109 » st := s.buildGetStrategy(&req, idx)
112 if st.lastIndex == -1 || req.Index > st.lastIndex { 110 if st.lastIndex == -1 || req.Index > st.lastIndex {
113 // No records to read. 111 // No records to read.
114 return nil 112 return nil
115 } 113 }
116 114
117 r, err := s.Client.NewReader(s.streamBucket, s.streamPath, gs.Options{ 115 r, err := s.Client.NewReader(s.streamBucket, s.streamPath, gs.Options{
118 From: st.from, 116 From: st.from,
119 To: st.to, 117 To: st.to,
120 }) 118 })
121 if err != nil { 119 if err != nil {
(...skipping 66 matching lines...) Expand 10 before | Expand all | Expand 10 after
188 if max > 0 { 186 if max > 0 {
189 max-- 187 max--
190 if max == 0 { 188 if max == 0 {
191 break 189 break
192 } 190 }
193 } 191 }
194 } 192 }
195 return nil 193 return nil
196 } 194 }
197 195
198 func (s *storageImpl) Tail(types.StreamPath) ([]byte, types.MessageIndex, error) { 196 func (s *storageImpl) Tail(path types.StreamPath) ([]byte, types.MessageIndex, e rror) {
199 idx, err := s.getIndex() 197 idx, err := s.getIndex()
200 if err != nil { 198 if err != nil {
201 return nil, 0, err 199 return nil, 0, err
202 } 200 }
203 201
204 // Get the offset of the last record. 202 // Get the offset of the last record.
205 if len(idx.Entries) == 0 { 203 if len(idx.Entries) == 0 {
206 return nil, 0, nil 204 return nil, 0, nil
207 } 205 }
208 lle := idx.Entries[len(idx.Entries)-1] 206 lle := idx.Entries[len(idx.Entries)-1]
(...skipping 119 matching lines...) Expand 10 before | Expand all | Expand 10 after
328 ui := uint64(i) 326 ui := uint64(i)
329 s := sort.Search(len(entries), func(i int) bool { 327 s := sort.Search(len(entries), func(i int) bool {
330 return entries[i].StreamIndex > ui 328 return entries[i].StreamIndex > ui
331 }) 329 })
332 330
333 // The returned index is the one immediately after the index that we wan t. If 331 // The returned index is the one immediately after the index that we wan t. If
334 // our search returned 0, the first index entry is > our search entry, a nd we 332 // our search returned 0, the first index entry is > our search entry, a nd we
335 // will return nil. 333 // will return nil.
336 return s - 1 334 return s - 1
337 } 335 }
OLDNEW
« no previous file with comments | « server/internal/logdog/service/service.go ('k') | server/logdog/storage/bigtable/bigtable.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698