Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(831)

Side by Side Diff: server/internal/logdog/archivist/archivist.go

Issue 1909053003: LogDog: Add project namespacing to Archivist. (Closed) Base URL: https://github.com/luci/luci-go@logdog-project-coordinator-logs
Patch Set: Rebase? Created 4 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « no previous file | server/internal/logdog/archivist/archivist_test.go » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2016 The Chromium Authors. All rights reserved. 1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 package archivist 5 package archivist
6 6
7 import ( 7 import (
8 "bytes" 8 "bytes"
9 "encoding/hex" 9 "encoding/hex"
10 "fmt" 10 "fmt"
11 "io" 11 "io"
12 12
13 "github.com/golang/protobuf/proto" 13 "github.com/golang/protobuf/proto"
14 "github.com/luci/luci-go/common/api/logdog_coordinator/services/v1" 14 "github.com/luci/luci-go/common/api/logdog_coordinator/services/v1"
15 "github.com/luci/luci-go/common/config"
15 "github.com/luci/luci-go/common/errors" 16 "github.com/luci/luci-go/common/errors"
16 "github.com/luci/luci-go/common/gcloud/gs" 17 "github.com/luci/luci-go/common/gcloud/gs"
17 "github.com/luci/luci-go/common/logdog/types" 18 "github.com/luci/luci-go/common/logdog/types"
18 log "github.com/luci/luci-go/common/logging" 19 log "github.com/luci/luci-go/common/logging"
19 "github.com/luci/luci-go/common/parallel" 20 "github.com/luci/luci-go/common/parallel"
20 "github.com/luci/luci-go/common/proto/logdog/logpb" 21 "github.com/luci/luci-go/common/proto/logdog/logpb"
21 "github.com/luci/luci-go/server/logdog/archive" 22 "github.com/luci/luci-go/server/logdog/archive"
22 "github.com/luci/luci-go/server/logdog/storage" 23 "github.com/luci/luci-go/server/logdog/storage"
23 "golang.org/x/net/context" 24 "golang.org/x/net/context"
24 ) 25 )
(...skipping 54 matching lines...) Expand 10 before | Expand all | Expand 10 after
79 // delete the task). The return value of true should only be used if the task 80 // delete the task). The return value of true should only be used if the task
80 // is truly complete and acknowledged by the Coordinator. 81 // is truly complete and acknowledged by the Coordinator.
81 // 82 //
82 // If the supplied Context is Done, operation may terminate before completion, 83 // If the supplied Context is Done, operation may terminate before completion,
83 // returning the Context's error. 84 // returning the Context's error.
84 func (a *Archivist) ArchiveTask(c context.Context, task Task) bool { 85 func (a *Archivist) ArchiveTask(c context.Context, task Task) bool {
85 delete, err := a.archiveTaskImpl(c, task) 86 delete, err := a.archiveTaskImpl(c, task)
86 log.Fields{ 87 log.Fields{
87 log.ErrorKey: err, 88 log.ErrorKey: err,
88 "delete": delete, 89 "delete": delete,
90 "project": task.Task().Project,
89 "path": task.Task().Path, 91 "path": task.Task().Path,
90 }.Infof(c, "Finished archive task.") 92 }.Infof(c, "Finished archive task.")
91 return delete 93 return delete
92 } 94 }
93 95
94 // archiveTaskImpl returns the same boolean value as ArchiveTask, but includes 96 // archiveTaskImpl returns the same boolean value as ArchiveTask, but includes
95 // an error. The error is useful for testing to assert that certain conditions 97 // an error. The error is useful for testing to assert that certain conditions
96 // were hit. 98 // were hit.
97 func (a *Archivist) archiveTaskImpl(c context.Context, task Task) (bool, error) { 99 func (a *Archivist) archiveTaskImpl(c context.Context, task Task) (bool, error) {
98 at := task.Task() 100 at := task.Task()
99 log.Fields{ 101 log.Fields{
100 » » "path": at.Path, 102 » » "project": at.Project,
103 » » "path": at.Path,
101 }.Debugf(c, "Received archival task.") 104 }.Debugf(c, "Received archival task.")
102 105
106 if err := types.StreamPath(at.Path).Validate(); err != nil {
107 return true, fmt.Errorf("invalid path %q: %s", at.Path, err)
108 }
109
110 // TODO(dnj): Remove empty project exemption, make empty project invalid .
111 if at.Project != "" {
112 if err := config.ProjectName(at.Project).Validate(); err != nil {
113 return true, fmt.Errorf("invalid project name %q: %s", a t.Project, err)
114 }
115 }
116
103 // Load the log stream's current state. If it is already archived, we wi ll 117 // Load the log stream's current state. If it is already archived, we wi ll
104 // return an immediate success. 118 // return an immediate success.
105 ls, err := a.Service.LoadStream(c, &logdog.LoadStreamRequest{ 119 ls, err := a.Service.LoadStream(c, &logdog.LoadStreamRequest{
106 » » Path: at.Path, 120 » » Project: at.Project,
107 » » Desc: true, 121 » » Path: at.Path,
122 » » Desc: true,
108 }) 123 })
109 switch { 124 switch {
110 case err != nil: 125 case err != nil:
111 log.WithError(err).Errorf(c, "Failed to load log stream.") 126 log.WithError(err).Errorf(c, "Failed to load log stream.")
112 return false, err 127 return false, err
113 128
114 case ls.State == nil: 129 case ls.State == nil:
115 log.Errorf(c, "Log stream did not include state.") 130 log.Errorf(c, "Log stream did not include state.")
116 return false, errors.New("log stream did not include state") 131 return false, errors.New("log stream did not include state")
117 132
(...skipping 58 matching lines...) Expand 10 before | Expand all | Expand 10 after
176 tidx := ls.State.TerminalIndex 191 tidx := ls.State.TerminalIndex
177 192
178 if tidx < 0 { 193 if tidx < 0 {
179 log.Warningf(c, "Cannot archive complete stream with no terminal index.") 194 log.Warningf(c, "Cannot archive complete stream with no terminal index.")
180 return false, errors.New("completeness required, but str eam has no terminal index") 195 return false, errors.New("completeness required, but str eam has no terminal index")
181 } 196 }
182 197
183 // If we're requiring completeness, perform a keys-only scan of intermediate 198 // If we're requiring completeness, perform a keys-only scan of intermediate
184 // storage to ensure that we have all of the records before we b other 199 // storage to ensure that we have all of the records before we b other
185 // streaming to storage only to find that we are missing data. 200 // streaming to storage only to find that we are missing data.
186 » » if err := a.checkComplete(types.StreamPath(at.Path), types.Messa geIndex(tidx)); err != nil { 201 » » if err := a.checkComplete(config.ProjectName(at.Project), types. StreamPath(at.Path), types.MessageIndex(tidx)); err != nil {
187 return false, err 202 return false, err
188 } 203 }
189 } 204 }
190 205
191 ar := logdog.ArchiveStreamRequest{ 206 ar := logdog.ArchiveStreamRequest{
192 » » Path: at.Path, 207 » » Project: at.Project,
208 » » Path: at.Path,
193 } 209 }
194 210
195 // Archive to staging. 211 // Archive to staging.
196 // 212 //
197 // If a non-transient failure occurs here, we will report it to the Arch ivist 213 // If a non-transient failure occurs here, we will report it to the Arch ivist
198 // under the assumption that it will continue occurring. 214 // under the assumption that it will continue occurring.
199 // 215 //
200 // We will handle error creating the plan and executing the plan in the same 216 // We will handle error creating the plan and executing the plan in the same
201 // switch statement below. 217 // switch statement below.
202 » staged, err := a.makeStagedArchival(c, types.StreamPath(at.Path), ls, ta sk.UniqueID()) 218 » staged, err := a.makeStagedArchival(c, config.ProjectName(at.Project), t ypes.StreamPath(at.Path), ls, task.UniqueID())
203 if err != nil { 219 if err != nil {
204 log.WithError(err).Errorf(c, "Failed to create staged archival p lan.") 220 log.WithError(err).Errorf(c, "Failed to create staged archival p lan.")
205 } else { 221 } else {
206 err = staged.stage(c) 222 err = staged.stage(c)
207 } 223 }
208 224
209 switch { 225 switch {
210 case errors.IsTransient(err): 226 case errors.IsTransient(err):
211 // If this is a transient error, exit immediately and do not del ete the 227 // If this is a transient error, exit immediately and do not del ete the
212 // archival task. 228 // archival task.
(...skipping 51 matching lines...) Expand 10 before | Expand all | Expand 10 after
264 return false, err 280 return false, err
265 } 281 }
266 282
267 // Archival is complete and acknowledged by Coordinator. Consume the arc hival 283 // Archival is complete and acknowledged by Coordinator. Consume the arc hival
268 // task. 284 // task.
269 return true, nil 285 return true, nil
270 } 286 }
271 287
272 // checkComplete performs a quick scan of intermediate storage to ensure that 288 // checkComplete performs a quick scan of intermediate storage to ensure that
273 // all of the log stream's records are available. 289 // all of the log stream's records are available.
274 func (a *Archivist) checkComplete(path types.StreamPath, tidx types.MessageIndex ) error { 290 func (a *Archivist) checkComplete(project config.ProjectName, path types.StreamP ath, tidx types.MessageIndex) error {
275 sreq := storage.GetRequest{ 291 sreq := storage.GetRequest{
292 Project: project,
276 Path: path, 293 Path: path,
277 KeysOnly: true, 294 KeysOnly: true,
278 } 295 }
279 296
280 nextIndex := types.MessageIndex(0) 297 nextIndex := types.MessageIndex(0)
281 var ierr error 298 var ierr error
282 err := a.Storage.Get(sreq, func(idx types.MessageIndex, d []byte) bool { 299 err := a.Storage.Get(sreq, func(idx types.MessageIndex, d []byte) bool {
283 switch { 300 switch {
284 case idx != nextIndex: 301 case idx != nextIndex:
285 ierr = fmt.Errorf("missing log entry index %d (next %d)" , nextIndex, idx) 302 ierr = fmt.Errorf("missing log entry index %d (next %d)" , nextIndex, idx)
(...skipping 10 matching lines...) Expand all
296 }) 313 })
297 if ierr != nil { 314 if ierr != nil {
298 return ierr 315 return ierr
299 } 316 }
300 if err != nil { 317 if err != nil {
301 return err 318 return err
302 } 319 }
303 return nil 320 return nil
304 } 321 }
305 322
306 func (a *Archivist) makeStagedArchival(c context.Context, path types.StreamPath, ls *logdog.LoadStreamResponse, uid string) ( 323 func (a *Archivist) makeStagedArchival(c context.Context, project config.Project Name, path types.StreamPath,
307 » *stagedArchival, error) { 324 » ls *logdog.LoadStreamResponse, uid string) (*stagedArchival, error) {
308 sa := stagedArchival{ 325 sa := stagedArchival{
309 Archivist: a, 326 Archivist: a,
327 project: project,
310 path: path, 328 path: path,
311 329
312 terminalIndex: ls.State.TerminalIndex, 330 terminalIndex: ls.State.TerminalIndex,
313 } 331 }
314 332
315 // Deserialize and validate the descriptor protobuf. If this fails, it i s a 333 // Deserialize and validate the descriptor protobuf. If this fails, it i s a
316 // non-transient error. 334 // non-transient error.
317 if err := proto.Unmarshal(ls.Desc, &sa.desc); err != nil { 335 if err := proto.Unmarshal(ls.Desc, &sa.desc); err != nil {
318 log.Fields{ 336 log.Fields{
319 log.ErrorKey: err, 337 log.ErrorKey: err,
320 "protoVersion": ls.State.ProtoVersion, 338 "protoVersion": ls.State.ProtoVersion,
321 }.Errorf(c, "Failed to unmarshal descriptor protobuf.") 339 }.Errorf(c, "Failed to unmarshal descriptor protobuf.")
322 return nil, err 340 return nil, err
323 } 341 }
324 342
325 bext := sa.desc.BinaryFileExt 343 bext := sa.desc.BinaryFileExt
326 if bext == "" { 344 if bext == "" {
327 bext = "bin" 345 bext = "bin"
328 } 346 }
329 347
330 // Construct our staged archival paths. 348 // Construct our staged archival paths.
331 » sa.stream = a.makeStagingPaths(path, "logstream.entries", uid) 349 » sa.stream = a.makeStagingPaths(project, path, "logstream.entries", uid)
332 » sa.index = a.makeStagingPaths(path, "logstream.index", uid) 350 » sa.index = a.makeStagingPaths(project, path, "logstream.index", uid)
333 » sa.data = a.makeStagingPaths(path, fmt.Sprintf("data.%s", bext), uid) 351 » sa.data = a.makeStagingPaths(project, path, fmt.Sprintf("data.%s", bext) , uid)
334 return &sa, nil 352 return &sa, nil
335 } 353 }
336 354
337 // makeStagingPaths returns a stagingPaths instance for the given path and 355 // makeStagingPaths returns a stagingPaths instance for the given path and
338 // file name. It incorporates a unique ID into the staging name to differentiate 356 // file name. It incorporates a unique ID into the staging name to differentiate
339 // it from other staging paths for the same path/name. 357 // it from other staging paths for the same path/name.
340 func (a *Archivist) makeStagingPaths(path types.StreamPath, name, uid string) st agingPaths { 358 func (a *Archivist) makeStagingPaths(project config.ProjectName, path types.Stre amPath, name, uid string) stagingPaths {
359 » // TODO(dnj): This won't be necessary when empty project is invalid.
360 » if project == "" {
361 » » project = "_"
362 » }
363
341 return stagingPaths{ 364 return stagingPaths{
342 » » staged: a.GSStagingBase.Concat(string(path), uid, name), 365 » » staged: a.GSStagingBase.Concat(string(project), string(path), ui d, name),
343 » » final: a.GSBase.Concat(string(path), name), 366 » » final: a.GSBase.Concat(string(project), string(path), name),
344 } 367 }
345 } 368 }
346 369
347 type stagedArchival struct { 370 type stagedArchival struct {
348 *Archivist 371 *Archivist
349 372
350 » path types.StreamPath 373 » project config.ProjectName
351 » desc logpb.LogStreamDescriptor 374 » path types.StreamPath
375 » desc logpb.LogStreamDescriptor
352 376
353 stream stagingPaths 377 stream stagingPaths
354 index stagingPaths 378 index stagingPaths
355 data stagingPaths 379 data stagingPaths
356 380
357 finalized bool 381 finalized bool
358 terminalIndex int64 382 terminalIndex int64
359 logEntryCount int64 383 logEntryCount int64
360 } 384 }
361 385
(...skipping 65 matching lines...) Expand 10 before | Expand all | Expand 10 after
427 451
428 if dataWriter, err = createWriter(sa.data.staged); err != nil { 452 if dataWriter, err = createWriter(sa.data.staged); err != nil {
429 return err 453 return err
430 } 454 }
431 defer closeWriter(dataWriter, sa.data.staged) 455 defer closeWriter(dataWriter, sa.data.staged)
432 456
433 // Read our log entries from intermediate storage. 457 // Read our log entries from intermediate storage.
434 ss := storageSource{ 458 ss := storageSource{
435 Context: c, 459 Context: c,
436 st: sa.Storage, 460 st: sa.Storage,
461 project: sa.project,
437 path: sa.path, 462 path: sa.path,
438 terminalIndex: types.MessageIndex(sa.terminalIndex), 463 terminalIndex: types.MessageIndex(sa.terminalIndex),
439 lastIndex: -1, 464 lastIndex: -1,
440 } 465 }
441 466
442 m := archive.Manifest{ 467 m := archive.Manifest{
443 Desc: &sa.desc, 468 Desc: &sa.desc,
444 Source: &ss, 469 Source: &ss,
445 LogWriter: streamWriter, 470 LogWriter: streamWriter,
446 IndexWriter: indexWriter, 471 IndexWriter: indexWriter,
(...skipping 99 matching lines...) Expand 10 before | Expand all | Expand 10 after
546 } 571 }
547 } 572 }
548 573
549 func (sa *stagedArchival) getStagingPaths() []*stagingPaths { 574 func (sa *stagedArchival) getStagingPaths() []*stagingPaths {
550 return []*stagingPaths{ 575 return []*stagingPaths{
551 &sa.stream, 576 &sa.stream,
552 &sa.index, 577 &sa.index,
553 &sa.data, 578 &sa.data,
554 } 579 }
555 } 580 }
OLDNEW
« no previous file with comments | « no previous file | server/internal/logdog/archivist/archivist_test.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698