Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(216)

Side by Side Diff: server/internal/logdog/collector/collector.go

Issue 1906023002: LogDog: Add project namespace to Butler/Collector. (Closed) Base URL: https://github.com/luci/luci-go@logdog-project-archivist
Patch Set: Rebase? Created 4 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2016 The Chromium Authors. All rights reserved. 1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 package collector 5 package collector
6 6
7 import ( 7 import (
8 "bytes" 8 "bytes"
9 "time" 9 "time"
10 10
11 "github.com/golang/protobuf/proto" 11 "github.com/golang/protobuf/proto"
12 "github.com/luci/luci-go/common/config"
12 "github.com/luci/luci-go/common/errors" 13 "github.com/luci/luci-go/common/errors"
13 "github.com/luci/luci-go/common/logdog/butlerproto" 14 "github.com/luci/luci-go/common/logdog/butlerproto"
14 "github.com/luci/luci-go/common/logdog/types" 15 "github.com/luci/luci-go/common/logdog/types"
15 log "github.com/luci/luci-go/common/logging" 16 log "github.com/luci/luci-go/common/logging"
16 "github.com/luci/luci-go/common/parallel" 17 "github.com/luci/luci-go/common/parallel"
17 "github.com/luci/luci-go/common/proto/logdog/logpb" 18 "github.com/luci/luci-go/common/proto/logdog/logpb"
18 "github.com/luci/luci-go/server/internal/logdog/collector/coordinator" 19 "github.com/luci/luci-go/server/internal/logdog/collector/coordinator"
19 "github.com/luci/luci-go/server/logdog/storage" 20 "github.com/luci/luci-go/server/logdog/storage"
20 "golang.org/x/net/context" 21 "golang.org/x/net/context"
21 ) 22 )
(...skipping 52 matching lines...) Expand 10 before | Expand all | Expand 10 after
74 if pr.Bundle == nil { 75 if pr.Bundle == nil {
75 log.Errorf(ctx, "Protocol message did not contain a Butler bundl e.") 76 log.Errorf(ctx, "Protocol message did not contain a Butler bundl e.")
76 return nil 77 return nil
77 } 78 }
78 79
79 // If we're logging INFO or higher, log the ranges that this bundle 80 // If we're logging INFO or higher, log the ranges that this bundle
80 // represents. 81 // represents.
81 if log.IsLogging(ctx, log.Info) { 82 if log.IsLogging(ctx, log.Info) {
82 for i, entry := range pr.Bundle.Entries { 83 for i, entry := range pr.Bundle.Entries {
83 fields := log.Fields{ 84 fields := log.Fields{
84 » » » » "index": i, 85 » » » » "index": i,
85 » » » » "path": entry.GetDesc().Path(), 86 » » » » "project": pr.Bundle.Project,
87 » » » » "path": entry.GetDesc().Path(),
86 } 88 }
87 if entry.Terminal { 89 if entry.Terminal {
88 fields["terminalIndex"] = entry.TerminalIndex 90 fields["terminalIndex"] = entry.TerminalIndex
89 } 91 }
90 if logs := entry.GetLogs(); len(logs) > 0 { 92 if logs := entry.GetLogs(); len(logs) > 0 {
91 fields["logStart"] = logs[0].StreamIndex 93 fields["logStart"] = logs[0].StreamIndex
92 fields["logEnd"] = logs[len(logs)-1].StreamIndex 94 fields["logEnd"] = logs[len(logs)-1].StreamIndex
93 } 95 }
94 96
95 fields.Infof(ctx, "Processing log bundle entry.") 97 fields.Infof(ctx, "Processing log bundle entry.")
96 } 98 }
97 } 99 }
98 100
99 // If there are no entries, there is nothing to do.
100 if len(pr.Bundle.Entries) == 0 {
101 return nil
102 }
103
104 lw := bundleHandler{ 101 lw := bundleHandler{
105 msg: msg, 102 msg: msg,
106 md: pr.Metadata, 103 md: pr.Metadata,
107 b: pr.Bundle, 104 b: pr.Bundle,
108 } 105 }
109 106
107 if err := types.PrefixSecret(lw.b.Secret).Validate(); err != nil {
108 log.Fields{
109 log.ErrorKey: err,
110 "secretLength": len(lw.b.Secret),
111 }.Errorf(ctx, "Failed to validate prefix secret.")
112 return errors.New("invalid prefix secret")
113 }
114
115 // TODO(dnj): Make this actually an fatal error, once project becomes
116 // required.
117 if lw.b.Project != "" {
118 lw.project = config.ProjectName(lw.b.Project)
119 if err := lw.project.Validate(); err != nil {
120 log.Fields{
121 log.ErrorKey: err,
122 "project": lw.b.Project,
123 }.Errorf(ctx, "Failed to validate bundle project name.")
124 return errors.New("invalid bundle project name")
125 }
126 }
127
128 if err := types.StreamName(lw.b.Prefix).Validate(); err != nil {
129 log.Fields{
130 log.ErrorKey: err,
131 "prefix": lw.b.Prefix,
132 }.Errorf(ctx, "Failed to validate bundle prefix.")
133 return errors.New("invalid bundle prefix")
134 }
135
136 // If there are no entries, there is nothing to do.
137 if len(pr.Bundle.Entries) == 0 {
138 return nil
139 }
140
110 // Handle each bundle entry in parallel. We will use a separate work poo l 141 // Handle each bundle entry in parallel. We will use a separate work poo l
111 // here so that top-level bundle dispatch can't deadlock the processing tasks. 142 // here so that top-level bundle dispatch can't deadlock the processing tasks.
112 workers := c.MaxMessageWorkers 143 workers := c.MaxMessageWorkers
113 if workers <= 0 { 144 if workers <= 0 {
114 workers = DefaultMaxMessageWorkers 145 workers = DefaultMaxMessageWorkers
115 } 146 }
116 err := parallel.WorkPool(workers, func(taskC chan<- func() error) { 147 err := parallel.WorkPool(workers, func(taskC chan<- func() error) {
117 for _, be := range pr.Bundle.Entries { 148 for _, be := range pr.Bundle.Entries {
118 be := be 149 be := be
119 150
(...skipping 22 matching lines...) Expand all
142 173
143 // bundleHandler is a cumulative set of read-only state passed around by 174 // bundleHandler is a cumulative set of read-only state passed around by
144 // value for log processing. 175 // value for log processing.
145 type bundleHandler struct { 176 type bundleHandler struct {
146 // msg is the original message bytes. 177 // msg is the original message bytes.
147 msg []byte 178 msg []byte
148 // md is the metadata associated with the overall message. 179 // md is the metadata associated with the overall message.
149 md *logpb.ButlerMetadata 180 md *logpb.ButlerMetadata
150 // b is the Butler bundle. 181 // b is the Butler bundle.
151 b *logpb.ButlerLogBundle 182 b *logpb.ButlerLogBundle
183
184 // project is the validated project name.
185 project config.ProjectName
152 } 186 }
153 187
154 type bundleEntryHandler struct { 188 type bundleEntryHandler struct {
155 *bundleHandler 189 *bundleHandler
156 190
157 // be is the Bundle entry. 191 // be is the Bundle entry.
158 be *logpb.ButlerLogBundle_Entry 192 be *logpb.ButlerLogBundle_Entry
159 // path is the constructed path of the stream being processed. 193 // path is the constructed path of the stream being processed.
160 path types.StreamPath 194 path types.StreamPath
161 } 195 }
162 196
163 // processLogStream processes an individual set of log messages belonging to the 197 // processLogStream processes an individual set of log messages belonging to the
164 // same log stream. 198 // same log stream.
165 func (c *Collector) processLogStream(ctx context.Context, h *bundleEntryHandler) error { 199 func (c *Collector) processLogStream(ctx context.Context, h *bundleEntryHandler) error {
166 // If this bundle has neither log entries nor a terminal index, it is ju nk and 200 // If this bundle has neither log entries nor a terminal index, it is ju nk and
167 // must be discarded. 201 // must be discarded.
168 // 202 //
169 // This is more important than a basic optimization, as it enforces that no 203 // This is more important than a basic optimization, as it enforces that no
170 // zero-entry log streams can be ingested. Either some entries exist, or there 204 // zero-entry log streams can be ingested. Either some entries exist, or there
171 // is a promise of a terminal entry. 205 // is a promise of a terminal entry.
172 if len(h.be.Logs) == 0 && !h.be.Terminal { 206 if len(h.be.Logs) == 0 && !h.be.Terminal {
173 log.Warningf(ctx, "Bundle entry is non-terminal and contains no logs; discarding.") 207 log.Warningf(ctx, "Bundle entry is non-terminal and contains no logs; discarding.")
174 return nil 208 return nil
175 } 209 }
176 210
211 // If the descriptor has a Prefix, it must match the bundle's Prefix.
212 if p := h.be.Desc.Prefix; p != "" {
213 if p != h.b.Prefix {
214 log.Fields{
215 "bundlePrefix": h.b.Prefix,
216 "bundleEntryPrefix": p,
217 }.Errorf(ctx, "Bundle prefix does not match entry prefix .")
218 return errors.New("mismatched bundle and entry prefixes" )
219 }
220 } else {
221 // Fill in the bundle's Prefix.
222 h.be.Desc.Prefix = h.b.Prefix
223 }
224
177 if err := h.be.Desc.Validate(true); err != nil { 225 if err := h.be.Desc.Validate(true); err != nil {
178 log.Errorf(log.SetError(ctx, err), "Invalid log stream descripto r.") 226 log.Errorf(log.SetError(ctx, err), "Invalid log stream descripto r.")
179 return err 227 return err
180 } 228 }
181 h.path = types.StreamName(h.be.Desc.Prefix).Join(types.StreamName(h.be.D esc.Name)) 229 h.path = types.StreamName(h.be.Desc.Prefix).Join(types.StreamName(h.be.D esc.Name))
182 » ctx = log.SetField(ctx, "path", h.path) 230 » ctx = log.SetFields(ctx, log.Fields{
183 231 » » "project": h.project,
184 » if len(h.be.Secret) == 0 { 232 » » "path": h.path,
185 » » log.Errorf(ctx, "Missing secret.") 233 » })
186 » » return errors.New("missing stream secret")
187 » }
188 234
189 // Confirm that the log entries are valid and contiguous. Serialize the log 235 // Confirm that the log entries are valid and contiguous. Serialize the log
190 // entries for ingest as we validate them. 236 // entries for ingest as we validate them.
191 var logData [][]byte 237 var logData [][]byte
192 var blockIndex uint64 238 var blockIndex uint64
193 if logs := h.be.Logs; len(logs) > 0 { 239 if logs := h.be.Logs; len(logs) > 0 {
194 logData = make([][]byte, len(logs)) 240 logData = make([][]byte, len(logs))
195 blockIndex = logs[0].StreamIndex 241 blockIndex = logs[0].StreamIndex
196 242
197 for i, le := range logs { 243 for i, le := range logs {
(...skipping 25 matching lines...) Expand all
223 }.Errorf(ctx, "Failed to marshal log entry.") 269 }.Errorf(ctx, "Failed to marshal log entry.")
224 return errors.New("failed to marshal log entries ") 270 return errors.New("failed to marshal log entries ")
225 } 271 }
226 } 272 }
227 } 273 }
228 274
229 // Fetch our cached/remote state. This will replace our state object wit h the 275 // Fetch our cached/remote state. This will replace our state object wit h the
230 // fetched state, so any future calls will need to re-set the Secret val ue. 276 // fetched state, so any future calls will need to re-set the Secret val ue.
231 // TODO: Use timeout? 277 // TODO: Use timeout?
232 state, err := c.Coordinator.RegisterStream(ctx, &coordinator.LogStreamSt ate{ 278 state, err := c.Coordinator.RegisterStream(ctx, &coordinator.LogStreamSt ate{
279 Project: h.project,
233 Path: h.path, 280 Path: h.path,
234 » » Secret: types.PrefixSecret(h.be.Secret), 281 » » Secret: types.PrefixSecret(h.b.Secret),
235 ProtoVersion: h.md.ProtoVersion, 282 ProtoVersion: h.md.ProtoVersion,
236 }, h.be.Desc) 283 }, h.be.Desc)
237 if err != nil { 284 if err != nil {
238 log.WithError(err).Errorf(ctx, "Failed to get/register current s tream state.") 285 log.WithError(err).Errorf(ctx, "Failed to get/register current s tream state.")
239 return err 286 return err
240 } 287 }
241 288
242 // Does the log stream's secret match the expected secret? 289 // Does the log stream's secret match the expected secret?
243 » if !bytes.Equal(h.be.Secret, []byte(state.Secret)) { 290 » if !bytes.Equal(h.b.Secret, []byte(state.Secret)) {
244 log.Errorf(log.SetFields(ctx, log.Fields{ 291 log.Errorf(log.SetFields(ctx, log.Fields{
245 » » » "secret": h.be.Secret, 292 » » » "secret": h.b.Secret,
246 "expectedSecret": state.Secret, 293 "expectedSecret": state.Secret,
247 }), "Log entry has incorrect secret.") 294 }), "Log entry has incorrect secret.")
248 return nil 295 return nil
249 } 296 }
250 297
251 if state.Archived { 298 if state.Archived {
252 log.Infof(ctx, "Skipping message bundle for archived stream.") 299 log.Infof(ctx, "Skipping message bundle for archived stream.")
253 return nil 300 return nil
254 } 301 }
255 if state.Purged { 302 if state.Purged {
(...skipping 23 matching lines...) Expand all
279 } 326 }
280 327
281 // Perform stream processing operations. We can do these operations in 328 // Perform stream processing operations. We can do these operations in
282 // parallel. 329 // parallel.
283 return parallel.FanOutIn(func(taskC chan<- func() error) { 330 return parallel.FanOutIn(func(taskC chan<- func() error) {
284 // Store log data, if any was provided. It has already been vali dated. 331 // Store log data, if any was provided. It has already been vali dated.
285 if len(logData) > 0 { 332 if len(logData) > 0 {
286 taskC <- func() error { 333 taskC <- func() error {
287 // Post the log to storage. 334 // Post the log to storage.
288 err = c.Storage.Put(storage.PutRequest{ 335 err = c.Storage.Put(storage.PutRequest{
289 » » » » » Path: h.path, 336 » » » » » Project: h.project,
290 » » » » » Index: types.MessageIndex(blockIndex), 337 » » » » » Path: h.path,
291 » » » » » Values: logData, 338 » » » » » Index: types.MessageIndex(blockIndex),
339 » » » » » Values: logData,
292 }) 340 })
293 341
294 // If the log entry already exists, consider the "put" successful. 342 // If the log entry already exists, consider the "put" successful.
295 // Storage will return a transient error if one occurred. 343 // Storage will return a transient error if one occurred.
296 if err != nil && err != storage.ErrExists { 344 if err != nil && err != storage.ErrExists {
297 log.Fields{ 345 log.Fields{
298 log.ErrorKey: err, 346 log.ErrorKey: err,
299 "blockIndex": blockIndex, 347 "blockIndex": blockIndex,
300 }.Errorf(ctx, "Failed to load log entry into Storage.") 348 }.Errorf(ctx, "Failed to load log entry into Storage.")
301 return err 349 return err
(...skipping 35 matching lines...) Expand 10 before | Expand all | Expand 10 after
337 for _, e := range merr { 385 for _, e := range merr {
338 if hasTransientError(e) { 386 if hasTransientError(e) {
339 return true 387 return true
340 } 388 }
341 } 389 }
342 return false 390 return false
343 } 391 }
344 392
345 return errors.IsTransient(err) 393 return errors.IsTransient(err)
346 } 394 }
OLDNEW
« no previous file with comments | « common/proto/logdog/logpb/butler.pb.go ('k') | server/internal/logdog/collector/collector_test.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698