Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 // Package archive constructs a LogDog archive out of log stream components. | 5 // Package archive constructs a LogDog archive out of log stream components. |
| 6 // Records are read from the stream and emitted as an archive. | 6 // Records are read from the stream and emitted as an archive. |
| 7 package archive | 7 package archive |
| 8 | 8 |
| 9 import ( | 9 import ( |
| 10 "io" | 10 "io" |
| 11 | 11 |
| 12 "github.com/golang/protobuf/proto" | 12 "github.com/golang/protobuf/proto" |
| 13 "github.com/luci/luci-go/common/errors" | |
| 13 "github.com/luci/luci-go/common/logging" | 14 "github.com/luci/luci-go/common/logging" |
| 14 "github.com/luci/luci-go/common/parallel" | 15 "github.com/luci/luci-go/common/parallel" |
| 15 "github.com/luci/luci-go/common/proto/logdog/logpb" | 16 "github.com/luci/luci-go/common/proto/logdog/logpb" |
| 16 "github.com/luci/luci-go/common/recordio" | 17 "github.com/luci/luci-go/common/recordio" |
| 17 ) | 18 ) |
| 18 | 19 |
| 19 // Manifest is a set of archival parameters. | 20 // Manifest is a set of archival parameters. |
| 20 type Manifest struct { | 21 type Manifest struct { |
| 21 // Desc is the logpb.LogStreamDescriptor for the stream. | 22 // Desc is the logpb.LogStreamDescriptor for the stream. |
| 22 Desc *logpb.LogStreamDescriptor | 23 Desc *logpb.LogStreamDescriptor |
| (...skipping 55 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 78 if m.IndexWriter != nil { | 79 if m.IndexWriter != nil { |
| 79 idx = &indexBuilder{ | 80 idx = &indexBuilder{ |
| 80 Manifest: &m, | 81 Manifest: &m, |
| 81 index: logpb.LogIndex{ | 82 index: logpb.LogIndex{ |
| 82 Desc: m.Desc, | 83 Desc: m.Desc, |
| 83 }, | 84 }, |
| 84 sizeFunc: m.sizeFunc, | 85 sizeFunc: m.sizeFunc, |
| 85 } | 86 } |
| 86 } | 87 } |
| 87 | 88 |
| 88 » return parallel.FanOutIn(func(taskC chan<- func() error) { | 89 » err := parallel.FanOutIn(func(taskC chan<- func() error) { |
| 89 var logC chan *logpb.LogEntry | 90 var logC chan *logpb.LogEntry |
| 90 if m.LogWriter != nil { | 91 if m.LogWriter != nil { |
| 91 logC = make(chan *logpb.LogEntry) | 92 logC = make(chan *logpb.LogEntry) |
| 92 | 93 |
| 93 taskC <- func() error { | 94 taskC <- func() error { |
| 94 if err := archiveLogs(m.LogWriter, m.Desc, logC, idx); err != nil { | 95 if err := archiveLogs(m.LogWriter, m.Desc, logC, idx); err != nil { |
| 95 return err | 96 return err |
| 96 } | 97 } |
| 97 | 98 |
| 98 // If we're building an index, emit it now that the log stream has | 99 // If we're building an index, emit it now that the log stream has |
| (...skipping 39 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 138 return nil | 139 return nil |
| 139 } | 140 } |
| 140 | 141 |
| 141 return err | 142 return err |
| 142 } | 143 } |
| 143 | 144 |
| 144 sendLog(le) | 145 sendLog(le) |
| 145 } | 146 } |
| 146 } | 147 } |
| 147 }) | 148 }) |
| 149 | |
| 150 // If any of the returned errors was transient, return a transient error . | |
| 151 if errors.Any(err, errors.IsTransient) { | |
|
dnj
2016/04/11 17:20:04
This causes the archival process to forward transi
| |
| 152 err = errors.WrapTransient(err) | |
| 153 } | |
| 154 return err | |
| 148 } | 155 } |
| 149 | 156 |
| 150 func archiveLogs(w io.Writer, d *logpb.LogStreamDescriptor, logC <-chan *logpb.L ogEntry, idx *indexBuilder) error { | 157 func archiveLogs(w io.Writer, d *logpb.LogStreamDescriptor, logC <-chan *logpb.L ogEntry, idx *indexBuilder) error { |
| 151 offset := int64(0) | 158 offset := int64(0) |
| 152 out := func(pb proto.Message) error { | 159 out := func(pb proto.Message) error { |
| 153 d, err := proto.Marshal(pb) | 160 d, err := proto.Marshal(pb) |
| 154 if err != nil { | 161 if err != nil { |
| 155 return err | 162 return err |
| 156 } | 163 } |
| 157 | 164 |
| (...skipping 11 matching lines...) Expand all Loading... | |
| 169 } | 176 } |
| 170 | 177 |
| 171 // Add this LogEntry to our index, noting the current offset. | 178 // Add this LogEntry to our index, noting the current offset. |
| 172 if idx != nil { | 179 if idx != nil { |
| 173 idx.addLogEntry(le, offset) | 180 idx.addLogEntry(le, offset) |
| 174 } | 181 } |
| 175 err = out(le) | 182 err = out(le) |
| 176 } | 183 } |
| 177 return err | 184 return err |
| 178 } | 185 } |
| OLD | NEW |