import os
import sys
import argparse
from . import space, backend, util, errors
[docs]def parse_args(args):
parser = argparse.ArgumentParser(prog='binoculars process')
parser.add_argument('-c', metavar='SECTION:OPTION=VALUE', action='append', type=parse_commandline_config_option, default=[], help='additional configuration option in the form section:option=value')
parser.add_argument('configfile', help='configuration file')
parser.add_argument('command', nargs='*', default=[])
return parser.parse_args(args)
[docs]def parse_commandline_config_option(s):
try:
key, value = s.split('=', 1)
section, option = key.split(':')
except ValueError:
raise argparse.ArgumentTypeError("configuration specification '{0}' not in the form section:option=value".format(s))
return section, option, value
[docs]def multiprocessing_main(xxx_todo_changeme): # note the double parenthesis for map() convenience
(config, command) = xxx_todo_changeme
Main.from_object(config, command)
return config.dispatcher.destination.retrieve()
[docs]class Main(object):
def __init__(self, config, command):
if isinstance(config, util.ConfigSectionGroup):
self.config = config.configfile.copy()
elif isinstance(config, util.ConfigFile):
self.config = config.copy()
else:
raise ValueError('Configfile is the wrong type')
# distribute the configfile to space and to the metadata instance
spaceconf = self.config.copy()
#input from either the configfile or the configsectiongroup is valid
self.dispatcher = backend.get_dispatcher(config.dispatcher, self, default='local')
self.projection = backend.get_projection(config.projection)
self.input = backend.get_input(config.input)
self.dispatcher.config.destination.set_final_options(self.input.get_destination_options(command))
if 'limits' in self.config.projection:
self.dispatcher.config.destination.set_limits(self.config.projection['limits'])
if command:
self.dispatcher.config.destination.set_config(spaceconf)
self.run(command)
[docs] @classmethod
def from_args(cls, args):
args = parse_args(args)
if not os.path.exists(args.configfile):
# wait up to 10 seconds if it is a zpi, it might take a while for the file to appear accross the network
if not args.configfile.endswith('.zpi') or not util.wait_for_file(args.configfile, 10):
raise errors.FileError("configuration file '{0}' does not exist".format(args.configfile))
configobj = False
with open(args.configfile, 'rb') as fp:
if fp.read(2) == '\x1f\x8b': # gzip marker
fp.seek(0)
configobj = util.zpi_load(fp)
if not configobj:
# reopen args.configfile as text
configobj = util.ConfigFile.fromtxtfile(args.configfile, command=args.command, overrides=args.c)
return cls(configobj, args.command)
[docs] @classmethod
def from_object(cls, config, command):
config.command = command
return cls(config, command)
[docs] def run(self, command):
if self.dispatcher.has_specific_task():
self.dispatcher.run_specific_task(command)
else:
jobs = self.input.generate_jobs(command)
tokens = self.dispatcher.process_jobs(jobs)
self.result = self.dispatcher.sum(tokens)
if self.result is True:
pass
elif isinstance(self.result, space.EmptySpace):
sys.stderr.write('error: output is an empty dataset\n')
else:
self.dispatcher.config.destination.store(self.result)
[docs] def process_job(self, job):
def generator():
res = self.projection.config.resolution
labels = self.projection.get_axis_labels()
for intensity, weights, params in self.input.process_job(job):
coords = self.projection.project(*params)
if self.projection.config.limits == None:
yield space.Multiverse((space.Space.from_image(res, labels, coords, intensity, weights=weights), ))
else:
yield space.Multiverse(space.Space.from_image(res, labels, coords, intensity, weights=weights, limits=limits) for limits in self.projection.config.limits)
jobverse = space.chunked_sum(generator(), chunksize=25)
for sp in jobverse.spaces:
if isinstance(sp, space.Space):
sp.metadata.add_dataset(self.input.metadata)
return jobverse
[docs] def clone_config(self):
config = util.ConfigSectionGroup()
config.configfile = self.config
config.dispatcher = self.dispatcher.config.copy()
config.projection = self.projection.config.copy()
config.input = self.input.config.copy()
return config
[docs] def get_reentrant(self):
return multiprocessing_main
[docs]class Split(Main): # completely ignores the dispatcher, just yields a space per image
def __init__(self, config, command):
self.command = command
if isinstance(config, util.ConfigSectionGroup):
self.config = config.configfile.copy()
elif isinstance(config, util.ConfigFile):
self.config = config.copy()
else:
raise ValueError('Configfile is the wrong type')
#input from either the configfile or the configsectiongroup is valid
self.projection = backend.get_projection(config.projection)
self.input = backend.get_input(config.input)
[docs] def process_job(self, job):
res = self.projection.config.resolution
labels = self.projection.get_axis_labels()
for intensity, weights, params in self.input.process_job(job):
coords = self.projection.project(*params)
if self.projection.config.limits == None:
yield space.Space.from_image(res, labels, coords, intensity, weights=weights)
else:
yield space.Multiverse(space.Space.from_image(res, labels, coords, intensity, weights=weights, limits=limits) for limits in self.projection.config.limits)
[docs] def run(self):
for job in self.input.generate_jobs(self.command):
for verse in self.process_job(job):
yield verse