OLD | NEW |
(Empty) | |
| 1 #!/usr/bin/env ruby |
| 2 |
| 3 # Copyright 2015, Google Inc. |
| 4 # All rights reserved. |
| 5 # |
| 6 # Redistribution and use in source and binary forms, with or without |
| 7 # modification, are permitted provided that the following conditions are |
| 8 # met: |
| 9 # |
| 10 # * Redistributions of source code must retain the above copyright |
| 11 # notice, this list of conditions and the following disclaimer. |
| 12 # * Redistributions in binary form must reproduce the above |
| 13 # copyright notice, this list of conditions and the following disclaimer |
| 14 # in the documentation and/or other materials provided with the |
| 15 # distribution. |
| 16 # * Neither the name of Google Inc. nor the names of its |
| 17 # contributors may be used to endorse or promote products derived from |
| 18 # this software without specific prior written permission. |
| 19 # |
| 20 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS |
| 21 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT |
| 22 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR |
| 23 # A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT |
| 24 # OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
| 25 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
| 26 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, |
| 27 # DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY |
| 28 # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT |
| 29 # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE |
| 30 # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
| 31 |
| 32 # pubsub_demo demos accesses the Google PubSub API via its gRPC interface |
| 33 # |
| 34 # $ GOOGLE_APPLICATION_CREDENTIALS=<path_to_service_account_key_file> \ |
| 35 # path/to/pubsub_demo.rb \ |
| 36 # [--action=<chosen_demo_action> ] |
| 37 # |
| 38 # There are options related to the chosen action, see #parse_args below. |
| 39 # - the possible actions are given by the method names of NamedAction class |
| 40 # - the default action is list_some_topics |
| 41 |
| 42 this_dir = File.expand_path(File.dirname(__FILE__)) |
| 43 lib_dir = File.join(File.dirname(File.dirname(this_dir)), 'lib') |
| 44 $LOAD_PATH.unshift(lib_dir) unless $LOAD_PATH.include?(lib_dir) |
| 45 $LOAD_PATH.unshift(this_dir) unless $LOAD_PATH.include?(this_dir) |
| 46 |
| 47 require 'optparse' |
| 48 |
| 49 require 'grpc' |
| 50 require 'googleauth' |
| 51 require 'google/protobuf' |
| 52 |
| 53 require 'google/protobuf/empty' |
| 54 require 'tech/pubsub/proto/pubsub' |
| 55 require 'tech/pubsub/proto/pubsub_services' |
| 56 |
| 57 # creates a SSL Credentials from the production certificates. |
| 58 def ssl_creds |
| 59 GRPC::Core::ChannelCredentials.new() |
| 60 end |
| 61 |
| 62 # Builds the metadata authentication update proc. |
| 63 def auth_proc(opts) |
| 64 auth_creds = Google::Auth.get_application_default |
| 65 return auth_creds.updater_proc |
| 66 end |
| 67 |
| 68 # Creates a stub for accessing the publisher service. |
| 69 def publisher_stub(opts) |
| 70 address = "#{opts.host}:#{opts.port}" |
| 71 stub_clz = Tech::Pubsub::PublisherService::Stub # shorter |
| 72 GRPC.logger.info("... access PublisherService at #{address}") |
| 73 call_creds = GRPC::Core::CallCredentials.new(auth_proc(opts)) |
| 74 combined_creds = ssl_creds.compose(call_creds) |
| 75 stub_clz.new(address, creds: combined_creds, |
| 76 GRPC::Core::Channel::SSL_TARGET => opts.host) |
| 77 end |
| 78 |
| 79 # Creates a stub for accessing the subscriber service. |
| 80 def subscriber_stub(opts) |
| 81 address = "#{opts.host}:#{opts.port}" |
| 82 stub_clz = Tech::Pubsub::SubscriberService::Stub # shorter |
| 83 GRPC.logger.info("... access SubscriberService at #{address}") |
| 84 call_creds = GRPC::Core::CallCredentials.new(auth_proc(opts)) |
| 85 combined_creds = ssl_creds.compose(call_creds) |
| 86 stub_clz.new(address, creds: combined_creds, |
| 87 GRPC::Core::Channel::SSL_TARGET => opts.host) |
| 88 end |
| 89 |
| 90 # defines methods corresponding to each interop test case. |
| 91 class NamedActions |
| 92 include Tech::Pubsub |
| 93 |
| 94 # Initializes NamedActions |
| 95 # |
| 96 # @param pub [Stub] a stub for accessing the publisher service |
| 97 # @param sub [Stub] a stub for accessing the publisher service |
| 98 # @param args [Args] provides access to the command line |
| 99 def initialize(pub, sub, args) |
| 100 @pub = pub |
| 101 @sub = sub |
| 102 @args = args |
| 103 end |
| 104 |
| 105 # Removes the test topic if it exists |
| 106 def remove_topic |
| 107 name = test_topic_name |
| 108 p "... removing Topic #{name}" |
| 109 @pub.delete_topic(DeleteTopicRequest.new(topic: name)) |
| 110 p "removed Topic: #{name} OK" |
| 111 rescue GRPC::BadStatus => e |
| 112 p "Could not delete a topics: rpc failed with '#{e}'" |
| 113 end |
| 114 |
| 115 # Creates a test topic |
| 116 def create_topic |
| 117 name = test_topic_name |
| 118 p "... creating Topic #{name}" |
| 119 resp = @pub.create_topic(Topic.new(name: name)) |
| 120 p "created Topic: #{resp.name} OK" |
| 121 rescue GRPC::BadStatus => e |
| 122 p "Could not create a topics: rpc failed with '#{e}'" |
| 123 end |
| 124 |
| 125 # Lists topics in the project |
| 126 def list_some_topics |
| 127 p 'Listing topics' |
| 128 p '-------------_' |
| 129 list_project_topics.topic.each { |t| p t.name } |
| 130 rescue GRPC::BadStatus => e |
| 131 p "Could not list topics: rpc failed with '#{e}'" |
| 132 end |
| 133 |
| 134 # Checks if a topics exists in a project |
| 135 def check_exists |
| 136 name = test_topic_name |
| 137 p "... checking for topic #{name}" |
| 138 exists = topic_exists?(name) |
| 139 p "#{name} is a topic" if exists |
| 140 p "#{name} is not a topic" unless exists |
| 141 rescue GRPC::BadStatus => e |
| 142 p "Could not check for a topics: rpc failed with '#{e}'" |
| 143 end |
| 144 |
| 145 # Publishes some messages |
| 146 def random_pub_sub |
| 147 topic_name, sub_name = test_topic_name, test_sub_name |
| 148 create_topic_if_needed(topic_name) |
| 149 @sub.create_subscription(Subscription.new(name: sub_name, |
| 150 topic: topic_name)) |
| 151 msg_count = rand(10..30) |
| 152 msg_count.times do |x| |
| 153 msg = PubsubMessage.new(data: "message #{x}") |
| 154 @pub.publish(PublishRequest.new(topic: topic_name, message: msg)) |
| 155 end |
| 156 p "Sent #{msg_count} messages to #{topic_name}, checking for them now." |
| 157 batch = @sub.pull_batch(PullBatchRequest.new(subscription: sub_name, |
| 158 max_events: msg_count)) |
| 159 ack_ids = batch.pull_responses.map { |x| x.ack_id } |
| 160 p "Got #{ack_ids.size} messages; acknowledging them.." |
| 161 @sub.acknowledge(AcknowledgeRequest.new(subscription: sub_name, |
| 162 ack_id: ack_ids)) |
| 163 p "Test messages were acknowledged OK, deleting the subscription" |
| 164 del_req = DeleteSubscriptionRequest.new(subscription: sub_name) |
| 165 @sub.delete_subscription(del_req) |
| 166 rescue GRPC::BadStatus => e |
| 167 p "Could not do random pub sub: rpc failed with '#{e}'" |
| 168 end |
| 169 |
| 170 private |
| 171 |
| 172 # test_topic_name is the topic name to use in this test. |
| 173 def test_topic_name |
| 174 unless @args.topic_name.nil? |
| 175 return "/topics/#{@args.project_id}/#{@args.topic_name}" |
| 176 end |
| 177 now_text = Time.now.utc.strftime('%Y%m%d%H%M%S%L') |
| 178 "/topics/#{@args.project_id}/#{ENV['USER']}-#{now_text}" |
| 179 end |
| 180 |
| 181 # test_sub_name is the subscription name to use in this test. |
| 182 def test_sub_name |
| 183 unless @args.sub_name.nil? |
| 184 return "/subscriptions/#{@args.project_id}/#{@args.sub_name}" |
| 185 end |
| 186 now_text = Time.now.utc.strftime('%Y%m%d%H%M%S%L') |
| 187 "/subscriptions/#{@args.project_id}/#{ENV['USER']}-#{now_text}" |
| 188 end |
| 189 |
| 190 # determines if the topic name exists |
| 191 def topic_exists?(name) |
| 192 topics = list_project_topics.topic.map { |t| t.name } |
| 193 topics.include?(name) |
| 194 end |
| 195 |
| 196 def create_topic_if_needed(name) |
| 197 return if topic_exists?(name) |
| 198 @pub.create_topic(Topic.new(name: name)) |
| 199 end |
| 200 |
| 201 def list_project_topics |
| 202 q = "cloud.googleapis.com/project in (/projects/#{@args.project_id})" |
| 203 @pub.list_topics(ListTopicsRequest.new(query: q)) |
| 204 end |
| 205 end |
| 206 |
| 207 # Args is used to hold the command line info. |
| 208 Args = Struct.new(:host, :port, :action, :project_id, :topic_name, |
| 209 :sub_name) |
| 210 |
| 211 # validates the the command line options, returning them as an Arg. |
| 212 def parse_args |
| 213 args = Args.new('pubsub-staging.googleapis.com', |
| 214 443, 'list_some_topics', 'stoked-keyword-656') |
| 215 OptionParser.new do |opts| |
| 216 opts.on('--server_host SERVER_HOST', 'server hostname') do |v| |
| 217 args.host = v |
| 218 end |
| 219 opts.on('--server_port SERVER_PORT', 'server port') do |v| |
| 220 args.port = v |
| 221 end |
| 222 |
| 223 # instance_methods(false) gives only the methods defined in that class. |
| 224 scenes = NamedActions.instance_methods(false).map { |t| t.to_s } |
| 225 scene_list = scenes.join(',') |
| 226 opts.on("--action CODE", scenes, {}, 'pick a demo action', |
| 227 " (#{scene_list})") do |v| |
| 228 args.action = v |
| 229 end |
| 230 |
| 231 # Set the remaining values. |
| 232 %w(project_id topic_name sub_name).each do |o| |
| 233 opts.on("--#{o} VALUE", "#{o}") do |v| |
| 234 args[o] = v |
| 235 end |
| 236 end |
| 237 end.parse! |
| 238 _check_args(args) |
| 239 end |
| 240 |
| 241 def _check_args(args) |
| 242 %w(host port action).each do |a| |
| 243 if args[a].nil? |
| 244 raise OptionParser::MissingArgument.new("please specify --#{a}") |
| 245 end |
| 246 end |
| 247 args |
| 248 end |
| 249 |
| 250 def main |
| 251 args = parse_args |
| 252 pub, sub = publisher_stub(args), subscriber_stub(args) |
| 253 NamedActions.new(pub, sub, args).method(args.action).call |
| 254 end |
| 255 |
| 256 main |
OLD | NEW |