Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright 2016 The Chromium Authors. All rights reserved. | 1 // Copyright 2016 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 package archivist | 5 package archivist |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "fmt" | 8 "fmt" |
| 9 | 9 |
| 10 "github.com/golang/protobuf/proto" | 10 "github.com/golang/protobuf/proto" |
| (...skipping 98 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 109 } | 109 } |
| 110 | 110 |
| 111 task := &archiveTask{ | 111 task := &archiveTask{ |
| 112 Archivist: a, | 112 Archivist: a, |
| 113 ArchiveTask: t, | 113 ArchiveTask: t, |
| 114 ls: ls, | 114 ls: ls, |
| 115 desc: &desc, | 115 desc: &desc, |
| 116 } | 116 } |
| 117 if err := task.archive(c); err != nil { | 117 if err := task.archive(c); err != nil { |
| 118 log.WithError(err).Errorf(c, "Failed to perform archival operati on.") | 118 log.WithError(err).Errorf(c, "Failed to perform archival operati on.") |
| 119 » » return err | 119 |
| 120 » » // Fail only if completeness is a requirement. Allow the Coordin ator to | |
| 121 » » // handle errors on !Complete streams. | |
| 122 » » if t.Complete { | |
| 123 » » » return err | |
| 124 » » } | |
| 125 | |
| 126 » » // Report that there was an archival error to the Coordinator's | |
| 127 » » // ArchiveStream endpoint. It may accept the archival or respond with a | |
| 128 » » // FailedPrecondition error code indicating that this archival r un should | |
| 129 » » // be considered a failure. | |
| 130 » » task.ar.Error = true | |
|
Vadim Sh.
2016/03/31 22:29:59
are transient errors possible in "task.archive"? I
dnj
2016/04/01 22:57:04
They are retried in task.archive already as part o
| |
| 120 } | 131 } |
| 121 log.Fields{ | 132 log.Fields{ |
| 122 "streamURL": task.ar.StreamUrl, | 133 "streamURL": task.ar.StreamUrl, |
| 123 "indexURL": task.ar.IndexUrl, | 134 "indexURL": task.ar.IndexUrl, |
| 124 "dataURL": task.ar.DataUrl, | 135 "dataURL": task.ar.DataUrl, |
| 125 "terminalIndex": task.ar.TerminalIndex, | 136 "terminalIndex": task.ar.TerminalIndex, |
| 126 "complete": task.ar.Complete, | 137 "complete": task.ar.Complete, |
| 138 "hadError": task.ar.Error, | |
| 127 }.Debugf(c, "Finished archive construction.") | 139 }.Debugf(c, "Finished archive construction.") |
| 128 | 140 |
| 129 if _, err := a.Service.ArchiveStream(c, &task.ar); err != nil { | 141 if _, err := a.Service.ArchiveStream(c, &task.ar); err != nil { |
| 130 log.WithError(err).Errorf(c, "Failed to mark log stream as archi ved.") | 142 log.WithError(err).Errorf(c, "Failed to mark log stream as archi ved.") |
| 131 return err | 143 return err |
| 132 } | 144 } |
| 133 return nil | 145 return nil |
| 134 } | 146 } |
| 135 | 147 |
| 136 // archiveTask is the set of parameters for a single archival. | 148 // archiveTask is the set of parameters for a single archival. |
| 137 type archiveTask struct { | 149 type archiveTask struct { |
| 138 *Archivist | 150 *Archivist |
| 139 *logdog.ArchiveTask | 151 *logdog.ArchiveTask |
| 140 | 152 |
| 141 // ls is the log stream state. | 153 // ls is the log stream state. |
| 142 ls *logdog.LoadStreamResponse | 154 ls *logdog.LoadStreamResponse |
| 143 // desc is the unmarshaled log stream descriptor. | 155 // desc is the unmarshaled log stream descriptor. |
| 144 desc *logpb.LogStreamDescriptor | 156 desc *logpb.LogStreamDescriptor |
| 145 | 157 |
| 146 // ar will be populated during archive construction. | 158 // ar will be populated during archive construction. |
| 147 ar logdog.ArchiveStreamRequest | 159 ar logdog.ArchiveStreamRequest |
| 148 } | 160 } |
| 149 | 161 |
| 150 // archiveState performs the archival operation on a stream described by a | 162 // archiveState performs the archival operation on a stream described by a |
| 151 // Coordinator State. Upon success, the State will be updated with the result | 163 // Coordinator State. Upon success, the State will be updated with the result |
| 152 // of the archival operation. | 164 // of the archival operation. |
| 153 func (t *archiveTask) archive(c context.Context) (err error) { | 165 func (t *archiveTask) archive(c context.Context) (err error) { |
| 166 // Minimal ArchiveRequest parameters. | |
| 167 t.ar.Path = t.Path | |
| 168 t.ar.TerminalIndex = -1 | |
| 169 | |
| 154 // Generate our archival object managers. | 170 // Generate our archival object managers. |
| 155 bext := t.desc.BinaryFileExt | 171 bext := t.desc.BinaryFileExt |
| 156 if bext == "" { | 172 if bext == "" { |
| 157 bext = "bin" | 173 bext = "bin" |
| 158 } | 174 } |
| 159 | 175 |
| 160 path := t.Path | 176 path := t.Path |
| 161 var streamO, indexO, dataO *gsObject | 177 var streamO, indexO, dataO *gsObject |
| 162 streamO, err = t.newGSObject(c, path, "logstream.entries") | 178 streamO, err = t.newGSObject(c, path, "logstream.entries") |
| 163 if err != nil { | 179 if err != nil { |
| (...skipping 100 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 264 } else { | 280 } else { |
| 265 // Update our terminal index. | 281 // Update our terminal index. |
| 266 log.Fields{ | 282 log.Fields{ |
| 267 "from": tidx, | 283 "from": tidx, |
| 268 "to": t.ar.TerminalIndex, | 284 "to": t.ar.TerminalIndex, |
| 269 }.Infof(c, "Updated log stream terminal index.") | 285 }.Infof(c, "Updated log stream terminal index.") |
| 270 } | 286 } |
| 271 } | 287 } |
| 272 | 288 |
| 273 // Update our state with archival results. | 289 // Update our state with archival results. |
| 274 t.ar.Path = t.Path | |
| 275 t.ar.StreamSize = streamO.Count() | 290 t.ar.StreamSize = streamO.Count() |
| 276 t.ar.IndexSize = indexO.Count() | 291 t.ar.IndexSize = indexO.Count() |
| 277 t.ar.DataSize = dataO.Count() | 292 t.ar.DataSize = dataO.Count() |
| 278 t.ar.Complete = !ss.hasMissingEntries | 293 t.ar.Complete = !ss.hasMissingEntries |
| 279 return | 294 return |
| 280 } | 295 } |
| 281 | 296 |
| 282 func (t *archiveTask) newGSObject(c context.Context, path string, name string) ( *gsObject, error) { | 297 func (t *archiveTask) newGSObject(c context.Context, path string, name string) ( *gsObject, error) { |
| 283 p := t.GSBase.Concat(path, name) | 298 p := t.GSBase.Concat(path, name) |
| 284 o := gsObject{ | 299 o := gsObject{ |
| (...skipping 133 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 418 log.Fields{ | 433 log.Fields{ |
| 419 "index": sidx, | 434 "index": sidx, |
| 420 "terminalIndex": s.terminalIndex, | 435 "terminalIndex": s.terminalIndex, |
| 421 }.Warningf(s, "Discarding log entries beyond expected terminal i ndex.") | 436 }.Warningf(s, "Discarding log entries beyond expected terminal i ndex.") |
| 422 return nil, archive.ErrEndOfStream | 437 return nil, archive.ErrEndOfStream |
| 423 } | 438 } |
| 424 | 439 |
| 425 s.lastIndex = sidx | 440 s.lastIndex = sidx |
| 426 return le, nil | 441 return le, nil |
| 427 } | 442 } |
| OLD | NEW |