OLD | NEW |
1 // Copyright 2016 The Chromium Authors. All rights reserved. | 1 // Copyright 2016 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 package collector | 5 package collector |
6 | 6 |
7 import ( | 7 import ( |
8 "bytes" | 8 "bytes" |
9 "time" | 9 "time" |
10 | 10 |
11 "github.com/golang/protobuf/proto" | 11 "github.com/golang/protobuf/proto" |
| 12 "github.com/luci/luci-go/common/config" |
12 "github.com/luci/luci-go/common/errors" | 13 "github.com/luci/luci-go/common/errors" |
13 "github.com/luci/luci-go/common/logdog/butlerproto" | 14 "github.com/luci/luci-go/common/logdog/butlerproto" |
14 "github.com/luci/luci-go/common/logdog/types" | 15 "github.com/luci/luci-go/common/logdog/types" |
15 log "github.com/luci/luci-go/common/logging" | 16 log "github.com/luci/luci-go/common/logging" |
16 "github.com/luci/luci-go/common/parallel" | 17 "github.com/luci/luci-go/common/parallel" |
17 "github.com/luci/luci-go/common/proto/logdog/logpb" | 18 "github.com/luci/luci-go/common/proto/logdog/logpb" |
18 "github.com/luci/luci-go/server/internal/logdog/collector/coordinator" | 19 "github.com/luci/luci-go/server/internal/logdog/collector/coordinator" |
19 "github.com/luci/luci-go/server/logdog/storage" | 20 "github.com/luci/luci-go/server/logdog/storage" |
20 "golang.org/x/net/context" | 21 "golang.org/x/net/context" |
21 ) | 22 ) |
(...skipping 52 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
74 if pr.Bundle == nil { | 75 if pr.Bundle == nil { |
75 log.Errorf(ctx, "Protocol message did not contain a Butler bundl
e.") | 76 log.Errorf(ctx, "Protocol message did not contain a Butler bundl
e.") |
76 return nil | 77 return nil |
77 } | 78 } |
78 | 79 |
79 // If we're logging INFO or higher, log the ranges that this bundle | 80 // If we're logging INFO or higher, log the ranges that this bundle |
80 // represents. | 81 // represents. |
81 if log.IsLogging(ctx, log.Info) { | 82 if log.IsLogging(ctx, log.Info) { |
82 for i, entry := range pr.Bundle.Entries { | 83 for i, entry := range pr.Bundle.Entries { |
83 fields := log.Fields{ | 84 fields := log.Fields{ |
84 » » » » "index": i, | 85 » » » » "index": i, |
85 » » » » "path": entry.GetDesc().Path(), | 86 » » » » "project": pr.Bundle.Project, |
| 87 » » » » "path": entry.GetDesc().Path(), |
86 } | 88 } |
87 if entry.Terminal { | 89 if entry.Terminal { |
88 fields["terminalIndex"] = entry.TerminalIndex | 90 fields["terminalIndex"] = entry.TerminalIndex |
89 } | 91 } |
90 if logs := entry.GetLogs(); len(logs) > 0 { | 92 if logs := entry.GetLogs(); len(logs) > 0 { |
91 fields["logStart"] = logs[0].StreamIndex | 93 fields["logStart"] = logs[0].StreamIndex |
92 fields["logEnd"] = logs[len(logs)-1].StreamIndex | 94 fields["logEnd"] = logs[len(logs)-1].StreamIndex |
93 } | 95 } |
94 | 96 |
95 fields.Infof(ctx, "Processing log bundle entry.") | 97 fields.Infof(ctx, "Processing log bundle entry.") |
96 } | 98 } |
97 } | 99 } |
98 | 100 |
99 // If there are no entries, there is nothing to do. | |
100 if len(pr.Bundle.Entries) == 0 { | |
101 return nil | |
102 } | |
103 | |
104 lw := bundleHandler{ | 101 lw := bundleHandler{ |
105 msg: msg, | 102 msg: msg, |
106 md: pr.Metadata, | 103 md: pr.Metadata, |
107 b: pr.Bundle, | 104 b: pr.Bundle, |
108 } | 105 } |
109 | 106 |
| 107 if err := types.PrefixSecret(lw.b.Secret).Validate(); err != nil { |
| 108 log.Fields{ |
| 109 log.ErrorKey: err, |
| 110 "secretLength": len(lw.b.Secret), |
| 111 }.Errorf(ctx, "Failed to validate prefix secret.") |
| 112 return errors.New("invalid prefix secret") |
| 113 } |
| 114 |
| 115 // TODO(dnj): Make this actually an fatal error, once project becomes |
| 116 // required. |
| 117 if lw.b.Project != "" { |
| 118 lw.project = config.ProjectName(lw.b.Project) |
| 119 if err := lw.project.Validate(); err != nil { |
| 120 log.Fields{ |
| 121 log.ErrorKey: err, |
| 122 "project": lw.b.Project, |
| 123 }.Errorf(ctx, "Failed to validate bundle project name.") |
| 124 return errors.New("invalid bundle project name") |
| 125 } |
| 126 } |
| 127 |
| 128 if err := types.StreamName(lw.b.Prefix).Validate(); err != nil { |
| 129 log.Fields{ |
| 130 log.ErrorKey: err, |
| 131 "prefix": lw.b.Prefix, |
| 132 }.Errorf(ctx, "Failed to validate bundle prefix.") |
| 133 return errors.New("invalid bundle prefix") |
| 134 } |
| 135 |
| 136 // If there are no entries, there is nothing to do. |
| 137 if len(pr.Bundle.Entries) == 0 { |
| 138 return nil |
| 139 } |
| 140 |
110 // Handle each bundle entry in parallel. We will use a separate work poo
l | 141 // Handle each bundle entry in parallel. We will use a separate work poo
l |
111 // here so that top-level bundle dispatch can't deadlock the processing
tasks. | 142 // here so that top-level bundle dispatch can't deadlock the processing
tasks. |
112 workers := c.MaxMessageWorkers | 143 workers := c.MaxMessageWorkers |
113 if workers <= 0 { | 144 if workers <= 0 { |
114 workers = DefaultMaxMessageWorkers | 145 workers = DefaultMaxMessageWorkers |
115 } | 146 } |
116 err := parallel.WorkPool(workers, func(taskC chan<- func() error) { | 147 err := parallel.WorkPool(workers, func(taskC chan<- func() error) { |
117 for _, be := range pr.Bundle.Entries { | 148 for _, be := range pr.Bundle.Entries { |
118 be := be | 149 be := be |
119 | 150 |
(...skipping 22 matching lines...) Expand all Loading... |
142 | 173 |
143 // bundleHandler is a cumulative set of read-only state passed around by | 174 // bundleHandler is a cumulative set of read-only state passed around by |
144 // value for log processing. | 175 // value for log processing. |
145 type bundleHandler struct { | 176 type bundleHandler struct { |
146 // msg is the original message bytes. | 177 // msg is the original message bytes. |
147 msg []byte | 178 msg []byte |
148 // md is the metadata associated with the overall message. | 179 // md is the metadata associated with the overall message. |
149 md *logpb.ButlerMetadata | 180 md *logpb.ButlerMetadata |
150 // b is the Butler bundle. | 181 // b is the Butler bundle. |
151 b *logpb.ButlerLogBundle | 182 b *logpb.ButlerLogBundle |
| 183 |
| 184 // project is the validated project name. |
| 185 project config.ProjectName |
152 } | 186 } |
153 | 187 |
154 type bundleEntryHandler struct { | 188 type bundleEntryHandler struct { |
155 *bundleHandler | 189 *bundleHandler |
156 | 190 |
157 // be is the Bundle entry. | 191 // be is the Bundle entry. |
158 be *logpb.ButlerLogBundle_Entry | 192 be *logpb.ButlerLogBundle_Entry |
159 // path is the constructed path of the stream being processed. | 193 // path is the constructed path of the stream being processed. |
160 path types.StreamPath | 194 path types.StreamPath |
161 } | 195 } |
162 | 196 |
163 // processLogStream processes an individual set of log messages belonging to the | 197 // processLogStream processes an individual set of log messages belonging to the |
164 // same log stream. | 198 // same log stream. |
165 func (c *Collector) processLogStream(ctx context.Context, h *bundleEntryHandler)
error { | 199 func (c *Collector) processLogStream(ctx context.Context, h *bundleEntryHandler)
error { |
166 // If this bundle has neither log entries nor a terminal index, it is ju
nk and | 200 // If this bundle has neither log entries nor a terminal index, it is ju
nk and |
167 // must be discarded. | 201 // must be discarded. |
168 // | 202 // |
169 // This is more important than a basic optimization, as it enforces that
no | 203 // This is more important than a basic optimization, as it enforces that
no |
170 // zero-entry log streams can be ingested. Either some entries exist, or
there | 204 // zero-entry log streams can be ingested. Either some entries exist, or
there |
171 // is a promise of a terminal entry. | 205 // is a promise of a terminal entry. |
172 if len(h.be.Logs) == 0 && !h.be.Terminal { | 206 if len(h.be.Logs) == 0 && !h.be.Terminal { |
173 log.Warningf(ctx, "Bundle entry is non-terminal and contains no
logs; discarding.") | 207 log.Warningf(ctx, "Bundle entry is non-terminal and contains no
logs; discarding.") |
174 return nil | 208 return nil |
175 } | 209 } |
176 | 210 |
| 211 // If the descriptor has a Prefix, it must match the bundle's Prefix. |
| 212 if p := h.be.Desc.Prefix; p != "" { |
| 213 if p != h.b.Prefix { |
| 214 log.Fields{ |
| 215 "bundlePrefix": h.b.Prefix, |
| 216 "bundleEntryPrefix": p, |
| 217 }.Errorf(ctx, "Bundle prefix does not match entry prefix
.") |
| 218 return errors.New("mismatched bundle and entry prefixes"
) |
| 219 } |
| 220 } else { |
| 221 // Fill in the bundle's Prefix. |
| 222 h.be.Desc.Prefix = h.b.Prefix |
| 223 } |
| 224 |
177 if err := h.be.Desc.Validate(true); err != nil { | 225 if err := h.be.Desc.Validate(true); err != nil { |
178 log.Errorf(log.SetError(ctx, err), "Invalid log stream descripto
r.") | 226 log.Errorf(log.SetError(ctx, err), "Invalid log stream descripto
r.") |
179 return err | 227 return err |
180 } | 228 } |
181 h.path = types.StreamName(h.be.Desc.Prefix).Join(types.StreamName(h.be.D
esc.Name)) | 229 h.path = types.StreamName(h.be.Desc.Prefix).Join(types.StreamName(h.be.D
esc.Name)) |
182 » ctx = log.SetField(ctx, "path", h.path) | 230 » ctx = log.SetFields(ctx, log.Fields{ |
183 | 231 » » "project": h.project, |
184 » if len(h.be.Secret) == 0 { | 232 » » "path": h.path, |
185 » » log.Errorf(ctx, "Missing secret.") | 233 » }) |
186 » » return errors.New("missing stream secret") | |
187 » } | |
188 | 234 |
189 // Confirm that the log entries are valid and contiguous. Serialize the
log | 235 // Confirm that the log entries are valid and contiguous. Serialize the
log |
190 // entries for ingest as we validate them. | 236 // entries for ingest as we validate them. |
191 var logData [][]byte | 237 var logData [][]byte |
192 var blockIndex uint64 | 238 var blockIndex uint64 |
193 if logs := h.be.Logs; len(logs) > 0 { | 239 if logs := h.be.Logs; len(logs) > 0 { |
194 logData = make([][]byte, len(logs)) | 240 logData = make([][]byte, len(logs)) |
195 blockIndex = logs[0].StreamIndex | 241 blockIndex = logs[0].StreamIndex |
196 | 242 |
197 for i, le := range logs { | 243 for i, le := range logs { |
(...skipping 25 matching lines...) Expand all Loading... |
223 }.Errorf(ctx, "Failed to marshal log entry.") | 269 }.Errorf(ctx, "Failed to marshal log entry.") |
224 return errors.New("failed to marshal log entries
") | 270 return errors.New("failed to marshal log entries
") |
225 } | 271 } |
226 } | 272 } |
227 } | 273 } |
228 | 274 |
229 // Fetch our cached/remote state. This will replace our state object wit
h the | 275 // Fetch our cached/remote state. This will replace our state object wit
h the |
230 // fetched state, so any future calls will need to re-set the Secret val
ue. | 276 // fetched state, so any future calls will need to re-set the Secret val
ue. |
231 // TODO: Use timeout? | 277 // TODO: Use timeout? |
232 state, err := c.Coordinator.RegisterStream(ctx, &coordinator.LogStreamSt
ate{ | 278 state, err := c.Coordinator.RegisterStream(ctx, &coordinator.LogStreamSt
ate{ |
| 279 Project: h.project, |
233 Path: h.path, | 280 Path: h.path, |
234 » » Secret: types.PrefixSecret(h.be.Secret), | 281 » » Secret: types.PrefixSecret(h.b.Secret), |
235 ProtoVersion: h.md.ProtoVersion, | 282 ProtoVersion: h.md.ProtoVersion, |
236 }, h.be.Desc) | 283 }, h.be.Desc) |
237 if err != nil { | 284 if err != nil { |
238 log.WithError(err).Errorf(ctx, "Failed to get/register current s
tream state.") | 285 log.WithError(err).Errorf(ctx, "Failed to get/register current s
tream state.") |
239 return err | 286 return err |
240 } | 287 } |
241 | 288 |
242 // Does the log stream's secret match the expected secret? | 289 // Does the log stream's secret match the expected secret? |
243 » if !bytes.Equal(h.be.Secret, []byte(state.Secret)) { | 290 » if !bytes.Equal(h.b.Secret, []byte(state.Secret)) { |
244 log.Errorf(log.SetFields(ctx, log.Fields{ | 291 log.Errorf(log.SetFields(ctx, log.Fields{ |
245 » » » "secret": h.be.Secret, | 292 » » » "secret": h.b.Secret, |
246 "expectedSecret": state.Secret, | 293 "expectedSecret": state.Secret, |
247 }), "Log entry has incorrect secret.") | 294 }), "Log entry has incorrect secret.") |
248 return nil | 295 return nil |
249 } | 296 } |
250 | 297 |
251 if state.Archived { | 298 if state.Archived { |
252 log.Infof(ctx, "Skipping message bundle for archived stream.") | 299 log.Infof(ctx, "Skipping message bundle for archived stream.") |
253 return nil | 300 return nil |
254 } | 301 } |
255 if state.Purged { | 302 if state.Purged { |
(...skipping 23 matching lines...) Expand all Loading... |
279 } | 326 } |
280 | 327 |
281 // Perform stream processing operations. We can do these operations in | 328 // Perform stream processing operations. We can do these operations in |
282 // parallel. | 329 // parallel. |
283 return parallel.FanOutIn(func(taskC chan<- func() error) { | 330 return parallel.FanOutIn(func(taskC chan<- func() error) { |
284 // Store log data, if any was provided. It has already been vali
dated. | 331 // Store log data, if any was provided. It has already been vali
dated. |
285 if len(logData) > 0 { | 332 if len(logData) > 0 { |
286 taskC <- func() error { | 333 taskC <- func() error { |
287 // Post the log to storage. | 334 // Post the log to storage. |
288 err = c.Storage.Put(storage.PutRequest{ | 335 err = c.Storage.Put(storage.PutRequest{ |
289 » » » » » Path: h.path, | 336 » » » » » Project: h.project, |
290 » » » » » Index: types.MessageIndex(blockIndex), | 337 » » » » » Path: h.path, |
291 » » » » » Values: logData, | 338 » » » » » Index: types.MessageIndex(blockIndex), |
| 339 » » » » » Values: logData, |
292 }) | 340 }) |
293 | 341 |
294 // If the log entry already exists, consider the
"put" successful. | 342 // If the log entry already exists, consider the
"put" successful. |
295 // Storage will return a transient error if one
occurred. | 343 // Storage will return a transient error if one
occurred. |
296 if err != nil && err != storage.ErrExists { | 344 if err != nil && err != storage.ErrExists { |
297 log.Fields{ | 345 log.Fields{ |
298 log.ErrorKey: err, | 346 log.ErrorKey: err, |
299 "blockIndex": blockIndex, | 347 "blockIndex": blockIndex, |
300 }.Errorf(ctx, "Failed to load log entry
into Storage.") | 348 }.Errorf(ctx, "Failed to load log entry
into Storage.") |
301 return err | 349 return err |
(...skipping 35 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
337 for _, e := range merr { | 385 for _, e := range merr { |
338 if hasTransientError(e) { | 386 if hasTransientError(e) { |
339 return true | 387 return true |
340 } | 388 } |
341 } | 389 } |
342 return false | 390 return false |
343 } | 391 } |
344 | 392 |
345 return errors.IsTransient(err) | 393 return errors.IsTransient(err) |
346 } | 394 } |
OLD | NEW |