| OLD | NEW |
| 1 // Copyright 2016 The Chromium Authors. All rights reserved. | 1 // Copyright 2016 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 package collector | 5 package collector |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "bytes" | 8 "bytes" |
| 9 "sync" | |
| 10 "time" | 9 "time" |
| 11 | 10 |
| 12 "github.com/golang/protobuf/proto" | 11 "github.com/golang/protobuf/proto" |
| 13 "github.com/luci/luci-go/common/errors" | 12 "github.com/luci/luci-go/common/errors" |
| 14 "github.com/luci/luci-go/common/logdog/butlerproto" | 13 "github.com/luci/luci-go/common/logdog/butlerproto" |
| 15 "github.com/luci/luci-go/common/logdog/types" | 14 "github.com/luci/luci-go/common/logdog/types" |
| 16 log "github.com/luci/luci-go/common/logging" | 15 log "github.com/luci/luci-go/common/logging" |
| 17 "github.com/luci/luci-go/common/parallel" | 16 "github.com/luci/luci-go/common/parallel" |
| 18 "github.com/luci/luci-go/common/proto/logdog/logpb" | 17 "github.com/luci/luci-go/common/proto/logdog/logpb" |
| 19 "github.com/luci/luci-go/server/internal/logdog/collector/coordinator" | 18 "github.com/luci/luci-go/server/internal/logdog/collector/coordinator" |
| 20 "github.com/luci/luci-go/server/logdog/storage" | 19 "github.com/luci/luci-go/server/logdog/storage" |
| 21 "golang.org/x/net/context" | 20 "golang.org/x/net/context" |
| 22 ) | 21 ) |
| 23 | 22 |
| 23 const ( |
| 24 // DefaultMaxMessageWorkers is the default number of concurrent worker |
| 25 // goroutones to employ for a single message. |
| 26 DefaultMaxMessageWorkers = 4 |
| 27 ) |
| 28 |
| 24 // Collector is a stateful object responsible for ingesting LogDog logs, | 29 // Collector is a stateful object responsible for ingesting LogDog logs, |
| 25 // registering them with a Coordinator, and stowing them in short-term storage | 30 // registering them with a Coordinator, and stowing them in short-term storage |
| 26 // for streaming and processing. | 31 // for streaming and processing. |
| 27 // | 32 // |
| 28 // A Collector's Close should be called when finished to release any internal | 33 // A Collector's Close should be called when finished to release any internal |
| 29 // resources. | 34 // resources. |
| 30 type Collector struct { | 35 type Collector struct { |
| 31 // Coordinator is used to interface with the Coordinator client. | 36 // Coordinator is used to interface with the Coordinator client. |
| 32 // | 37 // |
| 33 // On production systems, this should wrapped with a caching client (see | 38 // On production systems, this should wrapped with a caching client (see |
| 34 // the stateCache sub-package) to avoid overwhelming the server. | 39 // the stateCache sub-package) to avoid overwhelming the server. |
| 35 Coordinator coordinator.Coordinator | 40 Coordinator coordinator.Coordinator |
| 36 | 41 |
| 37 » // Storage is the backing store to use. | 42 » // Storage is the intermediate storage instance to use. |
| 38 Storage storage.Storage | 43 Storage storage.Storage |
| 39 | 44 |
| 40 // StreamStateCacheExpire is the maximum amount of time that a cached st
ream | 45 // StreamStateCacheExpire is the maximum amount of time that a cached st
ream |
| 41 // state entry is valid. If zero, DefaultStreamStateCacheExpire will be
used. | 46 // state entry is valid. If zero, DefaultStreamStateCacheExpire will be
used. |
| 42 StreamStateCacheExpire time.Duration | 47 StreamStateCacheExpire time.Duration |
| 43 | 48 |
| 44 » // MaxParallelBundles is the maximum number of log entry bundles per mes
sage | 49 » // MaxMessageWorkers is the maximum number of concurrent workers to empl
oy |
| 45 » // to handle in parallel. If <= 0, no maximum will be applied. | 50 » // for any given message. If <= 0, DefaultMaxMessageWorkers will be appl
ied. |
| 46 » MaxParallelBundles int | 51 » MaxMessageWorkers int |
| 47 » // MaxIngestWorkers is the maximum number of ingest worker goroutines th
at | |
| 48 » // will operate at a time. If <= 0, no maximum will be applied. | |
| 49 » MaxIngestWorkers int | |
| 50 | |
| 51 » // initOnce is used to ensure that the Collector's internal state is | |
| 52 » // initialized at most once. | |
| 53 » initOnce sync.Once | |
| 54 » // runner is the Runner that will be used for ingest. It will be configu
red | |
| 55 » // based on the supplied MaxIngestWorkers parameter. | |
| 56 » // | |
| 57 » // Internally, runner must not be used by tasks that themselves use the | |
| 58 » // runner, else deadlock could occur. | |
| 59 » runner *parallel.Runner | |
| 60 } | |
| 61 | |
| 62 // init initializes the operational state of the Collector. It must be called | |
| 63 // internally at the beginning of any exported method that uses that state. | |
| 64 func (c *Collector) init() { | |
| 65 » c.initOnce.Do(func() { | |
| 66 » » c.runner = ¶llel.Runner{ | |
| 67 » » » Sustained: c.MaxIngestWorkers, | |
| 68 » » » Maximum: c.MaxIngestWorkers, | |
| 69 » » } | |
| 70 » }) | |
| 71 } | 52 } |
| 72 | 53 |
| 73 // Process ingests an encoded ButlerLogBundle message, registering it with | 54 // Process ingests an encoded ButlerLogBundle message, registering it with |
| 74 // the LogDog Coordinator and stowing it in a temporary Storage for streaming | 55 // the LogDog Coordinator and stowing it in a temporary Storage for streaming |
| 75 // retrieval. | 56 // retrieval. |
| 76 // | 57 // |
| 77 // If a transient error occurs during ingest, Process will return an error. | 58 // If a transient error occurs during ingest, Process will return an error. |
| 78 // If no error occurred, or if there was an error with the input data, no error | 59 // If no error occurred, or if there was an error with the input data, no error |
| 79 // will be returned. | 60 // will be returned. |
| 80 func (c *Collector) Process(ctx context.Context, msg []byte) error { | 61 func (c *Collector) Process(ctx context.Context, msg []byte) error { |
| 81 c.init() | |
| 82 | |
| 83 pr := butlerproto.Reader{} | 62 pr := butlerproto.Reader{} |
| 84 if err := pr.Read(bytes.NewReader(msg)); err != nil { | 63 if err := pr.Read(bytes.NewReader(msg)); err != nil { |
| 85 log.Errorf(log.SetError(ctx, err), "Failed to unpack message.") | 64 log.Errorf(log.SetError(ctx, err), "Failed to unpack message.") |
| 86 return nil | 65 return nil |
| 87 } | 66 } |
| 88 if pr.Metadata.ProtoVersion != logpb.Version { | 67 if pr.Metadata.ProtoVersion != logpb.Version { |
| 89 log.Fields{ | 68 log.Fields{ |
| 90 "messageProtoVersion": pr.Metadata.ProtoVersion, | 69 "messageProtoVersion": pr.Metadata.ProtoVersion, |
| 91 "currentProtoVersion": logpb.Version, | 70 "currentProtoVersion": logpb.Version, |
| 92 }.Errorf(ctx, "Unknown protobuf version.") | 71 }.Errorf(ctx, "Unknown protobuf version.") |
| (...skipping 22 matching lines...) Expand all Loading... |
| 115 | 94 |
| 116 fields.Infof(ctx, "Processing log bundle entry.") | 95 fields.Infof(ctx, "Processing log bundle entry.") |
| 117 } | 96 } |
| 118 } | 97 } |
| 119 | 98 |
| 120 // If there are no entries, there is nothing to do. | 99 // If there are no entries, there is nothing to do. |
| 121 if len(pr.Bundle.Entries) == 0 { | 100 if len(pr.Bundle.Entries) == 0 { |
| 122 return nil | 101 return nil |
| 123 } | 102 } |
| 124 | 103 |
| 125 » // Define our logWork template. This will be cloned for each ingested lo
g | 104 » lw := bundleHandler{ |
| 126 » // stream. | 105 » » msg: msg, |
| 127 » lw := logWork{ | 106 » » md: pr.Metadata, |
| 128 » » md: pr.Metadata, | 107 » » b: pr.Bundle, |
| 129 » » b: pr.Bundle, | |
| 130 } | 108 } |
| 131 | 109 |
| 132 // Handle each bundle entry in parallel. We will use a separate work poo
l | 110 // Handle each bundle entry in parallel. We will use a separate work poo
l |
| 133 // here so that top-level bundle dispatch can't deadlock the processing
tasks. | 111 // here so that top-level bundle dispatch can't deadlock the processing
tasks. |
| 134 » err := parallel.WorkPool(c.MaxParallelBundles, func(taskC chan<- func()
error) { | 112 » workers := c.MaxMessageWorkers |
| 113 » if workers <= 0 { |
| 114 » » workers = DefaultMaxMessageWorkers |
| 115 » } |
| 116 » err := parallel.WorkPool(workers, func(taskC chan<- func() error) { |
| 135 for _, be := range pr.Bundle.Entries { | 117 for _, be := range pr.Bundle.Entries { |
| 136 » » » lw := lw | 118 » » » be := be |
| 137 » » » lw.be = be | 119 |
| 138 taskC <- func() error { | 120 taskC <- func() error { |
| 139 » » » » return c.processLogStream(ctx, &lw) | 121 » » » » return c.processLogStream(ctx, &bundleEntryHandl
er{ |
| 122 » » » » » bundleHandler: &lw, |
| 123 » » » » » be: be, |
| 124 » » » » }) |
| 140 } | 125 } |
| 141 } | 126 } |
| 142 }) | 127 }) |
| 143 if err != nil { | 128 if err != nil { |
| 144 » » if hasTransientError(err) && !errors.IsTransient(err) { | 129 » » if !errors.IsTransient(err) && hasTransientError(err) { |
| 145 // err has a nested transient error; propagate that to t
op. | 130 // err has a nested transient error; propagate that to t
op. |
| 146 err = errors.WrapTransient(err) | 131 err = errors.WrapTransient(err) |
| 147 } | 132 } |
| 148 return err | 133 return err |
| 149 } | 134 } |
| 150 return nil | 135 return nil |
| 151 } | 136 } |
| 152 | 137 |
| 153 // Close releases any internal resources and blocks pending the completion of | 138 // Close releases any internal resources and blocks pending the completion of |
| 154 // any outstanding operations. After Close, no new Process calls may be made. | 139 // any outstanding operations. After Close, no new Process calls may be made. |
| 155 func (c *Collector) Close() { | 140 func (c *Collector) Close() { |
| 156 c.init() | |
| 157 | |
| 158 c.runner.Close() | |
| 159 } | 141 } |
| 160 | 142 |
| 161 // logWork is a cumulative set of read-only state passed around by value for log | 143 // bundleHandler is a cumulative set of read-only state passed around by |
| 162 // processing. | 144 // value for log processing. |
| 163 type logWork struct { | 145 type bundleHandler struct { |
| 146 » // msg is the original message bytes. |
| 147 » msg []byte |
| 164 // md is the metadata associated with the overall message. | 148 // md is the metadata associated with the overall message. |
| 165 md *logpb.ButlerMetadata | 149 md *logpb.ButlerMetadata |
| 166 // b is the Butler bundle. | 150 // b is the Butler bundle. |
| 167 b *logpb.ButlerLogBundle | 151 b *logpb.ButlerLogBundle |
| 152 } |
| 153 |
| 154 type bundleEntryHandler struct { |
| 155 *bundleHandler |
| 156 |
| 168 // be is the Bundle entry. | 157 // be is the Bundle entry. |
| 169 be *logpb.ButlerLogBundle_Entry | 158 be *logpb.ButlerLogBundle_Entry |
| 170 // path is the constructed path of the stream being processed. | 159 // path is the constructed path of the stream being processed. |
| 171 path types.StreamPath | 160 path types.StreamPath |
| 172 // le is the LogEntry in the bundle entry. | |
| 173 le *logpb.LogEntry | |
| 174 } | 161 } |
| 175 | 162 |
| 176 // processLogStream processes an individual set of log messages belonging to the | 163 // processLogStream processes an individual set of log messages belonging to the |
| 177 // same log stream. | 164 // same log stream. |
| 178 func (c *Collector) processLogStream(ctx context.Context, lw *logWork) error { | 165 func (c *Collector) processLogStream(ctx context.Context, h *bundleEntryHandler)
error { |
| 179 » if err := lw.be.Desc.Validate(true); err != nil { | 166 » if err := h.be.Desc.Validate(true); err != nil { |
| 180 log.Errorf(log.SetError(ctx, err), "Invalid log stream descripto
r.") | 167 log.Errorf(log.SetError(ctx, err), "Invalid log stream descripto
r.") |
| 181 » » return nil | 168 » » return err |
| 182 } | 169 } |
| 183 » lw.path = types.StreamName(lw.be.Desc.Prefix).Join(types.StreamName(lw.b
e.Desc.Name)) | 170 » h.path = types.StreamName(h.be.Desc.Prefix).Join(types.StreamName(h.be.D
esc.Name)) |
| 184 » ctx = log.SetField(ctx, "path", lw.path) | 171 » ctx = log.SetField(ctx, "path", h.path) |
| 185 | 172 |
| 186 » if len(lw.be.Secret) == 0 { | 173 » if len(h.be.Secret) == 0 { |
| 187 log.Errorf(ctx, "Missing secret.") | 174 log.Errorf(ctx, "Missing secret.") |
| 188 » » return nil | 175 » » return errors.New("missing stream secret") |
| 176 » } |
| 177 |
| 178 » // Confirm that the log entries are valid and contiguous. Serialize the
log |
| 179 » // entries for ingest as we validate them. |
| 180 » var logData [][]byte |
| 181 » var blockIndex uint64 |
| 182 » if logs := h.be.Logs; len(logs) > 0 { |
| 183 » » logData = make([][]byte, len(logs)) |
| 184 » » blockIndex = logs[0].StreamIndex |
| 185 |
| 186 » » for i, le := range logs { |
| 187 » » » // Validate this log entry. |
| 188 » » » if err := le.Validate(h.be.Desc); err != nil { |
| 189 » » » » log.Fields{ |
| 190 » » » » » log.ErrorKey: err, |
| 191 » » » » » "index": le.StreamIndex, |
| 192 » » » » }.Warningf(ctx, "Discarding invalid log entry.") |
| 193 » » » » return errors.New("invalid log entry") |
| 194 » » » } |
| 195 |
| 196 » » » // Validate that this entry is contiguous. |
| 197 » » » if le.StreamIndex != blockIndex+uint64(i) { |
| 198 » » » » log.Fields{ |
| 199 » » » » » "index": i, |
| 200 » » » » » "expected": (blockIndex + uint64(i)), |
| 201 » » » » » "actual": le.StreamIndex, |
| 202 » » » » }.Errorf(ctx, "Non-contiguous log entry block in
stream.") |
| 203 » » » » return errors.New("non-contiguous log entry bloc
k") |
| 204 » » » } |
| 205 |
| 206 » » » var err error |
| 207 » » » logData[i], err = proto.Marshal(le) |
| 208 » » » if err != nil { |
| 209 » » » » log.Fields{ |
| 210 » » » » » log.ErrorKey: err, |
| 211 » » » » » "index": le.StreamIndex, |
| 212 » » » » }.Errorf(ctx, "Failed to marshal log entry.") |
| 213 » » » » return errors.New("failed to marshal log entries
") |
| 214 » » » } |
| 215 » » } |
| 189 } | 216 } |
| 190 | 217 |
| 191 // Fetch our cached/remote state. This will replace our state object wit
h the | 218 // Fetch our cached/remote state. This will replace our state object wit
h the |
| 192 // fetched state, so any future calls will need to re-set the Secret val
ue. | 219 // fetched state, so any future calls will need to re-set the Secret val
ue. |
| 193 // TODO: Use timeout? | 220 // TODO: Use timeout? |
| 194 state, err := c.Coordinator.RegisterStream(ctx, &coordinator.LogStreamSt
ate{ | 221 state, err := c.Coordinator.RegisterStream(ctx, &coordinator.LogStreamSt
ate{ |
| 195 » » Path: lw.path, | 222 » » Path: h.path, |
| 196 » » Secret: types.StreamSecret(lw.be.Secret), | 223 » » Secret: types.StreamSecret(h.be.Secret), |
| 197 » » ProtoVersion: lw.md.ProtoVersion, | 224 » » ProtoVersion: h.md.ProtoVersion, |
| 198 » }, lw.be.Desc) | 225 » }, h.be.Desc) |
| 199 if err != nil { | 226 if err != nil { |
| 200 log.WithError(err).Errorf(ctx, "Failed to get/register current s
tream state.") | 227 log.WithError(err).Errorf(ctx, "Failed to get/register current s
tream state.") |
| 201 return err | 228 return err |
| 202 } | 229 } |
| 203 | 230 |
| 204 // Does the log stream's secret match the expected secret? | 231 // Does the log stream's secret match the expected secret? |
| 205 » if !bytes.Equal(lw.be.Secret, []byte(state.Secret)) { | 232 » if !bytes.Equal(h.be.Secret, []byte(state.Secret)) { |
| 206 log.Errorf(log.SetFields(ctx, log.Fields{ | 233 log.Errorf(log.SetFields(ctx, log.Fields{ |
| 207 » » » "secret": lw.be.Secret, | 234 » » » "secret": h.be.Secret, |
| 208 "expectedSecret": state.Secret, | 235 "expectedSecret": state.Secret, |
| 209 }), "Log entry has incorrect secret.") | 236 }), "Log entry has incorrect secret.") |
| 210 return nil | 237 return nil |
| 211 } | 238 } |
| 212 | 239 |
| 213 if state.Archived { | 240 if state.Archived { |
| 214 log.Infof(ctx, "Skipping message bundle for archived stream.") | 241 log.Infof(ctx, "Skipping message bundle for archived stream.") |
| 215 return nil | 242 return nil |
| 216 } | 243 } |
| 217 if state.Purged { | 244 if state.Purged { |
| 218 log.Infof(ctx, "Skipping message bundle for purged stream.") | 245 log.Infof(ctx, "Skipping message bundle for purged stream.") |
| 219 return nil | 246 return nil |
| 220 } | 247 } |
| 221 | 248 |
| 222 // Update our terminal index if we have one. | 249 // Update our terminal index if we have one. |
| 223 // | 250 // |
| 224 // Note that even if our cached value is marked terminal, we could have
failed | 251 // Note that even if our cached value is marked terminal, we could have
failed |
| 225 // to push the terminal index to the Coordinator, so we will not refrain
from | 252 // to push the terminal index to the Coordinator, so we will not refrain
from |
| 226 // pushing every terminal index encountered regardless of cache state. | 253 // pushing every terminal index encountered regardless of cache state. |
| 227 » if lw.be.Terminal { | 254 » if h.be.Terminal { |
| 228 » » tidx := types.MessageIndex(lw.be.TerminalIndex) | 255 » » tidx := types.MessageIndex(h.be.TerminalIndex) |
| 229 log.Fields{ | 256 log.Fields{ |
| 230 "value": tidx, | 257 "value": tidx, |
| 231 }.Debugf(ctx, "Bundle includes a terminal index.") | 258 }.Debugf(ctx, "Bundle includes a terminal index.") |
| 232 | 259 |
| 233 if state.TerminalIndex < 0 { | 260 if state.TerminalIndex < 0 { |
| 234 state.TerminalIndex = tidx | 261 state.TerminalIndex = tidx |
| 235 } else if state.TerminalIndex != tidx { | 262 } else if state.TerminalIndex != tidx { |
| 236 log.Fields{ | 263 log.Fields{ |
| 237 "cachedIndex": state.TerminalIndex, | 264 "cachedIndex": state.TerminalIndex, |
| 238 "bundleIndex": tidx, | 265 "bundleIndex": tidx, |
| 239 }.Warningf(ctx, "Cached terminal index disagrees with st
ate.") | 266 }.Warningf(ctx, "Cached terminal index disagrees with st
ate.") |
| 240 } | 267 } |
| 241 } | 268 } |
| 242 | 269 |
| 243 » // In parallel, load the log entries into Storage. Throttle this with ou
r | 270 » // Perform stream processing operations. We can do these operations in |
| 244 » // ingest semaphore. | 271 » // parallel. |
| 245 » return errors.MultiErrorFromErrors(c.runner.Run(func(taskC chan<- func()
error) { | 272 » return parallel.FanOutIn(func(taskC chan<- func() error) { |
| 246 » » for i, le := range lw.be.Logs { | 273 » » // Store log data, if any was provided. It has already been vali
dated. |
| 247 » » » i, le := i, le | 274 » » if len(logData) > 0 { |
| 275 » » » taskC <- func() error { |
| 276 » » » » // Post the log to storage. |
| 277 » » » » err = c.Storage.Put(storage.PutRequest{ |
| 278 » » » » » Path: h.path, |
| 279 » » » » » Index: types.MessageIndex(blockIndex), |
| 280 » » » » » Values: logData, |
| 281 » » » » }) |
| 248 | 282 |
| 249 » » » // Store this LogEntry | 283 » » » » // If the log entry already exists, consider the
"put" successful. |
| 250 » » » taskC <- func() error { | 284 » » » » // Storage will return a transient error if one
occurred. |
| 251 » » » » if err := le.Validate(lw.be.Desc); err != nil { | 285 » » » » if err != nil && err != storage.ErrExists { |
| 252 log.Fields{ | 286 log.Fields{ |
| 253 log.ErrorKey: err, | 287 log.ErrorKey: err, |
| 254 » » » » » » "index": i, | 288 » » » » » » "blockIndex": blockIndex, |
| 255 » » » » » }.Warningf(ctx, "Discarding invalid log
entry.") | 289 » » » » » }.Errorf(ctx, "Failed to load log entry
into Storage.") |
| 256 » » » » » return nil | 290 » » » » » return err |
| 257 } | 291 } |
| 258 | 292 » » » » return nil |
| 259 » » » » if state.TerminalIndex >= 0 && types.MessageInde
x(le.StreamIndex) > state.TerminalIndex { | |
| 260 » » » » » log.Fields{ | |
| 261 » » » » » » "index": le.StreamIndex, | |
| 262 » » » » » » "terminalIndex": state.TerminalI
ndex, | |
| 263 » » » » » }.Warningf(ctx, "Stream is terminated be
fore log entry; discarding.") | |
| 264 » » » » » return nil | |
| 265 » » » » } | |
| 266 | |
| 267 » » » » lw := *lw | |
| 268 » » » » lw.le = le | |
| 269 » » » » return c.processLogEntry(ctx, &lw) | |
| 270 } | 293 } |
| 271 } | 294 } |
| 272 | 295 |
| 273 // If our bundle entry is terminal, we have an additional task o
f reporting | 296 // If our bundle entry is terminal, we have an additional task o
f reporting |
| 274 // this to the Coordinator. | 297 // this to the Coordinator. |
| 275 » » if lw.be.Terminal { | 298 » » if h.be.Terminal { |
| 276 taskC <- func() error { | 299 taskC <- func() error { |
| 277 // Sentinel task: Update the terminal bundle sta
te. | 300 // Sentinel task: Update the terminal bundle sta
te. |
| 278 state := *state | 301 state := *state |
| 279 » » » » state.TerminalIndex = types.MessageIndex(lw.be.T
erminalIndex) | 302 » » » » state.TerminalIndex = types.MessageIndex(h.be.Te
rminalIndex) |
| 280 | 303 |
| 281 log.Fields{ | 304 log.Fields{ |
| 282 "terminalIndex": state.TerminalIndex, | 305 "terminalIndex": state.TerminalIndex, |
| 283 }.Infof(ctx, "Received terminal log; updating Co
ordinator state.") | 306 }.Infof(ctx, "Received terminal log; updating Co
ordinator state.") |
| 284 | 307 |
| 285 if err := c.Coordinator.TerminateStream(ctx, &st
ate); err != nil { | 308 if err := c.Coordinator.TerminateStream(ctx, &st
ate); err != nil { |
| 286 log.WithError(err).Errorf(ctx, "Failed t
o set stream terminal index.") | 309 log.WithError(err).Errorf(ctx, "Failed t
o set stream terminal index.") |
| 287 return err | 310 return err |
| 288 } | 311 } |
| 289 return nil | 312 return nil |
| 290 } | 313 } |
| 291 } | 314 } |
| 292 })) | |
| 293 } | |
| 294 | |
| 295 func (c *Collector) processLogEntry(ctx context.Context, lw *logWork) error { | |
| 296 data, err := proto.Marshal(lw.le) | |
| 297 if err != nil { | |
| 298 log.WithError(err).Errorf(ctx, "Failed to marshal log entry.") | |
| 299 return err | |
| 300 } | |
| 301 | |
| 302 // Post the log to storage. | |
| 303 err = c.Storage.Put(&storage.PutRequest{ | |
| 304 Path: lw.path, | |
| 305 Index: types.MessageIndex(lw.le.StreamIndex), | |
| 306 Value: data, | |
| 307 }) | 315 }) |
| 308 | |
| 309 // If the log entry already exists, consider the "put" successful. | |
| 310 // | |
| 311 // All Storage errors are considered transient, as they are safe and | |
| 312 // data-agnostic. | |
| 313 if err != nil && err != storage.ErrExists { | |
| 314 log.WithError(err).Errorf(ctx, "Failed to load log entry into St
orage.") | |
| 315 return errors.WrapTransient(err) | |
| 316 } | |
| 317 return nil | |
| 318 } | 316 } |
| 319 | 317 |
| 320 // wrapMultiErrorTransient wraps an error in a TransientError wrapper. | 318 // wrapMultiErrorTransient wraps an error in a TransientError wrapper. |
| 321 // | 319 // |
| 322 // If the error is nil, it will return nil. If the error is already transient, | 320 // If the error is nil, it will return nil. If the error is already transient, |
| 323 // it will be directly returned. If the error is a MultiError, its sub-errors | 321 // it will be directly returned. If the error is a MultiError, its sub-errors |
| 324 // will be evaluated and wrapped in a TransientError if any of its sub-errors | 322 // will be evaluated and wrapped in a TransientError if any of its sub-errors |
| 325 // are transient errors. | 323 // are transient errors. |
| 326 func hasTransientError(err error) bool { | 324 func hasTransientError(err error) bool { |
| 327 if merr, ok := err.(errors.MultiError); ok { | 325 if merr, ok := err.(errors.MultiError); ok { |
| 328 for _, e := range merr { | 326 for _, e := range merr { |
| 329 if hasTransientError(e) { | 327 if hasTransientError(e) { |
| 330 return true | 328 return true |
| 331 } | 329 } |
| 332 } | 330 } |
| 333 return false | 331 return false |
| 334 } | 332 } |
| 335 | 333 |
| 336 return errors.IsTransient(err) | 334 return errors.IsTransient(err) |
| 337 } | 335 } |
| OLD | NEW |