Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(217)

Side by Side Diff: third_party/grpc/src/ruby/bin/apis/pubsub_demo.rb

Issue 1932353002: Initial checkin of gRPC to third_party/ Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 4 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 #!/usr/bin/env ruby
2
3 # Copyright 2015, Google Inc.
4 # All rights reserved.
5 #
6 # Redistribution and use in source and binary forms, with or without
7 # modification, are permitted provided that the following conditions are
8 # met:
9 #
10 # * Redistributions of source code must retain the above copyright
11 # notice, this list of conditions and the following disclaimer.
12 # * Redistributions in binary form must reproduce the above
13 # copyright notice, this list of conditions and the following disclaimer
14 # in the documentation and/or other materials provided with the
15 # distribution.
16 # * Neither the name of Google Inc. nor the names of its
17 # contributors may be used to endorse or promote products derived from
18 # this software without specific prior written permission.
19 #
20 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 # A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 # OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 # DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31
32 # pubsub_demo demos accesses the Google PubSub API via its gRPC interface
33 #
34 # $ GOOGLE_APPLICATION_CREDENTIALS=<path_to_service_account_key_file> \
35 # path/to/pubsub_demo.rb \
36 # [--action=<chosen_demo_action> ]
37 #
38 # There are options related to the chosen action, see #parse_args below.
39 # - the possible actions are given by the method names of NamedAction class
40 # - the default action is list_some_topics
41
42 this_dir = File.expand_path(File.dirname(__FILE__))
43 lib_dir = File.join(File.dirname(File.dirname(this_dir)), 'lib')
44 $LOAD_PATH.unshift(lib_dir) unless $LOAD_PATH.include?(lib_dir)
45 $LOAD_PATH.unshift(this_dir) unless $LOAD_PATH.include?(this_dir)
46
47 require 'optparse'
48
49 require 'grpc'
50 require 'googleauth'
51 require 'google/protobuf'
52
53 require 'google/protobuf/empty'
54 require 'tech/pubsub/proto/pubsub'
55 require 'tech/pubsub/proto/pubsub_services'
56
57 # creates a SSL Credentials from the production certificates.
58 def ssl_creds
59 GRPC::Core::ChannelCredentials.new()
60 end
61
62 # Builds the metadata authentication update proc.
63 def auth_proc(opts)
64 auth_creds = Google::Auth.get_application_default
65 return auth_creds.updater_proc
66 end
67
68 # Creates a stub for accessing the publisher service.
69 def publisher_stub(opts)
70 address = "#{opts.host}:#{opts.port}"
71 stub_clz = Tech::Pubsub::PublisherService::Stub # shorter
72 GRPC.logger.info("... access PublisherService at #{address}")
73 call_creds = GRPC::Core::CallCredentials.new(auth_proc(opts))
74 combined_creds = ssl_creds.compose(call_creds)
75 stub_clz.new(address, creds: combined_creds,
76 GRPC::Core::Channel::SSL_TARGET => opts.host)
77 end
78
79 # Creates a stub for accessing the subscriber service.
80 def subscriber_stub(opts)
81 address = "#{opts.host}:#{opts.port}"
82 stub_clz = Tech::Pubsub::SubscriberService::Stub # shorter
83 GRPC.logger.info("... access SubscriberService at #{address}")
84 call_creds = GRPC::Core::CallCredentials.new(auth_proc(opts))
85 combined_creds = ssl_creds.compose(call_creds)
86 stub_clz.new(address, creds: combined_creds,
87 GRPC::Core::Channel::SSL_TARGET => opts.host)
88 end
89
90 # defines methods corresponding to each interop test case.
91 class NamedActions
92 include Tech::Pubsub
93
94 # Initializes NamedActions
95 #
96 # @param pub [Stub] a stub for accessing the publisher service
97 # @param sub [Stub] a stub for accessing the publisher service
98 # @param args [Args] provides access to the command line
99 def initialize(pub, sub, args)
100 @pub = pub
101 @sub = sub
102 @args = args
103 end
104
105 # Removes the test topic if it exists
106 def remove_topic
107 name = test_topic_name
108 p "... removing Topic #{name}"
109 @pub.delete_topic(DeleteTopicRequest.new(topic: name))
110 p "removed Topic: #{name} OK"
111 rescue GRPC::BadStatus => e
112 p "Could not delete a topics: rpc failed with '#{e}'"
113 end
114
115 # Creates a test topic
116 def create_topic
117 name = test_topic_name
118 p "... creating Topic #{name}"
119 resp = @pub.create_topic(Topic.new(name: name))
120 p "created Topic: #{resp.name} OK"
121 rescue GRPC::BadStatus => e
122 p "Could not create a topics: rpc failed with '#{e}'"
123 end
124
125 # Lists topics in the project
126 def list_some_topics
127 p 'Listing topics'
128 p '-------------_'
129 list_project_topics.topic.each { |t| p t.name }
130 rescue GRPC::BadStatus => e
131 p "Could not list topics: rpc failed with '#{e}'"
132 end
133
134 # Checks if a topics exists in a project
135 def check_exists
136 name = test_topic_name
137 p "... checking for topic #{name}"
138 exists = topic_exists?(name)
139 p "#{name} is a topic" if exists
140 p "#{name} is not a topic" unless exists
141 rescue GRPC::BadStatus => e
142 p "Could not check for a topics: rpc failed with '#{e}'"
143 end
144
145 # Publishes some messages
146 def random_pub_sub
147 topic_name, sub_name = test_topic_name, test_sub_name
148 create_topic_if_needed(topic_name)
149 @sub.create_subscription(Subscription.new(name: sub_name,
150 topic: topic_name))
151 msg_count = rand(10..30)
152 msg_count.times do |x|
153 msg = PubsubMessage.new(data: "message #{x}")
154 @pub.publish(PublishRequest.new(topic: topic_name, message: msg))
155 end
156 p "Sent #{msg_count} messages to #{topic_name}, checking for them now."
157 batch = @sub.pull_batch(PullBatchRequest.new(subscription: sub_name,
158 max_events: msg_count))
159 ack_ids = batch.pull_responses.map { |x| x.ack_id }
160 p "Got #{ack_ids.size} messages; acknowledging them.."
161 @sub.acknowledge(AcknowledgeRequest.new(subscription: sub_name,
162 ack_id: ack_ids))
163 p "Test messages were acknowledged OK, deleting the subscription"
164 del_req = DeleteSubscriptionRequest.new(subscription: sub_name)
165 @sub.delete_subscription(del_req)
166 rescue GRPC::BadStatus => e
167 p "Could not do random pub sub: rpc failed with '#{e}'"
168 end
169
170 private
171
172 # test_topic_name is the topic name to use in this test.
173 def test_topic_name
174 unless @args.topic_name.nil?
175 return "/topics/#{@args.project_id}/#{@args.topic_name}"
176 end
177 now_text = Time.now.utc.strftime('%Y%m%d%H%M%S%L')
178 "/topics/#{@args.project_id}/#{ENV['USER']}-#{now_text}"
179 end
180
181 # test_sub_name is the subscription name to use in this test.
182 def test_sub_name
183 unless @args.sub_name.nil?
184 return "/subscriptions/#{@args.project_id}/#{@args.sub_name}"
185 end
186 now_text = Time.now.utc.strftime('%Y%m%d%H%M%S%L')
187 "/subscriptions/#{@args.project_id}/#{ENV['USER']}-#{now_text}"
188 end
189
190 # determines if the topic name exists
191 def topic_exists?(name)
192 topics = list_project_topics.topic.map { |t| t.name }
193 topics.include?(name)
194 end
195
196 def create_topic_if_needed(name)
197 return if topic_exists?(name)
198 @pub.create_topic(Topic.new(name: name))
199 end
200
201 def list_project_topics
202 q = "cloud.googleapis.com/project in (/projects/#{@args.project_id})"
203 @pub.list_topics(ListTopicsRequest.new(query: q))
204 end
205 end
206
207 # Args is used to hold the command line info.
208 Args = Struct.new(:host, :port, :action, :project_id, :topic_name,
209 :sub_name)
210
211 # validates the the command line options, returning them as an Arg.
212 def parse_args
213 args = Args.new('pubsub-staging.googleapis.com',
214 443, 'list_some_topics', 'stoked-keyword-656')
215 OptionParser.new do |opts|
216 opts.on('--server_host SERVER_HOST', 'server hostname') do |v|
217 args.host = v
218 end
219 opts.on('--server_port SERVER_PORT', 'server port') do |v|
220 args.port = v
221 end
222
223 # instance_methods(false) gives only the methods defined in that class.
224 scenes = NamedActions.instance_methods(false).map { |t| t.to_s }
225 scene_list = scenes.join(',')
226 opts.on("--action CODE", scenes, {}, 'pick a demo action',
227 " (#{scene_list})") do |v|
228 args.action = v
229 end
230
231 # Set the remaining values.
232 %w(project_id topic_name sub_name).each do |o|
233 opts.on("--#{o} VALUE", "#{o}") do |v|
234 args[o] = v
235 end
236 end
237 end.parse!
238 _check_args(args)
239 end
240
241 def _check_args(args)
242 %w(host port action).each do |a|
243 if args[a].nil?
244 raise OptionParser::MissingArgument.new("please specify --#{a}")
245 end
246 end
247 args
248 end
249
250 def main
251 args = parse_args
252 pub, sub = publisher_stub(args), subscriber_stub(args)
253 NamedActions.new(pub, sub, args).method(args.action).call
254 end
255
256 main
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698