Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(531)

Side by Side Diff: logdog/client/coordinator/stream.go

Issue 2341113002: Update Coordinator client, add datagram assembly. (Closed)
Patch Set: Comments. Created 4 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « logdog/client/coordinator/query_test.go ('k') | logdog/client/coordinator/stream_params.go » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2015 The LUCI Authors. All rights reserved. 1 // Copyright 2015 The LUCI Authors. All rights reserved.
2 // Use of this source code is governed under the Apache License, Version 2.0 2 // Use of this source code is governed under the Apache License, Version 2.0
3 // that can be found in the LICENSE file. 3 // that can be found in the LICENSE file.
4 4
5 package coordinator 5 package coordinator
6 6
7 import ( 7 import (
8 "errors" 8 "errors"
9 "fmt" 9 "fmt"
10 "time" 10 "time"
(...skipping 41 matching lines...) Expand 10 before | Expand all | Expand 10 after
52 } 52 }
53 53
54 // LogStream is returned metadata about a log stream. 54 // LogStream is returned metadata about a log stream.
55 type LogStream struct { 55 type LogStream struct {
56 // Project is the log stream's project. 56 // Project is the log stream's project.
57 Project config.ProjectName 57 Project config.ProjectName
58 // Path is the path of the log stream. 58 // Path is the path of the log stream.
59 Path types.StreamPath 59 Path types.StreamPath
60 60
61 // Desc is the log stream's descriptor. 61 // Desc is the log stream's descriptor.
62 » Desc *logpb.LogStreamDescriptor 62 » Desc logpb.LogStreamDescriptor
63 63
64 // State is the stream's current state. 64 // State is the stream's current state.
65 » State *StreamState 65 » State StreamState
66 } 66 }
67 67
68 func loadLogStream(proj string, path types.StreamPath, s *logdog.LogStreamState, d *logpb.LogStreamDescriptor) *LogStream { 68 func loadLogStream(proj string, path types.StreamPath, s *logdog.LogStreamState, d *logpb.LogStreamDescriptor) (
69 » *LogStream, error) {
70 » switch {
71 » case s == nil:
72 » » return nil, errors.New("missing required log state")
73 » case d == nil:
74 » » return nil, errors.New("missing required descriptor")
75 » }
76
69 ls := LogStream{ 77 ls := LogStream{
70 Project: config.ProjectName(proj), 78 Project: config.ProjectName(proj),
71 Path: path, 79 Path: path,
72 » » Desc: d, 80 » » Desc: *d,
73 » } 81 » » State: StreamState{
74 » if s != nil {
75 » » st := StreamState{
76 Created: s.Created.Time(), 82 Created: s.Created.Time(),
77 TerminalIndex: types.MessageIndex(s.TerminalIndex), 83 TerminalIndex: types.MessageIndex(s.TerminalIndex),
78 Purged: s.Purged, 84 Purged: s.Purged,
79 » » } 85 » » },
80 » » if a := s.Archive; a != nil {
81 » » » st.Archived = true
82 » » » st.ArchiveIndexURL = a.IndexUrl
83 » » » st.ArchiveStreamURL = a.StreamUrl
84 » » » st.ArchiveDataURL = a.DataUrl
85 » » }
86
87 » » ls.State = &st
88 } 86 }
89 » return &ls 87 » if a := s.Archive; a != nil {
88 » » ls.State.Archived = true
89 » » ls.State.ArchiveIndexURL = a.IndexUrl
90 » » ls.State.ArchiveStreamURL = a.StreamUrl
91 » » ls.State.ArchiveDataURL = a.DataUrl
92 » }
93 » return &ls, nil
90 } 94 }
91 95
92 // Stream is an interface to Coordinator stream-level commands. It is bound to 96 // Stream is an interface to Coordinator stream-level commands. It is bound to
93 // and operates on a single log stream path. 97 // and operates on a single log stream path.
94 type Stream struct { 98 type Stream struct {
95 // c is the Coordinator instance that this Stream is bound to. 99 // c is the Coordinator instance that this Stream is bound to.
96 c *Client 100 c *Client
97 101
98 // project is this stream's project. 102 // project is this stream's project.
99 project config.ProjectName 103 project config.ProjectName
(...skipping 12 matching lines...) Expand all
112 116
113 resp, err := s.c.C.Get(ctx, &req) 117 resp, err := s.c.C.Get(ctx, &req)
114 if err != nil { 118 if err != nil {
115 return nil, normalizeError(err) 119 return nil, normalizeError(err)
116 } 120 }
117 121
118 path := types.StreamPath(req.Path) 122 path := types.StreamPath(req.Path)
119 if desc := resp.Desc; desc != nil { 123 if desc := resp.Desc; desc != nil {
120 path = desc.Path() 124 path = desc.Path()
121 } 125 }
122 » return loadLogStream(resp.Project, path, resp.State, resp.Desc), nil 126
127 » st, err := loadLogStream(resp.Project, path, resp.State, resp.Desc)
128 » if err != nil {
129 » » return nil, fmt.Errorf("failed to load stream state: %v", err)
130 » }
131 » return st, nil
123 } 132 }
124 133
125 // Get retrieves log stream entries from the Coordinator. The supplied 134 // Get retrieves log stream entries from the Coordinator. The supplied
126 // parameters shape which entries are requested and what information is 135 // parameters shape which entries are requested and what information is
127 // returned. 136 // returned.
128 func (s *Stream) Get(ctx context.Context, p *StreamGetParams) ([]*logpb.LogEntry , error) { 137 func (s *Stream) Get(ctx context.Context, params ...GetParam) ([]*logpb.LogEntry , error) {
129 » if p == nil { 138 » p := getParamsInst{
130 » » p = &StreamGetParams{} 139 » » r: logdog.GetRequest{
140 » » » Project: string(s.project),
141 » » » Path: string(s.path),
142 » » },
143 » }
144 » for _, param := range params {
145 » » param.applyGet(&p)
131 } 146 }
132 147
133 req := p.r
134 req.Project = string(s.project)
135 req.Path = string(s.path)
136 if p.stateP != nil { 148 if p.stateP != nil {
137 » » req.State = true 149 » » p.r.State = true
138 } 150 }
139 151
140 » resp, err := s.c.C.Get(ctx, &req) 152 » resp, err := s.c.C.Get(ctx, &p.r)
141 if err != nil { 153 if err != nil {
142 return nil, normalizeError(err) 154 return nil, normalizeError(err)
143 } 155 }
144 if err := loadStatePointer(p.stateP, resp); err != nil { 156 if err := loadStatePointer(p.stateP, resp); err != nil {
145 return nil, err 157 return nil, err
146 } 158 }
147 return resp.Logs, nil 159 return resp.Logs, nil
148 } 160 }
149 161
150 // Tail performs a tail call, returning the last log entry in the stream. If 162 // Tail performs a tail call, returning the last log entry in the stream. If
151 // stateP is not nil, the stream's state will be requested and loaded into the 163 // stateP is not nil, the stream's state will be requested and loaded into the
152 // variable. 164 // variable.
153 func (s *Stream) Tail(ctx context.Context, stateP *LogStream) (*logpb.LogEntry, error) { 165 func (s *Stream) Tail(ctx context.Context, params ...TailParam) (*logpb.LogEntry , error) {
154 » req := logdog.TailRequest{ 166 » p := tailParamsInst{
155 » » Project: string(s.project), 167 » » r: logdog.TailRequest{
156 » » Path: string(s.path), 168 » » » Project: string(s.project),
169 » » » Path: string(s.path),
170 » » },
157 } 171 }
158 » if stateP != nil { 172 » for _, param := range params {
159 » » req.State = true 173 » » param.applyTail(&p)
160 } 174 }
161 175
162 » resp, err := s.c.C.Tail(ctx, &req) 176 » resp, err := s.c.C.Tail(ctx, &p.r)
163 if err != nil { 177 if err != nil {
164 return nil, normalizeError(err) 178 return nil, normalizeError(err)
165 } 179 }
166 » if err := loadStatePointer(stateP, resp); err != nil { 180 » if err := loadStatePointer(p.stateP, resp); err != nil {
167 return nil, err 181 return nil, err
168 } 182 }
169 183
170 switch len(resp.Logs) { 184 switch len(resp.Logs) {
171 case 0: 185 case 0:
172 return nil, nil 186 return nil, nil
173 187
174 case 1: 188 case 1:
175 » » return resp.Logs[0], nil 189 » » le := resp.Logs[0]
190 » » if p.complete {
191 » » » if dg := le.GetDatagram(); dg != nil && dg.Partial != ni l {
192 » » » » // This is a partial; datagram. Fetch and assemb le the full datagram.
193 » » » » return s.fetchFullDatagram(ctx, le, true)
194 » » » }
195 » » }
196 » » return le, nil
176 197
177 default: 198 default:
178 return nil, fmt.Errorf("tail call returned %d logs", len(resp.Lo gs)) 199 return nil, fmt.Errorf("tail call returned %d logs", len(resp.Lo gs))
179 } 200 }
180 } 201 }
181 202
203 func (s *Stream) fetchFullDatagram(ctx context.Context, le *logpb.LogEntry, fetc hIfMid bool) (*logpb.LogEntry, error) {
204 // Re-evaluate our partial state.
205 dg := le.GetDatagram()
206 if dg == nil {
207 return nil, fmt.Errorf("entry is not a datagram")
208 }
209
210 p := dg.Partial
211 if p == nil {
212 // Not partial, return the full message.
213 return le, nil
214 }
215
216 if uint64(p.Index) > le.StreamIndex {
217 // Something is wrong. The datagram identifies itself as an inde x in the
218 // stream that exceeds the actual number of entries in the strea m.
219 return nil, fmt.Errorf("malformed partial datagram; index (%d) > stream index (%d)",
220 p.Index, le.StreamIndex)
221 }
222
223 if !p.Last {
224 // This is the last log entry (b/c we Tail'd), but it is part of a larger
225 // datagram. We can't fetch the full datagram since presumably t he remainder
226 // doesn't exist. Therefore, fetch the previous datagram.
227 switch {
228 case !fetchIfMid:
229 return nil, fmt.Errorf("mid-fragment partial datagram no t allowed")
230
231 case uint64(p.Index) == le.StreamIndex:
232 // If we equal the stream index, then we are the first d atagram in the
233 // stream, so return nil.
234 return nil, nil
235
236 default:
237 // Perform a Get on the previous entry in the stream.
238 prevIdx := le.StreamIndex - uint64(p.Index) - 1
239 logs, err := s.Get(ctx, Index(types.MessageIndex(prevIdx )), LimitCount(1))
240 if err != nil {
241 return nil, fmt.Errorf("failed to get previous d atagram (%d): %s", prevIdx, err)
242 }
243
244 if len(logs) != 1 || logs[0].StreamIndex != prevIdx {
245 return nil, fmt.Errorf("previous datagram (%d) n ot returned", prevIdx)
246 }
247 if le, err = s.fetchFullDatagram(ctx, logs[0], false); e rr != nil {
248 return nil, fmt.Errorf("failed to recurse to pre vious datagram (%d): %s", prevIdx, err)
249 }
250 return le, nil
251 }
252 }
253
254 // If this is "Last", but it's also index 0, then it is a partial datagr am
255 // with one entry. Weird ... but whatever.
256 if p.Index == 0 {
257 dg.Partial = nil
258 return le, nil
259 }
260
261 // Get the intermediate logs.
262 startIdx := types.MessageIndex(le.StreamIndex - uint64(p.Index))
263 count := int(p.Index)
264 logs, err := s.Get(ctx, Index(startIdx), LimitCount(count))
265 if err != nil {
266 return nil, fmt.Errorf("failed to get intermediate logs [%d .. % d]: %s",
267 startIdx, startIdx+types.MessageIndex(count)-1, err)
268 }
269
270 if len(logs) < count {
271 return nil, fmt.Errorf("incomplete intermediate logs results (%d < %d)", len(logs), count)
272 }
273 logs = append(logs[:count], le)
274
275 // Construct the full datagram.
276 aggregate := make([]byte, 0, int(p.Size))
277 for i, ple := range logs {
278 chunkDg := ple.GetDatagram()
279 if chunkDg == nil {
280 return nil, fmt.Errorf("intermediate datagram #%d is not a datagram", i)
281 }
282 chunkP := chunkDg.Partial
283 if chunkP == nil {
284 return nil, fmt.Errorf("intermediate datagram #%d is not partial", i)
285 }
286 if int(chunkP.Index) != i {
287 return nil, fmt.Errorf("intermediate datagram #%d does n ot have a contiguous index (%d)", i, chunkP.Index)
288 }
289 if chunkP.Size != p.Size {
290 return nil, fmt.Errorf("inconsistent datagram size (%d ! = %d)", chunkP.Size, p.Size)
291 }
292 if uint64(len(aggregate))+uint64(len(chunkDg.Data)) > p.Size {
293 return nil, fmt.Errorf("appending chunk data would excee d the declared size (%d > %d)",
294 len(aggregate)+len(chunkDg.Data), p.Size)
295 }
296 aggregate = append(aggregate, chunkDg.Data...)
297 }
298
299 if uint64(len(aggregate)) != p.Size {
300 return nil, fmt.Errorf("reassembled datagram length (%d) differs from declared length (%d)", len(aggregate), p.Size)
301 }
302
303 le = logs[0]
304 dg = le.GetDatagram()
305 dg.Data = aggregate
306 dg.Partial = nil
307 return le, nil
308 }
309
182 func loadStatePointer(stateP *LogStream, resp *logdog.GetResponse) error { 310 func loadStatePointer(stateP *LogStream, resp *logdog.GetResponse) error {
183 if stateP == nil { 311 if stateP == nil {
184 return nil 312 return nil
185 } 313 }
186 314
187 » if resp.Desc == nil { 315 » ls, err := loadLogStream(resp.Project, resp.Desc.Path(), resp.State, res p.Desc)
188 » » return errors.New("Requested descriptor was not returned") 316 » if err != nil {
317 » » return fmt.Errorf("failde to load stream state: %v", err)
189 } 318 }
190 » if resp.State == nil { 319
191 » » return errors.New("Requested state was not returned")
192 » }
193 » ls := loadLogStream(resp.Project, resp.Desc.Path(), resp.State, resp.Des c)
194 *stateP = *ls 320 *stateP = *ls
195 return nil 321 return nil
196 } 322 }
OLDNEW
« no previous file with comments | « logdog/client/coordinator/query_test.go ('k') | logdog/client/coordinator/stream_params.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698