| OLD | NEW |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 package services | 5 package services |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "crypto/subtle" | 8 "crypto/subtle" |
| 9 "errors" | 9 "errors" |
| 10 | 10 |
| (...skipping 44 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 55 Archived: ls.Archived(), | 55 Archived: ls.Archived(), |
| 56 Purged: ls.Purged, | 56 Purged: ls.Purged, |
| 57 } | 57 } |
| 58 if !ls.Terminated() { | 58 if !ls.Terminated() { |
| 59 st.TerminalIndex = -1 | 59 st.TerminalIndex = -1 |
| 60 } | 60 } |
| 61 return &st | 61 return &st |
| 62 } | 62 } |
| 63 | 63 |
| 64 // RegisterStream is an idempotent stream state register operation. | 64 // RegisterStream is an idempotent stream state register operation. |
| 65 func (b *Server) RegisterStream(c context.Context, req *logdog.RegisterStreamReq
uest) ( | 65 func (s *Server) RegisterStream(c context.Context, req *logdog.RegisterStreamReq
uest) (*logdog.RegisterStreamResponse, error) { |
| 66 » *logdog.RegisterStreamResponse, error) { | 66 » svc := s.GetServices() |
| 67 » if err := Auth(c); err != nil { | 67 » if err := Auth(c, svc); err != nil { |
| 68 return nil, err | 68 return nil, err |
| 69 } | 69 } |
| 70 | 70 |
| 71 log.Fields{ | 71 log.Fields{ |
| 72 "path": req.Path, | 72 "path": req.Path, |
| 73 }.Infof(c, "Registration request for log stream.") | 73 }.Infof(c, "Registration request for log stream.") |
| 74 | 74 |
| 75 path := types.StreamPath(req.Path) | 75 path := types.StreamPath(req.Path) |
| 76 if err := path.Validate(); err != nil { | 76 if err := path.Validate(); err != nil { |
| 77 return nil, grpcutil.Errf(codes.InvalidArgument, "Invalid path (
%s): %s", path, err) | 77 return nil, grpcutil.Errf(codes.InvalidArgument, "Invalid path (
%s): %s", path, err) |
| (...skipping 96 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 174 log.Infof(c, "Registering new log stream'") | 174 log.Infof(c, "Registering new log stream'") |
| 175 | 175 |
| 176 // The stream is not yet registered. | 176 // The stream is not yet registered. |
| 177 if err := m.LoadDescriptor(m.req.Desc); err != nil { | 177 if err := m.LoadDescriptor(m.req.Desc); err != nil { |
| 178 log.Fields{ | 178 log.Fields{ |
| 179 log.ErrorKey: err, | 179 log.ErrorKey: err, |
| 180 }.Errorf(c, "Failed to load descriptor into LogStream.") | 180 }.Errorf(c, "Failed to load descriptor into LogStream.") |
| 181 return nil, grpcutil.Errf(codes.InvalidArgument, "Failed to load
descriptor.") | 181 return nil, grpcutil.Errf(codes.InvalidArgument, "Failed to load
descriptor.") |
| 182 } | 182 } |
| 183 | 183 |
| 184 now := ds.RoundTime(clock.Now(c).UTC()) | |
| 185 m.Secret = m.req.Secret | 184 m.Secret = m.req.Secret |
| 186 m.ProtoVersion = m.req.ProtoVersion | 185 m.ProtoVersion = m.req.ProtoVersion |
| 187 » m.State = coordinator.LSPending | 186 » m.State = coordinator.LSStreaming |
| 188 » m.Created = now | 187 » m.Created = ds.RoundTime(clock.Now(c).UTC()) |
| 189 » m.Updated = now | |
| 190 m.TerminalIndex = -1 | 188 m.TerminalIndex = -1 |
| 191 | 189 |
| 192 » if err := m.Put(di); err != nil { | 190 » if err := di.Put(m.LogStream); err != nil { |
| 193 log.Fields{ | 191 log.Fields{ |
| 194 log.ErrorKey: err, | 192 log.ErrorKey: err, |
| 195 }.Errorf(c, "Failed to Put() LogStream.") | 193 }.Errorf(c, "Failed to Put() LogStream.") |
| 196 return nil, grpcutil.Internal | 194 return nil, grpcutil.Internal |
| 197 } | 195 } |
| 198 | 196 |
| 199 return []tumble.Mutation{ | 197 return []tumble.Mutation{ |
| 200 &mutations.PutHierarchyMutation{ | 198 &mutations.PutHierarchyMutation{ |
| 201 Path: m.Path(), | 199 Path: m.Path(), |
| 202 }, | 200 }, |
| 203 }, nil | 201 }, nil |
| 204 } | 202 } |
| 205 | 203 |
| 206 func (m registerStreamMutation) Root(c context.Context) *ds.Key { | 204 func (m registerStreamMutation) Root(c context.Context) *ds.Key { |
| 207 return ds.Get(c).KeyForObj(m.LogStream) | 205 return ds.Get(c).KeyForObj(m.LogStream) |
| 208 } | 206 } |
| OLD | NEW |