| OLD | NEW |
| 1 // Copyright 2015 The LUCI Authors. All rights reserved. | 1 // Copyright 2015 The LUCI Authors. All rights reserved. |
| 2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
| 3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
| 4 | 4 |
| 5 package coordinator | 5 package coordinator |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "errors" | 8 "errors" |
| 9 "fmt" | 9 "fmt" |
| 10 "time" | 10 "time" |
| (...skipping 41 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 52 } | 52 } |
| 53 | 53 |
| 54 // LogStream is returned metadata about a log stream. | 54 // LogStream is returned metadata about a log stream. |
| 55 type LogStream struct { | 55 type LogStream struct { |
| 56 // Project is the log stream's project. | 56 // Project is the log stream's project. |
| 57 Project config.ProjectName | 57 Project config.ProjectName |
| 58 // Path is the path of the log stream. | 58 // Path is the path of the log stream. |
| 59 Path types.StreamPath | 59 Path types.StreamPath |
| 60 | 60 |
| 61 // Desc is the log stream's descriptor. | 61 // Desc is the log stream's descriptor. |
| 62 » Desc *logpb.LogStreamDescriptor | 62 » Desc logpb.LogStreamDescriptor |
| 63 | 63 |
| 64 // State is the stream's current state. | 64 // State is the stream's current state. |
| 65 » State *StreamState | 65 » State StreamState |
| 66 } | 66 } |
| 67 | 67 |
| 68 func loadLogStream(proj string, path types.StreamPath, s *logdog.LogStreamState,
d *logpb.LogStreamDescriptor) *LogStream { | 68 func loadLogStream(proj string, path types.StreamPath, s *logdog.LogStreamState,
d *logpb.LogStreamDescriptor) ( |
| 69 » *LogStream, error) { |
| 70 » switch { |
| 71 » case s == nil: |
| 72 » » return nil, errors.New("missing required log state") |
| 73 » case d == nil: |
| 74 » » return nil, errors.New("missing required descriptor") |
| 75 » } |
| 76 |
| 69 ls := LogStream{ | 77 ls := LogStream{ |
| 70 Project: config.ProjectName(proj), | 78 Project: config.ProjectName(proj), |
| 71 Path: path, | 79 Path: path, |
| 72 » » Desc: d, | 80 » » Desc: *d, |
| 73 » } | 81 » » State: StreamState{ |
| 74 » if s != nil { | |
| 75 » » st := StreamState{ | |
| 76 Created: s.Created.Time(), | 82 Created: s.Created.Time(), |
| 77 TerminalIndex: types.MessageIndex(s.TerminalIndex), | 83 TerminalIndex: types.MessageIndex(s.TerminalIndex), |
| 78 Purged: s.Purged, | 84 Purged: s.Purged, |
| 79 » » } | 85 » » }, |
| 80 » » if a := s.Archive; a != nil { | |
| 81 » » » st.Archived = true | |
| 82 » » » st.ArchiveIndexURL = a.IndexUrl | |
| 83 » » » st.ArchiveStreamURL = a.StreamUrl | |
| 84 » » » st.ArchiveDataURL = a.DataUrl | |
| 85 » » } | |
| 86 | |
| 87 » » ls.State = &st | |
| 88 } | 86 } |
| 89 » return &ls | 87 » if a := s.Archive; a != nil { |
| 88 » » ls.State.Archived = true |
| 89 » » ls.State.ArchiveIndexURL = a.IndexUrl |
| 90 » » ls.State.ArchiveStreamURL = a.StreamUrl |
| 91 » » ls.State.ArchiveDataURL = a.DataUrl |
| 92 » } |
| 93 » return &ls, nil |
| 90 } | 94 } |
| 91 | 95 |
| 92 // Stream is an interface to Coordinator stream-level commands. It is bound to | 96 // Stream is an interface to Coordinator stream-level commands. It is bound to |
| 93 // and operates on a single log stream path. | 97 // and operates on a single log stream path. |
| 94 type Stream struct { | 98 type Stream struct { |
| 95 // c is the Coordinator instance that this Stream is bound to. | 99 // c is the Coordinator instance that this Stream is bound to. |
| 96 c *Client | 100 c *Client |
| 97 | 101 |
| 98 // project is this stream's project. | 102 // project is this stream's project. |
| 99 project config.ProjectName | 103 project config.ProjectName |
| (...skipping 12 matching lines...) Expand all Loading... |
| 112 | 116 |
| 113 resp, err := s.c.C.Get(ctx, &req) | 117 resp, err := s.c.C.Get(ctx, &req) |
| 114 if err != nil { | 118 if err != nil { |
| 115 return nil, normalizeError(err) | 119 return nil, normalizeError(err) |
| 116 } | 120 } |
| 117 | 121 |
| 118 path := types.StreamPath(req.Path) | 122 path := types.StreamPath(req.Path) |
| 119 if desc := resp.Desc; desc != nil { | 123 if desc := resp.Desc; desc != nil { |
| 120 path = desc.Path() | 124 path = desc.Path() |
| 121 } | 125 } |
| 122 » return loadLogStream(resp.Project, path, resp.State, resp.Desc), nil | 126 |
| 127 » st, err := loadLogStream(resp.Project, path, resp.State, resp.Desc) |
| 128 » if err != nil { |
| 129 » » return nil, fmt.Errorf("failed to load stream state: %v", err) |
| 130 » } |
| 131 » return st, nil |
| 123 } | 132 } |
| 124 | 133 |
| 125 // Get retrieves log stream entries from the Coordinator. The supplied | 134 // Get retrieves log stream entries from the Coordinator. The supplied |
| 126 // parameters shape which entries are requested and what information is | 135 // parameters shape which entries are requested and what information is |
| 127 // returned. | 136 // returned. |
| 128 func (s *Stream) Get(ctx context.Context, p *StreamGetParams) ([]*logpb.LogEntry
, error) { | 137 func (s *Stream) Get(ctx context.Context, params ...GetParam) ([]*logpb.LogEntry
, error) { |
| 129 » if p == nil { | 138 » p := getParamsInst{ |
| 130 » » p = &StreamGetParams{} | 139 » » r: logdog.GetRequest{ |
| 140 » » » Project: string(s.project), |
| 141 » » » Path: string(s.path), |
| 142 » » }, |
| 143 » } |
| 144 » for _, param := range params { |
| 145 » » param.applyGet(&p) |
| 131 } | 146 } |
| 132 | 147 |
| 133 req := p.r | |
| 134 req.Project = string(s.project) | |
| 135 req.Path = string(s.path) | |
| 136 if p.stateP != nil { | 148 if p.stateP != nil { |
| 137 » » req.State = true | 149 » » p.r.State = true |
| 138 } | 150 } |
| 139 | 151 |
| 140 » resp, err := s.c.C.Get(ctx, &req) | 152 » resp, err := s.c.C.Get(ctx, &p.r) |
| 141 if err != nil { | 153 if err != nil { |
| 142 return nil, normalizeError(err) | 154 return nil, normalizeError(err) |
| 143 } | 155 } |
| 144 if err := loadStatePointer(p.stateP, resp); err != nil { | 156 if err := loadStatePointer(p.stateP, resp); err != nil { |
| 145 return nil, err | 157 return nil, err |
| 146 } | 158 } |
| 147 return resp.Logs, nil | 159 return resp.Logs, nil |
| 148 } | 160 } |
| 149 | 161 |
| 150 // Tail performs a tail call, returning the last log entry in the stream. If | 162 // Tail performs a tail call, returning the last log entry in the stream. If |
| 151 // stateP is not nil, the stream's state will be requested and loaded into the | 163 // stateP is not nil, the stream's state will be requested and loaded into the |
| 152 // variable. | 164 // variable. |
| 153 func (s *Stream) Tail(ctx context.Context, stateP *LogStream) (*logpb.LogEntry,
error) { | 165 func (s *Stream) Tail(ctx context.Context, params ...TailParam) (*logpb.LogEntry
, error) { |
| 154 » req := logdog.TailRequest{ | 166 » p := tailParamsInst{ |
| 155 » » Project: string(s.project), | 167 » » r: logdog.TailRequest{ |
| 156 » » Path: string(s.path), | 168 » » » Project: string(s.project), |
| 169 » » » Path: string(s.path), |
| 170 » » }, |
| 157 } | 171 } |
| 158 » if stateP != nil { | 172 » for _, param := range params { |
| 159 » » req.State = true | 173 » » param.applyTail(&p) |
| 160 } | 174 } |
| 161 | 175 |
| 162 » resp, err := s.c.C.Tail(ctx, &req) | 176 » resp, err := s.c.C.Tail(ctx, &p.r) |
| 163 if err != nil { | 177 if err != nil { |
| 164 return nil, normalizeError(err) | 178 return nil, normalizeError(err) |
| 165 } | 179 } |
| 166 » if err := loadStatePointer(stateP, resp); err != nil { | 180 » if err := loadStatePointer(p.stateP, resp); err != nil { |
| 167 return nil, err | 181 return nil, err |
| 168 } | 182 } |
| 169 | 183 |
| 170 switch len(resp.Logs) { | 184 switch len(resp.Logs) { |
| 171 case 0: | 185 case 0: |
| 172 return nil, nil | 186 return nil, nil |
| 173 | 187 |
| 174 case 1: | 188 case 1: |
| 175 » » return resp.Logs[0], nil | 189 » » le := resp.Logs[0] |
| 190 » » if p.complete { |
| 191 » » » if dg := le.GetDatagram(); dg != nil && dg.Partial != ni
l { |
| 192 » » » » // This is a partial; datagram. Fetch and assemb
le the full datagram. |
| 193 » » » » return s.fetchFullDatagram(ctx, le, true) |
| 194 » » » } |
| 195 » » } |
| 196 » » return le, nil |
| 176 | 197 |
| 177 default: | 198 default: |
| 178 return nil, fmt.Errorf("tail call returned %d logs", len(resp.Lo
gs)) | 199 return nil, fmt.Errorf("tail call returned %d logs", len(resp.Lo
gs)) |
| 179 } | 200 } |
| 180 } | 201 } |
| 181 | 202 |
| 203 func (s *Stream) fetchFullDatagram(ctx context.Context, le *logpb.LogEntry, fetc
hIfMid bool) (*logpb.LogEntry, error) { |
| 204 // Re-evaluate our partial state. |
| 205 dg := le.GetDatagram() |
| 206 if dg == nil { |
| 207 return nil, fmt.Errorf("entry is not a datagram") |
| 208 } |
| 209 |
| 210 p := dg.Partial |
| 211 if p == nil { |
| 212 // Not partial, return the full message. |
| 213 return le, nil |
| 214 } |
| 215 |
| 216 if uint64(p.Index) > le.StreamIndex { |
| 217 // Something is wrong. The datagram identifies itself as an inde
x in the |
| 218 // stream that exceeds the actual number of entries in the strea
m. |
| 219 return nil, fmt.Errorf("malformed partial datagram; index (%d) >
stream index (%d)", |
| 220 p.Index, le.StreamIndex) |
| 221 } |
| 222 |
| 223 if !p.Last { |
| 224 // This is the last log entry (b/c we Tail'd), but it is part of
a larger |
| 225 // datagram. We can't fetch the full datagram since presumably t
he remainder |
| 226 // doesn't exist. Therefore, fetch the previous datagram. |
| 227 switch { |
| 228 case !fetchIfMid: |
| 229 return nil, fmt.Errorf("mid-fragment partial datagram no
t allowed") |
| 230 |
| 231 case uint64(p.Index) == le.StreamIndex: |
| 232 // If we equal the stream index, then we are the first d
atagram in the |
| 233 // stream, so return nil. |
| 234 return nil, nil |
| 235 |
| 236 default: |
| 237 // Perform a Get on the previous entry in the stream. |
| 238 prevIdx := le.StreamIndex - uint64(p.Index) - 1 |
| 239 logs, err := s.Get(ctx, Index(types.MessageIndex(prevIdx
)), LimitCount(1)) |
| 240 if err != nil { |
| 241 return nil, fmt.Errorf("failed to get previous d
atagram (%d): %s", prevIdx, err) |
| 242 } |
| 243 |
| 244 if len(logs) != 1 || logs[0].StreamIndex != prevIdx { |
| 245 return nil, fmt.Errorf("previous datagram (%d) n
ot returned", prevIdx) |
| 246 } |
| 247 if le, err = s.fetchFullDatagram(ctx, logs[0], false); e
rr != nil { |
| 248 return nil, fmt.Errorf("failed to recurse to pre
vious datagram (%d): %s", prevIdx, err) |
| 249 } |
| 250 return le, nil |
| 251 } |
| 252 } |
| 253 |
| 254 // If this is "Last", but it's also index 0, then it is a partial datagr
am |
| 255 // with one entry. Weird ... but whatever. |
| 256 if p.Index == 0 { |
| 257 dg.Partial = nil |
| 258 return le, nil |
| 259 } |
| 260 |
| 261 // Get the intermediate logs. |
| 262 startIdx := types.MessageIndex(le.StreamIndex - uint64(p.Index)) |
| 263 count := int(p.Index) |
| 264 logs, err := s.Get(ctx, Index(startIdx), LimitCount(count)) |
| 265 if err != nil { |
| 266 return nil, fmt.Errorf("failed to get intermediate logs [%d .. %
d]: %s", |
| 267 startIdx, startIdx+types.MessageIndex(count)-1, err) |
| 268 } |
| 269 |
| 270 if len(logs) < count { |
| 271 return nil, fmt.Errorf("incomplete intermediate logs results (%d
< %d)", len(logs), count) |
| 272 } |
| 273 logs = append(logs[:count], le) |
| 274 |
| 275 // Construct the full datagram. |
| 276 aggregate := make([]byte, 0, int(p.Size)) |
| 277 for i, ple := range logs { |
| 278 chunkDg := ple.GetDatagram() |
| 279 if chunkDg == nil { |
| 280 return nil, fmt.Errorf("intermediate datagram #%d is not
a datagram", i) |
| 281 } |
| 282 chunkP := chunkDg.Partial |
| 283 if chunkP == nil { |
| 284 return nil, fmt.Errorf("intermediate datagram #%d is not
partial", i) |
| 285 } |
| 286 if int(chunkP.Index) != i { |
| 287 return nil, fmt.Errorf("intermediate datagram #%d does n
ot have a contiguous index (%d)", i, chunkP.Index) |
| 288 } |
| 289 if chunkP.Size != p.Size { |
| 290 return nil, fmt.Errorf("inconsistent datagram size (%d !
= %d)", chunkP.Size, p.Size) |
| 291 } |
| 292 if uint64(len(aggregate))+uint64(len(chunkDg.Data)) > p.Size { |
| 293 return nil, fmt.Errorf("appending chunk data would excee
d the declared size (%d > %d)", |
| 294 len(aggregate)+len(chunkDg.Data), p.Size) |
| 295 } |
| 296 aggregate = append(aggregate, chunkDg.Data...) |
| 297 } |
| 298 |
| 299 if uint64(len(aggregate)) != p.Size { |
| 300 return nil, fmt.Errorf("reassembled datagram length (%d) differs
from declared length (%d)", len(aggregate), p.Size) |
| 301 } |
| 302 |
| 303 le = logs[0] |
| 304 dg = le.GetDatagram() |
| 305 dg.Data = aggregate |
| 306 dg.Partial = nil |
| 307 return le, nil |
| 308 } |
| 309 |
| 182 func loadStatePointer(stateP *LogStream, resp *logdog.GetResponse) error { | 310 func loadStatePointer(stateP *LogStream, resp *logdog.GetResponse) error { |
| 183 if stateP == nil { | 311 if stateP == nil { |
| 184 return nil | 312 return nil |
| 185 } | 313 } |
| 186 | 314 |
| 187 » if resp.Desc == nil { | 315 » ls, err := loadLogStream(resp.Project, resp.Desc.Path(), resp.State, res
p.Desc) |
| 188 » » return errors.New("Requested descriptor was not returned") | 316 » if err != nil { |
| 317 » » return fmt.Errorf("failde to load stream state: %v", err) |
| 189 } | 318 } |
| 190 » if resp.State == nil { | 319 |
| 191 » » return errors.New("Requested state was not returned") | |
| 192 » } | |
| 193 » ls := loadLogStream(resp.Project, resp.Desc.Path(), resp.State, resp.Des
c) | |
| 194 *stateP = *ls | 320 *stateP = *ls |
| 195 return nil | 321 return nil |
| 196 } | 322 } |
| OLD | NEW |