| Index: tools/telemetry/third_party/typ/typ/pool.py
|
| diff --git a/tools/telemetry/third_party/typ/typ/pool.py b/tools/telemetry/third_party/typ/typ/pool.py
|
| deleted file mode 100644
|
| index 6200a8a80c98a4b3285c86eac11745586bf8ce69..0000000000000000000000000000000000000000
|
| --- a/tools/telemetry/third_party/typ/typ/pool.py
|
| +++ /dev/null
|
| @@ -1,204 +0,0 @@
|
| -# Copyright 2014 Google Inc. All rights reserved.
|
| -#
|
| -# Licensed under the Apache License, Version 2.0 (the "License");
|
| -# you may not use this file except in compliance with the License.
|
| -# You may obtain a copy of the License at
|
| -#
|
| -# http://www.apache.org/licenses/LICENSE-2.0
|
| -#
|
| -# Unless required by applicable law or agreed to in writing, software
|
| -# distributed under the License is distributed on an "AS IS" BASIS,
|
| -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
| -# See the License for the specific language governing permissions and
|
| -# limitations under the License.
|
| -
|
| -import copy
|
| -import multiprocessing
|
| -import pickle
|
| -import traceback
|
| -
|
| -from typ.host import Host
|
| -
|
| -
|
| -def make_pool(host, jobs, callback, context, pre_fn, post_fn):
|
| - _validate_args(context, pre_fn, post_fn)
|
| - if jobs > 1:
|
| - return _ProcessPool(host, jobs, callback, context, pre_fn, post_fn)
|
| - else:
|
| - return _AsyncPool(host, jobs, callback, context, pre_fn, post_fn)
|
| -
|
| -
|
| -class _MessageType(object):
|
| - Request = 'Request'
|
| - Response = 'Response'
|
| - Close = 'Close'
|
| - Done = 'Done'
|
| - Error = 'Error'
|
| - Interrupt = 'Interrupt'
|
| -
|
| - values = [Request, Response, Close, Done, Error, Interrupt]
|
| -
|
| -
|
| -def _validate_args(context, pre_fn, post_fn):
|
| - try:
|
| - _ = pickle.dumps(context)
|
| - except Exception as e:
|
| - raise ValueError('context passed to make_pool is not picklable: %s'
|
| - % str(e))
|
| - try:
|
| - _ = pickle.dumps(pre_fn)
|
| - except pickle.PickleError:
|
| - raise ValueError('pre_fn passed to make_pool is not picklable')
|
| - try:
|
| - _ = pickle.dumps(post_fn)
|
| - except pickle.PickleError:
|
| - raise ValueError('post_fn passed to make_pool is not picklable')
|
| -
|
| -
|
| -class _ProcessPool(object):
|
| -
|
| - def __init__(self, host, jobs, callback, context, pre_fn, post_fn):
|
| - self.host = host
|
| - self.jobs = jobs
|
| - self.requests = multiprocessing.Queue()
|
| - self.responses = multiprocessing.Queue()
|
| - self.workers = []
|
| - self.discarded_responses = []
|
| - self.closed = False
|
| - self.erred = False
|
| - for worker_num in range(1, jobs + 1):
|
| - w = multiprocessing.Process(target=_loop,
|
| - args=(self.requests, self.responses,
|
| - host.for_mp(), worker_num,
|
| - callback, context,
|
| - pre_fn, post_fn))
|
| - w.start()
|
| - self.workers.append(w)
|
| -
|
| - def send(self, msg):
|
| - self.requests.put((_MessageType.Request, msg))
|
| -
|
| - def get(self):
|
| - msg_type, resp = self.responses.get()
|
| - if msg_type == _MessageType.Error:
|
| - self._handle_error(resp)
|
| - elif msg_type == _MessageType.Interrupt:
|
| - raise KeyboardInterrupt
|
| - assert msg_type == _MessageType.Response
|
| - return resp
|
| -
|
| - def close(self):
|
| - for _ in self.workers:
|
| - self.requests.put((_MessageType.Close, None))
|
| - self.closed = True
|
| -
|
| - def join(self):
|
| - # TODO: one would think that we could close self.requests in close(),
|
| - # above, and close self.responses below, but if we do, we get
|
| - # weird tracebacks in the daemon threads multiprocessing starts up.
|
| - # Instead, we have to hack the innards of multiprocessing. It
|
| - # seems likely that there's a bug somewhere, either in this module or
|
| - # in multiprocessing.
|
| - # pylint: disable=protected-access
|
| - if self.host.is_python3: # pragma: python3
|
| - multiprocessing.queues.is_exiting = lambda: True
|
| - else: # pragma: python2
|
| - multiprocessing.util._exiting = True
|
| -
|
| - if not self.closed:
|
| - # We must be aborting; terminate the workers rather than
|
| - # shutting down cleanly.
|
| - for w in self.workers:
|
| - w.terminate()
|
| - w.join()
|
| - return []
|
| -
|
| - final_responses = []
|
| - error = None
|
| - interrupted = None
|
| - for w in self.workers:
|
| - while True:
|
| - msg_type, resp = self.responses.get()
|
| - if msg_type == _MessageType.Error:
|
| - error = resp
|
| - break
|
| - if msg_type == _MessageType.Interrupt:
|
| - interrupted = True
|
| - break
|
| - if msg_type == _MessageType.Done:
|
| - final_responses.append(resp[1])
|
| - break
|
| - self.discarded_responses.append(resp)
|
| -
|
| - for w in self.workers:
|
| - w.join()
|
| -
|
| - # TODO: See comment above at the beginning of the function for
|
| - # why this is commented out.
|
| - # self.responses.close()
|
| -
|
| - if error:
|
| - self._handle_error(error)
|
| - if interrupted:
|
| - raise KeyboardInterrupt
|
| - return final_responses
|
| -
|
| - def _handle_error(self, msg):
|
| - worker_num, tb = msg
|
| - self.erred = True
|
| - raise Exception("Error from worker %d (traceback follows):\n%s" %
|
| - (worker_num, tb))
|
| -
|
| -
|
| -# 'Too many arguments' pylint: disable=R0913
|
| -
|
| -def _loop(requests, responses, host, worker_num,
|
| - callback, context, pre_fn, post_fn, should_loop=True):
|
| - host = host or Host()
|
| - try:
|
| - context_after_pre = pre_fn(host, worker_num, context)
|
| - keep_looping = True
|
| - while keep_looping:
|
| - message_type, args = requests.get(block=True)
|
| - if message_type == _MessageType.Close:
|
| - responses.put((_MessageType.Done,
|
| - (worker_num, post_fn(context_after_pre))))
|
| - break
|
| - assert message_type == _MessageType.Request
|
| - resp = callback(context_after_pre, args)
|
| - responses.put((_MessageType.Response, resp))
|
| - keep_looping = should_loop
|
| - except KeyboardInterrupt as e:
|
| - responses.put((_MessageType.Interrupt, (worker_num, str(e))))
|
| - except Exception as e:
|
| - responses.put((_MessageType.Error,
|
| - (worker_num, traceback.format_exc(e))))
|
| -
|
| -
|
| -class _AsyncPool(object):
|
| -
|
| - def __init__(self, host, jobs, callback, context, pre_fn, post_fn):
|
| - self.host = host or Host()
|
| - self.jobs = jobs
|
| - self.callback = callback
|
| - self.context = copy.deepcopy(context)
|
| - self.msgs = []
|
| - self.closed = False
|
| - self.post_fn = post_fn
|
| - self.context_after_pre = pre_fn(self.host, 1, self.context)
|
| - self.final_context = None
|
| -
|
| - def send(self, msg):
|
| - self.msgs.append(msg)
|
| -
|
| - def get(self):
|
| - return self.callback(self.context_after_pre, self.msgs.pop(0))
|
| -
|
| - def close(self):
|
| - self.closed = True
|
| - self.final_context = self.post_fn(self.context_after_pre)
|
| -
|
| - def join(self):
|
| - if not self.closed:
|
| - self.close()
|
| - return [self.final_context]
|
|
|