Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(41)

Side by Side Diff: server/logdog/storage/archive/storage.go

Issue 1844253002: Fix GS client breakage, remove offset hacks. (Closed) Base URL: https://github.com/luci/luci-go@subscriber-update
Patch Set: Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « common/gcloud/gs/opts.go ('k') | no next file » | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 // Package archive implements a storage.Storage instance that retrieves logs 5 // Package archive implements a storage.Storage instance that retrieves logs
6 // from a Google Storage archive. 6 // from a Google Storage archive.
7 // 7 //
8 // This is a special implementation of storage.Storage, and does not fully 8 // This is a special implementation of storage.Storage, and does not fully
9 // conform to the API expecations. Namely: 9 // conform to the API expecations. Namely:
10 // - It is read-only. Mutation methods will return storage.ErrReadOnly. 10 // - It is read-only. Mutation methods will return storage.ErrReadOnly.
11 // - Storage methods ignore the supplied Path argument, instead opting for 11 // - Storage methods ignore the supplied Path argument, instead opting for
12 // the archive configured in its Options. 12 // the archive configured in its Options.
13 package archive 13 package archive
14 14
15 import ( 15 import (
16 "bytes" 16 "bytes"
17 "encoding/hex"
17 "errors" 18 "errors"
19 "fmt"
18 "io" 20 "io"
19 "io/ioutil" 21 "io/ioutil"
20 "sort" 22 "sort"
21 "strings" 23 "strings"
22 "sync" 24 "sync"
23 25
24 "golang.org/x/net/context" 26 "golang.org/x/net/context"
25 27
26 "github.com/golang/protobuf/proto" 28 "github.com/golang/protobuf/proto"
27 "github.com/luci/luci-go/common/gcloud/gs" 29 "github.com/luci/luci-go/common/gcloud/gs"
(...skipping 70 matching lines...) Expand 10 before | Expand all | Expand 10 after
98 100
99 func (s *storageImpl) Config(storage.Config) error { return storage.ErrReadOnly } 101 func (s *storageImpl) Config(storage.Config) error { return storage.ErrReadOnly }
100 func (s *storageImpl) Put(storage.PutRequest) error { return storage.ErrReadOnly } 102 func (s *storageImpl) Put(storage.PutRequest) error { return storage.ErrReadOnly }
101 103
102 func (s *storageImpl) Get(req storage.GetRequest, cb storage.GetCallback) error { 104 func (s *storageImpl) Get(req storage.GetRequest, cb storage.GetCallback) error {
103 idx, err := s.getIndex() 105 idx, err := s.getIndex()
104 if err != nil { 106 if err != nil {
105 return err 107 return err
106 } 108 }
107 109
110 fmt.Println("GOT INDEX:", proto.MarshalTextString(idx))
111
108 // Identify the byte offsets that we want to fetch from the entries stre am. 112 // Identify the byte offsets that we want to fetch from the entries stre am.
109 st := s.buildGetStrategy(&req, idx) 113 st := s.buildGetStrategy(&req, idx)
110 if st.lastIndex == -1 || req.Index > st.lastIndex { 114 if st.lastIndex == -1 || req.Index > st.lastIndex {
111 // No records to read. 115 // No records to read.
112 return nil 116 return nil
113 } 117 }
114 118
115 r, err := s.Client.NewReader(s.streamBucket, s.streamPath, gs.Options{ 119 r, err := s.Client.NewReader(s.streamBucket, s.streamPath, gs.Options{
116 From: st.from, 120 From: st.from,
117 To: st.to, 121 To: st.to,
(...skipping 36 matching lines...) Expand 10 before | Expand all | Expand 10 after
154 buf.Reset() 158 buf.Reset()
155 buf.Grow(int(sz)) 159 buf.Grow(int(sz))
156 if _, err := buf.ReadFrom(r); err != nil { 160 if _, err := buf.ReadFrom(r); err != nil {
157 log.Fields{ 161 log.Fields{
158 log.ErrorKey: err, 162 log.ErrorKey: err,
159 "offset": offset, 163 "offset": offset,
160 "frameSize": sz, 164 "frameSize": sz,
161 }.Errorf(s, "Failed to read frame data.") 165 }.Errorf(s, "Failed to read frame data.")
162 return err 166 return err
163 } 167 }
168 fmt.Println(hex.EncodeToString(buf.Bytes()))
164 if err := proto.Unmarshal(buf.Bytes(), &le); err != nil { 169 if err := proto.Unmarshal(buf.Bytes(), &le); err != nil {
165 log.Fields{ 170 log.Fields{
166 log.ErrorKey: err, 171 log.ErrorKey: err,
167 "offset": offset, 172 "offset": offset,
168 "frameSize": sz, 173 "frameSize": sz,
169 }.Errorf(s, "Failed to unmarshal log data.") 174 }.Errorf(s, "Failed to unmarshal log data.")
170 return err 175 return err
171 } 176 }
172 177
173 idx := types.MessageIndex(le.StreamIndex) 178 idx := types.MessageIndex(le.StreamIndex)
(...skipping 152 matching lines...) Expand 10 before | Expand all | Expand 10 after
326 ui := uint64(i) 331 ui := uint64(i)
327 s := sort.Search(len(entries), func(i int) bool { 332 s := sort.Search(len(entries), func(i int) bool {
328 return entries[i].StreamIndex > ui 333 return entries[i].StreamIndex > ui
329 }) 334 })
330 335
331 // The returned index is the one immediately after the index that we wan t. If 336 // The returned index is the one immediately after the index that we wan t. If
332 // our search returned 0, the first index entry is > our search entry, a nd we 337 // our search returned 0, the first index entry is > our search entry, a nd we
333 // will return nil. 338 // will return nil.
334 return s - 1 339 return s - 1
335 } 340 }
OLDNEW
« no previous file with comments | « common/gcloud/gs/opts.go ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698