OLD | NEW |
1 // Copyright 2016 The Chromium Authors. All rights reserved. | 1 // Copyright 2016 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 package archivist | 5 package archivist |
6 | 6 |
7 import ( | 7 import ( |
8 "bytes" | 8 "bytes" |
9 "encoding/hex" | 9 "encoding/hex" |
10 "fmt" | 10 "fmt" |
11 "io" | 11 "io" |
12 | 12 |
13 "github.com/golang/protobuf/proto" | 13 "github.com/golang/protobuf/proto" |
14 "github.com/luci/luci-go/common/api/logdog_coordinator/services/v1" | 14 "github.com/luci/luci-go/common/api/logdog_coordinator/services/v1" |
| 15 "github.com/luci/luci-go/common/config" |
15 "github.com/luci/luci-go/common/errors" | 16 "github.com/luci/luci-go/common/errors" |
16 "github.com/luci/luci-go/common/gcloud/gs" | 17 "github.com/luci/luci-go/common/gcloud/gs" |
17 "github.com/luci/luci-go/common/logdog/types" | 18 "github.com/luci/luci-go/common/logdog/types" |
18 log "github.com/luci/luci-go/common/logging" | 19 log "github.com/luci/luci-go/common/logging" |
19 "github.com/luci/luci-go/common/parallel" | 20 "github.com/luci/luci-go/common/parallel" |
20 "github.com/luci/luci-go/common/proto/logdog/logpb" | 21 "github.com/luci/luci-go/common/proto/logdog/logpb" |
21 "github.com/luci/luci-go/server/logdog/archive" | 22 "github.com/luci/luci-go/server/logdog/archive" |
22 "github.com/luci/luci-go/server/logdog/storage" | 23 "github.com/luci/luci-go/server/logdog/storage" |
23 "golang.org/x/net/context" | 24 "golang.org/x/net/context" |
24 ) | 25 ) |
(...skipping 54 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
79 // delete the task). The return value of true should only be used if the task | 80 // delete the task). The return value of true should only be used if the task |
80 // is truly complete and acknowledged by the Coordinator. | 81 // is truly complete and acknowledged by the Coordinator. |
81 // | 82 // |
82 // If the supplied Context is Done, operation may terminate before completion, | 83 // If the supplied Context is Done, operation may terminate before completion, |
83 // returning the Context's error. | 84 // returning the Context's error. |
84 func (a *Archivist) ArchiveTask(c context.Context, task Task) bool { | 85 func (a *Archivist) ArchiveTask(c context.Context, task Task) bool { |
85 delete, err := a.archiveTaskImpl(c, task) | 86 delete, err := a.archiveTaskImpl(c, task) |
86 log.Fields{ | 87 log.Fields{ |
87 log.ErrorKey: err, | 88 log.ErrorKey: err, |
88 "delete": delete, | 89 "delete": delete, |
| 90 "project": task.Task().Project, |
89 "path": task.Task().Path, | 91 "path": task.Task().Path, |
90 }.Infof(c, "Finished archive task.") | 92 }.Infof(c, "Finished archive task.") |
91 return delete | 93 return delete |
92 } | 94 } |
93 | 95 |
94 // archiveTaskImpl returns the same boolean value as ArchiveTask, but includes | 96 // archiveTaskImpl returns the same boolean value as ArchiveTask, but includes |
95 // an error. The error is useful for testing to assert that certain conditions | 97 // an error. The error is useful for testing to assert that certain conditions |
96 // were hit. | 98 // were hit. |
97 func (a *Archivist) archiveTaskImpl(c context.Context, task Task) (bool, error)
{ | 99 func (a *Archivist) archiveTaskImpl(c context.Context, task Task) (bool, error)
{ |
98 at := task.Task() | 100 at := task.Task() |
99 log.Fields{ | 101 log.Fields{ |
100 » » "path": at.Path, | 102 » » "project": at.Project, |
| 103 » » "path": at.Path, |
101 }.Debugf(c, "Received archival task.") | 104 }.Debugf(c, "Received archival task.") |
102 | 105 |
| 106 if err := types.StreamPath(at.Path).Validate(); err != nil { |
| 107 return true, fmt.Errorf("invalid path %q: %s", at.Path, err) |
| 108 } |
| 109 |
| 110 // TODO(dnj): Remove empty project exemption, make empty project invalid
. |
| 111 if at.Project != "" { |
| 112 if err := config.ProjectName(at.Project).Validate(); err != nil
{ |
| 113 return true, fmt.Errorf("invalid project name %q: %s", a
t.Project, err) |
| 114 } |
| 115 } |
| 116 |
103 // Load the log stream's current state. If it is already archived, we wi
ll | 117 // Load the log stream's current state. If it is already archived, we wi
ll |
104 // return an immediate success. | 118 // return an immediate success. |
105 ls, err := a.Service.LoadStream(c, &logdog.LoadStreamRequest{ | 119 ls, err := a.Service.LoadStream(c, &logdog.LoadStreamRequest{ |
106 » » Path: at.Path, | 120 » » Project: at.Project, |
107 » » Desc: true, | 121 » » Path: at.Path, |
| 122 » » Desc: true, |
108 }) | 123 }) |
109 switch { | 124 switch { |
110 case err != nil: | 125 case err != nil: |
111 log.WithError(err).Errorf(c, "Failed to load log stream.") | 126 log.WithError(err).Errorf(c, "Failed to load log stream.") |
112 return false, err | 127 return false, err |
113 | 128 |
114 case ls.State == nil: | 129 case ls.State == nil: |
115 log.Errorf(c, "Log stream did not include state.") | 130 log.Errorf(c, "Log stream did not include state.") |
116 return false, errors.New("log stream did not include state") | 131 return false, errors.New("log stream did not include state") |
117 | 132 |
(...skipping 58 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
176 tidx := ls.State.TerminalIndex | 191 tidx := ls.State.TerminalIndex |
177 | 192 |
178 if tidx < 0 { | 193 if tidx < 0 { |
179 log.Warningf(c, "Cannot archive complete stream with no
terminal index.") | 194 log.Warningf(c, "Cannot archive complete stream with no
terminal index.") |
180 return false, errors.New("completeness required, but str
eam has no terminal index") | 195 return false, errors.New("completeness required, but str
eam has no terminal index") |
181 } | 196 } |
182 | 197 |
183 // If we're requiring completeness, perform a keys-only scan of
intermediate | 198 // If we're requiring completeness, perform a keys-only scan of
intermediate |
184 // storage to ensure that we have all of the records before we b
other | 199 // storage to ensure that we have all of the records before we b
other |
185 // streaming to storage only to find that we are missing data. | 200 // streaming to storage only to find that we are missing data. |
186 » » if err := a.checkComplete(types.StreamPath(at.Path), types.Messa
geIndex(tidx)); err != nil { | 201 » » if err := a.checkComplete(config.ProjectName(at.Project), types.
StreamPath(at.Path), types.MessageIndex(tidx)); err != nil { |
187 return false, err | 202 return false, err |
188 } | 203 } |
189 } | 204 } |
190 | 205 |
191 ar := logdog.ArchiveStreamRequest{ | 206 ar := logdog.ArchiveStreamRequest{ |
192 » » Path: at.Path, | 207 » » Project: at.Project, |
| 208 » » Path: at.Path, |
193 } | 209 } |
194 | 210 |
195 // Archive to staging. | 211 // Archive to staging. |
196 // | 212 // |
197 // If a non-transient failure occurs here, we will report it to the Arch
ivist | 213 // If a non-transient failure occurs here, we will report it to the Arch
ivist |
198 // under the assumption that it will continue occurring. | 214 // under the assumption that it will continue occurring. |
199 // | 215 // |
200 // We will handle error creating the plan and executing the plan in the
same | 216 // We will handle error creating the plan and executing the plan in the
same |
201 // switch statement below. | 217 // switch statement below. |
202 » staged, err := a.makeStagedArchival(c, types.StreamPath(at.Path), ls, ta
sk.UniqueID()) | 218 » staged, err := a.makeStagedArchival(c, config.ProjectName(at.Project), t
ypes.StreamPath(at.Path), ls, task.UniqueID()) |
203 if err != nil { | 219 if err != nil { |
204 log.WithError(err).Errorf(c, "Failed to create staged archival p
lan.") | 220 log.WithError(err).Errorf(c, "Failed to create staged archival p
lan.") |
205 } else { | 221 } else { |
206 err = staged.stage(c) | 222 err = staged.stage(c) |
207 } | 223 } |
208 | 224 |
209 switch { | 225 switch { |
210 case errors.IsTransient(err): | 226 case errors.IsTransient(err): |
211 // If this is a transient error, exit immediately and do not del
ete the | 227 // If this is a transient error, exit immediately and do not del
ete the |
212 // archival task. | 228 // archival task. |
(...skipping 51 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
264 return false, err | 280 return false, err |
265 } | 281 } |
266 | 282 |
267 // Archival is complete and acknowledged by Coordinator. Consume the arc
hival | 283 // Archival is complete and acknowledged by Coordinator. Consume the arc
hival |
268 // task. | 284 // task. |
269 return true, nil | 285 return true, nil |
270 } | 286 } |
271 | 287 |
272 // checkComplete performs a quick scan of intermediate storage to ensure that | 288 // checkComplete performs a quick scan of intermediate storage to ensure that |
273 // all of the log stream's records are available. | 289 // all of the log stream's records are available. |
274 func (a *Archivist) checkComplete(path types.StreamPath, tidx types.MessageIndex
) error { | 290 func (a *Archivist) checkComplete(project config.ProjectName, path types.StreamP
ath, tidx types.MessageIndex) error { |
275 sreq := storage.GetRequest{ | 291 sreq := storage.GetRequest{ |
| 292 Project: project, |
276 Path: path, | 293 Path: path, |
277 KeysOnly: true, | 294 KeysOnly: true, |
278 } | 295 } |
279 | 296 |
280 nextIndex := types.MessageIndex(0) | 297 nextIndex := types.MessageIndex(0) |
281 var ierr error | 298 var ierr error |
282 err := a.Storage.Get(sreq, func(idx types.MessageIndex, d []byte) bool { | 299 err := a.Storage.Get(sreq, func(idx types.MessageIndex, d []byte) bool { |
283 switch { | 300 switch { |
284 case idx != nextIndex: | 301 case idx != nextIndex: |
285 ierr = fmt.Errorf("missing log entry index %d (next %d)"
, nextIndex, idx) | 302 ierr = fmt.Errorf("missing log entry index %d (next %d)"
, nextIndex, idx) |
(...skipping 10 matching lines...) Expand all Loading... |
296 }) | 313 }) |
297 if ierr != nil { | 314 if ierr != nil { |
298 return ierr | 315 return ierr |
299 } | 316 } |
300 if err != nil { | 317 if err != nil { |
301 return err | 318 return err |
302 } | 319 } |
303 return nil | 320 return nil |
304 } | 321 } |
305 | 322 |
306 func (a *Archivist) makeStagedArchival(c context.Context, path types.StreamPath,
ls *logdog.LoadStreamResponse, uid string) ( | 323 func (a *Archivist) makeStagedArchival(c context.Context, project config.Project
Name, path types.StreamPath, |
307 » *stagedArchival, error) { | 324 » ls *logdog.LoadStreamResponse, uid string) (*stagedArchival, error) { |
308 sa := stagedArchival{ | 325 sa := stagedArchival{ |
309 Archivist: a, | 326 Archivist: a, |
| 327 project: project, |
310 path: path, | 328 path: path, |
311 | 329 |
312 terminalIndex: ls.State.TerminalIndex, | 330 terminalIndex: ls.State.TerminalIndex, |
313 } | 331 } |
314 | 332 |
315 // Deserialize and validate the descriptor protobuf. If this fails, it i
s a | 333 // Deserialize and validate the descriptor protobuf. If this fails, it i
s a |
316 // non-transient error. | 334 // non-transient error. |
317 if err := proto.Unmarshal(ls.Desc, &sa.desc); err != nil { | 335 if err := proto.Unmarshal(ls.Desc, &sa.desc); err != nil { |
318 log.Fields{ | 336 log.Fields{ |
319 log.ErrorKey: err, | 337 log.ErrorKey: err, |
320 "protoVersion": ls.State.ProtoVersion, | 338 "protoVersion": ls.State.ProtoVersion, |
321 }.Errorf(c, "Failed to unmarshal descriptor protobuf.") | 339 }.Errorf(c, "Failed to unmarshal descriptor protobuf.") |
322 return nil, err | 340 return nil, err |
323 } | 341 } |
324 | 342 |
325 bext := sa.desc.BinaryFileExt | 343 bext := sa.desc.BinaryFileExt |
326 if bext == "" { | 344 if bext == "" { |
327 bext = "bin" | 345 bext = "bin" |
328 } | 346 } |
329 | 347 |
330 // Construct our staged archival paths. | 348 // Construct our staged archival paths. |
331 » sa.stream = a.makeStagingPaths(path, "logstream.entries", uid) | 349 » sa.stream = a.makeStagingPaths(project, path, "logstream.entries", uid) |
332 » sa.index = a.makeStagingPaths(path, "logstream.index", uid) | 350 » sa.index = a.makeStagingPaths(project, path, "logstream.index", uid) |
333 » sa.data = a.makeStagingPaths(path, fmt.Sprintf("data.%s", bext), uid) | 351 » sa.data = a.makeStagingPaths(project, path, fmt.Sprintf("data.%s", bext)
, uid) |
334 return &sa, nil | 352 return &sa, nil |
335 } | 353 } |
336 | 354 |
337 // makeStagingPaths returns a stagingPaths instance for the given path and | 355 // makeStagingPaths returns a stagingPaths instance for the given path and |
338 // file name. It incorporates a unique ID into the staging name to differentiate | 356 // file name. It incorporates a unique ID into the staging name to differentiate |
339 // it from other staging paths for the same path/name. | 357 // it from other staging paths for the same path/name. |
340 func (a *Archivist) makeStagingPaths(path types.StreamPath, name, uid string) st
agingPaths { | 358 func (a *Archivist) makeStagingPaths(project config.ProjectName, path types.Stre
amPath, name, uid string) stagingPaths { |
| 359 » // TODO(dnj): This won't be necessary when empty project is invalid. |
| 360 » if project == "" { |
| 361 » » project = "_" |
| 362 » } |
| 363 |
341 return stagingPaths{ | 364 return stagingPaths{ |
342 » » staged: a.GSStagingBase.Concat(string(path), uid, name), | 365 » » staged: a.GSStagingBase.Concat(string(project), string(path), ui
d, name), |
343 » » final: a.GSBase.Concat(string(path), name), | 366 » » final: a.GSBase.Concat(string(project), string(path), name), |
344 } | 367 } |
345 } | 368 } |
346 | 369 |
347 type stagedArchival struct { | 370 type stagedArchival struct { |
348 *Archivist | 371 *Archivist |
349 | 372 |
350 » path types.StreamPath | 373 » project config.ProjectName |
351 » desc logpb.LogStreamDescriptor | 374 » path types.StreamPath |
| 375 » desc logpb.LogStreamDescriptor |
352 | 376 |
353 stream stagingPaths | 377 stream stagingPaths |
354 index stagingPaths | 378 index stagingPaths |
355 data stagingPaths | 379 data stagingPaths |
356 | 380 |
357 finalized bool | 381 finalized bool |
358 terminalIndex int64 | 382 terminalIndex int64 |
359 logEntryCount int64 | 383 logEntryCount int64 |
360 } | 384 } |
361 | 385 |
(...skipping 65 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
427 | 451 |
428 if dataWriter, err = createWriter(sa.data.staged); err != nil { | 452 if dataWriter, err = createWriter(sa.data.staged); err != nil { |
429 return err | 453 return err |
430 } | 454 } |
431 defer closeWriter(dataWriter, sa.data.staged) | 455 defer closeWriter(dataWriter, sa.data.staged) |
432 | 456 |
433 // Read our log entries from intermediate storage. | 457 // Read our log entries from intermediate storage. |
434 ss := storageSource{ | 458 ss := storageSource{ |
435 Context: c, | 459 Context: c, |
436 st: sa.Storage, | 460 st: sa.Storage, |
| 461 project: sa.project, |
437 path: sa.path, | 462 path: sa.path, |
438 terminalIndex: types.MessageIndex(sa.terminalIndex), | 463 terminalIndex: types.MessageIndex(sa.terminalIndex), |
439 lastIndex: -1, | 464 lastIndex: -1, |
440 } | 465 } |
441 | 466 |
442 m := archive.Manifest{ | 467 m := archive.Manifest{ |
443 Desc: &sa.desc, | 468 Desc: &sa.desc, |
444 Source: &ss, | 469 Source: &ss, |
445 LogWriter: streamWriter, | 470 LogWriter: streamWriter, |
446 IndexWriter: indexWriter, | 471 IndexWriter: indexWriter, |
(...skipping 99 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
546 } | 571 } |
547 } | 572 } |
548 | 573 |
549 func (sa *stagedArchival) getStagingPaths() []*stagingPaths { | 574 func (sa *stagedArchival) getStagingPaths() []*stagingPaths { |
550 return []*stagingPaths{ | 575 return []*stagingPaths{ |
551 &sa.stream, | 576 &sa.stream, |
552 &sa.index, | 577 &sa.index, |
553 &sa.data, | 578 &sa.data, |
554 } | 579 } |
555 } | 580 } |
OLD | NEW |