Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(213)

Side by Side Diff: server/logdog/storage/archive/storage.go

Issue 1863973002: LogDog: Update to archival V2. (Closed) Base URL: https://github.com/luci/luci-go@grpcutil-errors
Patch Set: Code review comments, use Pub/Sub, archival staging, quality of life. Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 // Package archive implements a storage.Storage instance that retrieves logs 5 // Package archive implements a storage.Storage instance that retrieves logs
6 // from a Google Storage archive. 6 // from a Google Storage archive.
7 // 7 //
8 // This is a special implementation of storage.Storage, and does not fully 8 // This is a special implementation of storage.Storage, and does not fully
9 // conform to the API expecations. Namely: 9 // conform to the API expecations. Namely:
10 // - It is read-only. Mutation methods will return storage.ErrReadOnly. 10 // - It is read-only. Mutation methods will return storage.ErrReadOnly.
11 // - Storage methods ignore the supplied Path argument, instead opting for 11 // - Storage methods ignore the supplied Path argument, instead opting for
12 // the archive configured in its Options. 12 // the archive configured in its Options.
13 package archive 13 package archive
14 14
15 import ( 15 import (
16 "bytes" 16 "bytes"
17 » "errors" 17 » "fmt"
18 "io" 18 "io"
19 "io/ioutil" 19 "io/ioutil"
20 "sort" 20 "sort"
21 "strings"
22 "sync" 21 "sync"
23 22
24 "golang.org/x/net/context" 23 "golang.org/x/net/context"
25 24
26 "github.com/golang/protobuf/proto" 25 "github.com/golang/protobuf/proto"
27 "github.com/luci/luci-go/common/gcloud/gs" 26 "github.com/luci/luci-go/common/gcloud/gs"
28 "github.com/luci/luci-go/common/iotools" 27 "github.com/luci/luci-go/common/iotools"
29 "github.com/luci/luci-go/common/logdog/types" 28 "github.com/luci/luci-go/common/logdog/types"
30 log "github.com/luci/luci-go/common/logging" 29 log "github.com/luci/luci-go/common/logging"
31 "github.com/luci/luci-go/common/proto/logdog/logpb" 30 "github.com/luci/luci-go/common/proto/logdog/logpb"
(...skipping 22 matching lines...) Expand all
54 // Client is the HTTP client to use for authentication. 53 // Client is the HTTP client to use for authentication.
55 // 54 //
56 // Closing this Storage instance does not close the underlying Client. 55 // Closing this Storage instance does not close the underlying Client.
57 Client gs.Client 56 Client gs.Client
58 } 57 }
59 58
60 type storageImpl struct { 59 type storageImpl struct {
61 *Options 60 *Options
62 context.Context 61 context.Context
63 62
64 » streamBucket string 63 » streamPath gs.Path
dnj 2016/04/11 17:20:05 This was just updated to use "gs.Path" instead of
65 » streamPath string 64 » indexPath gs.Path
66 » indexBucket string
67 » indexPath string
68 65
69 indexMu sync.Mutex 66 indexMu sync.Mutex
70 index *logpb.LogIndex 67 index *logpb.LogIndex
71 closeClient bool 68 closeClient bool
72 } 69 }
73 70
74 // New instantiates a new Storage instance, bound to the supplied Options. 71 // New instantiates a new Storage instance, bound to the supplied Options.
75 func New(ctx context.Context, o Options) (storage.Storage, error) { 72 func New(ctx context.Context, o Options) (storage.Storage, error) {
76 s := storageImpl{ 73 s := storageImpl{
77 Options: &o, 74 Options: &o,
78 Context: ctx, 75 Context: ctx,
76
77 streamPath: gs.Path(o.StreamURL),
78 indexPath: gs.Path(o.IndexURL),
79 } 79 }
80 80
81 » s.indexBucket, s.indexPath = splitGSURL(o.IndexURL) 81 » if !s.streamPath.IsFullPath() {
82 » if s.indexBucket == "" || s.indexPath == "" { 82 » » return nil, fmt.Errorf("invalid stream URL: %q", s.streamPath)
83 » » return nil, errors.New("invalid index URL") 83 » }
84 » if !s.indexPath.IsFullPath() {
85 » » return nil, fmt.Errorf("invalid index URL: %v", s.indexPath)
84 } 86 }
85 87
86 s.streamBucket, s.streamPath = splitGSURL(o.StreamURL)
87 if s.streamBucket == "" || s.streamPath == "" {
88 return nil, errors.New("invalid stream URL")
89 }
90 return &s, nil 88 return &s, nil
91 } 89 }
92 90
93 func (s *storageImpl) Close() { 91 func (s *storageImpl) Close() {
94 if s.closeClient { 92 if s.closeClient {
95 _ = s.Client.Close() 93 _ = s.Client.Close()
96 } 94 }
97 } 95 }
98 96
99 func (s *storageImpl) Config(storage.Config) error { return storage.ErrReadOnly } 97 func (s *storageImpl) Config(storage.Config) error { return storage.ErrReadOnly }
100 func (s *storageImpl) Put(storage.PutRequest) error { return storage.ErrReadOnly } 98 func (s *storageImpl) Put(storage.PutRequest) error { return storage.ErrReadOnly }
101 99
102 func (s *storageImpl) Get(req storage.GetRequest, cb storage.GetCallback) error { 100 func (s *storageImpl) Get(req storage.GetRequest, cb storage.GetCallback) error {
103 idx, err := s.getIndex() 101 idx, err := s.getIndex()
104 if err != nil { 102 if err != nil {
105 return err 103 return err
106 } 104 }
107 105
108 // Identify the byte offsets that we want to fetch from the entries stre am. 106 // Identify the byte offsets that we want to fetch from the entries stre am.
109 st := s.buildGetStrategy(&req, idx) 107 st := s.buildGetStrategy(&req, idx)
110 if st.lastIndex == -1 || req.Index > st.lastIndex { 108 if st.lastIndex == -1 || req.Index > st.lastIndex {
111 // No records to read. 109 // No records to read.
112 return nil 110 return nil
113 } 111 }
114 112
115 » r, err := s.Client.NewReader(s.streamBucket, s.streamPath, gs.Options{ 113 » r, err := s.Client.NewReader(s.streamPath, gs.Options{
116 From: st.from, 114 From: st.from,
117 To: st.to, 115 To: st.to,
118 }) 116 })
119 if err != nil { 117 if err != nil {
120 log.WithError(err).Errorf(s, "Failed to create stream Reader.") 118 log.WithError(err).Errorf(s, "Failed to create stream Reader.")
121 return err 119 return err
122 } 120 }
123 defer func() { 121 defer func() {
124 if err := r.Close(); err != nil { 122 if err := r.Close(); err != nil {
125 log.WithError(err).Warningf(s, "Error closing stream Rea der.") 123 log.WithError(err).Warningf(s, "Error closing stream Rea der.")
(...skipping 74 matching lines...) Expand 10 before | Expand all | Expand 10 after
200 return nil, 0, err 198 return nil, 0, err
201 } 199 }
202 200
203 // Get the offset of the last record. 201 // Get the offset of the last record.
204 if len(idx.Entries) == 0 { 202 if len(idx.Entries) == 0 {
205 return nil, 0, nil 203 return nil, 0, nil
206 } 204 }
207 lle := idx.Entries[len(idx.Entries)-1] 205 lle := idx.Entries[len(idx.Entries)-1]
208 206
209 // Get a Reader for the Tail entry. 207 // Get a Reader for the Tail entry.
210 » r, err := s.Client.NewReader(s.streamBucket, s.streamPath, gs.Options{ 208 » r, err := s.Client.NewReader(s.streamPath, gs.Options{
211 From: int64(lle.Offset), 209 From: int64(lle.Offset),
212 }) 210 })
213 if err != nil { 211 if err != nil {
214 log.Fields{ 212 log.Fields{
215 log.ErrorKey: err, 213 log.ErrorKey: err,
216 "offset": lle.Offset, 214 "offset": lle.Offset,
217 }.Errorf(s, "Failed to create reader.") 215 }.Errorf(s, "Failed to create reader.")
218 return nil, 0, err 216 return nil, 0, err
219 } 217 }
220 defer func() { 218 defer func() {
(...skipping 11 matching lines...) Expand all
232 230
233 return d, types.MessageIndex(lle.StreamIndex), nil 231 return d, types.MessageIndex(lle.StreamIndex), nil
234 } 232 }
235 233
236 // getIndex returns the cached log stream index, fetching it if necessary. 234 // getIndex returns the cached log stream index, fetching it if necessary.
237 func (s *storageImpl) getIndex() (*logpb.LogIndex, error) { 235 func (s *storageImpl) getIndex() (*logpb.LogIndex, error) {
238 s.indexMu.Lock() 236 s.indexMu.Lock()
239 defer s.indexMu.Unlock() 237 defer s.indexMu.Unlock()
240 238
241 if s.index == nil { 239 if s.index == nil {
242 » » r, err := s.Client.NewReader(s.indexBucket, s.indexPath, gs.Opti ons{}) 240 » » r, err := s.Client.NewReader(s.indexPath, gs.Options{})
243 if err != nil { 241 if err != nil {
244 log.WithError(err).Errorf(s, "Failed to create index Rea der.") 242 log.WithError(err).Errorf(s, "Failed to create index Rea der.")
245 return nil, err 243 return nil, err
246 } 244 }
247 defer func() { 245 defer func() {
248 if err := r.Close(); err != nil { 246 if err := r.Close(); err != nil {
249 log.WithError(err).Warningf(s, "Error closing in dex Reader.") 247 log.WithError(err).Warningf(s, "Error closing in dex Reader.")
250 } 248 }
251 }() 249 }()
252 indexData, err := ioutil.ReadAll(r) 250 indexData, err := ioutil.ReadAll(r)
253 if err != nil { 251 if err != nil {
254 log.WithError(err).Errorf(s, "Failed to read index.") 252 log.WithError(err).Errorf(s, "Failed to read index.")
255 return nil, err 253 return nil, err
256 } 254 }
257 255
258 index := logpb.LogIndex{} 256 index := logpb.LogIndex{}
259 if err := proto.Unmarshal(indexData, &index); err != nil { 257 if err := proto.Unmarshal(indexData, &index); err != nil {
260 log.WithError(err).Errorf(s, "Failed to unmarshal index. ") 258 log.WithError(err).Errorf(s, "Failed to unmarshal index. ")
261 return nil, err 259 return nil, err
262 } 260 }
263 261
264 s.index = &index 262 s.index = &index
265 } 263 }
266 return s.index, nil 264 return s.index, nil
267 } 265 }
268 266
269 func splitGSURL(u string) (string, string) {
270 parts := strings.SplitN(strings.TrimPrefix(u, "gs://"), "/", 2)
271 if len(parts) == 1 {
272 return parts[0], ""
273 }
274 return parts[0], parts[1]
275 }
276
277 type getStrategy struct { 267 type getStrategy struct {
278 // from is the beginning byte offset of the log entry stream. 268 // from is the beginning byte offset of the log entry stream.
279 from int64 269 from int64
280 // to is the ending byte offset of the log entry stream. 270 // to is the ending byte offset of the log entry stream.
281 to int64 271 to int64
282 272
283 // lastIndex is the last log entry index in the stream. This will be -1 if 273 // lastIndex is the last log entry index in the stream. This will be -1 if
284 // there are no entries in the stream. 274 // there are no entries in the stream.
285 lastIndex types.MessageIndex 275 lastIndex types.MessageIndex
286 } 276 }
(...skipping 40 matching lines...) Expand 10 before | Expand all | Expand 10 after
327 ui := uint64(i) 317 ui := uint64(i)
328 s := sort.Search(len(entries), func(i int) bool { 318 s := sort.Search(len(entries), func(i int) bool {
329 return entries[i].StreamIndex > ui 319 return entries[i].StreamIndex > ui
330 }) 320 })
331 321
332 // The returned index is the one immediately after the index that we wan t. If 322 // The returned index is the one immediately after the index that we wan t. If
333 // our search returned 0, the first index entry is > our search entry, a nd we 323 // our search returned 0, the first index entry is > our search entry, a nd we
334 // will return nil. 324 // will return nil.
335 return s - 1 325 return s - 1
336 } 326 }
OLDNEW
« server/logdog/archive/archive.go ('K') | « server/logdog/archive/archive.go ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698