| OLD | NEW |
| 1 // Copyright 2016 The Chromium Authors. All rights reserved. | 1 // Copyright 2016 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 package archivist | 5 package archivist |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "fmt" | 8 "fmt" |
| 9 | 9 |
| 10 "github.com/golang/protobuf/proto" | 10 "github.com/golang/protobuf/proto" |
| (...skipping 98 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 109 } | 109 } |
| 110 | 110 |
| 111 task := &archiveTask{ | 111 task := &archiveTask{ |
| 112 Archivist: a, | 112 Archivist: a, |
| 113 ArchiveTask: t, | 113 ArchiveTask: t, |
| 114 ls: ls, | 114 ls: ls, |
| 115 desc: &desc, | 115 desc: &desc, |
| 116 } | 116 } |
| 117 if err := task.archive(c); err != nil { | 117 if err := task.archive(c); err != nil { |
| 118 log.WithError(err).Errorf(c, "Failed to perform archival operati
on.") | 118 log.WithError(err).Errorf(c, "Failed to perform archival operati
on.") |
| 119 » » return err | 119 |
| 120 » » // Fail only if completeness is a requirement. Allow the Coordin
ator to |
| 121 » » // handle errors on !Complete streams. |
| 122 » » if t.Complete { |
| 123 » » » return err |
| 124 » » } |
| 125 |
| 126 » » // Report that there was an archival error to the Coordinator's |
| 127 » » // ArchiveStream endpoint. It may accept the archival or respond
with a |
| 128 » » // FailedPrecondition error code indicating that this archival r
un should |
| 129 » » // be considered a failure, in which case we will refrain from d
eleting our |
| 130 » » // task and try again. |
| 131 » » task.ar.Error = true |
| 120 } | 132 } |
| 121 log.Fields{ | 133 log.Fields{ |
| 122 "streamURL": task.ar.StreamUrl, | 134 "streamURL": task.ar.StreamUrl, |
| 123 "indexURL": task.ar.IndexUrl, | 135 "indexURL": task.ar.IndexUrl, |
| 124 "dataURL": task.ar.DataUrl, | 136 "dataURL": task.ar.DataUrl, |
| 125 "terminalIndex": task.ar.TerminalIndex, | 137 "terminalIndex": task.ar.TerminalIndex, |
| 126 "complete": task.ar.Complete, | 138 "complete": task.ar.Complete, |
| 139 "hadError": task.ar.Error, |
| 127 }.Debugf(c, "Finished archive construction.") | 140 }.Debugf(c, "Finished archive construction.") |
| 128 | 141 |
| 129 if _, err := a.Service.ArchiveStream(c, &task.ar); err != nil { | 142 if _, err := a.Service.ArchiveStream(c, &task.ar); err != nil { |
| 130 log.WithError(err).Errorf(c, "Failed to mark log stream as archi
ved.") | 143 log.WithError(err).Errorf(c, "Failed to mark log stream as archi
ved.") |
| 131 return err | 144 return err |
| 132 } | 145 } |
| 133 return nil | 146 return nil |
| 134 } | 147 } |
| 135 | 148 |
| 136 // archiveTask is the set of parameters for a single archival. | 149 // archiveTask is the set of parameters for a single archival. |
| 137 type archiveTask struct { | 150 type archiveTask struct { |
| 138 *Archivist | 151 *Archivist |
| 139 *logdog.ArchiveTask | 152 *logdog.ArchiveTask |
| 140 | 153 |
| 141 // ls is the log stream state. | 154 // ls is the log stream state. |
| 142 ls *logdog.LoadStreamResponse | 155 ls *logdog.LoadStreamResponse |
| 143 // desc is the unmarshaled log stream descriptor. | 156 // desc is the unmarshaled log stream descriptor. |
| 144 desc *logpb.LogStreamDescriptor | 157 desc *logpb.LogStreamDescriptor |
| 145 | 158 |
| 146 // ar will be populated during archive construction. | 159 // ar will be populated during archive construction. |
| 147 ar logdog.ArchiveStreamRequest | 160 ar logdog.ArchiveStreamRequest |
| 148 } | 161 } |
| 149 | 162 |
| 150 // archiveState performs the archival operation on a stream described by a | 163 // archiveState performs the archival operation on a stream described by a |
| 151 // Coordinator State. Upon success, the State will be updated with the result | 164 // Coordinator State. Upon success, the State will be updated with the result |
| 152 // of the archival operation. | 165 // of the archival operation. |
| 153 func (t *archiveTask) archive(c context.Context) (err error) { | 166 func (t *archiveTask) archive(c context.Context) (err error) { |
| 167 // Minimal ArchiveRequest parameters. |
| 168 t.ar.Path = t.Path |
| 169 t.ar.TerminalIndex = -1 |
| 170 |
| 154 // Generate our archival object managers. | 171 // Generate our archival object managers. |
| 155 bext := t.desc.BinaryFileExt | 172 bext := t.desc.BinaryFileExt |
| 156 if bext == "" { | 173 if bext == "" { |
| 157 bext = "bin" | 174 bext = "bin" |
| 158 } | 175 } |
| 159 | 176 |
| 160 path := t.Path | 177 path := t.Path |
| 161 var streamO, indexO, dataO *gsObject | 178 var streamO, indexO, dataO *gsObject |
| 162 streamO, err = t.newGSObject(c, path, "logstream.entries") | 179 streamO, err = t.newGSObject(c, path, "logstream.entries") |
| 163 if err != nil { | 180 if err != nil { |
| (...skipping 100 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 264 } else { | 281 } else { |
| 265 // Update our terminal index. | 282 // Update our terminal index. |
| 266 log.Fields{ | 283 log.Fields{ |
| 267 "from": tidx, | 284 "from": tidx, |
| 268 "to": t.ar.TerminalIndex, | 285 "to": t.ar.TerminalIndex, |
| 269 }.Infof(c, "Updated log stream terminal index.") | 286 }.Infof(c, "Updated log stream terminal index.") |
| 270 } | 287 } |
| 271 } | 288 } |
| 272 | 289 |
| 273 // Update our state with archival results. | 290 // Update our state with archival results. |
| 274 t.ar.Path = t.Path | |
| 275 t.ar.StreamSize = streamO.Count() | 291 t.ar.StreamSize = streamO.Count() |
| 276 t.ar.IndexSize = indexO.Count() | 292 t.ar.IndexSize = indexO.Count() |
| 277 t.ar.DataSize = dataO.Count() | 293 t.ar.DataSize = dataO.Count() |
| 278 t.ar.Complete = !ss.hasMissingEntries | 294 t.ar.Complete = !ss.hasMissingEntries |
| 279 return | 295 return |
| 280 } | 296 } |
| 281 | 297 |
| 282 func (t *archiveTask) newGSObject(c context.Context, path string, name string) (
*gsObject, error) { | 298 func (t *archiveTask) newGSObject(c context.Context, path string, name string) (
*gsObject, error) { |
| 283 p := t.GSBase.Concat(path, name) | 299 p := t.GSBase.Concat(path, name) |
| 284 o := gsObject{ | 300 o := gsObject{ |
| (...skipping 133 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 418 log.Fields{ | 434 log.Fields{ |
| 419 "index": sidx, | 435 "index": sidx, |
| 420 "terminalIndex": s.terminalIndex, | 436 "terminalIndex": s.terminalIndex, |
| 421 }.Warningf(s, "Discarding log entries beyond expected terminal i
ndex.") | 437 }.Warningf(s, "Discarding log entries beyond expected terminal i
ndex.") |
| 422 return nil, archive.ErrEndOfStream | 438 return nil, archive.ErrEndOfStream |
| 423 } | 439 } |
| 424 | 440 |
| 425 s.lastIndex = sidx | 441 s.lastIndex = sidx |
| 426 return le, nil | 442 return le, nil |
| 427 } | 443 } |
| OLD | NEW |