| OLD | NEW | 
|---|
| 1 // Copyright 2016 The Chromium Authors. All rights reserved. | 1 // Copyright 2016 The Chromium Authors. All rights reserved. | 
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be | 
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. | 
| 4 | 4 | 
| 5 package archivist | 5 package archivist | 
| 6 | 6 | 
| 7 import ( | 7 import ( | 
| 8         "github.com/golang/protobuf/proto" | 8         "github.com/golang/protobuf/proto" | 
|  | 9         "github.com/luci/luci-go/common/config" | 
| 9         "github.com/luci/luci-go/common/logdog/types" | 10         "github.com/luci/luci-go/common/logdog/types" | 
| 10         log "github.com/luci/luci-go/common/logging" | 11         log "github.com/luci/luci-go/common/logging" | 
| 11         "github.com/luci/luci-go/common/proto/logdog/logpb" | 12         "github.com/luci/luci-go/common/proto/logdog/logpb" | 
| 12         "github.com/luci/luci-go/server/logdog/archive" | 13         "github.com/luci/luci-go/server/logdog/archive" | 
| 13         "github.com/luci/luci-go/server/logdog/storage" | 14         "github.com/luci/luci-go/server/logdog/storage" | 
| 14         "golang.org/x/net/context" | 15         "golang.org/x/net/context" | 
| 15 ) | 16 ) | 
| 16 | 17 | 
| 17 // storageSource is an archive.LogEntrySource that pulls log entries from | 18 // storageSource is an archive.LogEntrySource that pulls log entries from | 
| 18 // intermediate storage via its storage.Storage instance. | 19 // intermediate storage via its storage.Storage instance. | 
| 19 type storageSource struct { | 20 type storageSource struct { | 
| 20         context.Context | 21         context.Context | 
| 21 | 22 | 
| 22         st            storage.Storage    // the storage instance to read from | 23         st            storage.Storage    // the storage instance to read from | 
|  | 24         project       config.ProjectName // the path of the log stream | 
| 23         path          types.StreamPath   // the path of the log stream | 25         path          types.StreamPath   // the path of the log stream | 
| 24         terminalIndex types.MessageIndex // if >= 0, discard logs beyond this | 26         terminalIndex types.MessageIndex // if >= 0, discard logs beyond this | 
| 25 | 27 | 
| 26         buf           []*logpb.LogEntry | 28         buf           []*logpb.LogEntry | 
| 27         lastIndex     types.MessageIndex | 29         lastIndex     types.MessageIndex | 
| 28         logEntryCount int64 | 30         logEntryCount int64 | 
| 29 } | 31 } | 
| 30 | 32 | 
| 31 func (s *storageSource) bufferEntries(start types.MessageIndex) error { | 33 func (s *storageSource) bufferEntries(start types.MessageIndex) error { | 
| 32         bytes := 0 | 34         bytes := 0 | 
| 33 | 35 | 
| 34         req := storage.GetRequest{ | 36         req := storage.GetRequest{ | 
| 35 »       »       Path:  s.path, | 37 »       »       Project: s.project, | 
| 36 »       »       Index: start, | 38 »       »       Path:    s.path, | 
|  | 39 »       »       Index:   start, | 
| 37         } | 40         } | 
| 38         return s.st.Get(req, func(idx types.MessageIndex, d []byte) bool { | 41         return s.st.Get(req, func(idx types.MessageIndex, d []byte) bool { | 
| 39                 le := logpb.LogEntry{} | 42                 le := logpb.LogEntry{} | 
| 40                 if err := proto.Unmarshal(d, &le); err != nil { | 43                 if err := proto.Unmarshal(d, &le); err != nil { | 
| 41                         log.Fields{ | 44                         log.Fields{ | 
| 42                                 log.ErrorKey:  err, | 45                                 log.ErrorKey:  err, | 
| 43                                 "streamIndex": idx, | 46                                 "streamIndex": idx, | 
| 44                         }.Errorf(s, "Failed to unmarshal LogEntry.") | 47                         }.Errorf(s, "Failed to unmarshal LogEntry.") | 
| 45                         return false | 48                         return false | 
| 46                 } | 49                 } | 
| (...skipping 49 matching lines...) Expand 10 before | Expand all | Expand 10 after  Loading... | 
| 96                         "index":         sidx, | 99                         "index":         sidx, | 
| 97                         "terminalIndex": s.terminalIndex, | 100                         "terminalIndex": s.terminalIndex, | 
| 98                 }.Warningf(s, "Discarding log entries beyond expected terminal i
     ndex.") | 101                 }.Warningf(s, "Discarding log entries beyond expected terminal i
     ndex.") | 
| 99                 return nil, archive.ErrEndOfStream | 102                 return nil, archive.ErrEndOfStream | 
| 100         } | 103         } | 
| 101 | 104 | 
| 102         s.lastIndex = sidx | 105         s.lastIndex = sidx | 
| 103         s.logEntryCount++ | 106         s.logEntryCount++ | 
| 104         return le, nil | 107         return le, nil | 
| 105 } | 108 } | 
| OLD | NEW | 
|---|