| OLD | NEW | 
|---|
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. | 
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be | 
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. | 
| 4 | 4 | 
| 5 package logs | 5 package logs | 
| 6 | 6 | 
| 7 import ( | 7 import ( | 
| 8         "net/url" |  | 
| 9         "time" | 8         "time" | 
| 10 | 9 | 
| 11         "github.com/golang/protobuf/proto" | 10         "github.com/golang/protobuf/proto" | 
| 12         ds "github.com/luci/gae/service/datastore" | 11         ds "github.com/luci/gae/service/datastore" | 
| 13         "github.com/luci/luci-go/appengine/logdog/coordinator" | 12         "github.com/luci/luci-go/appengine/logdog/coordinator" | 
| 14         "github.com/luci/luci-go/common/api/logdog_coordinator/logs/v1" | 13         "github.com/luci/luci-go/common/api/logdog_coordinator/logs/v1" | 
|  | 14         "github.com/luci/luci-go/common/config" | 
| 15         "github.com/luci/luci-go/common/grpcutil" | 15         "github.com/luci/luci-go/common/grpcutil" | 
| 16         "github.com/luci/luci-go/common/logdog/types" | 16         "github.com/luci/luci-go/common/logdog/types" | 
| 17         log "github.com/luci/luci-go/common/logging" | 17         log "github.com/luci/luci-go/common/logging" | 
| 18         "github.com/luci/luci-go/common/proto/logdog/logpb" | 18         "github.com/luci/luci-go/common/proto/logdog/logpb" | 
| 19         "github.com/luci/luci-go/common/retry" | 19         "github.com/luci/luci-go/common/retry" | 
| 20         "github.com/luci/luci-go/server/logdog/storage" | 20         "github.com/luci/luci-go/server/logdog/storage" | 
| 21         "github.com/luci/luci-go/server/logdog/storage/archive" | 21         "github.com/luci/luci-go/server/logdog/storage/archive" | 
| 22         "golang.org/x/net/context" | 22         "golang.org/x/net/context" | 
| 23         "google.golang.org/grpc/codes" | 23         "google.golang.org/grpc/codes" | 
| 24 ) | 24 ) | 
| (...skipping 14 matching lines...) Expand all  Loading... | 
| 39 ) | 39 ) | 
| 40 | 40 | 
| 41 // Get returns state and log data for a single log stream. | 41 // Get returns state and log data for a single log stream. | 
| 42 func (s *server) Get(c context.Context, req *logdog.GetRequest) (*logdog.GetResp
     onse, error) { | 42 func (s *server) Get(c context.Context, req *logdog.GetRequest) (*logdog.GetResp
     onse, error) { | 
| 43         return s.getImpl(c, req, false) | 43         return s.getImpl(c, req, false) | 
| 44 } | 44 } | 
| 45 | 45 | 
| 46 // Tail returns the last log entry for a given log stream. | 46 // Tail returns the last log entry for a given log stream. | 
| 47 func (s *server) Tail(c context.Context, req *logdog.TailRequest) (*logdog.GetRe
     sponse, error) { | 47 func (s *server) Tail(c context.Context, req *logdog.TailRequest) (*logdog.GetRe
     sponse, error) { | 
| 48         r := logdog.GetRequest{ | 48         r := logdog.GetRequest{ | 
| 49 »       »       Path:  req.Path, | 49 »       »       Project: req.Project, | 
| 50 »       »       State: req.State, | 50 »       »       Path:    req.Path, | 
|  | 51 »       »       State:   req.State, | 
| 51         } | 52         } | 
| 52         return s.getImpl(c, &r, true) | 53         return s.getImpl(c, &r, true) | 
| 53 } | 54 } | 
| 54 | 55 | 
| 55 // getImpl is common code shared between Get and Tail endpoints. | 56 // getImpl is common code shared between Get and Tail endpoints. | 
| 56 func (s *server) getImpl(c context.Context, req *logdog.GetRequest, tail bool) (
     *logdog.GetResponse, error) { | 57 func (s *server) getImpl(c context.Context, req *logdog.GetRequest, tail bool) (
     *logdog.GetResponse, error) { | 
| 57         log.Fields{ | 58         log.Fields{ | 
| 58 »       »       "path":  req.Path, | 59 »       »       "project": req.Project, | 
| 59 »       »       "index": req.Index, | 60 »       »       "path":    req.Path, | 
| 60 »       »       "tail":  tail, | 61 »       »       "index":   req.Index, | 
|  | 62 »       »       "tail":    tail, | 
| 61         }.Debugf(c, "Received get request.") | 63         }.Debugf(c, "Received get request.") | 
| 62 | 64 | 
| 63 »       // Fetch the log stream state for this log stream. | 65 »       ls, err := coordinator.NewLogStream(req.Path) | 
| 64 »       u, err := url.Parse(req.Path) |  | 
| 65         if err != nil { | 66         if err != nil { | 
| 66 »       »       log.Fields{ | 67 »       »       log.WithError(err).Errorf(c, "Invalid path supplied.") | 
| 67 »       »       »       log.ErrorKey: err, |  | 
| 68 »       »       »       "path":       req.Path, |  | 
| 69 »       »       }.Errorf(c, "Could not parse path URL.") |  | 
| 70 »       »       return nil, grpcutil.Errf(codes.InvalidArgument, "invalid path e
     ncoding") |  | 
| 71 »       } |  | 
| 72 »       ls, err := coordinator.NewLogStream(u.Path) |  | 
| 73 »       if err != nil { |  | 
| 74 »       »       log.Fields{ |  | 
| 75 »       »       »       log.ErrorKey: err, |  | 
| 76 »       »       »       "path":       u.Path, |  | 
| 77 »       »       }.Errorf(c, "Invalid path supplied.") |  | 
| 78                 return nil, grpcutil.Errf(codes.InvalidArgument, "invalid path v
     alue") | 68                 return nil, grpcutil.Errf(codes.InvalidArgument, "invalid path v
     alue") | 
| 79         } | 69         } | 
| 80 | 70 | 
|  | 71         // The user may supply a hash instead of a full path. Once resolved, log | 
|  | 72         // the original log stream. | 
|  | 73         path := ls.Path() | 
|  | 74         if req.Path != string(path) { | 
|  | 75                 log.Fields{ | 
|  | 76                         "hashPath":   req.Path, | 
|  | 77                         "streamPath": path, | 
|  | 78                 }.Debugf(c, "Resolved hash path.") | 
|  | 79         } | 
|  | 80 | 
| 81         // If this log entry is Purged and we're not admin, pretend it doesn't e
     xist. | 81         // If this log entry is Purged and we're not admin, pretend it doesn't e
     xist. | 
| 82         err = ds.Get(c).Get(ls) | 82         err = ds.Get(c).Get(ls) | 
| 83         switch err { | 83         switch err { | 
| 84         case nil: | 84         case nil: | 
| 85                 if ls.Purged { | 85                 if ls.Purged { | 
| 86                         if authErr := coordinator.IsAdminUser(c); authErr != nil
      { | 86                         if authErr := coordinator.IsAdminUser(c); authErr != nil
      { | 
| 87                                 log.Fields{ | 87                                 log.Fields{ | 
| 88                                         log.ErrorKey: authErr, | 88                                         log.ErrorKey: authErr, | 
| 89                                 }.Warningf(c, "Non-superuser requested purged lo
     g.") | 89                                 }.Warningf(c, "Non-superuser requested purged lo
     g.") | 
| 90                                 return nil, grpcutil.Errf(codes.NotFound, "path 
     not found") | 90                                 return nil, grpcutil.Errf(codes.NotFound, "path 
     not found") | 
| 91                         } | 91                         } | 
| 92                 } | 92                 } | 
| 93 | 93 | 
| 94         case ds.ErrNoSuchEntity: | 94         case ds.ErrNoSuchEntity: | 
| 95 »       »       log.Fields{ | 95 »       »       log.Errorf(c, "Log stream does not exist.") | 
| 96 »       »       »       "path": u.Path, |  | 
| 97 »       »       }.Errorf(c, "Log stream does not exist.") |  | 
| 98                 return nil, grpcutil.Errf(codes.NotFound, "path not found") | 96                 return nil, grpcutil.Errf(codes.NotFound, "path not found") | 
| 99 | 97 | 
| 100         default: | 98         default: | 
| 101 »       »       log.Fields{ | 99 »       »       log.WithError(err).Errorf(c, "Failed to look up log stream.") | 
| 102 »       »       »       log.ErrorKey: err, |  | 
| 103 »       »       »       "path":       u.Path, |  | 
| 104 »       »       }.Errorf(c, "Failed to look up log stream.") |  | 
| 105                 return nil, grpcutil.Internal | 100                 return nil, grpcutil.Internal | 
| 106         } | 101         } | 
| 107         path := ls.Path() |  | 
| 108 | 102 | 
| 109         // If nothing was requested, return nothing. | 103         // If nothing was requested, return nothing. | 
| 110         resp := logdog.GetResponse{} | 104         resp := logdog.GetResponse{} | 
| 111         if !(req.State || tail) && req.LogCount < 0 { | 105         if !(req.State || tail) && req.LogCount < 0 { | 
| 112                 return &resp, nil | 106                 return &resp, nil | 
| 113         } | 107         } | 
| 114 | 108 | 
| 115         if req.State { | 109         if req.State { | 
| 116                 resp.State = loadLogStreamState(ls) | 110                 resp.State = loadLogStreamState(ls) | 
| 117 | 111 | 
| 118                 var err error | 112                 var err error | 
| 119                 resp.Desc, err = ls.DescriptorValue() | 113                 resp.Desc, err = ls.DescriptorValue() | 
| 120                 if err != nil { | 114                 if err != nil { | 
| 121                         log.WithError(err).Errorf(c, "Failed to deserialize desc
     riptor protobuf.") | 115                         log.WithError(err).Errorf(c, "Failed to deserialize desc
     riptor protobuf.") | 
| 122                         return nil, grpcutil.Internal | 116                         return nil, grpcutil.Internal | 
| 123                 } | 117                 } | 
| 124         } | 118         } | 
| 125 | 119 | 
| 126         // Retrieve requested logs from storage, if requested. | 120         // Retrieve requested logs from storage, if requested. | 
| 127         if tail || req.LogCount >= 0 { | 121         if tail || req.LogCount >= 0 { | 
| 128                 resp.Logs, err = s.getLogs(c, req, tail, ls) | 122                 resp.Logs, err = s.getLogs(c, req, tail, ls) | 
| 129                 if err != nil { | 123                 if err != nil { | 
| 130 »       »       »       log.Fields{ | 124 »       »       »       log.WithError(err).Errorf(c, "Failed to get logs.") | 
| 131 »       »       »       »       log.ErrorKey: err, |  | 
| 132 »       »       »       »       "path":       path, |  | 
| 133 »       »       »       }.Errorf(c, "Failed to get logs.") |  | 
| 134                         return nil, grpcutil.Internal | 125                         return nil, grpcutil.Internal | 
| 135                 } | 126                 } | 
| 136         } | 127         } | 
| 137 | 128 | 
| 138         log.Fields{ | 129         log.Fields{ | 
| 139                 "logCount": len(resp.Logs), | 130                 "logCount": len(resp.Logs), | 
| 140         }.Debugf(c, "Get request completed successfully.") | 131         }.Debugf(c, "Get request completed successfully.") | 
| 141         return &resp, nil | 132         return &resp, nil | 
| 142 } | 133 } | 
| 143 | 134 | 
| (...skipping 40 matching lines...) Expand 10 before | Expand all | Expand 10 after  Loading... | 
| 184                         Client:    gs, | 175                         Client:    gs, | 
| 185                         MaxBytes:  byteLimit, | 176                         MaxBytes:  byteLimit, | 
| 186                 }) | 177                 }) | 
| 187                 if err != nil { | 178                 if err != nil { | 
| 188                         log.WithError(err).Errorf(c, "Failed to create Google St
     orage storage instance.") | 179                         log.WithError(err).Errorf(c, "Failed to create Google St
     orage storage instance.") | 
| 189                         return nil, err | 180                         return nil, err | 
| 190                 } | 181                 } | 
| 191         } | 182         } | 
| 192         defer st.Close() | 183         defer st.Close() | 
| 193 | 184 | 
| 194 »       path := ls.Path() | 185 »       project, path := coordinator.Project(c), ls.Path() | 
| 195 | 186 | 
| 196         var fetchedLogs [][]byte | 187         var fetchedLogs [][]byte | 
| 197         var err error | 188         var err error | 
| 198         if tail { | 189         if tail { | 
| 199 »       »       fetchedLogs, err = getTail(c, st, path) | 190 »       »       fetchedLogs, err = getTail(c, st, project, path) | 
| 200         } else { | 191         } else { | 
| 201 »       »       fetchedLogs, err = getHead(c, req, st, path, byteLimit) | 192 »       »       fetchedLogs, err = getHead(c, req, st, project, path, byteLimit) | 
| 202         } | 193         } | 
| 203         if err != nil { | 194         if err != nil { | 
| 204                 log.WithError(err).Errorf(c, "Failed to fetch log records.") | 195                 log.WithError(err).Errorf(c, "Failed to fetch log records.") | 
| 205                 return nil, err | 196                 return nil, err | 
| 206         } | 197         } | 
| 207 | 198 | 
| 208         logEntries := make([]*logpb.LogEntry, len(fetchedLogs)) | 199         logEntries := make([]*logpb.LogEntry, len(fetchedLogs)) | 
| 209         for idx, ld := range fetchedLogs { | 200         for idx, ld := range fetchedLogs { | 
| 210                 // Deserialize the log entry, then convert it to output value. | 201                 // Deserialize the log entry, then convert it to output value. | 
| 211                 le := logpb.LogEntry{} | 202                 le := logpb.LogEntry{} | 
| 212                 if err := proto.Unmarshal(ld, &le); err != nil { | 203                 if err := proto.Unmarshal(ld, &le); err != nil { | 
| 213                         log.Fields{ | 204                         log.Fields{ | 
| 214                                 log.ErrorKey: err, | 205                                 log.ErrorKey: err, | 
| 215                                 "index":      idx, | 206                                 "index":      idx, | 
| 216                         }.Errorf(c, "Failed to generate response log entry.") | 207                         }.Errorf(c, "Failed to generate response log entry.") | 
| 217                         return nil, err | 208                         return nil, err | 
| 218                 } | 209                 } | 
| 219                 logEntries[idx] = &le | 210                 logEntries[idx] = &le | 
| 220         } | 211         } | 
| 221         return logEntries, nil | 212         return logEntries, nil | 
| 222 } | 213 } | 
| 223 | 214 | 
| 224 func getHead(c context.Context, req *logdog.GetRequest, st storage.Storage, p ty
     pes.StreamPath, byteLimit int) ( | 215 func getHead(c context.Context, req *logdog.GetRequest, st storage.Storage, proj
     ect config.ProjectName, | 
| 225 »       [][]byte, error) { | 216 »       path types.StreamPath, byteLimit int) ([][]byte, error) { | 
| 226 »       c = log.SetFields(c, log.Fields{ | 217 »       log.Fields{ | 
| 227 »       »       "path":          p, | 218 »       »       "project":       project, | 
|  | 219 »       »       "path":          path, | 
| 228                 "index":         req.Index, | 220                 "index":         req.Index, | 
| 229                 "count":         req.LogCount, | 221                 "count":         req.LogCount, | 
| 230                 "bytes":         req.ByteCount, | 222                 "bytes":         req.ByteCount, | 
| 231                 "noncontiguous": req.NonContiguous, | 223                 "noncontiguous": req.NonContiguous, | 
| 232 »       }) | 224 »       }.Debugf(c, "Issuing Get request.") | 
| 233 | 225 | 
| 234         // Allocate result logs array. | 226         // Allocate result logs array. | 
| 235         logCount := int(req.LogCount) | 227         logCount := int(req.LogCount) | 
| 236         asz := getInitialArraySize | 228         asz := getInitialArraySize | 
| 237         if logCount > 0 && logCount < asz { | 229         if logCount > 0 && logCount < asz { | 
| 238                 asz = logCount | 230                 asz = logCount | 
| 239         } | 231         } | 
| 240         logs := make([][]byte, 0, asz) | 232         logs := make([][]byte, 0, asz) | 
| 241 | 233 | 
| 242         sreq := storage.GetRequest{ | 234         sreq := storage.GetRequest{ | 
| 243 »       »       Path:  p, | 235 »       »       Project: project, | 
| 244 »       »       Index: types.MessageIndex(req.Index), | 236 »       »       Path:    path, | 
| 245 »       »       Limit: logCount, | 237 »       »       Index:   types.MessageIndex(req.Index), | 
|  | 238 »       »       Limit:   logCount, | 
| 246         } | 239         } | 
| 247 | 240 | 
| 248         count := 0 | 241         count := 0 | 
| 249         err := retry.Retry(c, retry.TransientOnly(retry.Default), func() error { | 242         err := retry.Retry(c, retry.TransientOnly(retry.Default), func() error { | 
| 250                 // Issue the Get request. This may return a transient error, in 
     which case | 243                 // Issue the Get request. This may return a transient error, in 
     which case | 
| 251                 // we will retry. | 244                 // we will retry. | 
| 252                 return st.Get(sreq, func(idx types.MessageIndex, ld []byte) bool
      { | 245                 return st.Get(sreq, func(idx types.MessageIndex, ld []byte) bool
      { | 
| 253                         if count > 0 && byteLimit-len(ld) < 0 { | 246                         if count > 0 && byteLimit-len(ld) < 0 { | 
| 254                                 // Not the first log, and we've exceeded our byt
     e limit. | 247                                 // Not the first log, and we've exceeded our byt
     e limit. | 
| 255                                 return false | 248                                 return false | 
| (...skipping 10 matching lines...) Expand all  Loading... | 
| 266                 }) | 259                 }) | 
| 267         }, func(err error, delay time.Duration) { | 260         }, func(err error, delay time.Duration) { | 
| 268                 log.Fields{ | 261                 log.Fields{ | 
| 269                         log.ErrorKey:   err, | 262                         log.ErrorKey:   err, | 
| 270                         "delay":        delay, | 263                         "delay":        delay, | 
| 271                         "initialIndex": req.Index, | 264                         "initialIndex": req.Index, | 
| 272                         "nextIndex":    sreq.Index, | 265                         "nextIndex":    sreq.Index, | 
| 273                         "count":        len(logs), | 266                         "count":        len(logs), | 
| 274                 }.Warningf(c, "Transient error while loading logs; retrying.") | 267                 }.Warningf(c, "Transient error while loading logs; retrying.") | 
| 275         }) | 268         }) | 
| 276 »       if err != nil { | 269 »       switch err { | 
|  | 270 »       case nil: | 
|  | 271 »       »       return logs, nil | 
|  | 272 | 
|  | 273 »       case storage.ErrDoesNotExist: | 
|  | 274 »       »       return nil, nil | 
|  | 275 | 
|  | 276 »       default: | 
| 277                 log.Fields{ | 277                 log.Fields{ | 
| 278                         log.ErrorKey:   err, | 278                         log.ErrorKey:   err, | 
| 279                         "initialIndex": req.Index, | 279                         "initialIndex": req.Index, | 
| 280                         "nextIndex":    sreq.Index, | 280                         "nextIndex":    sreq.Index, | 
| 281                         "count":        len(logs), | 281                         "count":        len(logs), | 
| 282                 }.Errorf(c, "Failed to execute range request.") | 282                 }.Errorf(c, "Failed to execute range request.") | 
| 283                 return nil, err | 283                 return nil, err | 
| 284         } | 284         } | 
| 285 |  | 
| 286         return logs, nil |  | 
| 287 } | 285 } | 
| 288 | 286 | 
| 289 func getTail(c context.Context, st storage.Storage, p types.StreamPath) ([][]byt
     e, error) { | 287 func getTail(c context.Context, st storage.Storage, project config.ProjectName, 
     path types.StreamPath) ([][]byte, error) { | 
|  | 288 »       log.Fields{ | 
|  | 289 »       »       "project": project, | 
|  | 290 »       »       "path":    path, | 
|  | 291 »       }.Debugf(c, "Issuing Tail request.") | 
|  | 292 | 
| 290         var data []byte | 293         var data []byte | 
| 291         err := retry.Retry(c, retry.TransientOnly(retry.Default), func() (err er
     ror) { | 294         err := retry.Retry(c, retry.TransientOnly(retry.Default), func() (err er
     ror) { | 
| 292 »       »       data, _, err = st.Tail("", p) | 295 »       »       data, _, err = st.Tail(project, path) | 
| 293                 return | 296                 return | 
| 294         }, func(err error, delay time.Duration) { | 297         }, func(err error, delay time.Duration) { | 
| 295                 log.Fields{ | 298                 log.Fields{ | 
| 296                         log.ErrorKey: err, | 299                         log.ErrorKey: err, | 
| 297                         "delay":      delay, | 300                         "delay":      delay, | 
| 298                 }.Warningf(c, "Transient error while fetching tail log; retrying
     .") | 301                 }.Warningf(c, "Transient error while fetching tail log; retrying
     .") | 
| 299         }) | 302         }) | 
| 300 »       if err != nil { | 303 »       switch err { | 
|  | 304 »       case nil: | 
|  | 305 »       »       return [][]byte{data}, nil | 
|  | 306 | 
|  | 307 »       case storage.ErrDoesNotExist: | 
|  | 308 »       »       return nil, nil | 
|  | 309 | 
|  | 310 »       default: | 
| 301                 log.WithError(err).Errorf(c, "Failed to fetch tail log.") | 311                 log.WithError(err).Errorf(c, "Failed to fetch tail log.") | 
| 302                 return nil, err | 312                 return nil, err | 
| 303         } | 313         } | 
| 304         return [][]byte{data}, err |  | 
| 305 } | 314 } | 
| OLD | NEW | 
|---|