| OLD | NEW |
| (Empty) | |
| 1 package main |
| 2 |
| 3 import ( |
| 4 "errors" |
| 5 "fmt" |
| 6 "io" |
| 7 "log" |
| 8 "net/http" |
| 9 "strings" |
| 10 "sync" |
| 11 "time" |
| 12 |
| 13 "github.com/luci/luci-go/common/clock" |
| 14 "github.com/luci/luci-go/grpc/prpc" |
| 15 "github.com/luci/luci-go/logdog/api/logpb" |
| 16 "github.com/luci/luci-go/logdog/client/coordinator" |
| 17 "github.com/luci/luci-go/logdog/common/fetcher" |
| 18 "github.com/luci/luci-go/logdog/common/types" |
| 19 "github.com/luci/luci-go/luci_config/common/cfgtypes" |
| 20 "github.com/luci/luci-go/server/auth" |
| 21 |
| 22 "golang.org/x/net/context" |
| 23 ) |
| 24 |
| 25 var errNoAuth = errors.New("no access") |
| 26 |
| 27 type streamInfo struct { |
| 28 Project cfgtypes.ProjectName |
| 29 Path types.StreamPath |
| 30 |
| 31 // Client is the HTTP client to use for LogDog communication. |
| 32 Client *coordinator.Client |
| 33 } |
| 34 |
| 35 const noStreamDelay = 5 * time.Second |
| 36 |
| 37 // coordinatorSource is a fetcher.Source implementation that uses the |
| 38 // Coordiantor API. |
| 39 type coordinatorSource struct { |
| 40 sync.Mutex |
| 41 |
| 42 stream *coordinator.Stream |
| 43 tidx types.MessageIndex |
| 44 tailFirst bool |
| 45 |
| 46 streamState *coordinator.LogStream |
| 47 } |
| 48 |
| 49 func (s *coordinatorSource) LogEntries(c context.Context, req *fetcher.LogReques
t) ( |
| 50 []*logpb.LogEntry, types.MessageIndex, error) { |
| 51 s.Lock() |
| 52 defer s.Unlock() |
| 53 |
| 54 params := append(make([]coordinator.GetParam, 0, 4), |
| 55 coordinator.LimitBytes(int(req.Bytes)), |
| 56 coordinator.LimitCount(req.Count), |
| 57 coordinator.Index(req.Index), |
| 58 ) |
| 59 |
| 60 // If we haven't terminated, use this opportunity to fetch/update our st
ream |
| 61 // state. |
| 62 var streamState coordinator.LogStream |
| 63 reqStream := (s.streamState == nil || s.streamState.State.TerminalIndex
< 0) |
| 64 if reqStream { |
| 65 params = append(params, coordinator.WithState(&streamState)) |
| 66 } |
| 67 |
| 68 for { |
| 69 logs, err := s.stream.Get(c, params...) |
| 70 switch err { |
| 71 case nil: |
| 72 if reqStream { |
| 73 s.streamState = &streamState |
| 74 s.tidx = streamState.State.TerminalIndex |
| 75 } |
| 76 return logs, s.tidx, nil |
| 77 |
| 78 case coordinator.ErrNoSuchStream: |
| 79 log.Print("Stream does not exist. Sleeping pending regis
tration.") |
| 80 |
| 81 // Delay, interrupting if our Context is interrupted. |
| 82 if tr := <-clock.After(c, noStreamDelay); tr.Incomplete(
) { |
| 83 return nil, 0, tr.Err |
| 84 } |
| 85 |
| 86 default: |
| 87 if err != nil { |
| 88 return nil, 0, err |
| 89 } |
| 90 } |
| 91 } |
| 92 } |
| 93 |
| 94 func logHandler(c context.Context, w http.ResponseWriter, host, path string) err
or { |
| 95 // TODO(hinoka): Move this to luci-config. |
| 96 if !(host == "luci-logdog.appspot.com" || host == "luci-logdog-dev.appsp
ot.com") { |
| 97 return fmt.Errorf("unknown host %s", host) |
| 98 } |
| 99 spath := strings.SplitN(path, "/", 2) |
| 100 project := cfgtypes.ProjectName(spath[0]) |
| 101 streamPath := types.StreamPath(spath[1]) |
| 102 |
| 103 t, err := auth.GetRPCTransport(c, auth.AsUser) |
| 104 if err != nil { |
| 105 return err |
| 106 } |
| 107 client := coordinator.NewClient(&prpc.Client{ |
| 108 C: &http.Client{ |
| 109 Transport: t, |
| 110 }, |
| 111 Host: host, |
| 112 }) |
| 113 stream := client.Stream(project, streamPath) |
| 114 |
| 115 // Pull stream information. |
| 116 src := coordinatorSource{ |
| 117 stream: stream, |
| 118 tidx: -1, // Must be set to probe for state. |
| 119 } |
| 120 f := fetcher.New(c, fetcher.Options{ |
| 121 Source: &src, |
| 122 Index: types.MessageIndex(0), |
| 123 Count: 0, |
| 124 // Try to buffer as much as possible, with a large window, since
this is |
| 125 // basically a cloud-to-cloud connection. |
| 126 BufferCount: 200, |
| 127 BufferBytes: int64(4 * 1024 * 1024), |
| 128 PrefetchFactor: 10, |
| 129 }) |
| 130 |
| 131 for { |
| 132 // Read out of the buffer. This _should_ be bottlenecked on the
network |
| 133 // connection between the Flex instance and the client, via Fpri
ntf(). |
| 134 entry, err := f.NextLogEntry() |
| 135 switch err { |
| 136 case io.EOF: |
| 137 break // We're done. |
| 138 case nil: |
| 139 // Nothing |
| 140 case coordinator.ErrNoAccess: |
| 141 return errNoAuth // This will force a redirect |
| 142 default: |
| 143 return err |
| 144 } |
| 145 content := entry.GetText() |
| 146 if content == nil { |
| 147 break |
| 148 } |
| 149 for _, line := range content.Lines { |
| 150 fmt.Fprint(w, line.Value) |
| 151 fmt.Fprint(w, line.Delimiter) |
| 152 } |
| 153 } |
| 154 return nil |
| 155 } |
| OLD | NEW |