Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 // Package archive implements a storage.Storage instance that retrieves logs | 5 // Package archive implements a storage.Storage instance that retrieves logs |
| 6 // from a Google Storage archive. | 6 // from a Google Storage archive. |
| 7 // | 7 // |
| 8 // This is a special implementation of storage.Storage, and does not fully | 8 // This is a special implementation of storage.Storage, and does not fully |
| 9 // conform to the API expecations. Namely: | 9 // conform to the API expecations. Namely: |
| 10 // - It is read-only. Mutation methods will return storage.ErrReadOnly. | 10 // - It is read-only. Mutation methods will return storage.ErrReadOnly. |
| 11 // - Storage methods ignore the supplied Path argument, instead opting for | 11 // - Storage methods ignore the supplied Path argument, instead opting for |
| 12 // the archive configured in its Options. | 12 // the archive configured in its Options. |
| 13 package archive | 13 package archive |
| 14 | 14 |
| 15 import ( | 15 import ( |
| 16 "bytes" | 16 "bytes" |
| 17 » "errors" | 17 » "fmt" |
| 18 "io" | 18 "io" |
| 19 "io/ioutil" | 19 "io/ioutil" |
| 20 "sort" | 20 "sort" |
| 21 "strings" | |
| 22 "sync" | 21 "sync" |
| 23 | 22 |
| 24 "golang.org/x/net/context" | 23 "golang.org/x/net/context" |
| 25 | 24 |
| 26 "github.com/golang/protobuf/proto" | 25 "github.com/golang/protobuf/proto" |
| 27 "github.com/luci/luci-go/common/gcloud/gs" | 26 "github.com/luci/luci-go/common/gcloud/gs" |
| 28 "github.com/luci/luci-go/common/iotools" | 27 "github.com/luci/luci-go/common/iotools" |
| 29 "github.com/luci/luci-go/common/logdog/types" | 28 "github.com/luci/luci-go/common/logdog/types" |
| 30 log "github.com/luci/luci-go/common/logging" | 29 log "github.com/luci/luci-go/common/logging" |
| 31 "github.com/luci/luci-go/common/proto/logdog/logpb" | 30 "github.com/luci/luci-go/common/proto/logdog/logpb" |
| (...skipping 22 matching lines...) Expand all Loading... | |
| 54 // Client is the HTTP client to use for authentication. | 53 // Client is the HTTP client to use for authentication. |
| 55 // | 54 // |
| 56 // Closing this Storage instance does not close the underlying Client. | 55 // Closing this Storage instance does not close the underlying Client. |
| 57 Client gs.Client | 56 Client gs.Client |
| 58 } | 57 } |
| 59 | 58 |
| 60 type storageImpl struct { | 59 type storageImpl struct { |
| 61 *Options | 60 *Options |
| 62 context.Context | 61 context.Context |
| 63 | 62 |
| 64 » streamBucket string | 63 » streamPath gs.Path |
|
dnj
2016/04/11 17:20:05
This was just updated to use "gs.Path" instead of
| |
| 65 » streamPath string | 64 » indexPath gs.Path |
| 66 » indexBucket string | |
| 67 » indexPath string | |
| 68 | 65 |
| 69 indexMu sync.Mutex | 66 indexMu sync.Mutex |
| 70 index *logpb.LogIndex | 67 index *logpb.LogIndex |
| 71 closeClient bool | 68 closeClient bool |
| 72 } | 69 } |
| 73 | 70 |
| 74 // New instantiates a new Storage instance, bound to the supplied Options. | 71 // New instantiates a new Storage instance, bound to the supplied Options. |
| 75 func New(ctx context.Context, o Options) (storage.Storage, error) { | 72 func New(ctx context.Context, o Options) (storage.Storage, error) { |
| 76 s := storageImpl{ | 73 s := storageImpl{ |
| 77 Options: &o, | 74 Options: &o, |
| 78 Context: ctx, | 75 Context: ctx, |
| 76 | |
| 77 streamPath: gs.Path(o.StreamURL), | |
| 78 indexPath: gs.Path(o.IndexURL), | |
| 79 } | 79 } |
| 80 | 80 |
| 81 » s.indexBucket, s.indexPath = splitGSURL(o.IndexURL) | 81 » if !s.streamPath.IsFullPath() { |
| 82 » if s.indexBucket == "" || s.indexPath == "" { | 82 » » return nil, fmt.Errorf("invalid stream URL: %q", s.streamPath) |
| 83 » » return nil, errors.New("invalid index URL") | 83 » } |
| 84 » if !s.indexPath.IsFullPath() { | |
| 85 » » return nil, fmt.Errorf("invalid index URL: %v", s.indexPath) | |
| 84 } | 86 } |
| 85 | 87 |
| 86 s.streamBucket, s.streamPath = splitGSURL(o.StreamURL) | |
| 87 if s.streamBucket == "" || s.streamPath == "" { | |
| 88 return nil, errors.New("invalid stream URL") | |
| 89 } | |
| 90 return &s, nil | 88 return &s, nil |
| 91 } | 89 } |
| 92 | 90 |
| 93 func (s *storageImpl) Close() { | 91 func (s *storageImpl) Close() { |
| 94 if s.closeClient { | 92 if s.closeClient { |
| 95 _ = s.Client.Close() | 93 _ = s.Client.Close() |
| 96 } | 94 } |
| 97 } | 95 } |
| 98 | 96 |
| 99 func (s *storageImpl) Config(storage.Config) error { return storage.ErrReadOnly } | 97 func (s *storageImpl) Config(storage.Config) error { return storage.ErrReadOnly } |
| 100 func (s *storageImpl) Put(storage.PutRequest) error { return storage.ErrReadOnly } | 98 func (s *storageImpl) Put(storage.PutRequest) error { return storage.ErrReadOnly } |
| 101 | 99 |
| 102 func (s *storageImpl) Get(req storage.GetRequest, cb storage.GetCallback) error { | 100 func (s *storageImpl) Get(req storage.GetRequest, cb storage.GetCallback) error { |
| 103 idx, err := s.getIndex() | 101 idx, err := s.getIndex() |
| 104 if err != nil { | 102 if err != nil { |
| 105 return err | 103 return err |
| 106 } | 104 } |
| 107 | 105 |
| 108 // Identify the byte offsets that we want to fetch from the entries stre am. | 106 // Identify the byte offsets that we want to fetch from the entries stre am. |
| 109 st := s.buildGetStrategy(&req, idx) | 107 st := s.buildGetStrategy(&req, idx) |
| 110 if st.lastIndex == -1 || req.Index > st.lastIndex { | 108 if st.lastIndex == -1 || req.Index > st.lastIndex { |
| 111 // No records to read. | 109 // No records to read. |
| 112 return nil | 110 return nil |
| 113 } | 111 } |
| 114 | 112 |
| 115 » r, err := s.Client.NewReader(s.streamBucket, s.streamPath, gs.Options{ | 113 » r, err := s.Client.NewReader(s.streamPath, gs.Options{ |
| 116 From: st.from, | 114 From: st.from, |
| 117 To: st.to, | 115 To: st.to, |
| 118 }) | 116 }) |
| 119 if err != nil { | 117 if err != nil { |
| 120 log.WithError(err).Errorf(s, "Failed to create stream Reader.") | 118 log.WithError(err).Errorf(s, "Failed to create stream Reader.") |
| 121 return err | 119 return err |
| 122 } | 120 } |
| 123 defer func() { | 121 defer func() { |
| 124 if err := r.Close(); err != nil { | 122 if err := r.Close(); err != nil { |
| 125 log.WithError(err).Warningf(s, "Error closing stream Rea der.") | 123 log.WithError(err).Warningf(s, "Error closing stream Rea der.") |
| (...skipping 74 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 200 return nil, 0, err | 198 return nil, 0, err |
| 201 } | 199 } |
| 202 | 200 |
| 203 // Get the offset of the last record. | 201 // Get the offset of the last record. |
| 204 if len(idx.Entries) == 0 { | 202 if len(idx.Entries) == 0 { |
| 205 return nil, 0, nil | 203 return nil, 0, nil |
| 206 } | 204 } |
| 207 lle := idx.Entries[len(idx.Entries)-1] | 205 lle := idx.Entries[len(idx.Entries)-1] |
| 208 | 206 |
| 209 // Get a Reader for the Tail entry. | 207 // Get a Reader for the Tail entry. |
| 210 » r, err := s.Client.NewReader(s.streamBucket, s.streamPath, gs.Options{ | 208 » r, err := s.Client.NewReader(s.streamPath, gs.Options{ |
| 211 From: int64(lle.Offset), | 209 From: int64(lle.Offset), |
| 212 }) | 210 }) |
| 213 if err != nil { | 211 if err != nil { |
| 214 log.Fields{ | 212 log.Fields{ |
| 215 log.ErrorKey: err, | 213 log.ErrorKey: err, |
| 216 "offset": lle.Offset, | 214 "offset": lle.Offset, |
| 217 }.Errorf(s, "Failed to create reader.") | 215 }.Errorf(s, "Failed to create reader.") |
| 218 return nil, 0, err | 216 return nil, 0, err |
| 219 } | 217 } |
| 220 defer func() { | 218 defer func() { |
| (...skipping 11 matching lines...) Expand all Loading... | |
| 232 | 230 |
| 233 return d, types.MessageIndex(lle.StreamIndex), nil | 231 return d, types.MessageIndex(lle.StreamIndex), nil |
| 234 } | 232 } |
| 235 | 233 |
| 236 // getIndex returns the cached log stream index, fetching it if necessary. | 234 // getIndex returns the cached log stream index, fetching it if necessary. |
| 237 func (s *storageImpl) getIndex() (*logpb.LogIndex, error) { | 235 func (s *storageImpl) getIndex() (*logpb.LogIndex, error) { |
| 238 s.indexMu.Lock() | 236 s.indexMu.Lock() |
| 239 defer s.indexMu.Unlock() | 237 defer s.indexMu.Unlock() |
| 240 | 238 |
| 241 if s.index == nil { | 239 if s.index == nil { |
| 242 » » r, err := s.Client.NewReader(s.indexBucket, s.indexPath, gs.Opti ons{}) | 240 » » r, err := s.Client.NewReader(s.indexPath, gs.Options{}) |
| 243 if err != nil { | 241 if err != nil { |
| 244 log.WithError(err).Errorf(s, "Failed to create index Rea der.") | 242 log.WithError(err).Errorf(s, "Failed to create index Rea der.") |
| 245 return nil, err | 243 return nil, err |
| 246 } | 244 } |
| 247 defer func() { | 245 defer func() { |
| 248 if err := r.Close(); err != nil { | 246 if err := r.Close(); err != nil { |
| 249 log.WithError(err).Warningf(s, "Error closing in dex Reader.") | 247 log.WithError(err).Warningf(s, "Error closing in dex Reader.") |
| 250 } | 248 } |
| 251 }() | 249 }() |
| 252 indexData, err := ioutil.ReadAll(r) | 250 indexData, err := ioutil.ReadAll(r) |
| 253 if err != nil { | 251 if err != nil { |
| 254 log.WithError(err).Errorf(s, "Failed to read index.") | 252 log.WithError(err).Errorf(s, "Failed to read index.") |
| 255 return nil, err | 253 return nil, err |
| 256 } | 254 } |
| 257 | 255 |
| 258 index := logpb.LogIndex{} | 256 index := logpb.LogIndex{} |
| 259 if err := proto.Unmarshal(indexData, &index); err != nil { | 257 if err := proto.Unmarshal(indexData, &index); err != nil { |
| 260 log.WithError(err).Errorf(s, "Failed to unmarshal index. ") | 258 log.WithError(err).Errorf(s, "Failed to unmarshal index. ") |
| 261 return nil, err | 259 return nil, err |
| 262 } | 260 } |
| 263 | 261 |
| 264 s.index = &index | 262 s.index = &index |
| 265 } | 263 } |
| 266 return s.index, nil | 264 return s.index, nil |
| 267 } | 265 } |
| 268 | 266 |
| 269 func splitGSURL(u string) (string, string) { | |
| 270 parts := strings.SplitN(strings.TrimPrefix(u, "gs://"), "/", 2) | |
| 271 if len(parts) == 1 { | |
| 272 return parts[0], "" | |
| 273 } | |
| 274 return parts[0], parts[1] | |
| 275 } | |
| 276 | |
| 277 type getStrategy struct { | 267 type getStrategy struct { |
| 278 // from is the beginning byte offset of the log entry stream. | 268 // from is the beginning byte offset of the log entry stream. |
| 279 from int64 | 269 from int64 |
| 280 // to is the ending byte offset of the log entry stream. | 270 // to is the ending byte offset of the log entry stream. |
| 281 to int64 | 271 to int64 |
| 282 | 272 |
| 283 // lastIndex is the last log entry index in the stream. This will be -1 if | 273 // lastIndex is the last log entry index in the stream. This will be -1 if |
| 284 // there are no entries in the stream. | 274 // there are no entries in the stream. |
| 285 lastIndex types.MessageIndex | 275 lastIndex types.MessageIndex |
| 286 } | 276 } |
| (...skipping 40 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 327 ui := uint64(i) | 317 ui := uint64(i) |
| 328 s := sort.Search(len(entries), func(i int) bool { | 318 s := sort.Search(len(entries), func(i int) bool { |
| 329 return entries[i].StreamIndex > ui | 319 return entries[i].StreamIndex > ui |
| 330 }) | 320 }) |
| 331 | 321 |
| 332 // The returned index is the one immediately after the index that we wan t. If | 322 // The returned index is the one immediately after the index that we wan t. If |
| 333 // our search returned 0, the first index entry is > our search entry, a nd we | 323 // our search returned 0, the first index entry is > our search entry, a nd we |
| 334 // will return nil. | 324 // will return nil. |
| 335 return s - 1 | 325 return s - 1 |
| 336 } | 326 } |
| OLD | NEW |