Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(112)

Side by Side Diff: server/logdog/archive/archive.go

Issue 1863973002: LogDog: Update to archival V2. (Closed) Base URL: https://github.com/luci/luci-go@grpcutil-errors
Patch Set: Code review comments, use Pub/Sub, archival staging, quality of life. Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 // Package archive constructs a LogDog archive out of log stream components. 5 // Package archive constructs a LogDog archive out of log stream components.
6 // Records are read from the stream and emitted as an archive. 6 // Records are read from the stream and emitted as an archive.
7 package archive 7 package archive
8 8
9 import ( 9 import (
10 "io" 10 "io"
11 11
12 "github.com/golang/protobuf/proto" 12 "github.com/golang/protobuf/proto"
13 "github.com/luci/luci-go/common/errors"
13 "github.com/luci/luci-go/common/logging" 14 "github.com/luci/luci-go/common/logging"
14 "github.com/luci/luci-go/common/parallel" 15 "github.com/luci/luci-go/common/parallel"
15 "github.com/luci/luci-go/common/proto/logdog/logpb" 16 "github.com/luci/luci-go/common/proto/logdog/logpb"
16 "github.com/luci/luci-go/common/recordio" 17 "github.com/luci/luci-go/common/recordio"
17 ) 18 )
18 19
19 // Manifest is a set of archival parameters. 20 // Manifest is a set of archival parameters.
20 type Manifest struct { 21 type Manifest struct {
21 // Desc is the logpb.LogStreamDescriptor for the stream. 22 // Desc is the logpb.LogStreamDescriptor for the stream.
22 Desc *logpb.LogStreamDescriptor 23 Desc *logpb.LogStreamDescriptor
(...skipping 55 matching lines...) Expand 10 before | Expand all | Expand 10 after
78 if m.IndexWriter != nil { 79 if m.IndexWriter != nil {
79 idx = &indexBuilder{ 80 idx = &indexBuilder{
80 Manifest: &m, 81 Manifest: &m,
81 index: logpb.LogIndex{ 82 index: logpb.LogIndex{
82 Desc: m.Desc, 83 Desc: m.Desc,
83 }, 84 },
84 sizeFunc: m.sizeFunc, 85 sizeFunc: m.sizeFunc,
85 } 86 }
86 } 87 }
87 88
88 » return parallel.FanOutIn(func(taskC chan<- func() error) { 89 » err := parallel.FanOutIn(func(taskC chan<- func() error) {
89 var logC chan *logpb.LogEntry 90 var logC chan *logpb.LogEntry
90 if m.LogWriter != nil { 91 if m.LogWriter != nil {
91 logC = make(chan *logpb.LogEntry) 92 logC = make(chan *logpb.LogEntry)
92 93
93 taskC <- func() error { 94 taskC <- func() error {
94 if err := archiveLogs(m.LogWriter, m.Desc, logC, idx); err != nil { 95 if err := archiveLogs(m.LogWriter, m.Desc, logC, idx); err != nil {
95 return err 96 return err
96 } 97 }
97 98
98 // If we're building an index, emit it now that the log stream has 99 // If we're building an index, emit it now that the log stream has
(...skipping 39 matching lines...) Expand 10 before | Expand all | Expand 10 after
138 return nil 139 return nil
139 } 140 }
140 141
141 return err 142 return err
142 } 143 }
143 144
144 sendLog(le) 145 sendLog(le)
145 } 146 }
146 } 147 }
147 }) 148 })
149
150 // If any of the returned errors was transient, return a transient error .
151 if errors.Any(err, errors.IsTransient) {
dnj 2016/04/11 17:20:04 This causes the archival process to forward transi
152 err = errors.WrapTransient(err)
153 }
154 return err
148 } 155 }
149 156
150 func archiveLogs(w io.Writer, d *logpb.LogStreamDescriptor, logC <-chan *logpb.L ogEntry, idx *indexBuilder) error { 157 func archiveLogs(w io.Writer, d *logpb.LogStreamDescriptor, logC <-chan *logpb.L ogEntry, idx *indexBuilder) error {
151 offset := int64(0) 158 offset := int64(0)
152 out := func(pb proto.Message) error { 159 out := func(pb proto.Message) error {
153 d, err := proto.Marshal(pb) 160 d, err := proto.Marshal(pb)
154 if err != nil { 161 if err != nil {
155 return err 162 return err
156 } 163 }
157 164
(...skipping 11 matching lines...) Expand all
169 } 176 }
170 177
171 // Add this LogEntry to our index, noting the current offset. 178 // Add this LogEntry to our index, noting the current offset.
172 if idx != nil { 179 if idx != nil {
173 idx.addLogEntry(le, offset) 180 idx.addLogEntry(le, offset)
174 } 181 }
175 err = out(le) 182 err = out(le)
176 } 183 }
177 return err 184 return err
178 } 185 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698