Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright 2016 The Chromium Authors. All rights reserved. | 1 // Copyright 2016 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 package archivist | 5 package archivist |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "bytes" | 8 "bytes" |
| 9 "encoding/hex" | 9 "encoding/hex" |
| 10 "fmt" | 10 "fmt" |
| 11 "io" | 11 "io" |
| 12 | 12 |
| 13 "github.com/golang/protobuf/proto" | 13 "github.com/golang/protobuf/proto" |
| 14 "github.com/luci/luci-go/common/api/logdog_coordinator/services/v1" | 14 "github.com/luci/luci-go/common/api/logdog_coordinator/services/v1" |
| 15 "github.com/luci/luci-go/common/config" | |
| 15 "github.com/luci/luci-go/common/errors" | 16 "github.com/luci/luci-go/common/errors" |
| 16 "github.com/luci/luci-go/common/gcloud/gs" | 17 "github.com/luci/luci-go/common/gcloud/gs" |
| 17 "github.com/luci/luci-go/common/logdog/types" | 18 "github.com/luci/luci-go/common/logdog/types" |
| 18 log "github.com/luci/luci-go/common/logging" | 19 log "github.com/luci/luci-go/common/logging" |
| 19 "github.com/luci/luci-go/common/parallel" | 20 "github.com/luci/luci-go/common/parallel" |
| 20 "github.com/luci/luci-go/common/proto/logdog/logpb" | 21 "github.com/luci/luci-go/common/proto/logdog/logpb" |
| 21 "github.com/luci/luci-go/server/logdog/archive" | 22 "github.com/luci/luci-go/server/logdog/archive" |
| 22 "github.com/luci/luci-go/server/logdog/storage" | 23 "github.com/luci/luci-go/server/logdog/storage" |
| 23 "golang.org/x/net/context" | 24 "golang.org/x/net/context" |
| 24 ) | 25 ) |
| (...skipping 54 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 79 // delete the task). The return value of true should only be used if the task | 80 // delete the task). The return value of true should only be used if the task |
| 80 // is truly complete and acknowledged by the Coordinator. | 81 // is truly complete and acknowledged by the Coordinator. |
| 81 // | 82 // |
| 82 // If the supplied Context is Done, operation may terminate before completion, | 83 // If the supplied Context is Done, operation may terminate before completion, |
| 83 // returning the Context's error. | 84 // returning the Context's error. |
| 84 func (a *Archivist) ArchiveTask(c context.Context, task Task) bool { | 85 func (a *Archivist) ArchiveTask(c context.Context, task Task) bool { |
| 85 delete, err := a.archiveTaskImpl(c, task) | 86 delete, err := a.archiveTaskImpl(c, task) |
| 86 log.Fields{ | 87 log.Fields{ |
| 87 log.ErrorKey: err, | 88 log.ErrorKey: err, |
| 88 "delete": delete, | 89 "delete": delete, |
| 90 "project": task.Task().Project, | |
| 89 "path": task.Task().Path, | 91 "path": task.Task().Path, |
| 90 }.Infof(c, "Finished archive task.") | 92 }.Infof(c, "Finished archive task.") |
| 91 return delete | 93 return delete |
| 92 } | 94 } |
| 93 | 95 |
| 94 // archiveTaskImpl returns the same boolean value as ArchiveTask, but includes | 96 // archiveTaskImpl returns the same boolean value as ArchiveTask, but includes |
| 95 // an error. The error is useful for testing to assert that certain conditions | 97 // an error. The error is useful for testing to assert that certain conditions |
| 96 // were hit. | 98 // were hit. |
| 97 func (a *Archivist) archiveTaskImpl(c context.Context, task Task) (bool, error) { | 99 func (a *Archivist) archiveTaskImpl(c context.Context, task Task) (bool, error) { |
| 98 at := task.Task() | 100 at := task.Task() |
| 99 log.Fields{ | 101 log.Fields{ |
| 100 » » "path": at.Path, | 102 » » "project": at.Project, |
| 103 » » "path": at.Path, | |
| 101 }.Debugf(c, "Received archival task.") | 104 }.Debugf(c, "Received archival task.") |
| 102 | 105 |
| 106 if err := types.StreamPath(at.Path).Validate(); err != nil { | |
| 107 return true, fmt.Errorf("invalid path %q: %s", at.Path, err) | |
| 108 } | |
| 109 | |
| 110 // TODO(dnj): Remove empty project exemption, make empty project invalid . | |
| 111 if at.Project != "" { | |
| 112 if err := config.ProjectName(at.Project).Validate(); err != nil { | |
| 113 return true, fmt.Errorf("invalid project name %q: %s", a t.Project, err) | |
| 114 } | |
| 115 } | |
| 116 | |
| 103 // Load the log stream's current state. If it is already archived, we wi ll | 117 // Load the log stream's current state. If it is already archived, we wi ll |
| 104 // return an immediate success. | 118 // return an immediate success. |
| 105 ls, err := a.Service.LoadStream(c, &logdog.LoadStreamRequest{ | 119 ls, err := a.Service.LoadStream(c, &logdog.LoadStreamRequest{ |
| 106 » » Path: at.Path, | 120 » » Project: at.Project, |
| 107 » » Desc: true, | 121 » » Path: at.Path, |
| 122 » » Desc: true, | |
| 108 }) | 123 }) |
| 109 switch { | 124 switch { |
| 110 case err != nil: | 125 case err != nil: |
| 111 log.WithError(err).Errorf(c, "Failed to load log stream.") | 126 log.WithError(err).Errorf(c, "Failed to load log stream.") |
| 112 return false, err | 127 return false, err |
| 113 | 128 |
| 114 case ls.State == nil: | 129 case ls.State == nil: |
| 115 log.Errorf(c, "Log stream did not include state.") | 130 log.Errorf(c, "Log stream did not include state.") |
| 116 return false, errors.New("log stream did not include state") | 131 return false, errors.New("log stream did not include state") |
| 117 | 132 |
| (...skipping 58 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 176 tidx := ls.State.TerminalIndex | 191 tidx := ls.State.TerminalIndex |
| 177 | 192 |
| 178 if tidx < 0 { | 193 if tidx < 0 { |
| 179 log.Warningf(c, "Cannot archive complete stream with no terminal index.") | 194 log.Warningf(c, "Cannot archive complete stream with no terminal index.") |
| 180 return false, errors.New("completeness required, but str eam has no terminal index") | 195 return false, errors.New("completeness required, but str eam has no terminal index") |
| 181 } | 196 } |
| 182 | 197 |
| 183 // If we're requiring completeness, perform a keys-only scan of intermediate | 198 // If we're requiring completeness, perform a keys-only scan of intermediate |
| 184 // storage to ensure that we have all of the records before we b other | 199 // storage to ensure that we have all of the records before we b other |
| 185 // streaming to storage only to find that we are missing data. | 200 // streaming to storage only to find that we are missing data. |
| 186 » » if err := a.checkComplete(types.StreamPath(at.Path), types.Messa geIndex(tidx)); err != nil { | 201 » » if err := a.checkComplete(config.ProjectName(at.Project), types. StreamPath(at.Path), types.MessageIndex(tidx)); err != nil { |
| 187 return false, err | 202 return false, err |
| 188 } | 203 } |
| 189 } | 204 } |
| 190 | 205 |
| 191 ar := logdog.ArchiveStreamRequest{ | 206 ar := logdog.ArchiveStreamRequest{ |
| 192 » » Path: at.Path, | 207 » » Project: at.Project, |
| 208 » » Path: at.Path, | |
| 193 } | 209 } |
| 194 | 210 |
| 195 // Archive to staging. | 211 // Archive to staging. |
| 196 // | 212 // |
| 197 // If a non-transient failure occurs here, we will report it to the Arch ivist | 213 // If a non-transient failure occurs here, we will report it to the Arch ivist |
| 198 // under the assumption that it will continue occurring. | 214 // under the assumption that it will continue occurring. |
| 199 // | 215 // |
| 200 // We will handle error creating the plan and executing the plan in the same | 216 // We will handle error creating the plan and executing the plan in the same |
| 201 // switch statement below. | 217 // switch statement below. |
| 202 » staged, err := a.makeStagedArchival(c, types.StreamPath(at.Path), ls, ta sk.UniqueID()) | 218 » staged, err := a.makeStagedArchival(c, config.ProjectName(at.Project), t ypes.StreamPath(at.Path), ls, task.UniqueID()) |
| 203 if err != nil { | 219 if err != nil { |
| 204 log.WithError(err).Errorf(c, "Failed to create staged archival p lan.") | 220 log.WithError(err).Errorf(c, "Failed to create staged archival p lan.") |
| 205 } else { | 221 } else { |
| 206 err = staged.stage(c) | 222 err = staged.stage(c) |
| 207 } | 223 } |
| 208 | 224 |
| 209 switch { | 225 switch { |
| 210 case errors.IsTransient(err): | 226 case errors.IsTransient(err): |
| 211 // If this is a transient error, exit immediately and do not del ete the | 227 // If this is a transient error, exit immediately and do not del ete the |
| 212 // archival task. | 228 // archival task. |
| (...skipping 51 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 264 return false, err | 280 return false, err |
| 265 } | 281 } |
| 266 | 282 |
| 267 // Archival is complete and acknowledged by Coordinator. Consume the arc hival | 283 // Archival is complete and acknowledged by Coordinator. Consume the arc hival |
| 268 // task. | 284 // task. |
| 269 return true, nil | 285 return true, nil |
| 270 } | 286 } |
| 271 | 287 |
| 272 // checkComplete performs a quick scan of intermediate storage to ensure that | 288 // checkComplete performs a quick scan of intermediate storage to ensure that |
| 273 // all of the log stream's records are available. | 289 // all of the log stream's records are available. |
| 274 func (a *Archivist) checkComplete(path types.StreamPath, tidx types.MessageIndex ) error { | 290 func (a *Archivist) checkComplete(project config.ProjectName, path types.StreamP ath, tidx types.MessageIndex) error { |
| 275 sreq := storage.GetRequest{ | 291 sreq := storage.GetRequest{ |
| 292 Project: project, | |
| 276 Path: path, | 293 Path: path, |
| 277 KeysOnly: true, | 294 KeysOnly: true, |
| 278 } | 295 } |
| 279 | 296 |
| 280 nextIndex := types.MessageIndex(0) | 297 nextIndex := types.MessageIndex(0) |
| 281 var ierr error | 298 var ierr error |
| 282 err := a.Storage.Get(sreq, func(idx types.MessageIndex, d []byte) bool { | 299 err := a.Storage.Get(sreq, func(idx types.MessageIndex, d []byte) bool { |
| 283 switch { | 300 switch { |
| 284 case idx != nextIndex: | 301 case idx != nextIndex: |
| 285 ierr = fmt.Errorf("missing log entry index %d (next %d)" , nextIndex, idx) | 302 ierr = fmt.Errorf("missing log entry index %d (next %d)" , nextIndex, idx) |
| (...skipping 10 matching lines...) Expand all Loading... | |
| 296 }) | 313 }) |
| 297 if ierr != nil { | 314 if ierr != nil { |
| 298 return ierr | 315 return ierr |
| 299 } | 316 } |
| 300 if err != nil { | 317 if err != nil { |
| 301 return err | 318 return err |
| 302 } | 319 } |
| 303 return nil | 320 return nil |
| 304 } | 321 } |
| 305 | 322 |
| 306 func (a *Archivist) makeStagedArchival(c context.Context, path types.StreamPath, ls *logdog.LoadStreamResponse, uid string) ( | 323 func (a *Archivist) makeStagedArchival(c context.Context, project config.Project Name, path types.StreamPath, |
| 307 » *stagedArchival, error) { | 324 » ls *logdog.LoadStreamResponse, uid string) (*stagedArchival, error) { |
| 308 sa := stagedArchival{ | 325 sa := stagedArchival{ |
| 309 Archivist: a, | 326 Archivist: a, |
| 327 project: project, | |
| 310 path: path, | 328 path: path, |
| 311 | 329 |
| 312 terminalIndex: ls.State.TerminalIndex, | 330 terminalIndex: ls.State.TerminalIndex, |
| 313 } | 331 } |
| 314 | 332 |
| 315 // Deserialize and validate the descriptor protobuf. If this fails, it i s a | 333 // Deserialize and validate the descriptor protobuf. If this fails, it i s a |
| 316 // non-transient error. | 334 // non-transient error. |
| 317 if err := proto.Unmarshal(ls.Desc, &sa.desc); err != nil { | 335 if err := proto.Unmarshal(ls.Desc, &sa.desc); err != nil { |
| 318 log.Fields{ | 336 log.Fields{ |
| 319 log.ErrorKey: err, | 337 log.ErrorKey: err, |
| 320 "protoVersion": ls.State.ProtoVersion, | 338 "protoVersion": ls.State.ProtoVersion, |
| 321 }.Errorf(c, "Failed to unmarshal descriptor protobuf.") | 339 }.Errorf(c, "Failed to unmarshal descriptor protobuf.") |
| 322 return nil, err | 340 return nil, err |
| 323 } | 341 } |
| 324 | 342 |
| 325 bext := sa.desc.BinaryFileExt | 343 bext := sa.desc.BinaryFileExt |
| 326 if bext == "" { | 344 if bext == "" { |
| 327 bext = "bin" | 345 bext = "bin" |
| 328 } | 346 } |
| 329 | 347 |
| 330 // Construct our staged archival paths. | 348 // Construct our staged archival paths. |
| 331 » sa.stream = a.makeStagingPaths(path, "logstream.entries", uid) | 349 » sa.stream = a.makeStagingPaths(project, path, "logstream.entries", uid) |
| 332 » sa.index = a.makeStagingPaths(path, "logstream.index", uid) | 350 » sa.index = a.makeStagingPaths(project, path, "logstream.index", uid) |
| 333 » sa.data = a.makeStagingPaths(path, fmt.Sprintf("data.%s", bext), uid) | 351 » sa.data = a.makeStagingPaths(project, path, fmt.Sprintf("data.%s", bext) , uid) |
| 334 return &sa, nil | 352 return &sa, nil |
| 335 } | 353 } |
| 336 | 354 |
| 337 // makeStagingPaths returns a stagingPaths instance for the given path and | 355 // makeStagingPaths returns a stagingPaths instance for the given path and |
| 338 // file name. It incorporates a unique ID into the staging name to differentiate | 356 // file name. It incorporates a unique ID into the staging name to differentiate |
| 339 // it from other staging paths for the same path/name. | 357 // it from other staging paths for the same path/name. |
| 340 func (a *Archivist) makeStagingPaths(path types.StreamPath, name, uid string) st agingPaths { | 358 func (a *Archivist) makeStagingPaths(project config.ProjectName, path types.Stre amPath, name, uid string) stagingPaths { |
| 359 » // TODO(dnj): This won't be necessary when empty project is invalid. | |
| 360 » if project == "" { | |
| 361 » » project = "_" | |
| 362 » } | |
|
Ryan Tseng
2016/04/28 19:57:34
elif project == "_", freak out? Or is that allowe
dnj
2016/04/30 03:03:13
"_" is not a valid project name character :D
| |
| 363 | |
| 341 return stagingPaths{ | 364 return stagingPaths{ |
| 342 » » staged: a.GSStagingBase.Concat(string(path), uid, name), | 365 » » staged: a.GSStagingBase.Concat(string(project), string(path), ui d, name), |
| 343 » » final: a.GSBase.Concat(string(path), name), | 366 » » final: a.GSBase.Concat(string(project), string(path), name), |
| 344 } | 367 } |
| 345 } | 368 } |
| 346 | 369 |
| 347 type stagedArchival struct { | 370 type stagedArchival struct { |
| 348 *Archivist | 371 *Archivist |
| 349 | 372 |
| 350 » path types.StreamPath | 373 » project config.ProjectName |
| 351 » desc logpb.LogStreamDescriptor | 374 » path types.StreamPath |
| 375 » desc logpb.LogStreamDescriptor | |
| 352 | 376 |
| 353 stream stagingPaths | 377 stream stagingPaths |
| 354 index stagingPaths | 378 index stagingPaths |
| 355 data stagingPaths | 379 data stagingPaths |
| 356 | 380 |
| 357 finalized bool | 381 finalized bool |
| 358 terminalIndex int64 | 382 terminalIndex int64 |
| 359 logEntryCount int64 | 383 logEntryCount int64 |
| 360 } | 384 } |
| 361 | 385 |
| (...skipping 65 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 427 | 451 |
| 428 if dataWriter, err = createWriter(sa.data.staged); err != nil { | 452 if dataWriter, err = createWriter(sa.data.staged); err != nil { |
| 429 return err | 453 return err |
| 430 } | 454 } |
| 431 defer closeWriter(dataWriter, sa.data.staged) | 455 defer closeWriter(dataWriter, sa.data.staged) |
| 432 | 456 |
| 433 // Read our log entries from intermediate storage. | 457 // Read our log entries from intermediate storage. |
| 434 ss := storageSource{ | 458 ss := storageSource{ |
| 435 Context: c, | 459 Context: c, |
| 436 st: sa.Storage, | 460 st: sa.Storage, |
| 461 project: sa.project, | |
| 437 path: sa.path, | 462 path: sa.path, |
| 438 terminalIndex: types.MessageIndex(sa.terminalIndex), | 463 terminalIndex: types.MessageIndex(sa.terminalIndex), |
| 439 lastIndex: -1, | 464 lastIndex: -1, |
| 440 } | 465 } |
| 441 | 466 |
| 442 m := archive.Manifest{ | 467 m := archive.Manifest{ |
| 443 Desc: &sa.desc, | 468 Desc: &sa.desc, |
| 444 Source: &ss, | 469 Source: &ss, |
| 445 LogWriter: streamWriter, | 470 LogWriter: streamWriter, |
| 446 IndexWriter: indexWriter, | 471 IndexWriter: indexWriter, |
| (...skipping 99 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 546 } | 571 } |
| 547 } | 572 } |
| 548 | 573 |
| 549 func (sa *stagedArchival) getStagingPaths() []*stagingPaths { | 574 func (sa *stagedArchival) getStagingPaths() []*stagingPaths { |
| 550 return []*stagingPaths{ | 575 return []*stagingPaths{ |
| 551 &sa.stream, | 576 &sa.stream, |
| 552 &sa.index, | 577 &sa.index, |
| 553 &sa.data, | 578 &sa.data, |
| 554 } | 579 } |
| 555 } | 580 } |
| OLD | NEW |