| OLD | NEW |
| 1 // Copyright 2016 The Chromium Authors. All rights reserved. | 1 // Copyright 2016 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 package main | 5 package main |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "fmt" | 8 "fmt" |
| 9 "time" | 9 "time" |
| 10 | 10 |
| (...skipping 69 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 80 if !exists { | 80 if !exists { |
| 81 log.Fields{ | 81 log.Fields{ |
| 82 "subscription": sub, | 82 "subscription": sub, |
| 83 }.Errorf(c, "Subscription does not exist.") | 83 }.Errorf(c, "Subscription does not exist.") |
| 84 return errInvalidConfig | 84 return errInvalidConfig |
| 85 } | 85 } |
| 86 log.Fields{ | 86 log.Fields{ |
| 87 "subscription": sub, | 87 "subscription": sub, |
| 88 }.Infof(c, "Successfully validated Pub/Sub subscription.") | 88 }.Infof(c, "Successfully validated Pub/Sub subscription.") |
| 89 | 89 |
| 90 » // Initialize our Storage. | 90 » st, err := a.IntermediateStorage(c) |
| 91 » s, err := a.IntermediateStorage(c) | |
| 92 if err != nil { | 91 if err != nil { |
| 93 log.WithError(err).Errorf(c, "Failed to get storage instance.") | |
| 94 return err | 92 return err |
| 95 } | 93 } |
| 96 » defer s.Close() | 94 » defer st.Close() |
| 97 | 95 |
| 98 // Application shutdown will now operate by cancelling the Collector's | 96 // Application shutdown will now operate by cancelling the Collector's |
| 99 // shutdown Context. | 97 // shutdown Context. |
| 100 shutdownCtx, shutdownFunc := context.WithCancel(c) | 98 shutdownCtx, shutdownFunc := context.WithCancel(c) |
| 101 a.SetShutdownFunc(shutdownFunc) | 99 a.SetShutdownFunc(shutdownFunc) |
| 102 | 100 |
| 103 // Start an ACK buffer so that we can batch ACKs. Note that we do NOT us
e the | 101 // Start an ACK buffer so that we can batch ACKs. Note that we do NOT us
e the |
| 104 // shutdown context here, as we want clean shutdowns to continue to ack
any | 102 // shutdown context here, as we want clean shutdowns to continue to ack
any |
| 105 // buffered messages. | 103 // buffered messages. |
| 106 ab := ackbuffer.New(c, ackbuffer.Config{ | 104 ab := ackbuffer.New(c, ackbuffer.Config{ |
| 107 Ack: ackbuffer.NewACK(ps, sub, 0), | 105 Ack: ackbuffer.NewACK(ps, sub, 0), |
| 108 }) | 106 }) |
| 109 defer ab.CloseAndFlush() | 107 defer ab.CloseAndFlush() |
| 110 | 108 |
| 111 // Initialize our Collector service object using a caching Coordinator | 109 // Initialize our Collector service object using a caching Coordinator |
| 112 // interface. | 110 // interface. |
| 113 coord := coordinator.NewCoordinator(a.Coordinator()) | 111 coord := coordinator.NewCoordinator(a.Coordinator()) |
| 114 coord = coordinator.NewCache(coord, int(ccfg.StateCacheSize), ccfg.State
CacheExpiration.Duration()) | 112 coord = coordinator.NewCache(coord, int(ccfg.StateCacheSize), ccfg.State
CacheExpiration.Duration()) |
| 115 | 113 |
| 116 coll := collector.Collector{ | 114 coll := collector.Collector{ |
| 117 » » Coordinator: coord, | 115 » » Coordinator: coord, |
| 118 » » Storage: s, | 116 » » Storage: st, |
| 119 » » MaxParallelBundles: int(ccfg.Workers), | 117 » » MaxMessageWorkers: int(ccfg.MaxMessageWorkers), |
| 120 » » MaxIngestWorkers: int(ccfg.Workers), | |
| 121 } | 118 } |
| 122 defer coll.Close() | 119 defer coll.Close() |
| 123 | 120 |
| 124 // Execute our main Subscriber loop. It will run until the supplied Cont
ext | 121 // Execute our main Subscriber loop. It will run until the supplied Cont
ext |
| 125 // is cancelled. | 122 // is cancelled. |
| 126 clk := clock.Get(c) | 123 clk := clock.Get(c) |
| 127 engine := subscriber.Subscriber{ | 124 engine := subscriber.Subscriber{ |
| 128 » » S: subscriber.NewSource(ps, sub, 0), | 125 » » S: subscriber.NewSource(ps, sub), |
| 129 » » A: ab, | 126 » » A: ab, |
| 130 | 127 » » Workers: int(ccfg.MaxConcurrentMessages), |
| 131 » » PullWorkers: int(ccfg.TransportWorkers), | |
| 132 » » HandlerWorkers: int(ccfg.Workers), | |
| 133 } | 128 } |
| 134 engine.Run(shutdownCtx, func(msg *pubsub.Message) bool { | 129 engine.Run(shutdownCtx, func(msg *pubsub.Message) bool { |
| 135 c := log.SetField(c, "messageID", msg.ID) | 130 c := log.SetField(c, "messageID", msg.ID) |
| 136 log.Fields{ | 131 log.Fields{ |
| 137 "ackID": msg.AckID, | 132 "ackID": msg.AckID, |
| 138 "size": len(msg.Data), | 133 "size": len(msg.Data), |
| 139 }.Infof(c, "Received Pub/Sub Message.") | 134 }.Infof(c, "Received Pub/Sub Message.") |
| 140 | 135 |
| 141 startTime := clk.Now() | 136 startTime := clk.Now() |
| 142 err := coll.Process(c, msg.Data) | 137 err := coll.Process(c, msg.Data) |
| (...skipping 29 matching lines...) Expand all Loading... |
| 172 | 167 |
| 173 log.Debugf(c, "Collector finished.") | 168 log.Debugf(c, "Collector finished.") |
| 174 return nil | 169 return nil |
| 175 } | 170 } |
| 176 | 171 |
| 177 // Entry point. | 172 // Entry point. |
| 178 func main() { | 173 func main() { |
| 179 a := application{} | 174 a := application{} |
| 180 a.Run(context.Background(), a.runCollector) | 175 a.Run(context.Background(), a.runCollector) |
| 181 } | 176 } |
| OLD | NEW |