from __future__ import print_function, division
import os
import sys
import gzip
import itertools
import random
import inspect
import time
import copy
import numpy
import contextlib
import argparse
import h5py
import glob
from . import errors
import struct
import json
import socket
import binascii
import re
### ARGUMENT HANDLING
#python3 support
PY3 = sys.version_info > (3,)
if PY3:
import pickle
import io
import configparser
else:
import StringIO as io
import Queue as queue
import cPickle as pickle
import ConfigParser as configparser
[docs]class OrderedOperation(argparse.Action):
def __call__(self, parser, namespace, values, option_string=None):
oops = getattr(namespace, 'ordered_operations', [])
oops.append((self.dest, values))
setattr(namespace, 'ordered_operations', oops)
[docs]def argparse_common_arguments(parser, *args):
for arg in args:
# (ORDERED) OPERATIONS
if arg == 'project':
parser.add_argument('-p', '--project', metavar='AXIS', action=OrderedOperation, help='project space on AXIS')
elif arg == 'slice':
parser.add_argument('--slice', nargs=2, metavar=('AXIS', 'START:STOP'), action=OrderedOperation, help="slice AXIS from START to STOP (replace minus signs by 'm')")
elif arg == 'pslice':
parser.add_argument('--pslice', nargs=2, metavar=('AXIS', 'START:STOP'), action=OrderedOperation, help="like slice, but also project on AXIS after slicing")
elif arg == 'transform':
parser.add_argument('--transform', metavar='VAR@RES=EXPR;VAR2@RES2=EXPR2;...', action=OrderedOperation, help='perform coordinate transformation, rebinning data on new axis named VAR with resolution RES defined by EXPR, example: Q@0.1=sqrt(H**2+K**2+L**2)')
elif arg == 'rebin':
parser.add_argument('--rebin', metavar='N,M,...', action=OrderedOperation, help='reduce binsize by factor N in first dimension, M in second, etc')
# SUBTRACT
elif arg == 'subtract':
parser.add_argument('--subtract', metavar='SPACE', help='subtract SPACE from input file')
# PRESENTATION
elif arg == 'nolog':
parser.add_argument('--nolog', action='store_true', help='do not use logarithmic axis')
elif arg == 'clip':
parser.add_argument('-c', '--clip', metavar='FRACTION', default=0.00, help='clip color scale to remove FRACTION datapoints')
# OUTPUT
elif arg == 'savepdf':
parser.add_argument('-s', '--savepdf', action='store_true', help='save output as pdf, automatic file naming')
elif arg == 'savefile':
parser.add_argument('--savefile', metavar='FILENAME', help='save output as FILENAME, autodetect filetype')
# ERROR!
else:
raise ValueError("unsupported argument '{0}'".format(arg))
[docs]def handle_ordered_operations(space, args, auto3to2=False):
info = []
for command, opts in getattr(args, 'ordered_operations', []):
if command == 'slice' or command == 'pslice':
ax, key = opts
axindex = space.axes.index(ax)
axlabel = space.axes[axindex].label
if ':' in key:
start, stop = key.split(':')
if start:
start = float(start.replace('m', '-'))
else:
start = space.axes[axindex].min
if stop:
stop = float(stop.replace('m', '-'))
else:
stop = space.axes[axindex].max
key = slice(start, stop)
info.append('sliced in {0} from {1} to {2}'.format(axlabel, start, stop))
else:
key = float(key.replace('m', '-'))
info.append('sliced in {0} at {1}'.format(axlabel, key))
space = space.slice(axindex, key)
if command == 'pslice':
try:
projectaxis = space.axes.index(ax)
except ValueError:
pass
else:
info.append('projected on {0}'.format(space.axes[projectaxis].label))
space = space.project(projectaxis)
elif command == 'project':
projectaxis = space.axes.index(opts)
info.append('projected on {0}'.format(space.axes[projectaxis].label))
space = space.project(projectaxis)
elif command == 'transform':
labels, resolutions, exprs = list(zip(*parse_transform_args(opts)))
transformation = transformation_from_expressions(space, exprs)
info.append('transformed to {0}'.format(', '.join('{0} = {1}'.format(label, expr) for (label, expr) in zip(labels, exprs))))
space = space.transform_coordinates(resolutions, labels, transformation)
elif command == 'rebin':
if ',' in opts:
factors = tuple(int(i) for i in opts.split(','))
else:
factors = (int(opts),) * space.dimension
space = space.rebin(factors)
else:
raise ValueError("unsported Ordered Operation '{0}'".format(command))
if auto3to2 and space.dimension == 3: # automatic projection on smallest axis
projectaxis = numpy.argmin(space.photons.shape)
info.append('projected on {0}'.format(space.axes[projectaxis].label))
space = space.project(projectaxis)
return space, info
### STATUS LINES
_status_line_length = 0
[docs]def status(line, eol=False):
"""Prints a status line to sys.stdout, overwriting the previous one.
Set eol to True to append a newline to the end of the line"""
global _status_line_length
sys.stdout.write('\r{0}\r{1}'.format(' '*_status_line_length, line))
if eol:
sys.stdout.write('\n')
_status_line_length = 0
else:
_status_line_length = len(line)
sys.stdout.flush()
[docs]def statusnl(line):
"""Shortcut for status(..., eol=True)"""
return status(line, eol=True)
[docs]def statuseol():
"""Starts a new status line, keeping the previous one intact"""
global _status_line_length
_status_line_length = 0
sys.stdout.write('\n')
sys.stdout.flush()
[docs]def statuscl():
"""Clears the status line, shortcut for status('')"""
return status('')
### Dispatcher, projection and input finder
[docs]def get_backends():
modules = glob.glob(os.path.join(os.path.dirname(__file__), 'backends', '*.py'))
names = list()
for module in modules:
if not module.endswith('__init__.py'):
names.append(os.path.splitext(os.path.basename(module))[0])
return names
[docs]def get_projections(module):
from . import backend
return get_base(module, backend.ProjectionBase)
[docs]def get_dispatchers():
from . import dispatcher
from inspect import isclass
items = dir(dispatcher)
options = []
for item in items:
obj = getattr(dispatcher, item)
if isclass(obj):
if issubclass(obj, dispatcher.DispatcherBase):
options.append(item)
return options
[docs]def get_base(modname, base):
from inspect import isclass
if modname not in get_backends():
raise KeyError("{0} is not an available backend".format(modname))
try:
backends = __import__('backends.{0}'.format(modname), globals(), locals(), [], 1)
except ImportError as e:
raise ImportError("Unable to import module backends.{0}: {1}".format(modname, e))
backend = getattr(backends, modname)
items = dir(backend)
options = []
for item in items:
obj = getattr(backend, item)
if isclass(obj):
if issubclass(obj, base):
options.append(item)
return options
### Dispatcher, projection and input configuration options finder
[docs]def get_dispatcher_configkeys(classname):
from . import dispatcher
cls = getattr(dispatcher, classname)
return get_configkeys(cls)
[docs]def get_projection_configkeys(modname, classname):
return get_backend_configkeys(modname, classname)
[docs]def get_backend_configkeys(modname, classname):
backends = __import__('backends.{0}'.format(modname), globals(), locals(), [], 1)
backend = getattr(backends, modname)
cls = getattr(backend, classname)
return get_configkeys(cls)
[docs]def get_configkeys(cls):
from inspect import getsource
items = list()
while hasattr(cls, 'parse_config'):
code = getsource(cls.parse_config)
for line in code.split('\n'):
key = parse_configcode(line)
if key:
if key not in items:
items.append(key)
cls = cls.__base__
return items
[docs]def parse_configcode(line):
try:
comment = '#'.join(line.split('#')[1:])
line = line.split('#')[0]
index = line.index('config.pop')
item = line[index:].split('\'')[1]
if item == 'action':
return # action is reserved for internal use!
return item, comment
except ValueError:
pass
### CONFIGURATION MANAGEMENT
[docs]def parse_range(r):
if '-' in r:
a, b = r.split('-')
return list(range(int(a), int(b)+1))
elif r:
return [int(r)]
else:
return []
[docs]def parse_multi_range(s):
if not s:
return s
out = []
ranges = s.split(',')
for r in ranges:
out.extend(parse_range(r))
return out
[docs]def parse_tuple(s, length=None, type=str):
if not s:
return s
t = tuple(type(i) for i in s.split(','))
if length is not None and len(t) != length:
raise ValueError('invalid tuple length: expected {0} got {1}'.format(length, len(t)))
return t
[docs]def parse_bool(s):
l = s.lower()
if l in ('1', 'true', 'yes', 'on'):
return True
elif l in ('0', 'false', 'no', 'off'):
return False
raise ValueError("invalid input for boolean: '{0}'".format(s))
[docs]def parse_pairs(s):
if not s:
return s
limits = []
for lim in re.findall('\[(.*?)\]', s):
parsed = []
for pair in re.split(',', lim):
mi, ma = tuple(m.strip() for m in pair.split(':'))
if mi == '' and ma == '':
parsed.append(slice(None))
elif mi == '':
parsed.append(slice(None, float(ma)))
elif ma == '':
parsed.append(slice(float(mi), None))
else:
if float(ma) < float(mi):
raise ValueError("invalid input. maximum is larger than minimum: '{0}'".format(s))
else:
parsed.append(slice(float(mi), float(ma)))
limits.append(parsed)
return limits
[docs]def limit_to_filelabel(s):
return tuple('[{0}]'.format(lim.replace('-', 'm').replace(':', '-').replace(' ', '')) for lim in re.findall('\[(.*?)\]', s))
#Contains the unparsed config dicts
[docs]class ConfigFile(MetaBase):
def __init__(self, origin='n/a', command=[]):
self.origin = origin
self.command = command
super(ConfigFile, self).__init__()
self.sections = ['dispatcher', 'projection', 'input']
for section in self.sections:
setattr(self, section, dict())
[docs] @classmethod
def fromfile(cls, filename):
if isinstance(filename, str):
if not os.path.exists(filename):
raise IOError('Error importing configuration file. filename {0} does not exist'.format(filename))
configobj = cls(str(filename))
with open_h5py(filename, 'r') as fp:
try:
config = fp['configuration']
if 'command' in config.attrs:
configobj.command = json.loads(config.attrs['command'].decode('utf8'))
for section in config:
if isinstance(config[section], h5py._hl.group.Group): # new
setattr(configobj, section, dict((key, config[section][key].value) for key in config[section]))
else: # old
setattr(configobj, section, dict(config[section]))
except KeyError as e:
pass # when config is not present, proceed without Error
return configobj
[docs] @classmethod
def fromtxtfile(cls, filename, command=[], overrides=[]):
if not os.path.exists(filename):
raise IOError('Error importing configuration file. filename {0} does not exist'.format(filename))
config = configparser.RawConfigParser()
config.read(filename)
for section, option, value in overrides:
config.set(section, option, value)
configobj = cls(filename, command=command)
for section in configobj.sections:
setattr(configobj, section, dict((k, v.split('#')[0].strip()) for (k, v) in config.items(section)))
return configobj
[docs] def tofile(self, filename):
with open_h5py(filename, 'w') as fp:
conf = fp.create_group('configuration')
conf.attrs['origin'] = str(self.origin)
conf.attrs['command'] = json.dumps(self.command)
for section in self.sections:
sectiongroup = conf.create_group(section)
s = getattr(self, section)
for key in list(s.keys()):
sectiongroup.create_dataset(key, data=s[key])
[docs] def totxtfile(self, filename):
with open(filename, 'w') as fp:
fp.write('# Configurations origin: {}\n'.format(self.origin))
for section in self.sections:
fp.write('[{}]\n'.format(section))
s = getattr(self, section)
for entry in s:
fp.write('{} = {}\n'.format(entry, s[entry]))
def __repr__(self):
str = super(ConfigFile, self).__repr__()
str += 'origin = {0}\n'.format(self.origin)
str += 'command = {0}'.format(','.join(self.command))
return str
#contains one parsed dict, for distribution to dispatcher, input or projection class
[docs]class ConfigSection(object):
def __init__(self, **kwargs):
self.__dict__.update(kwargs)
[docs] def copy(self):
return copy.deepcopy(self)
#contains the parsed configsections
[docs]class ConfigSectionGroup(object):
def __init__(self, origin='n/a'):
self.origin = origin
self.sections = 'dispatcher', 'projection', 'input'
for section in self.sections:
setattr(self, section, ConfigSection())
self.configfile = ConfigFile()
[docs]class ConfigurableObject(object):
def __init__(self, config):
if isinstance(config, ConfigSection):
self.config = config
elif not isinstance(config, dict):
raise ValueError('expecting dict or Configsection, not: {0}'.format(type(config)))
else:
self.config = ConfigSection()
try:
allkeys = list(config.keys())
self.parse_config(config)
except KeyError as exc:
raise errors.ConfigError("Configuration option {0} is missing from the configuration file. Please specify this option in the configuration file".format(exc))
except Exception as exc:
missing = set(key for key in allkeys if key not in list(self.config.__dict__.keys())) - set(config.keys())
exc.args = errors.addmessage(exc.args, ". Unable to parse configuration option '{0}'. The error can quite likely be solved by modifying the option in the configuration file.".format(','.join(missing)))
raise
for k in config:
print('warning: unrecognized configuration option {0} for {1}'.format(k, self.__class__.__name__))
self.config.class_ = self.__class__
[docs] def parse_config(self, config):
# every known option should be pop()'ed from config, converted to a
# proper type and stored as property in self.config, for example:
# self.config.foo = int(config.pop('foo', 1))
pass
### FILES
[docs]def best_effort_atomic_rename(src, dest):
if sys.platform == 'win32' and os.path.exists(dest):
os.remove(dest)
os.rename(src, dest)
[docs]def filename_enumerator(filename, start=0):
base, ext = os.path.splitext(filename)
for count in itertools.count(start):
yield '{0}_{2}{1}'.format(base, ext, count)
[docs]def find_unused_filename(filename):
if not os.path.exists(filename):
return filename
for f in filename_enumerator(filename, 2):
if not os.path.exists(f):
return f
[docs]def label_enumerator(label, start=0):
for count in itertools.count(start):
yield '{0}_{1}'.format(label, count)
[docs]def find_unused_label(label, labellist):
for l in label_enumerator(label):
if not l in labellist:
return l
[docs]def yield_when_exists(filelist, timeout=None):
"""Wait for files in 'filelist' to appear, for a maximum of 'timeout' seconds,
yielding them in arbitrary order as soon as they appear.
If 'filelist' is a set, it will be modified in place, and on timeout it will
contain the files that have not appeared yet."""
if not isinstance(filelist, set):
filelist = set(filelist)
delay = loop_delayer(5)
start = time.time()
while filelist:
next(delay)
exists = set(f for f in filelist if os.path.exists(f))
for e in exists:
yield e
filelist -= exists
if timeout is not None and time.time() - start > timeout:
break
[docs]def wait_for_files(filelist, timeout=None):
"""Wait until the files in 'filelist' have appeared, for a maximum of 'timeout' seconds.
Returns True on success, False on timeout."""
filelist = set(filelist)
for i in yield_when_exists(filelist, timeout):
pass
return not filelist
[docs]def wait_for_file(file, timeout=None):
return wait_for_files([file], timeout=timeout)
[docs]def space_to_edf(space, filename):
from PyMca import EdfFile
header = {}
for a in space.axes:
header[str(a.label)] = '{0} {1} {2}'.format(a.min, a.max, a.res)
edf = EdfFile.EdfFile(filename)
edf.WriteImage(header, space.get_masked().filled(0), DataType="Float")
[docs]def space_to_txt(space, filename):
data = [coord.flatten() for coord in space.get_grid()]
data.append(space.get_masked().filled(0).flatten())
data = numpy.array(data).T
with open(filename, 'w') as fp:
fp.write('\t'.join(ax.label for ax in space.axes))
fp.write('\tintensity\n')
numpy.savetxt(fp, data, fmt='%.6g', delimiter='\t')
[docs]@contextlib.contextmanager
def open_h5py(fn, mode):
if isinstance(fn, h5py._hl.group.Group):
yield fn
else:
with h5py.File(fn, mode) as fp:
if mode == 'w':
fp.create_group('binoculars')
yield fp['binoculars']
if mode == 'r':
if 'binoculars' in fp:
yield fp['binoculars']
else:
yield fp
### VARIOUS
[docs]def uniqid():
return '{0:08x}'.format(random.randint(0, 2**32-1))
[docs]def grouper(iterable, n):
while True:
chunk = list(itertools.islice(iterable, n))
if not chunk:
break
yield chunk
_python_executable = None
[docs]def register_python_executable(scriptname):
global _python_executable
_python_executable = sys.executable, scriptname
[docs]def get_python_executable():
return _python_executable
[docs]def chunk_slicer(count, chunksize):
"""yields slice() objects that split an array of length 'count' into equal sized chunks of at most 'chunksize'"""
chunkcount = int(numpy.ceil(float(count) / chunksize))
realchunksize = int(numpy.ceil(float(count) / chunkcount))
for i in range(chunkcount):
yield slice(i*realchunksize, min(count, (i+1)*realchunksize))
[docs]def cluster_jobs(jobs, target_weight):
jobs = sorted(jobs, key=lambda job: job.weight)
# we cannot split jobs here, so just yield away all jobs that are overweight or just right
while jobs and jobs[-1].weight >= target_weight:
yield [jobs.pop()]
while jobs:
cluster = [jobs.pop()] # take the biggest remaining job
size = cluster[0].weight
for i in range(len(jobs)-1, -1, -1): # and exhaustively search for all jobs that can accompany it (biggest first)
if size + jobs[i].weight <= target_weight:
size += jobs[i].weight
cluster.append(jobs.pop(i))
yield cluster
[docs]def cluster_jobs2(jobs, target_weight):
"""Taking the first n jobs that together add up to target_weight.
Here as opposed to cluster_jobs the total number of jobs does not have to be known beforehand
"""
jobslist = []
for job in jobs:
jobslist.append(job)
if sum(j.weight for j in jobslist) >= target_weight:
yield jobslist[:]
jobslist = []
if len(jobslist) > 0: # yield the remainder of the jobs
yield jobslist[:]
[docs]def loop_delayer(delay):
"""Delay a loop such that it runs at most once every 'delay' seconds. Usage example:
delay = loop_delayer(5)
while some_condition:
next(delay)
do_other_tasks
"""
def generator():
polltime = 0
while 1:
diff = time.time() - polltime
if diff < delay:
time.sleep(delay - diff)
polltime = time.time()
yield
return generator()
### GZIP PICKLING (zpi)
# handle old zpi's
def _pickle_translate(module, name):
if module in ('__main__', 'ivoxoar.space') and name in ('Space', 'Axis'):
return 'BINoculars.space', name
return module, name
if inspect.isbuiltin(pickle.Unpickler):
# real cPickle: cannot subclass
def _find_global(module, name):
module, name = _pickle_translate(module, name)
__import__(module)
return getattr(sys.modules[module], name)
def pickle_load(fileobj):
unpickler = pickle.Unpickler(fileobj)
unpickler.find_global = _find_global
return unpickler.load()
else:
# pure python implementation
class _Unpickler(pickle.Unpickler):
def find_class(self, module, name):
module, name = _pickle_translate(module, name)
return pickle.Unpickler.find_class(self, module, name)
[docs] def pickle_load(fileobj):
unpickler = _Unpickler(fileobj)
return unpickler.load()
[docs]@contextlib.contextmanager
def atomic_write(filename):
"""Atomically write data into 'filename' using a temporary file and os.rename()
Rename on success, clean up on failure (any exception).
Example:
with atomic_write(filename) as tmpfile
with open(tmpfile, 'w') as fp:
fp.write(...)
"""
if isinstance(filename, h5py._hl.group.Group):
yield filename
else:
tmpfile = '{0}-{1}.tmp'.format(os.path.splitext(filename)[0], uniqid())
try:
yield tmpfile
except:
raise
else:
best_effort_atomic_rename(tmpfile, filename)
finally:
if os.path.exists(tmpfile):
os.remove(tmpfile)
[docs]def zpi_save(obj, filename):
with atomic_write(filename) as tmpfile:
fp = gzip.open(tmpfile, 'wb')
try:
pickle.dump(obj, fp, pickle.HIGHEST_PROTOCOL)
finally:
fp.close()
[docs]def zpi_load(filename):
if hasattr(filename, 'read'):
fp = gzip.GzipFile(filename.name, fileobj=filename)
else:
fp = gzip.open(filename, 'rb')
try:
return pickle_load(fp)
finally:
fp.close()
[docs]def serialize(space, command):
# first 48 bytes contain length of the message, whereby the first 8 give the length of the command, the second 8 the length of the configfile etc..
message = io.StringIO()
message.write(struct.pack('QQQQQQ', 0, 0, 0, 0, 0, 0))
message.write(command)
commandlength = message.len - 48
message.write(space.config.serialize())
configlength = message.len - commandlength - 48
message.write(space.metadata.serialize())
metalength = message.len - configlength - commandlength - 48
numpy.save(message, space.axes.toarray())
arraylength = message.len - metalength - configlength - commandlength - 48
numpy.save(message, space.photons)
photonlength = message.len - arraylength - metalength - configlength - commandlength - 48
numpy.save(message, space.contributions)
contributionlength = message.len - photonlength - arraylength - metalength - configlength - commandlength - 48
message.seek(0)
message.write(struct.pack('QQQQQQ', commandlength, configlength, metalength, arraylength, photonlength, contributionlength))
message.seek(0)
return message
[docs]def packet_slicer(length, size=1024): # limit the communication to 1024 bytes
while length > size:
length -= size
yield size
yield length
[docs]def socket_send(ip, port, mssg):
try:
mssglengths = struct.unpack('QQQQQQ', mssg.read(48)) # the lengths of all the components
mssg.seek(0)
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((ip, port))
sock.send(mssg.read(48))
for l in mssglengths:
for packet in packet_slicer(l):
sock.send(mssg.read(packet))
sock.close()
except socket.error: # in case of failure to send. The data will be saved anyway so any loss of communication unfortunate but not critical
pass
[docs]def socket_recieve(RequestHandler): # pass one the handler to deal with incoming data
def get_msg(length):
msg = io.StringIO()
for packet in packet_slicer(length):
p = RequestHandler.request.recv(packet, socket.MSG_WAITALL) # wait for full mssg
msg.write(p)
if msg.len != length:
raise errors.CommunicationError('recieved message is too short. expected length {0}, recieved length {1}'.format(length, msg.len))
msg.seek(0)
return msg
command, config, metadata, axes, photons, contributions = tuple(get_msg(msglength) for msglength in struct.unpack('QQQQQQ', RequestHandler.request.recv(48, socket.MSG_WAITALL)))
return command.read(), config.read(), metadata.read(), numpy.load(axes), numpy.load(photons), numpy.load(contributions)