Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(20)

Side by Side Diff: server/internal/logdog/archivist/archivist.go

Issue 1853433002: LogDog: Handle archive failures. (Closed) Base URL: https://github.com/luci/luci-go@logdog-gs-update
Patch Set: Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2016 The Chromium Authors. All rights reserved. 1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 package archivist 5 package archivist
6 6
7 import ( 7 import (
8 "fmt" 8 "fmt"
9 9
10 "github.com/golang/protobuf/proto" 10 "github.com/golang/protobuf/proto"
(...skipping 98 matching lines...) Expand 10 before | Expand all | Expand 10 after
109 } 109 }
110 110
111 task := &archiveTask{ 111 task := &archiveTask{
112 Archivist: a, 112 Archivist: a,
113 ArchiveTask: t, 113 ArchiveTask: t,
114 ls: ls, 114 ls: ls,
115 desc: &desc, 115 desc: &desc,
116 } 116 }
117 if err := task.archive(c); err != nil { 117 if err := task.archive(c); err != nil {
118 log.WithError(err).Errorf(c, "Failed to perform archival operati on.") 118 log.WithError(err).Errorf(c, "Failed to perform archival operati on.")
119 » » return err 119
120 » » // Fail only if completeness is a requirement. Allow the Coordin ator to
121 » » // handle errors on !Complete streams.
122 » » if t.Complete {
123 » » » return err
124 » » }
125
126 » » // Report that there was an archival error to the Coordinator's
127 » » // ArchiveStream endpoint. It may accept the archival or respond with a
128 » » // FailedPrecondition error code indicating that this archival r un should
129 » » // be considered a failure.
130 » » task.ar.Error = true
Vadim Sh. 2016/03/31 22:29:59 are transient errors possible in "task.archive"? I
dnj 2016/04/01 22:57:04 They are retried in task.archive already as part o
120 } 131 }
121 log.Fields{ 132 log.Fields{
122 "streamURL": task.ar.StreamUrl, 133 "streamURL": task.ar.StreamUrl,
123 "indexURL": task.ar.IndexUrl, 134 "indexURL": task.ar.IndexUrl,
124 "dataURL": task.ar.DataUrl, 135 "dataURL": task.ar.DataUrl,
125 "terminalIndex": task.ar.TerminalIndex, 136 "terminalIndex": task.ar.TerminalIndex,
126 "complete": task.ar.Complete, 137 "complete": task.ar.Complete,
138 "hadError": task.ar.Error,
127 }.Debugf(c, "Finished archive construction.") 139 }.Debugf(c, "Finished archive construction.")
128 140
129 if _, err := a.Service.ArchiveStream(c, &task.ar); err != nil { 141 if _, err := a.Service.ArchiveStream(c, &task.ar); err != nil {
130 log.WithError(err).Errorf(c, "Failed to mark log stream as archi ved.") 142 log.WithError(err).Errorf(c, "Failed to mark log stream as archi ved.")
131 return err 143 return err
132 } 144 }
133 return nil 145 return nil
134 } 146 }
135 147
136 // archiveTask is the set of parameters for a single archival. 148 // archiveTask is the set of parameters for a single archival.
137 type archiveTask struct { 149 type archiveTask struct {
138 *Archivist 150 *Archivist
139 *logdog.ArchiveTask 151 *logdog.ArchiveTask
140 152
141 // ls is the log stream state. 153 // ls is the log stream state.
142 ls *logdog.LoadStreamResponse 154 ls *logdog.LoadStreamResponse
143 // desc is the unmarshaled log stream descriptor. 155 // desc is the unmarshaled log stream descriptor.
144 desc *logpb.LogStreamDescriptor 156 desc *logpb.LogStreamDescriptor
145 157
146 // ar will be populated during archive construction. 158 // ar will be populated during archive construction.
147 ar logdog.ArchiveStreamRequest 159 ar logdog.ArchiveStreamRequest
148 } 160 }
149 161
150 // archiveState performs the archival operation on a stream described by a 162 // archiveState performs the archival operation on a stream described by a
151 // Coordinator State. Upon success, the State will be updated with the result 163 // Coordinator State. Upon success, the State will be updated with the result
152 // of the archival operation. 164 // of the archival operation.
153 func (t *archiveTask) archive(c context.Context) (err error) { 165 func (t *archiveTask) archive(c context.Context) (err error) {
166 // Minimal ArchiveRequest parameters.
167 t.ar.Path = t.Path
168 t.ar.TerminalIndex = -1
169
154 // Generate our archival object managers. 170 // Generate our archival object managers.
155 bext := t.desc.BinaryFileExt 171 bext := t.desc.BinaryFileExt
156 if bext == "" { 172 if bext == "" {
157 bext = "bin" 173 bext = "bin"
158 } 174 }
159 175
160 path := t.Path 176 path := t.Path
161 var streamO, indexO, dataO *gsObject 177 var streamO, indexO, dataO *gsObject
162 streamO, err = t.newGSObject(c, path, "logstream.entries") 178 streamO, err = t.newGSObject(c, path, "logstream.entries")
163 if err != nil { 179 if err != nil {
(...skipping 100 matching lines...) Expand 10 before | Expand all | Expand 10 after
264 } else { 280 } else {
265 // Update our terminal index. 281 // Update our terminal index.
266 log.Fields{ 282 log.Fields{
267 "from": tidx, 283 "from": tidx,
268 "to": t.ar.TerminalIndex, 284 "to": t.ar.TerminalIndex,
269 }.Infof(c, "Updated log stream terminal index.") 285 }.Infof(c, "Updated log stream terminal index.")
270 } 286 }
271 } 287 }
272 288
273 // Update our state with archival results. 289 // Update our state with archival results.
274 t.ar.Path = t.Path
275 t.ar.StreamSize = streamO.Count() 290 t.ar.StreamSize = streamO.Count()
276 t.ar.IndexSize = indexO.Count() 291 t.ar.IndexSize = indexO.Count()
277 t.ar.DataSize = dataO.Count() 292 t.ar.DataSize = dataO.Count()
278 t.ar.Complete = !ss.hasMissingEntries 293 t.ar.Complete = !ss.hasMissingEntries
279 return 294 return
280 } 295 }
281 296
282 func (t *archiveTask) newGSObject(c context.Context, path string, name string) ( *gsObject, error) { 297 func (t *archiveTask) newGSObject(c context.Context, path string, name string) ( *gsObject, error) {
283 p := t.GSBase.Concat(path, name) 298 p := t.GSBase.Concat(path, name)
284 o := gsObject{ 299 o := gsObject{
(...skipping 133 matching lines...) Expand 10 before | Expand all | Expand 10 after
418 log.Fields{ 433 log.Fields{
419 "index": sidx, 434 "index": sidx,
420 "terminalIndex": s.terminalIndex, 435 "terminalIndex": s.terminalIndex,
421 }.Warningf(s, "Discarding log entries beyond expected terminal i ndex.") 436 }.Warningf(s, "Discarding log entries beyond expected terminal i ndex.")
422 return nil, archive.ErrEndOfStream 437 return nil, archive.ErrEndOfStream
423 } 438 }
424 439
425 s.lastIndex = sidx 440 s.lastIndex = sidx
426 return le, nil 441 return le, nil
427 } 442 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698