Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(202)

Side by Side Diff: milo/appengine/logs/logs.go

Issue 2796743004: Milo flex raw log viewer endpoint (Closed)
Patch Set: Remove more debug comments Created 3 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 package main
2
3 import (
4 "errors"
5 "fmt"
6 "io"
7 "log"
8 "net/http"
9 "strings"
10 "sync"
11 "time"
12
13 "github.com/luci/luci-go/common/clock"
14 "github.com/luci/luci-go/grpc/prpc"
15 "github.com/luci/luci-go/logdog/api/logpb"
16 "github.com/luci/luci-go/logdog/client/coordinator"
17 "github.com/luci/luci-go/logdog/common/fetcher"
18 "github.com/luci/luci-go/logdog/common/types"
19 "github.com/luci/luci-go/luci_config/common/cfgtypes"
20 "github.com/luci/luci-go/server/auth"
21
22 "golang.org/x/net/context"
23 )
24
25 var errNoAuth = errors.New("no access")
26
27 type streamInfo struct {
28 Project cfgtypes.ProjectName
29 Path types.StreamPath
30
31 // Client is the HTTP client to use for LogDog communication.
32 Client *coordinator.Client
33 }
34
35 const noStreamDelay = 5 * time.Second
36
37 // coordinatorSource is a fetcher.Source implementation that uses the
38 // Coordiantor API.
39 type coordinatorSource struct {
40 sync.Mutex
41
42 stream *coordinator.Stream
43 tidx types.MessageIndex
44 tailFirst bool
45
46 streamState *coordinator.LogStream
47 }
48
49 func (s *coordinatorSource) LogEntries(c context.Context, req *fetcher.LogReques t) (
50 []*logpb.LogEntry, types.MessageIndex, error) {
51 s.Lock()
52 defer s.Unlock()
53
54 params := append(make([]coordinator.GetParam, 0, 4),
55 coordinator.LimitBytes(int(req.Bytes)),
56 coordinator.LimitCount(req.Count),
57 coordinator.Index(req.Index),
58 )
59
60 // If we haven't terminated, use this opportunity to fetch/update our st ream
61 // state.
62 var streamState coordinator.LogStream
63 reqStream := (s.streamState == nil || s.streamState.State.TerminalIndex < 0)
64 if reqStream {
65 params = append(params, coordinator.WithState(&streamState))
66 }
67
68 for {
69 logs, err := s.stream.Get(c, params...)
70 switch err {
71 case nil:
72 if reqStream {
73 s.streamState = &streamState
74 s.tidx = streamState.State.TerminalIndex
75 }
76 return logs, s.tidx, nil
77
78 case coordinator.ErrNoSuchStream:
79 log.Print("Stream does not exist. Sleeping pending regis tration.")
80
81 // Delay, interrupting if our Context is interrupted.
82 if tr := <-clock.After(c, noStreamDelay); tr.Incomplete( ) {
83 return nil, 0, tr.Err
84 }
85
86 default:
87 if err != nil {
88 return nil, 0, err
89 }
90 }
91 }
92 }
93
94 func logHandler(c context.Context, w http.ResponseWriter, host, path string) err or {
95 // TODO(hinoka): Move this to luci-config.
96 if !(host == "luci-logdog.appspot.com" || host == "luci-logdog-dev.appsp ot.com") {
97 return fmt.Errorf("unknown host %s", host)
98 }
99 spath := strings.SplitN(path, "/", 2)
100 project := cfgtypes.ProjectName(spath[0])
101 streamPath := types.StreamPath(spath[1])
102
103 t, err := auth.GetRPCTransport(c, auth.AsUser)
104 if err != nil {
105 return err
106 }
107 client := coordinator.NewClient(&prpc.Client{
108 C: &http.Client{
109 Transport: t,
110 },
111 Host: host,
112 })
113 stream := client.Stream(project, streamPath)
114
115 // Pull stream information.
116 src := coordinatorSource{
117 stream: stream,
118 tidx: -1, // Must be set to probe for state.
119 }
120 f := fetcher.New(c, fetcher.Options{
121 Source: &src,
122 Index: types.MessageIndex(0),
123 Count: 0,
124 // Try to buffer as much as possible, with a large window, since this is
125 // basically a cloud-to-cloud connection.
126 BufferCount: 200,
127 BufferBytes: int64(4 * 1024 * 1024),
128 PrefetchFactor: 10,
129 })
130
131 for {
132 // Read out of the buffer. This _should_ be bottlenecked on the network
133 // connection between the Flex instance and the client, via Fpri ntf().
134 entry, err := f.NextLogEntry()
135 switch err {
136 case io.EOF:
137 break // We're done.
138 case nil:
139 // Nothing
140 case coordinator.ErrNoAccess:
141 return errNoAuth // This will force a redirect
142 default:
143 return err
144 }
145 content := entry.GetText()
146 if content == nil {
147 break
148 }
149 for _, line := range content.Lines {
150 fmt.Fprint(w, line.Value)
151 fmt.Fprint(w, line.Delimiter)
152 }
153 }
154 return nil
155 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698