| OLD | NEW |
| 1 // Copyright 2015 The LUCI Authors. All rights reserved. | 1 // Copyright 2015 The LUCI Authors. All rights reserved. |
| 2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
| 3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
| 4 | 4 |
| 5 package coordinator | 5 package coordinator |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "errors" | 8 "errors" |
| 9 "fmt" | 9 "fmt" |
| 10 "time" | 10 "time" |
| (...skipping 49 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 60 // Path is the path of the log stream. | 60 // Path is the path of the log stream. |
| 61 Path types.StreamPath | 61 Path types.StreamPath |
| 62 | 62 |
| 63 // Desc is the log stream's descriptor. | 63 // Desc is the log stream's descriptor. |
| 64 Desc logpb.LogStreamDescriptor | 64 Desc logpb.LogStreamDescriptor |
| 65 | 65 |
| 66 // State is the stream's current state. | 66 // State is the stream's current state. |
| 67 State StreamState | 67 State StreamState |
| 68 } | 68 } |
| 69 | 69 |
| 70 func loadLogStream(proj string, path types.StreamPath, s *logdog.LogStreamState,
d *logpb.LogStreamDescriptor) ( | 70 func loadLogStream(proj string, path types.StreamPath, s *logdog.LogStreamState,
d *logpb.LogStreamDescriptor) *LogStream { |
| 71 » *LogStream, error) { | |
| 72 » switch { | |
| 73 » case s == nil: | |
| 74 » » return nil, errors.New("missing required log state") | |
| 75 » case d == nil: | |
| 76 » » return nil, errors.New("missing required descriptor") | |
| 77 » } | |
| 78 | |
| 79 ls := LogStream{ | 71 ls := LogStream{ |
| 80 Project: cfgtypes.ProjectName(proj), | 72 Project: cfgtypes.ProjectName(proj), |
| 81 Path: path, | 73 Path: path, |
| 82 » » Desc: *d, | 74 » } |
| 83 » » State: StreamState{ | 75 » if d != nil { |
| 76 » » ls.Desc = *d |
| 77 » } |
| 78 » if s != nil { |
| 79 » » ls.State = StreamState{ |
| 84 Created: google.TimeFromProto(s.Created), | 80 Created: google.TimeFromProto(s.Created), |
| 85 TerminalIndex: types.MessageIndex(s.TerminalIndex), | 81 TerminalIndex: types.MessageIndex(s.TerminalIndex), |
| 86 Purged: s.Purged, | 82 Purged: s.Purged, |
| 87 » » }, | 83 » » } |
| 84 |
| 85 » » if a := s.Archive; a != nil { |
| 86 » » » ls.State.Archived = true |
| 87 » » » ls.State.ArchiveIndexURL = a.IndexUrl |
| 88 » » » ls.State.ArchiveStreamURL = a.StreamUrl |
| 89 » » » ls.State.ArchiveDataURL = a.DataUrl |
| 90 » » } |
| 88 } | 91 } |
| 89 » if a := s.Archive; a != nil { | 92 » return &ls |
| 90 » » ls.State.Archived = true | |
| 91 » » ls.State.ArchiveIndexURL = a.IndexUrl | |
| 92 » » ls.State.ArchiveStreamURL = a.StreamUrl | |
| 93 » » ls.State.ArchiveDataURL = a.DataUrl | |
| 94 » } | |
| 95 » return &ls, nil | |
| 96 } | 93 } |
| 97 | 94 |
| 98 // Stream is an interface to Coordinator stream-level commands. It is bound to | 95 // Stream is an interface to Coordinator stream-level commands. It is bound to |
| 99 // and operates on a single log stream path. | 96 // and operates on a single log stream path. |
| 100 type Stream struct { | 97 type Stream struct { |
| 101 // c is the Coordinator instance that this Stream is bound to. | 98 // c is the Coordinator instance that this Stream is bound to. |
| 102 c *Client | 99 c *Client |
| 103 | 100 |
| 104 // project is this stream's project. | 101 // project is this stream's project. |
| 105 project cfgtypes.ProjectName | 102 project cfgtypes.ProjectName |
| (...skipping 13 matching lines...) Expand all Loading... |
| 119 resp, err := s.c.C.Get(ctx, &req) | 116 resp, err := s.c.C.Get(ctx, &req) |
| 120 if err != nil { | 117 if err != nil { |
| 121 return nil, normalizeError(err) | 118 return nil, normalizeError(err) |
| 122 } | 119 } |
| 123 | 120 |
| 124 path := types.StreamPath(req.Path) | 121 path := types.StreamPath(req.Path) |
| 125 if desc := resp.Desc; desc != nil { | 122 if desc := resp.Desc; desc != nil { |
| 126 path = desc.Path() | 123 path = desc.Path() |
| 127 } | 124 } |
| 128 | 125 |
| 129 » st, err := loadLogStream(resp.Project, path, resp.State, resp.Desc) | 126 » return loadLogStream(resp.Project, path, resp.State, resp.Desc), nil |
| 130 » if err != nil { | |
| 131 » » return nil, fmt.Errorf("failed to load stream state: %v", err) | |
| 132 » } | |
| 133 » return st, nil | |
| 134 } | 127 } |
| 135 | 128 |
| 136 // Get retrieves log stream entries from the Coordinator. The supplied | 129 // Get retrieves log stream entries from the Coordinator. The supplied |
| 137 // parameters shape which entries are requested and what information is | 130 // parameters shape which entries are requested and what information is |
| 138 // returned. | 131 // returned. |
| 139 func (s *Stream) Get(ctx context.Context, params ...GetParam) ([]*logpb.LogEntry
, error) { | 132 func (s *Stream) Get(ctx context.Context, params ...GetParam) ([]*logpb.LogEntry
, error) { |
| 140 p := getParamsInst{ | 133 p := getParamsInst{ |
| 141 r: logdog.GetRequest{ | 134 r: logdog.GetRequest{ |
| 142 Project: string(s.project), | 135 Project: string(s.project), |
| 143 Path: string(s.path), | 136 Path: string(s.path), |
| (...skipping 169 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 313 if stateP == nil { | 306 if stateP == nil { |
| 314 return nil | 307 return nil |
| 315 } | 308 } |
| 316 | 309 |
| 317 // The service should always return this when requested, but handle the
case | 310 // The service should always return this when requested, but handle the
case |
| 318 // where it doesn't for completeness. | 311 // where it doesn't for completeness. |
| 319 if resp.Desc == nil { | 312 if resp.Desc == nil { |
| 320 return errors.New("descriptor was not returned") | 313 return errors.New("descriptor was not returned") |
| 321 } | 314 } |
| 322 | 315 |
| 323 » ls, err := loadLogStream(resp.Project, resp.Desc.Path(), resp.State, res
p.Desc) | 316 » ls := loadLogStream(resp.Project, resp.Desc.Path(), resp.State, resp.Des
c) |
| 324 » if err != nil { | |
| 325 » » return fmt.Errorf("failed to load stream state: %v", err) | |
| 326 » } | |
| 327 | |
| 328 *stateP = *ls | 317 *stateP = *ls |
| 329 return nil | 318 return nil |
| 330 } | 319 } |
| OLD | NEW |