OLD | NEW |
1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 package logs | 5 package logs |
6 | 6 |
7 import ( | 7 import ( |
8 "net/url" | |
9 "time" | 8 "time" |
10 | 9 |
11 "github.com/golang/protobuf/proto" | 10 "github.com/golang/protobuf/proto" |
12 ds "github.com/luci/gae/service/datastore" | 11 ds "github.com/luci/gae/service/datastore" |
13 "github.com/luci/luci-go/appengine/logdog/coordinator" | 12 "github.com/luci/luci-go/appengine/logdog/coordinator" |
14 "github.com/luci/luci-go/common/api/logdog_coordinator/logs/v1" | 13 "github.com/luci/luci-go/common/api/logdog_coordinator/logs/v1" |
| 14 "github.com/luci/luci-go/common/config" |
15 "github.com/luci/luci-go/common/grpcutil" | 15 "github.com/luci/luci-go/common/grpcutil" |
16 "github.com/luci/luci-go/common/logdog/types" | 16 "github.com/luci/luci-go/common/logdog/types" |
17 log "github.com/luci/luci-go/common/logging" | 17 log "github.com/luci/luci-go/common/logging" |
18 "github.com/luci/luci-go/common/proto/logdog/logpb" | 18 "github.com/luci/luci-go/common/proto/logdog/logpb" |
19 "github.com/luci/luci-go/common/retry" | 19 "github.com/luci/luci-go/common/retry" |
20 "github.com/luci/luci-go/server/logdog/storage" | 20 "github.com/luci/luci-go/server/logdog/storage" |
21 "github.com/luci/luci-go/server/logdog/storage/archive" | 21 "github.com/luci/luci-go/server/logdog/storage/archive" |
22 "golang.org/x/net/context" | 22 "golang.org/x/net/context" |
23 "google.golang.org/grpc/codes" | 23 "google.golang.org/grpc/codes" |
24 ) | 24 ) |
(...skipping 14 matching lines...) Expand all Loading... |
39 ) | 39 ) |
40 | 40 |
41 // Get returns state and log data for a single log stream. | 41 // Get returns state and log data for a single log stream. |
42 func (s *server) Get(c context.Context, req *logdog.GetRequest) (*logdog.GetResp
onse, error) { | 42 func (s *server) Get(c context.Context, req *logdog.GetRequest) (*logdog.GetResp
onse, error) { |
43 return s.getImpl(c, req, false) | 43 return s.getImpl(c, req, false) |
44 } | 44 } |
45 | 45 |
46 // Tail returns the last log entry for a given log stream. | 46 // Tail returns the last log entry for a given log stream. |
47 func (s *server) Tail(c context.Context, req *logdog.TailRequest) (*logdog.GetRe
sponse, error) { | 47 func (s *server) Tail(c context.Context, req *logdog.TailRequest) (*logdog.GetRe
sponse, error) { |
48 r := logdog.GetRequest{ | 48 r := logdog.GetRequest{ |
49 » » Path: req.Path, | 49 » » Project: req.Project, |
50 » » State: req.State, | 50 » » Path: req.Path, |
| 51 » » State: req.State, |
51 } | 52 } |
52 return s.getImpl(c, &r, true) | 53 return s.getImpl(c, &r, true) |
53 } | 54 } |
54 | 55 |
55 // getImpl is common code shared between Get and Tail endpoints. | 56 // getImpl is common code shared between Get and Tail endpoints. |
56 func (s *server) getImpl(c context.Context, req *logdog.GetRequest, tail bool) (
*logdog.GetResponse, error) { | 57 func (s *server) getImpl(c context.Context, req *logdog.GetRequest, tail bool) (
*logdog.GetResponse, error) { |
57 log.Fields{ | 58 log.Fields{ |
58 » » "path": req.Path, | 59 » » "project": req.Project, |
59 » » "index": req.Index, | 60 » » "path": req.Path, |
60 » » "tail": tail, | 61 » » "index": req.Index, |
| 62 » » "tail": tail, |
61 }.Debugf(c, "Received get request.") | 63 }.Debugf(c, "Received get request.") |
62 | 64 |
63 » // Fetch the log stream state for this log stream. | 65 » ls, err := coordinator.NewLogStream(req.Path) |
64 » u, err := url.Parse(req.Path) | |
65 if err != nil { | 66 if err != nil { |
66 » » log.Fields{ | 67 » » log.WithError(err).Errorf(c, "Invalid path supplied.") |
67 » » » log.ErrorKey: err, | |
68 » » » "path": req.Path, | |
69 » » }.Errorf(c, "Could not parse path URL.") | |
70 » » return nil, grpcutil.Errf(codes.InvalidArgument, "invalid path e
ncoding") | |
71 » } | |
72 » ls, err := coordinator.NewLogStream(u.Path) | |
73 » if err != nil { | |
74 » » log.Fields{ | |
75 » » » log.ErrorKey: err, | |
76 » » » "path": u.Path, | |
77 » » }.Errorf(c, "Invalid path supplied.") | |
78 return nil, grpcutil.Errf(codes.InvalidArgument, "invalid path v
alue") | 68 return nil, grpcutil.Errf(codes.InvalidArgument, "invalid path v
alue") |
79 } | 69 } |
80 | 70 |
| 71 // The user may supply a hash instead of a full path. Once resolved, log |
| 72 // the original log stream. |
| 73 path := ls.Path() |
| 74 if req.Path != string(path) { |
| 75 log.Fields{ |
| 76 "hashPath": req.Path, |
| 77 "streamPath": path, |
| 78 }.Debugf(c, "Resolved hash path.") |
| 79 } |
| 80 |
81 // If this log entry is Purged and we're not admin, pretend it doesn't e
xist. | 81 // If this log entry is Purged and we're not admin, pretend it doesn't e
xist. |
82 err = ds.Get(c).Get(ls) | 82 err = ds.Get(c).Get(ls) |
83 switch err { | 83 switch err { |
84 case nil: | 84 case nil: |
85 if ls.Purged { | 85 if ls.Purged { |
86 if authErr := coordinator.IsAdminUser(c); authErr != nil
{ | 86 if authErr := coordinator.IsAdminUser(c); authErr != nil
{ |
87 log.Fields{ | 87 log.Fields{ |
88 log.ErrorKey: authErr, | 88 log.ErrorKey: authErr, |
89 }.Warningf(c, "Non-superuser requested purged lo
g.") | 89 }.Warningf(c, "Non-superuser requested purged lo
g.") |
90 return nil, grpcutil.Errf(codes.NotFound, "path
not found") | 90 return nil, grpcutil.Errf(codes.NotFound, "path
not found") |
91 } | 91 } |
92 } | 92 } |
93 | 93 |
94 case ds.ErrNoSuchEntity: | 94 case ds.ErrNoSuchEntity: |
95 » » log.Fields{ | 95 » » log.Errorf(c, "Log stream does not exist.") |
96 » » » "path": u.Path, | |
97 » » }.Errorf(c, "Log stream does not exist.") | |
98 return nil, grpcutil.Errf(codes.NotFound, "path not found") | 96 return nil, grpcutil.Errf(codes.NotFound, "path not found") |
99 | 97 |
100 default: | 98 default: |
101 » » log.Fields{ | 99 » » log.WithError(err).Errorf(c, "Failed to look up log stream.") |
102 » » » log.ErrorKey: err, | |
103 » » » "path": u.Path, | |
104 » » }.Errorf(c, "Failed to look up log stream.") | |
105 return nil, grpcutil.Internal | 100 return nil, grpcutil.Internal |
106 } | 101 } |
107 path := ls.Path() | |
108 | 102 |
109 // If nothing was requested, return nothing. | 103 // If nothing was requested, return nothing. |
110 resp := logdog.GetResponse{} | 104 resp := logdog.GetResponse{} |
111 if !(req.State || tail) && req.LogCount < 0 { | 105 if !(req.State || tail) && req.LogCount < 0 { |
112 return &resp, nil | 106 return &resp, nil |
113 } | 107 } |
114 | 108 |
115 if req.State { | 109 if req.State { |
116 resp.State = loadLogStreamState(ls) | 110 resp.State = loadLogStreamState(ls) |
117 | 111 |
118 var err error | 112 var err error |
119 resp.Desc, err = ls.DescriptorValue() | 113 resp.Desc, err = ls.DescriptorValue() |
120 if err != nil { | 114 if err != nil { |
121 log.WithError(err).Errorf(c, "Failed to deserialize desc
riptor protobuf.") | 115 log.WithError(err).Errorf(c, "Failed to deserialize desc
riptor protobuf.") |
122 return nil, grpcutil.Internal | 116 return nil, grpcutil.Internal |
123 } | 117 } |
124 } | 118 } |
125 | 119 |
126 // Retrieve requested logs from storage, if requested. | 120 // Retrieve requested logs from storage, if requested. |
127 if tail || req.LogCount >= 0 { | 121 if tail || req.LogCount >= 0 { |
128 resp.Logs, err = s.getLogs(c, req, tail, ls) | 122 resp.Logs, err = s.getLogs(c, req, tail, ls) |
129 if err != nil { | 123 if err != nil { |
130 » » » log.Fields{ | 124 » » » log.WithError(err).Errorf(c, "Failed to get logs.") |
131 » » » » log.ErrorKey: err, | |
132 » » » » "path": path, | |
133 » » » }.Errorf(c, "Failed to get logs.") | |
134 return nil, grpcutil.Internal | 125 return nil, grpcutil.Internal |
135 } | 126 } |
136 } | 127 } |
137 | 128 |
138 log.Fields{ | 129 log.Fields{ |
139 "logCount": len(resp.Logs), | 130 "logCount": len(resp.Logs), |
140 }.Debugf(c, "Get request completed successfully.") | 131 }.Debugf(c, "Get request completed successfully.") |
141 return &resp, nil | 132 return &resp, nil |
142 } | 133 } |
143 | 134 |
(...skipping 40 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
184 Client: gs, | 175 Client: gs, |
185 MaxBytes: byteLimit, | 176 MaxBytes: byteLimit, |
186 }) | 177 }) |
187 if err != nil { | 178 if err != nil { |
188 log.WithError(err).Errorf(c, "Failed to create Google St
orage storage instance.") | 179 log.WithError(err).Errorf(c, "Failed to create Google St
orage storage instance.") |
189 return nil, err | 180 return nil, err |
190 } | 181 } |
191 } | 182 } |
192 defer st.Close() | 183 defer st.Close() |
193 | 184 |
194 » path := ls.Path() | 185 » project, path := coordinator.Project(c), ls.Path() |
195 | 186 |
196 var fetchedLogs [][]byte | 187 var fetchedLogs [][]byte |
197 var err error | 188 var err error |
198 if tail { | 189 if tail { |
199 » » fetchedLogs, err = getTail(c, st, path) | 190 » » fetchedLogs, err = getTail(c, st, project, path) |
200 } else { | 191 } else { |
201 » » fetchedLogs, err = getHead(c, req, st, path, byteLimit) | 192 » » fetchedLogs, err = getHead(c, req, st, project, path, byteLimit) |
202 } | 193 } |
203 if err != nil { | 194 if err != nil { |
204 log.WithError(err).Errorf(c, "Failed to fetch log records.") | 195 log.WithError(err).Errorf(c, "Failed to fetch log records.") |
205 return nil, err | 196 return nil, err |
206 } | 197 } |
207 | 198 |
208 logEntries := make([]*logpb.LogEntry, len(fetchedLogs)) | 199 logEntries := make([]*logpb.LogEntry, len(fetchedLogs)) |
209 for idx, ld := range fetchedLogs { | 200 for idx, ld := range fetchedLogs { |
210 // Deserialize the log entry, then convert it to output value. | 201 // Deserialize the log entry, then convert it to output value. |
211 le := logpb.LogEntry{} | 202 le := logpb.LogEntry{} |
212 if err := proto.Unmarshal(ld, &le); err != nil { | 203 if err := proto.Unmarshal(ld, &le); err != nil { |
213 log.Fields{ | 204 log.Fields{ |
214 log.ErrorKey: err, | 205 log.ErrorKey: err, |
215 "index": idx, | 206 "index": idx, |
216 }.Errorf(c, "Failed to generate response log entry.") | 207 }.Errorf(c, "Failed to generate response log entry.") |
217 return nil, err | 208 return nil, err |
218 } | 209 } |
219 logEntries[idx] = &le | 210 logEntries[idx] = &le |
220 } | 211 } |
221 return logEntries, nil | 212 return logEntries, nil |
222 } | 213 } |
223 | 214 |
224 func getHead(c context.Context, req *logdog.GetRequest, st storage.Storage, p ty
pes.StreamPath, byteLimit int) ( | 215 func getHead(c context.Context, req *logdog.GetRequest, st storage.Storage, proj
ect config.ProjectName, |
225 » [][]byte, error) { | 216 » path types.StreamPath, byteLimit int) ([][]byte, error) { |
226 » c = log.SetFields(c, log.Fields{ | 217 » log.Fields{ |
227 » » "path": p, | 218 » » "project": project, |
| 219 » » "path": path, |
228 "index": req.Index, | 220 "index": req.Index, |
229 "count": req.LogCount, | 221 "count": req.LogCount, |
230 "bytes": req.ByteCount, | 222 "bytes": req.ByteCount, |
231 "noncontiguous": req.NonContiguous, | 223 "noncontiguous": req.NonContiguous, |
232 » }) | 224 » }.Debugf(c, "Issuing Get request.") |
233 | 225 |
234 // Allocate result logs array. | 226 // Allocate result logs array. |
235 logCount := int(req.LogCount) | 227 logCount := int(req.LogCount) |
236 asz := getInitialArraySize | 228 asz := getInitialArraySize |
237 if logCount > 0 && logCount < asz { | 229 if logCount > 0 && logCount < asz { |
238 asz = logCount | 230 asz = logCount |
239 } | 231 } |
240 logs := make([][]byte, 0, asz) | 232 logs := make([][]byte, 0, asz) |
241 | 233 |
242 sreq := storage.GetRequest{ | 234 sreq := storage.GetRequest{ |
243 » » Path: p, | 235 » » Project: project, |
244 » » Index: types.MessageIndex(req.Index), | 236 » » Path: path, |
245 » » Limit: logCount, | 237 » » Index: types.MessageIndex(req.Index), |
| 238 » » Limit: logCount, |
246 } | 239 } |
247 | 240 |
248 count := 0 | 241 count := 0 |
249 err := retry.Retry(c, retry.TransientOnly(retry.Default), func() error { | 242 err := retry.Retry(c, retry.TransientOnly(retry.Default), func() error { |
250 // Issue the Get request. This may return a transient error, in
which case | 243 // Issue the Get request. This may return a transient error, in
which case |
251 // we will retry. | 244 // we will retry. |
252 return st.Get(sreq, func(idx types.MessageIndex, ld []byte) bool
{ | 245 return st.Get(sreq, func(idx types.MessageIndex, ld []byte) bool
{ |
253 if count > 0 && byteLimit-len(ld) < 0 { | 246 if count > 0 && byteLimit-len(ld) < 0 { |
254 // Not the first log, and we've exceeded our byt
e limit. | 247 // Not the first log, and we've exceeded our byt
e limit. |
255 return false | 248 return false |
(...skipping 10 matching lines...) Expand all Loading... |
266 }) | 259 }) |
267 }, func(err error, delay time.Duration) { | 260 }, func(err error, delay time.Duration) { |
268 log.Fields{ | 261 log.Fields{ |
269 log.ErrorKey: err, | 262 log.ErrorKey: err, |
270 "delay": delay, | 263 "delay": delay, |
271 "initialIndex": req.Index, | 264 "initialIndex": req.Index, |
272 "nextIndex": sreq.Index, | 265 "nextIndex": sreq.Index, |
273 "count": len(logs), | 266 "count": len(logs), |
274 }.Warningf(c, "Transient error while loading logs; retrying.") | 267 }.Warningf(c, "Transient error while loading logs; retrying.") |
275 }) | 268 }) |
276 » if err != nil { | 269 » switch err { |
| 270 » case nil: |
| 271 » » return logs, nil |
| 272 |
| 273 » case storage.ErrDoesNotExist: |
| 274 » » return nil, nil |
| 275 |
| 276 » default: |
277 log.Fields{ | 277 log.Fields{ |
278 log.ErrorKey: err, | 278 log.ErrorKey: err, |
279 "initialIndex": req.Index, | 279 "initialIndex": req.Index, |
280 "nextIndex": sreq.Index, | 280 "nextIndex": sreq.Index, |
281 "count": len(logs), | 281 "count": len(logs), |
282 }.Errorf(c, "Failed to execute range request.") | 282 }.Errorf(c, "Failed to execute range request.") |
283 return nil, err | 283 return nil, err |
284 } | 284 } |
285 | |
286 return logs, nil | |
287 } | 285 } |
288 | 286 |
289 func getTail(c context.Context, st storage.Storage, p types.StreamPath) ([][]byt
e, error) { | 287 func getTail(c context.Context, st storage.Storage, project config.ProjectName,
path types.StreamPath) ([][]byte, error) { |
| 288 » log.Fields{ |
| 289 » » "project": project, |
| 290 » » "path": path, |
| 291 » }.Debugf(c, "Issuing Tail request.") |
| 292 |
290 var data []byte | 293 var data []byte |
291 err := retry.Retry(c, retry.TransientOnly(retry.Default), func() (err er
ror) { | 294 err := retry.Retry(c, retry.TransientOnly(retry.Default), func() (err er
ror) { |
292 » » data, _, err = st.Tail("", p) | 295 » » data, _, err = st.Tail(project, path) |
293 return | 296 return |
294 }, func(err error, delay time.Duration) { | 297 }, func(err error, delay time.Duration) { |
295 log.Fields{ | 298 log.Fields{ |
296 log.ErrorKey: err, | 299 log.ErrorKey: err, |
297 "delay": delay, | 300 "delay": delay, |
298 }.Warningf(c, "Transient error while fetching tail log; retrying
.") | 301 }.Warningf(c, "Transient error while fetching tail log; retrying
.") |
299 }) | 302 }) |
300 » if err != nil { | 303 » switch err { |
| 304 » case nil: |
| 305 » » return [][]byte{data}, nil |
| 306 |
| 307 » case storage.ErrDoesNotExist: |
| 308 » » return nil, nil |
| 309 |
| 310 » default: |
301 log.WithError(err).Errorf(c, "Failed to fetch tail log.") | 311 log.WithError(err).Errorf(c, "Failed to fetch tail log.") |
302 return nil, err | 312 return nil, err |
303 } | 313 } |
304 return [][]byte{data}, err | |
305 } | 314 } |
OLD | NEW |