Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(230)

Unified Diff: tools/telemetry/third_party/typ/typ/pool.py

Issue 1647513002: Delete tools/telemetry. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 4 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « tools/telemetry/third_party/typ/typ/json_results.py ('k') | tools/telemetry/third_party/typ/typ/printer.py » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: tools/telemetry/third_party/typ/typ/pool.py
diff --git a/tools/telemetry/third_party/typ/typ/pool.py b/tools/telemetry/third_party/typ/typ/pool.py
deleted file mode 100644
index 6200a8a80c98a4b3285c86eac11745586bf8ce69..0000000000000000000000000000000000000000
--- a/tools/telemetry/third_party/typ/typ/pool.py
+++ /dev/null
@@ -1,204 +0,0 @@
-# Copyright 2014 Google Inc. All rights reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import copy
-import multiprocessing
-import pickle
-import traceback
-
-from typ.host import Host
-
-
-def make_pool(host, jobs, callback, context, pre_fn, post_fn):
- _validate_args(context, pre_fn, post_fn)
- if jobs > 1:
- return _ProcessPool(host, jobs, callback, context, pre_fn, post_fn)
- else:
- return _AsyncPool(host, jobs, callback, context, pre_fn, post_fn)
-
-
-class _MessageType(object):
- Request = 'Request'
- Response = 'Response'
- Close = 'Close'
- Done = 'Done'
- Error = 'Error'
- Interrupt = 'Interrupt'
-
- values = [Request, Response, Close, Done, Error, Interrupt]
-
-
-def _validate_args(context, pre_fn, post_fn):
- try:
- _ = pickle.dumps(context)
- except Exception as e:
- raise ValueError('context passed to make_pool is not picklable: %s'
- % str(e))
- try:
- _ = pickle.dumps(pre_fn)
- except pickle.PickleError:
- raise ValueError('pre_fn passed to make_pool is not picklable')
- try:
- _ = pickle.dumps(post_fn)
- except pickle.PickleError:
- raise ValueError('post_fn passed to make_pool is not picklable')
-
-
-class _ProcessPool(object):
-
- def __init__(self, host, jobs, callback, context, pre_fn, post_fn):
- self.host = host
- self.jobs = jobs
- self.requests = multiprocessing.Queue()
- self.responses = multiprocessing.Queue()
- self.workers = []
- self.discarded_responses = []
- self.closed = False
- self.erred = False
- for worker_num in range(1, jobs + 1):
- w = multiprocessing.Process(target=_loop,
- args=(self.requests, self.responses,
- host.for_mp(), worker_num,
- callback, context,
- pre_fn, post_fn))
- w.start()
- self.workers.append(w)
-
- def send(self, msg):
- self.requests.put((_MessageType.Request, msg))
-
- def get(self):
- msg_type, resp = self.responses.get()
- if msg_type == _MessageType.Error:
- self._handle_error(resp)
- elif msg_type == _MessageType.Interrupt:
- raise KeyboardInterrupt
- assert msg_type == _MessageType.Response
- return resp
-
- def close(self):
- for _ in self.workers:
- self.requests.put((_MessageType.Close, None))
- self.closed = True
-
- def join(self):
- # TODO: one would think that we could close self.requests in close(),
- # above, and close self.responses below, but if we do, we get
- # weird tracebacks in the daemon threads multiprocessing starts up.
- # Instead, we have to hack the innards of multiprocessing. It
- # seems likely that there's a bug somewhere, either in this module or
- # in multiprocessing.
- # pylint: disable=protected-access
- if self.host.is_python3: # pragma: python3
- multiprocessing.queues.is_exiting = lambda: True
- else: # pragma: python2
- multiprocessing.util._exiting = True
-
- if not self.closed:
- # We must be aborting; terminate the workers rather than
- # shutting down cleanly.
- for w in self.workers:
- w.terminate()
- w.join()
- return []
-
- final_responses = []
- error = None
- interrupted = None
- for w in self.workers:
- while True:
- msg_type, resp = self.responses.get()
- if msg_type == _MessageType.Error:
- error = resp
- break
- if msg_type == _MessageType.Interrupt:
- interrupted = True
- break
- if msg_type == _MessageType.Done:
- final_responses.append(resp[1])
- break
- self.discarded_responses.append(resp)
-
- for w in self.workers:
- w.join()
-
- # TODO: See comment above at the beginning of the function for
- # why this is commented out.
- # self.responses.close()
-
- if error:
- self._handle_error(error)
- if interrupted:
- raise KeyboardInterrupt
- return final_responses
-
- def _handle_error(self, msg):
- worker_num, tb = msg
- self.erred = True
- raise Exception("Error from worker %d (traceback follows):\n%s" %
- (worker_num, tb))
-
-
-# 'Too many arguments' pylint: disable=R0913
-
-def _loop(requests, responses, host, worker_num,
- callback, context, pre_fn, post_fn, should_loop=True):
- host = host or Host()
- try:
- context_after_pre = pre_fn(host, worker_num, context)
- keep_looping = True
- while keep_looping:
- message_type, args = requests.get(block=True)
- if message_type == _MessageType.Close:
- responses.put((_MessageType.Done,
- (worker_num, post_fn(context_after_pre))))
- break
- assert message_type == _MessageType.Request
- resp = callback(context_after_pre, args)
- responses.put((_MessageType.Response, resp))
- keep_looping = should_loop
- except KeyboardInterrupt as e:
- responses.put((_MessageType.Interrupt, (worker_num, str(e))))
- except Exception as e:
- responses.put((_MessageType.Error,
- (worker_num, traceback.format_exc(e))))
-
-
-class _AsyncPool(object):
-
- def __init__(self, host, jobs, callback, context, pre_fn, post_fn):
- self.host = host or Host()
- self.jobs = jobs
- self.callback = callback
- self.context = copy.deepcopy(context)
- self.msgs = []
- self.closed = False
- self.post_fn = post_fn
- self.context_after_pre = pre_fn(self.host, 1, self.context)
- self.final_context = None
-
- def send(self, msg):
- self.msgs.append(msg)
-
- def get(self):
- return self.callback(self.context_after_pre, self.msgs.pop(0))
-
- def close(self):
- self.closed = True
- self.final_context = self.post_fn(self.context_after_pre)
-
- def join(self):
- if not self.closed:
- self.close()
- return [self.final_context]
« no previous file with comments | « tools/telemetry/third_party/typ/typ/json_results.py ('k') | tools/telemetry/third_party/typ/typ/printer.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698