Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(200)

Side by Side Diff: server/internal/logdog/archivist/archivist.go

Issue 1853433002: LogDog: Handle archive failures. (Closed) Base URL: https://github.com/luci/luci-go@logdog-gs-update
Patch Set: Regenerate protobufs. Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2016 The Chromium Authors. All rights reserved. 1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 package archivist 5 package archivist
6 6
7 import ( 7 import (
8 "fmt" 8 "fmt"
9 9
10 "github.com/golang/protobuf/proto" 10 "github.com/golang/protobuf/proto"
(...skipping 98 matching lines...) Expand 10 before | Expand all | Expand 10 after
109 } 109 }
110 110
111 task := &archiveTask{ 111 task := &archiveTask{
112 Archivist: a, 112 Archivist: a,
113 ArchiveTask: t, 113 ArchiveTask: t,
114 ls: ls, 114 ls: ls,
115 desc: &desc, 115 desc: &desc,
116 } 116 }
117 if err := task.archive(c); err != nil { 117 if err := task.archive(c); err != nil {
118 log.WithError(err).Errorf(c, "Failed to perform archival operati on.") 118 log.WithError(err).Errorf(c, "Failed to perform archival operati on.")
119 » » return err 119
120 » » // Fail only if completeness is a requirement. Allow the Coordin ator to
121 » » // handle errors on !Complete streams.
122 » » if t.Complete {
123 » » » return err
124 » » }
125
126 » » // Report that there was an archival error to the Coordinator's
127 » » // ArchiveStream endpoint. It may accept the archival or respond with a
128 » » // FailedPrecondition error code indicating that this archival r un should
129 » » // be considered a failure, in which case we will refrain from d eleting our
130 » » // task and try again.
131 » » task.ar.Error = true
120 } 132 }
121 log.Fields{ 133 log.Fields{
122 "streamURL": task.ar.StreamUrl, 134 "streamURL": task.ar.StreamUrl,
123 "indexURL": task.ar.IndexUrl, 135 "indexURL": task.ar.IndexUrl,
124 "dataURL": task.ar.DataUrl, 136 "dataURL": task.ar.DataUrl,
125 "terminalIndex": task.ar.TerminalIndex, 137 "terminalIndex": task.ar.TerminalIndex,
126 "complete": task.ar.Complete, 138 "complete": task.ar.Complete,
139 "hadError": task.ar.Error,
127 }.Debugf(c, "Finished archive construction.") 140 }.Debugf(c, "Finished archive construction.")
128 141
129 if _, err := a.Service.ArchiveStream(c, &task.ar); err != nil { 142 if _, err := a.Service.ArchiveStream(c, &task.ar); err != nil {
130 log.WithError(err).Errorf(c, "Failed to mark log stream as archi ved.") 143 log.WithError(err).Errorf(c, "Failed to mark log stream as archi ved.")
131 return err 144 return err
132 } 145 }
133 return nil 146 return nil
134 } 147 }
135 148
136 // archiveTask is the set of parameters for a single archival. 149 // archiveTask is the set of parameters for a single archival.
137 type archiveTask struct { 150 type archiveTask struct {
138 *Archivist 151 *Archivist
139 *logdog.ArchiveTask 152 *logdog.ArchiveTask
140 153
141 // ls is the log stream state. 154 // ls is the log stream state.
142 ls *logdog.LoadStreamResponse 155 ls *logdog.LoadStreamResponse
143 // desc is the unmarshaled log stream descriptor. 156 // desc is the unmarshaled log stream descriptor.
144 desc *logpb.LogStreamDescriptor 157 desc *logpb.LogStreamDescriptor
145 158
146 // ar will be populated during archive construction. 159 // ar will be populated during archive construction.
147 ar logdog.ArchiveStreamRequest 160 ar logdog.ArchiveStreamRequest
148 } 161 }
149 162
150 // archiveState performs the archival operation on a stream described by a 163 // archiveState performs the archival operation on a stream described by a
151 // Coordinator State. Upon success, the State will be updated with the result 164 // Coordinator State. Upon success, the State will be updated with the result
152 // of the archival operation. 165 // of the archival operation.
153 func (t *archiveTask) archive(c context.Context) (err error) { 166 func (t *archiveTask) archive(c context.Context) (err error) {
167 // Minimal ArchiveRequest parameters.
168 t.ar.Path = t.Path
169 t.ar.TerminalIndex = -1
170
154 // Generate our archival object managers. 171 // Generate our archival object managers.
155 bext := t.desc.BinaryFileExt 172 bext := t.desc.BinaryFileExt
156 if bext == "" { 173 if bext == "" {
157 bext = "bin" 174 bext = "bin"
158 } 175 }
159 176
160 path := t.Path 177 path := t.Path
161 var streamO, indexO, dataO *gsObject 178 var streamO, indexO, dataO *gsObject
162 streamO, err = t.newGSObject(c, path, "logstream.entries") 179 streamO, err = t.newGSObject(c, path, "logstream.entries")
163 if err != nil { 180 if err != nil {
(...skipping 100 matching lines...) Expand 10 before | Expand all | Expand 10 after
264 } else { 281 } else {
265 // Update our terminal index. 282 // Update our terminal index.
266 log.Fields{ 283 log.Fields{
267 "from": tidx, 284 "from": tidx,
268 "to": t.ar.TerminalIndex, 285 "to": t.ar.TerminalIndex,
269 }.Infof(c, "Updated log stream terminal index.") 286 }.Infof(c, "Updated log stream terminal index.")
270 } 287 }
271 } 288 }
272 289
273 // Update our state with archival results. 290 // Update our state with archival results.
274 t.ar.Path = t.Path
275 t.ar.StreamSize = streamO.Count() 291 t.ar.StreamSize = streamO.Count()
276 t.ar.IndexSize = indexO.Count() 292 t.ar.IndexSize = indexO.Count()
277 t.ar.DataSize = dataO.Count() 293 t.ar.DataSize = dataO.Count()
278 t.ar.Complete = !ss.hasMissingEntries 294 t.ar.Complete = !ss.hasMissingEntries
279 return 295 return
280 } 296 }
281 297
282 func (t *archiveTask) newGSObject(c context.Context, path string, name string) ( *gsObject, error) { 298 func (t *archiveTask) newGSObject(c context.Context, path string, name string) ( *gsObject, error) {
283 p := t.GSBase.Concat(path, name) 299 p := t.GSBase.Concat(path, name)
284 o := gsObject{ 300 o := gsObject{
(...skipping 133 matching lines...) Expand 10 before | Expand all | Expand 10 after
418 log.Fields{ 434 log.Fields{
419 "index": sidx, 435 "index": sidx,
420 "terminalIndex": s.terminalIndex, 436 "terminalIndex": s.terminalIndex,
421 }.Warningf(s, "Discarding log entries beyond expected terminal i ndex.") 437 }.Warningf(s, "Discarding log entries beyond expected terminal i ndex.")
422 return nil, archive.ErrEndOfStream 438 return nil, archive.ErrEndOfStream
423 } 439 }
424 440
425 s.lastIndex = sidx 441 s.lastIndex = sidx
426 return le, nil 442 return le, nil
427 } 443 }
OLDNEW
« no previous file with comments | « common/api/logdog_coordinator/services/v1/service.pb.go ('k') | server/internal/logdog/archivist/archivist_test.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698