Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(137)

Side by Side Diff: milo/appengine/logs/logs.go

Issue 2796743004: Milo flex raw log viewer endpoint (Closed)
Patch Set: Fix module path Created 3 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 package main
2
3 import (
4 "errors"
5 "fmt"
6 "io"
7 "log"
8 "net/http"
9 "strings"
10 "sync"
11 "time"
12
13 "github.com/luci/luci-go/common/clock"
14 "github.com/luci/luci-go/grpc/prpc"
15 "github.com/luci/luci-go/logdog/api/logpb"
16 "github.com/luci/luci-go/logdog/client/coordinator"
17 "github.com/luci/luci-go/logdog/common/fetcher"
18 "github.com/luci/luci-go/logdog/common/types"
19 "github.com/luci/luci-go/luci_config/common/cfgtypes"
20
21 "golang.org/x/net/context"
nodir 2017/04/29 00:59:45 3p imports should be before 1p
hinoka 2017/05/01 23:08:02 Done.
22 )
23
24 var errNoAuth = errors.New("no access")
25
26 type streamInfo struct {
27 Project cfgtypes.ProjectName
28 Path types.StreamPath
29
30 // Client is the HTTP client to use for LogDog communication.
31 Client *coordinator.Client
32 }
33
34 const noStreamDelay = 5 * time.Second
35
36 // coordinatorSource is a fetcher.Source implementation that uses the
37 // Coordiantor API.
38 type coordinatorSource struct {
39 sync.Mutex
40
41 stream *coordinator.Stream
42 tidx types.MessageIndex
43 tailFirst bool
44
45 streamState *coordinator.LogStream
46 }
47
48 func (s *coordinatorSource) LogEntries(c context.Context, req *fetcher.LogReques t) (
49 []*logpb.LogEntry, types.MessageIndex, error) {
50 s.Lock()
nodir 2017/04/29 00:59:45 FWIU, this serializes all requests to logdog. It i
hinoka 2017/05/01 23:08:02 Done.
51 defer s.Unlock()
52
53 params := append(make([]coordinator.GetParam, 0, 4),
54 coordinator.LimitBytes(int(req.Bytes)),
55 coordinator.LimitCount(req.Count),
56 coordinator.Index(req.Index),
57 )
58
59 // If we haven't terminated, use this opportunity to fetch/update our st ream
60 // state.
61 var streamState coordinator.LogStream
62 reqStream := (s.streamState == nil || s.streamState.State.TerminalIndex < 0)
nodir 2017/04/29 00:59:45 nit: parens are unnecessary
hinoka 2017/05/01 23:08:02 Done.
63 if reqStream {
64 params = append(params, coordinator.WithState(&streamState))
nodir 2017/04/29 00:59:45 it seems that this variable controls whether or no
hinoka 2017/05/01 23:08:02 Done.
65 }
66
67 for {
68 logs, err := s.stream.Get(c, params...)
69 switch err {
70 case nil:
71 if reqStream {
72 s.streamState = &streamState
73 s.tidx = streamState.State.TerminalIndex
74 }
75 return logs, s.tidx, nil
76
77 case coordinator.ErrNoSuchStream:
78 log.Print("Stream does not exist. Sleeping pending regis tration.")
79
80 // Delay, interrupting if our Context is interrupted.
81 if tr := <-clock.After(c, noStreamDelay); tr.Incomplete( ) {
82 return nil, 0, tr.Err
83 }
84
85 default:
86 if err != nil {
nodir 2017/04/29 00:59:45 this is constant true because of `case nil` above,
hinoka 2017/05/01 23:08:02 Done.
87 return nil, 0, err
88 }
89 }
90 }
91 }
92
93 func logHandler(c context.Context, w http.ResponseWriter, host, path string) err or {
94 // TODO(hinoka): Move this to luci-config.
95 if !(host == "luci-logdog.appspot.com" || host == "luci-logdog-dev.appsp ot.com") {
96 return fmt.Errorf("unknown host %s", host)
97 }
98 spath := strings.SplitN(path, "/", 2)
99 project := cfgtypes.ProjectName(spath[0])
100 streamPath := types.StreamPath(spath[1])
nodir 2017/04/29 00:59:45 this will crash the server if someone requests /lo
hinoka 2017/05/01 23:08:03 Done.
101
102 client := coordinator.NewClient(&prpc.Client{
103 C: &http.Client{
104 // TODO(hinoka): Once 712506 is resolved, figure out how to get auth.
nodir 2017/04/29 00:59:45 please write crbug.com/712506
hinoka 2017/05/01 23:08:02 Done.
105 Transport: http.DefaultTransport,
106 },
107 Host: host,
108 })
109 stream := client.Stream(project, streamPath)
110
111 // Pull stream information.
112 src := coordinatorSource{
nodir 2017/04/29 00:59:45 nit: consider inlining
hinoka 2017/05/01 23:08:02 Done.
113 stream: stream,
114 tidx: -1, // Must be set to probe for state.
115 }
116 f := fetcher.New(c, fetcher.Options{
117 Source: &src,
118 Index: types.MessageIndex(0),
119 Count: 0,
120 // Try to buffer as much as possible, with a large window, since this is
121 // basically a cloud-to-cloud connection.
122 BufferCount: 200,
123 BufferBytes: int64(4 * 1024 * 1024),
124 PrefetchFactor: 10,
125 })
126
127 for {
128 // Read out of the buffer. This _should_ be bottlenecked on the network
129 // connection between the Flex instance and the client, via Fpri ntf().
130 entry, err := f.NextLogEntry()
131 switch err {
132 case io.EOF:
133 break // We're done.
nodir 2017/04/29 00:59:45 i think this breaks the switch, not the loop you c
hinoka 2017/05/01 23:08:02 Oh you're right... I think "return nil" will work
134 case nil:
135 // Nothing
136 case coordinator.ErrNoAccess:
137 return errNoAuth // This will force a redirect
138 default:
139 return err
140 }
141 content := entry.GetText()
142 if content == nil {
143 break
144 }
145 for _, line := range content.Lines {
146 fmt.Fprint(w, line.Value)
147 fmt.Fprint(w, line.Delimiter)
148 }
149 }
150 return nil
151 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698