| OLD | NEW |
| 1 # Copyright 2016 The LUCI Authors. All rights reserved. | 1 # Copyright 2016 The LUCI Authors. All rights reserved. |
| 2 # Use of this source code is governed under the Apache License, Version 2.0 | 2 # Use of this source code is governed under the Apache License, Version 2.0 |
| 3 # that can be found in the LICENSE file. | 3 # that can be found in the LICENSE file. |
| 4 | 4 |
| 5 import logging | 5 import logging |
| 6 import time |
| 6 | 7 |
| 7 import gae_event_mon | 8 import gae_event_mon |
| 8 | 9 |
| 10 from server import task_result |
| 11 |
| 12 |
| 13 DIMENSIONS = ( |
| 14 ('cores', int), |
| 15 ('cpu', unicode), |
| 16 ('device_os', unicode), |
| 17 ('device_type', unicode), |
| 18 ('gpu', unicode), |
| 19 ('hidpi', unicode), |
| 20 ('machine_type', unicode), |
| 21 ('os', unicode), |
| 22 ('pool', unicode), |
| 23 ('xcode_version', unicode), |
| 24 ('zone', unicode), |
| 25 ) |
| 26 |
| 27 |
| 28 def _to_timestamp(dt): |
| 29 return int(time.mktime(dt.timetuple())) |
| 30 |
| 31 |
| 32 def _files_ref_to_proto(files_ref, proto): |
| 33 if files_ref.isolated: |
| 34 proto.isolated = files_ref.isolated |
| 35 if files_ref.isolatedserver: |
| 36 proto.isolatedserver = files_ref.isolatedserver |
| 37 if files_ref.namespace: |
| 38 proto.namespace = files_ref.namespace |
| 39 |
| 40 |
| 41 def _cipd_package_to_proto(cipd_package, package_proto): |
| 42 if cipd_package.package_name: |
| 43 package_proto.package_name = cipd_package.package_name |
| 44 if cipd_package.version: |
| 45 package_proto.version = cipd_package.version |
| 46 if package_proto.path: |
| 47 package_proto.path = cipd_package.path |
| 48 |
| 49 |
| 50 def _task_summary_to_proto(summary, event): |
| 51 event.proto.swarming_task_event.id = summary.task_id |
| 52 |
| 53 request_proto = event.proto.swarming_task_event.request |
| 54 if summary.request.parent_task_id: |
| 55 request_proto.parent_task_id = summary.request.parent_task_id |
| 56 request_proto.name = summary.request.name |
| 57 request_proto.created_ts = _to_timestamp(summary.request.created_ts) |
| 58 request_proto.expiration_ts = _to_timestamp(summary.request.expiration_ts) |
| 59 request_proto.priority = summary.request.priority |
| 60 if summary.request.pubsub_topic: |
| 61 request_proto.pubsub_topic = summary.request.pubsub_topic |
| 62 |
| 63 task_properties = summary.request.properties |
| 64 properties_proto = request_proto.properties |
| 65 |
| 66 if task_properties.inputs_ref: |
| 67 _files_ref_to_proto(task_properties.inputs_ref, properties_proto.inputs_ref) |
| 68 |
| 69 if task_properties.cipd_input: |
| 70 cipd_proto = properties_proto.cipd_input |
| 71 cipd_proto.server = task_properties.cipd_input.server |
| 72 |
| 73 _cipd_package_to_proto(task_properties.cipd_input.client_package, |
| 74 cipd_proto.client_package) |
| 75 for package in task_properties.cipd_input.packages: |
| 76 package_proto = cipd_proto.packages.add() |
| 77 _cipd_package_to_proto(package, package_proto) |
| 78 |
| 79 for d, t in DIMENSIONS: |
| 80 if d in task_properties.dimensions: |
| 81 getattr(properties_proto.dimensions, d).append( |
| 82 t(task_properties.dimensions[d])) |
| 83 |
| 84 if task_properties.execution_timeout_secs: |
| 85 properties_proto.execution_timeout_s = \ |
| 86 task_properties.execution_timeout_secs |
| 87 if task_properties.grace_period_secs: |
| 88 properties_proto.grace_period_s = task_properties.grace_period_secs |
| 89 if task_properties.io_timeout_secs: |
| 90 properties_proto.io_timeout_s = task_properties.io_timeout_secs |
| 91 properties_proto.idempotent = task_properties.idempotent |
| 92 |
| 93 state_enum = event.proto.swarming_task_event.State.DESCRIPTOR.values_by_name |
| 94 if summary.state == task_result.State.COMPLETED: |
| 95 event.proto.swarming_task_event.state = state_enum['COMPLETED'].number |
| 96 elif summary.state == task_result.State.CANCELED: |
| 97 event.proto.swarming_task_event.state = state_enum['CANCELED'].number |
| 98 elif summary.state == task_result.State.BOT_DIED: |
| 99 event.proto.swarming_task_event.state = state_enum['BOT_DIED'].number |
| 100 elif summary.state == task_result.State.TIMED_OUT: |
| 101 event.proto.swarming_task_event.state = state_enum['TIMED_OUT'].number |
| 102 elif summary.state == task_result.State.EXPIRED: |
| 103 event.proto.swarming_task_event.state = state_enum['EXPIRED'].number |
| 104 else: |
| 105 logging.error('Unhandled task state %r', summary.state) |
| 106 |
| 107 event.proto.swarming_task_event.bot_id = summary.bot_id |
| 108 event.proto.swarming_task_event.bot_version = summary.bot_version |
| 109 |
| 110 for d, t in DIMENSIONS: |
| 111 for v in summary.bot_dimensions.get(d, []): |
| 112 getattr(event.proto.swarming_task_event.bot_dimensions, d).append(t(v)) |
| 113 |
| 114 for v in summary.server_versions: |
| 115 event.proto.swarming_task_event.server_versions.append(v) |
| 116 |
| 117 event.proto.swarming_task_event.internal_failure = summary.internal_failure |
| 118 event.proto.swarming_task_event.exit_code = summary.exit_code |
| 119 if summary.started_ts: |
| 120 event.proto.swarming_task_event.started_ts = _to_timestamp( |
| 121 summary.started_ts) |
| 122 if summary.completed_ts: |
| 123 event.proto.swarming_task_event.completed_ts = _to_timestamp( |
| 124 summary.completed_ts) |
| 125 if summary.abandoned_ts: |
| 126 event.proto.swarming_task_event.abandoned_ts = _to_timestamp( |
| 127 summary.abandoned_ts) |
| 128 |
| 129 for task_id in summary.children_task_ids: |
| 130 event.proto.swarming_task_event.children_task_ids.append(task_id) |
| 131 |
| 132 if summary.outputs_ref: |
| 133 _files_ref_to_proto( |
| 134 summary.outputs_ref, event.proto.swarming_task_event.outputs_ref) |
| 135 |
| 136 event.proto.swarming_task_event.cost_usd = summary.cost_usd |
| 137 if summary.cost_saved_usd: |
| 138 event.proto.swarming_task_event.cost_saved_usd = summary.cost_saved_usd |
| 139 if summary.deduped_from: |
| 140 event.proto.swarming_task_event.deduped_from = summary.deduped_from |
| 141 event.proto.swarming_task_event.try_number = summary.try_number |
| 142 |
| 9 | 143 |
| 10 def initialize(): | 144 def initialize(): |
| 11 gae_event_mon.initialize('swarming') | 145 gae_event_mon.initialize('swarming') |
| 12 | 146 |
| 13 | 147 |
| 14 def send_task_event(task_result_summary): | 148 def send_task_event(summary): |
| 15 """Sends an event_mon event about a swarming task. | 149 """Sends an event_mon event about a swarming task. |
| 16 | 150 |
| 17 Currently implemented as sending a HTTP request. | 151 Currently implemented as sending a HTTP request. |
| 18 | 152 |
| 19 Args: | 153 Args: |
| 20 task_result_summary: TaskResultSummary object. | 154 summary: TaskResultSummary object. |
| 21 """ | 155 """ |
| 22 event = gae_event_mon.Event('POINT') | |
| 23 event.proto.swarming_task_event.id = task_result_summary.task_id | |
| 24 | |
| 25 # Isolate rest of the app from monitoring pipeline issues. They should | 156 # Isolate rest of the app from monitoring pipeline issues. They should |
| 26 # not cause outage of swarming. | 157 # not cause outage of swarming. |
| 27 try: | 158 try: |
| 159 event = gae_event_mon.Event('POINT') |
| 160 _task_summary_to_proto(summary, event) |
| 28 event.send() | 161 event.send() |
| 29 except Exception: | 162 except Exception: |
| 30 logging.exception('Caught exception while sending event') | 163 logging.exception('Caught exception while sending event') |
| OLD | NEW |