| OLD | NEW |
| 1 // Copyright 2015 The LUCI Authors. | 1 // Copyright 2015 The LUCI Authors. |
| 2 // | 2 // |
| 3 // Licensed under the Apache License, Version 2.0 (the "License"); | 3 // Licensed under the Apache License, Version 2.0 (the "License"); |
| 4 // you may not use this file except in compliance with the License. | 4 // you may not use this file except in compliance with the License. |
| 5 // You may obtain a copy of the License at | 5 // You may obtain a copy of the License at |
| 6 // | 6 // |
| 7 // http://www.apache.org/licenses/LICENSE-2.0 | 7 // http://www.apache.org/licenses/LICENSE-2.0 |
| 8 // | 8 // |
| 9 // Unless required by applicable law or agreed to in writing, software | 9 // Unless required by applicable law or agreed to in writing, software |
| 10 // distributed under the License is distributed on an "AS IS" BASIS, | 10 // distributed under the License is distributed on an "AS IS" BASIS, |
| 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 12 // See the License for the specific language governing permissions and | 12 // See the License for the specific language governing permissions and |
| 13 // limitations under the License. | 13 // limitations under the License. |
| 14 | 14 |
| 15 package services | 15 package services |
| 16 | 16 |
| 17 import ( | 17 import ( |
| 18 "crypto/subtle" | 18 "crypto/subtle" |
| 19 | 19 |
| 20 "github.com/golang/protobuf/proto" | 20 "github.com/golang/protobuf/proto" |
| 21 ds "github.com/luci/gae/service/datastore" | 21 ds "github.com/luci/gae/service/datastore" |
| 22 "github.com/luci/luci-go/common/clock" | 22 "github.com/luci/luci-go/common/clock" |
| 23 log "github.com/luci/luci-go/common/logging" | 23 log "github.com/luci/luci-go/common/logging" |
| 24 "github.com/luci/luci-go/grpc/grpcutil" | 24 "github.com/luci/luci-go/grpc/grpcutil" |
| 25 "github.com/luci/luci-go/logdog/api/endpoints/coordinator/services/v1" | 25 "github.com/luci/luci-go/logdog/api/endpoints/coordinator/services/v1" |
| 26 "github.com/luci/luci-go/logdog/api/logpb" | 26 "github.com/luci/luci-go/logdog/api/logpb" |
| 27 "github.com/luci/luci-go/logdog/appengine/coordinator" | 27 "github.com/luci/luci-go/logdog/appengine/coordinator" |
| 28 "github.com/luci/luci-go/logdog/appengine/coordinator/endpoints" | 28 "github.com/luci/luci-go/logdog/appengine/coordinator/endpoints" |
| 29 "github.com/luci/luci-go/logdog/appengine/coordinator/hierarchy" | |
| 30 "github.com/luci/luci-go/logdog/appengine/coordinator/mutations" | 29 "github.com/luci/luci-go/logdog/appengine/coordinator/mutations" |
| 31 "github.com/luci/luci-go/logdog/common/types" | 30 "github.com/luci/luci-go/logdog/common/types" |
| 32 "github.com/luci/luci-go/tumble" | 31 "github.com/luci/luci-go/tumble" |
| 33 "golang.org/x/net/context" | 32 "golang.org/x/net/context" |
| 34 "google.golang.org/grpc/codes" | 33 "google.golang.org/grpc/codes" |
| 35 ) | 34 ) |
| 36 | 35 |
| 37 func buildLogStreamState(ls *coordinator.LogStream, lst *coordinator.LogStreamSt
ate) *logdog.LogStreamState { | 36 func buildLogStreamState(ls *coordinator.LogStream, lst *coordinator.LogStreamSt
ate) *logdog.LogStreamState { |
| 38 st := logdog.LogStreamState{ | 37 st := logdog.LogStreamState{ |
| 39 ProtoVersion: ls.ProtoVersion, | 38 ProtoVersion: ls.ProtoVersion, |
| (...skipping 106 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 146 // (non-transactional). | 145 // (non-transactional). |
| 147 ls := &coordinator.LogStream{ID: logStreamID} | 146 ls := &coordinator.LogStream{ID: logStreamID} |
| 148 lst := ls.State(c) | 147 lst := ls.State(c) |
| 149 | 148 |
| 150 if err := ds.Get(c, ls, lst); err != nil { | 149 if err := ds.Get(c, ls, lst); err != nil { |
| 151 if !anyNoSuchEntity(err) { | 150 if !anyNoSuchEntity(err) { |
| 152 log.WithError(err).Errorf(c, "Failed to check for log st
ream.") | 151 log.WithError(err).Errorf(c, "Failed to check for log st
ream.") |
| 153 return nil, err | 152 return nil, err |
| 154 } | 153 } |
| 155 | 154 |
| 156 // The stream is not registered. Perform a transactional registr
ation via | |
| 157 // mutation. | |
| 158 // | |
| 159 // Determine which hierarchy components we need to add. | |
| 160 comps := hierarchy.Components(path, true) | |
| 161 if comps, err = hierarchy.Missing(c, comps); err != nil { | |
| 162 log.WithError(err).Warningf(c, "Failed to probe for miss
ing hierarchy components.") | |
| 163 } | |
| 164 | |
| 165 // Before we go into transaction, try and put these entries. Thi
s should not | |
| 166 // be contested, since components don't share an entity root. | |
| 167 if err := hierarchy.PutMulti(c, comps); err != nil { | |
| 168 log.WithError(err).Errorf(c, "Failed to add missing hier
archy components.") | |
| 169 return nil, grpcutil.Internal | |
| 170 } | |
| 171 | |
| 172 // The stream does not exist. Proceed with transactional registr
ation. | 155 // The stream does not exist. Proceed with transactional registr
ation. |
| 173 lstKey := ds.KeyForObj(c, lst) | 156 lstKey := ds.KeyForObj(c, lst) |
| 174 err = tumble.RunUnbuffered(c, lstKey, func(c context.Context) ([
]tumble.Mutation, error) { | 157 err = tumble.RunUnbuffered(c, lstKey, func(c context.Context) ([
]tumble.Mutation, error) { |
| 175 // Load our state and stream (transactional). | 158 // Load our state and stream (transactional). |
| 176 switch err := ds.Get(c, ls, lst); { | 159 switch err := ds.Get(c, ls, lst); { |
| 177 case err == nil: | 160 case err == nil: |
| 178 // The stream is already registered. | 161 // The stream is already registered. |
| 179 return nil, nil | 162 return nil, nil |
| 180 | 163 |
| 181 case !anyNoSuchEntity(err): | 164 case !anyNoSuchEntity(err): |
| (...skipping 92 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 274 }.Errorf(c, "Failed to register LogStream.") | 257 }.Errorf(c, "Failed to register LogStream.") |
| 275 return nil, err | 258 return nil, err |
| 276 } | 259 } |
| 277 } | 260 } |
| 278 | 261 |
| 279 return &logdog.RegisterStreamResponse{ | 262 return &logdog.RegisterStreamResponse{ |
| 280 Id: string(ls.ID), | 263 Id: string(ls.ID), |
| 281 State: buildLogStreamState(ls, lst), | 264 State: buildLogStreamState(ls, lst), |
| 282 }, nil | 265 }, nil |
| 283 } | 266 } |
| OLD | NEW |