| OLD | NEW |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 // Package archive implements a storage.Storage instance that retrieves logs | 5 // Package archive implements a storage.Storage instance that retrieves logs |
| 6 // from a Google Storage archive. | 6 // from a Google Storage archive. |
| 7 // | 7 // |
| 8 // This is a special implementation of storage.Storage, and does not fully | 8 // This is a special implementation of storage.Storage, and does not fully |
| 9 // conform to the API expecations. Namely: | 9 // conform to the API expecations. Namely: |
| 10 // - It is read-only. Mutation methods will return storage.ErrReadOnly. | 10 // - It is read-only. Mutation methods will return storage.ErrReadOnly. |
| 11 // - Storage methods ignore the supplied Path argument, instead opting for | 11 // - Storage methods ignore the supplied Path argument, instead opting for |
| 12 // the archive configured in its Options. | 12 // the archive configured in its Options. |
| 13 package archive | 13 package archive |
| 14 | 14 |
| 15 import ( | 15 import ( |
| 16 "bytes" | 16 "bytes" |
| 17 "encoding/hex" |
| 17 "errors" | 18 "errors" |
| 19 "fmt" |
| 18 "io" | 20 "io" |
| 19 "io/ioutil" | 21 "io/ioutil" |
| 20 "sort" | 22 "sort" |
| 21 "strings" | 23 "strings" |
| 22 "sync" | 24 "sync" |
| 23 | 25 |
| 24 "golang.org/x/net/context" | 26 "golang.org/x/net/context" |
| 25 | 27 |
| 26 "github.com/golang/protobuf/proto" | 28 "github.com/golang/protobuf/proto" |
| 27 "github.com/luci/luci-go/common/gcloud/gs" | 29 "github.com/luci/luci-go/common/gcloud/gs" |
| (...skipping 70 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 98 | 100 |
| 99 func (s *storageImpl) Config(storage.Config) error { return storage.ErrReadOnly
} | 101 func (s *storageImpl) Config(storage.Config) error { return storage.ErrReadOnly
} |
| 100 func (s *storageImpl) Put(storage.PutRequest) error { return storage.ErrReadOnly
} | 102 func (s *storageImpl) Put(storage.PutRequest) error { return storage.ErrReadOnly
} |
| 101 | 103 |
| 102 func (s *storageImpl) Get(req storage.GetRequest, cb storage.GetCallback) error
{ | 104 func (s *storageImpl) Get(req storage.GetRequest, cb storage.GetCallback) error
{ |
| 103 idx, err := s.getIndex() | 105 idx, err := s.getIndex() |
| 104 if err != nil { | 106 if err != nil { |
| 105 return err | 107 return err |
| 106 } | 108 } |
| 107 | 109 |
| 110 fmt.Println("GOT INDEX:", proto.MarshalTextString(idx)) |
| 111 |
| 108 // Identify the byte offsets that we want to fetch from the entries stre
am. | 112 // Identify the byte offsets that we want to fetch from the entries stre
am. |
| 109 st := s.buildGetStrategy(&req, idx) | 113 st := s.buildGetStrategy(&req, idx) |
| 110 if st.lastIndex == -1 || req.Index > st.lastIndex { | 114 if st.lastIndex == -1 || req.Index > st.lastIndex { |
| 111 // No records to read. | 115 // No records to read. |
| 112 return nil | 116 return nil |
| 113 } | 117 } |
| 114 | 118 |
| 115 r, err := s.Client.NewReader(s.streamBucket, s.streamPath, gs.Options{ | 119 r, err := s.Client.NewReader(s.streamBucket, s.streamPath, gs.Options{ |
| 116 From: st.from, | 120 From: st.from, |
| 117 To: st.to, | 121 To: st.to, |
| (...skipping 36 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 154 buf.Reset() | 158 buf.Reset() |
| 155 buf.Grow(int(sz)) | 159 buf.Grow(int(sz)) |
| 156 if _, err := buf.ReadFrom(r); err != nil { | 160 if _, err := buf.ReadFrom(r); err != nil { |
| 157 log.Fields{ | 161 log.Fields{ |
| 158 log.ErrorKey: err, | 162 log.ErrorKey: err, |
| 159 "offset": offset, | 163 "offset": offset, |
| 160 "frameSize": sz, | 164 "frameSize": sz, |
| 161 }.Errorf(s, "Failed to read frame data.") | 165 }.Errorf(s, "Failed to read frame data.") |
| 162 return err | 166 return err |
| 163 } | 167 } |
| 168 fmt.Println(hex.EncodeToString(buf.Bytes())) |
| 164 if err := proto.Unmarshal(buf.Bytes(), &le); err != nil { | 169 if err := proto.Unmarshal(buf.Bytes(), &le); err != nil { |
| 165 log.Fields{ | 170 log.Fields{ |
| 166 log.ErrorKey: err, | 171 log.ErrorKey: err, |
| 167 "offset": offset, | 172 "offset": offset, |
| 168 "frameSize": sz, | 173 "frameSize": sz, |
| 169 }.Errorf(s, "Failed to unmarshal log data.") | 174 }.Errorf(s, "Failed to unmarshal log data.") |
| 170 return err | 175 return err |
| 171 } | 176 } |
| 172 | 177 |
| 173 idx := types.MessageIndex(le.StreamIndex) | 178 idx := types.MessageIndex(le.StreamIndex) |
| (...skipping 152 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 326 ui := uint64(i) | 331 ui := uint64(i) |
| 327 s := sort.Search(len(entries), func(i int) bool { | 332 s := sort.Search(len(entries), func(i int) bool { |
| 328 return entries[i].StreamIndex > ui | 333 return entries[i].StreamIndex > ui |
| 329 }) | 334 }) |
| 330 | 335 |
| 331 // The returned index is the one immediately after the index that we wan
t. If | 336 // The returned index is the one immediately after the index that we wan
t. If |
| 332 // our search returned 0, the first index entry is > our search entry, a
nd we | 337 // our search returned 0, the first index entry is > our search entry, a
nd we |
| 333 // will return nil. | 338 // will return nil. |
| 334 return s - 1 | 339 return s - 1 |
| 335 } | 340 } |
| OLD | NEW |