Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(62)

Side by Side Diff: milo/appengine/logs/logs.go

Issue 2949783002: [milo] appengine/* -> * (Closed)
Patch Set: rebase Created 3 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « milo/appengine/logs/README.md ('k') | milo/appengine/logs/main.go » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 package main
2
3 import (
4 "errors"
5 "fmt"
6 "io"
7 "log"
8 "net/http"
9 "strings"
10 "sync"
11 "time"
12
13 "golang.org/x/net/context"
14
15 "github.com/luci/luci-go/common/clock"
16 "github.com/luci/luci-go/grpc/prpc"
17 "github.com/luci/luci-go/logdog/api/logpb"
18 "github.com/luci/luci-go/logdog/client/coordinator"
19 "github.com/luci/luci-go/logdog/common/fetcher"
20 "github.com/luci/luci-go/logdog/common/types"
21 "github.com/luci/luci-go/luci_config/common/cfgtypes"
22 )
23
24 var errNoAuth = errors.New("no access")
25
26 type streamInfo struct {
27 Project cfgtypes.ProjectName
28 Path types.StreamPath
29
30 // Client is the HTTP client to use for LogDog communication.
31 Client *coordinator.Client
32 }
33
34 const noStreamDelay = 5 * time.Second
35
36 // coordinatorSource is a fetcher.Source implementation that uses the
37 // Coordiantor API.
38 type coordinatorSource struct {
39 sync.Mutex
40
41 stream *coordinator.Stream
42 tidx types.MessageIndex
43 tailFirst bool
44
45 streamState *coordinator.LogStream
46 }
47
48 func (s *coordinatorSource) LogEntries(c context.Context, req *fetcher.LogReques t) (
49 []*logpb.LogEntry, types.MessageIndex, error) {
50 s.Lock()
51 // TODO(hinoka): If fetching multiple streams, this would cause requests
52 // to be serialized. We may not want this.
53 defer s.Unlock()
54
55 params := append(make([]coordinator.GetParam, 0, 4),
56 coordinator.LimitBytes(int(req.Bytes)),
57 coordinator.LimitCount(req.Count),
58 coordinator.Index(req.Index),
59 )
60
61 // If we haven't terminated, use this opportunity to fetch/update our st ream
62 // state.
63 var streamState coordinator.LogStream
64 reqState := s.streamState == nil || s.streamState.State.TerminalIndex < 0
65 if reqState {
66 params = append(params, coordinator.WithState(&streamState))
67 }
68
69 for {
70 logs, err := s.stream.Get(c, params...)
71 switch err {
72 case nil:
73 if reqState {
74 s.streamState = &streamState
75 s.tidx = streamState.State.TerminalIndex
76 }
77 return logs, s.tidx, nil
78
79 case coordinator.ErrNoSuchStream:
80 log.Print("Stream does not exist. Sleeping pending regis tration.")
81
82 // Delay, interrupting if our Context is interrupted.
83 if tr := <-clock.After(c, noStreamDelay); tr.Incomplete( ) {
84 return nil, 0, tr.Err
85 }
86
87 default:
88 return nil, 0, err
89 }
90 }
91 }
92
93 func logHandler(c context.Context, w http.ResponseWriter, host, path string) err or {
94 // TODO(hinoka): Move this to luci-config.
95 if !(host == "luci-logdog.appspot.com" || host == "luci-logdog-dev.appsp ot.com") {
96 return fmt.Errorf("unknown host %s", host)
97 }
98 spath := strings.SplitN(path, "/", 2)
99 if len(spath) != 2 {
100 return fmt.Errorf("%s is not a valid path", path)
101 }
102 project := cfgtypes.ProjectName(spath[0])
103 streamPath := types.StreamPath(spath[1])
104
105 client := coordinator.NewClient(&prpc.Client{
106 C: &http.Client{
107 // TODO(hinoka): Once crbug.com/712506 is resolved, figu re out how to get auth.
108 Transport: http.DefaultTransport,
109 },
110 Host: host,
111 })
112 stream := client.Stream(project, streamPath)
113
114 // Pull stream information.
115 f := fetcher.New(c, fetcher.Options{
116 Source: &coordinatorSource{
117 stream: stream,
118 tidx: -1, // Must be set to probe for state.
119 },
120 Index: types.MessageIndex(0),
121 Count: 0,
122 // Try to buffer as much as possible, with a large window, since this is
123 // basically a cloud-to-cloud connection.
124 BufferCount: 200,
125 BufferBytes: int64(4 * 1024 * 1024),
126 PrefetchFactor: 10,
127 })
128
129 for {
130 // Read out of the buffer. This _should_ be bottlenecked on the network
131 // connection between the Flex instance and the client, via Fpri ntf().
132 entry, err := f.NextLogEntry()
133 switch err {
134 case io.EOF:
135 return nil // We're done.
136 case nil:
137 // Nothing
138 case coordinator.ErrNoAccess:
139 return errNoAuth // This will force a redirect
140 default:
141 return err
142 }
143 content := entry.GetText()
144 if content == nil {
145 break
146 }
147 for _, line := range content.Lines {
148 fmt.Fprint(w, line.Value)
149 fmt.Fprint(w, line.Delimiter)
150 }
151 }
152 return nil
153 }
OLDNEW
« no previous file with comments | « milo/appengine/logs/README.md ('k') | milo/appengine/logs/main.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698