astropy:docs

Source code for astropy.io.ascii.ipac

# Licensed under a 3-clause BSD style license - see LICENSE.rst
"""An extensible ASCII table reader and writer.

ipac.py:
  Classes to read IPAC table format

:Copyright: Smithsonian Astrophysical Observatory (2011)
:Author: Tom Aldcroft (aldcroft@head.cfa.harvard.edu)
"""

##
## Redistribution and use in source and binary forms, with or without
## modification, are permitted provided that the following conditions are met:
##     * Redistributions of source code must retain the above copyright
##       notice, this list of conditions and the following disclaimer.
##     * Redistributions in binary form must reproduce the above copyright
##       notice, this list of conditions and the following disclaimer in the
##       documentation and/or other materials provided with the distribution.
##     * Neither the name of the Smithsonian Astrophysical Observatory nor the
##       names of its contributors may be used to endorse or promote products
##       derived from this software without specific prior written permission.
##
## THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
## ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
## WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
## DISCLAIMED. IN NO EVENT SHALL <COPYRIGHT HOLDER> BE LIABLE FOR ANY
## DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
## (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
## LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
## ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
## (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
## SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

import re
from collections import defaultdict
from textwrap import wrap
from warnings import warn

from . import core
from . import fixedwidth
from ...utils import OrderedDict
from ...utils.exceptions import AstropyUserWarning
from ...table.pprint import _format_funcs, _auto_format_func


class IpacFormatErrorDBMS(Exception):
    def __str__(self):
        return '{0}\nSee {1}'.format(
            super(Exception, self).__str__(),
            'http://irsa.ipac.caltech.edu/applications/DDGEN/Doc/DBMSrestriction.html')


class IpacFormatError(Exception):
    def __str__(self):
        return '{0}\nSee {1}'.format(
            super(Exception, self).__str__(),
            'http://irsa.ipac.caltech.edu/applications/DDGEN/Doc/ipac_tbl.html')


[docs]class Ipac(fixedwidth.FixedWidth): """Read or write an IPAC format table. See http://irsa.ipac.caltech.edu/applications/DDGEN/Doc/ipac_tbl.html:: \\name=value \\ Comment | column1 | column2 | column3 | column4 | column5 | | double | double | int | double | char | | unit | unit | unit | unit | unit | | null | null | null | null | null | 2.0978 29.09056 73765 2.06000 B8IVpMnHg Or:: |-----ra---|----dec---|---sao---|------v---|----sptype--------| 2.09708 29.09056 73765 2.06000 B8IVpMnHg The comments and keywords defined in the header are available via the output table ``meta`` attribute:: >>> import os >>> from astropy.io import ascii >>> filename = os.path.join(ascii.__path__[0], 'tests/t/ipac.dat') >>> data = ascii.read(filename) >>> print data.meta['comments'] ['This is an example of a valid comment'] >>> for name, keyword in data.meta['keywords'].items(): ... print name, keyword['value'] ... intval 1 floatval 2300.0 date Wed Sp 20 09:48:36 1995 key_continue IPAC keywords can continue across lines Note that there are different conventions for characters occuring below the position of the ``|`` symbol in IPAC tables. By default, any character below a ``|`` will be ignored (since this is the current standard), but if you need to read files that assume characters below the ``|`` symbols belong to the column before or after the ``|``, you can specify ``definition='left'`` or ``definition='right'`` respectively when reading the table (the default is ``definition='ignore'``). The following examples demonstrate the different conventions: * ``definition='ignore'``:: | ra | dec | | float | float | 1.2345 6.7890 * ``definition='left'``:: | ra | dec | | float | float | 1.2345 6.7890 * ``definition='right'``:: | ra | dec | | float | float | 1.2345 6.7890 Parameters ---------- definition : str, optional Specify the convention for characters in the data table that occur directly below the pipe (`|`) symbol in the header column definition: * 'ignore' - Any character beneath a pipe symbol is ignored (default) * 'right' - Character is associated with the column to the right * 'left' - Character is associated with the column to the left DBMS : bool, optional If true, this varifies that written tables adhere (semantically) to the `IPAC/DBMS <http://irsa.ipac.caltech.edu/applications/DDGEN/Doc/DBMSrestriction.html>`_ definiton of IPAC tables. If 'False' it only checks for the (less strict) `IPAC <http://irsa.ipac.caltech.edu/applications/DDGEN/Doc/ipac_tbl.html>`_ definition. """ _format_name = 'ipac' _io_registry_format_aliases = ['ipac'] _io_registry_can_write = True _description = 'IPAC format table' def __init__(self, definition='ignore', DBMS=False): super(fixedwidth.FixedWidth, self).__init__() self.header = IpacHeader(definition=definition) self.data = IpacData() self.data.header = self.header self.header.data = self.data self.header.DBMS = DBMS self.data.splitter.delimiter = ' ' self.data.splitter.delimiter_pad = '' self.data.splitter.bookend = True
[docs] def write(self, table): """Write ``table`` as list of strings. :param table: input table data (astropy.table.Table object) :returns: list of strings corresponding to ASCII table """ core._apply_include_exclude_names(table, self.names, self.include_names, self.exclude_names, self.strict_names) # link information about the columns to the writer object (i.e. self) self.header.cols = table.columns.values() self.data.cols = table.columns.values() # Write header and data to lines list lines = [] # Write meta information if 'comments' in table.meta: for comment in table.meta['comments']: if len(str(comment)) > 78: warn('Comment string > 78 characters was automatically wrapped.', AstropyUserWarning) for line in wrap(str(comment), 80, initial_indent='\\ ', subsequent_indent='\\ '): lines.append(line) if 'keywords' in table.meta: keydict = table.meta['keywords'] for keyword in keydict: try: val = keydict[keyword]['value'] lines.append('\\{0}={1!r}'.format(keyword.strip(), val)) # meta is not standardized: Catch some common Errors. except TypeError: pass # get header and data as strings to find width of each column for i, col in enumerate(table.columns.values()): col.headwidth = max([len(vals[i]) for vals in self.header.str_vals()]) # keep data_str_vals because they take some time to make data_str_vals = self.data.str_vals() for i, col in enumerate(table.columns.values()): col.width = max([len(vals[i]) for vals in data_str_vals]) widths = [max(col.width, col.headwidth) for col in table.columns.values()] # then write table self.header.write(lines, widths) self.data.write(lines, widths, data_str_vals) return lines
class IpacHeaderSplitter(core.BaseSplitter): '''Splitter for Ipac Headers. This splitter is similar its parent when reading, but supports a fixed width format (as required for Ipac table headers) for writing. ''' process_line = None process_val = None delimiter = '|' delimiter_pad = '' skipinitialspace = False comment = r'\s*\\' write_comment = r'\\' col_starts = None col_ends = None def join(self, vals, widths): pad = self.delimiter_pad or '' delimiter = self.delimiter or '' padded_delim = pad + delimiter + pad bookend_left = delimiter + pad bookend_right = pad + delimiter vals = [' ' * (width - len(val)) + val for val, width in zip(vals, widths)] return bookend_left + padded_delim.join(vals) + bookend_right class IpacHeader(fixedwidth.FixedWidthHeader): """IPAC table header""" splitter_class = IpacHeaderSplitter col_type_map = {'int': core.IntType, 'integer': core.IntType, 'long': core.IntType, 'double': core.FloatType, 'float': core.FloatType, 'real': core.FloatType, 'char': core.StrType, 'date': core.StrType, 'i': core.IntType, 'l': core.IntType, 'd': core.FloatType, 'f': core.FloatType, 'r': core.FloatType, 'c': core.StrType} def __init__(self, definition='ignore'): fixedwidth.FixedWidthHeader.__init__(self) if definition in ['ignore', 'left', 'right']: self.ipac_definition = definition else: raise ValueError("definition should be one of ignore/left/right") def process_lines(self, lines): """Generator to yield IPAC header lines, i.e. those starting and ending with delimiter character.""" delim = self.splitter.delimiter for line in lines: if line.startswith(delim) and line.endswith(delim): yield line.strip(delim) def update_meta(self, lines, meta): """ Extract table-level comments and keywords for IPAC table. See: http://irsa.ipac.caltech.edu/applications/DDGEN/Doc/ipac_tbl.html#kw """ def process_keyword_value(val): """ Take a string value and convert to float, int or str, and strip quotes as needed. """ val = val.strip() try: val = int(val) except: try: val = float(val) except: # Strip leading/trailing quote. The spec says that a matched pair # of quotes is required, but this code will allow a non-quoted value. for quote in ('"', "'"): if val.startswith(quote) and val.endswith(quote): val = val[1:-1] break return val table_meta = meta['table'] table_meta['comments'] = [] table_meta['keywords'] = OrderedDict() keywords = table_meta['keywords'] re_keyword = re.compile(r'\\' r'(?P<name> \w+)' r'\s* = (?P<value> .+) $', re.VERBOSE) for line in lines: # Keywords and comments start with "\". Once the first non-slash # line is seen then bail out. if not line.startswith('\\'): break m = re_keyword.match(line) if m: name = m.group('name') val = process_keyword_value(m.group('value')) # IPAC allows for continuation keywords, e.g. # \SQL = 'WHERE ' # \SQL = 'SELECT (25 column names follow in next row.)' if name in keywords and isinstance(val, basestring): prev_val = keywords[name]['value'] if isinstance(prev_val, basestring): val = prev_val + val table_meta['keywords'][name] = {'value': val} else: # Comment is required to start with "\ " if line.startswith('\\ '): val = line[2:].strip() if val: table_meta['comments'].append(val) def get_cols(self, lines): """Initialize the header Column objects from the table ``lines``. Based on the previously set Header attributes find or create the column names. Sets ``self.cols`` with the list of Columns. :param lines: list of table lines :returns: list of table Columns """ header_lines = self.process_lines(lines) # generator returning valid header lines header_vals = [vals for vals in self.splitter(header_lines)] if len(header_vals) == 0: raise ValueError('At least one header line beginning and ending with ' 'delimiter required') elif len(header_vals) > 4: raise ValueError('More than four header lines were found') # Generate column definitions cols = [] start = 1 for i, name in enumerate(header_vals[0]): col = core.Column(name=name.strip(' -')) col.start = start col.end = start + len(name) if len(header_vals) > 1: col.raw_type = header_vals[1][i].strip(' -') col.type = self.get_col_type(col) if len(header_vals) > 2: col.unit = header_vals[2][i].strip() # Can't strip dashes here if len(header_vals) > 3: # The IPAC null value corresponds to the io.ascii bad_value. # In this case there isn't a fill_value defined, so just put # in the minimal entry that is sure to convert properly to the # required type. # # Strip spaces but not dashes (not allowed in NULL row per # https://github.com/astropy/astropy/issues/361) null = header_vals[3][i].strip() fillval = '' if issubclass(col.type, core.StrType) else '0' self.data.fill_values.append((null, fillval, col.name)) start = col.end + 1 cols.append(col) # Correct column start/end based on definition if self.ipac_definition == 'right': col.start -= 1 elif self.ipac_definition == 'left': col.end += 1 self.names = [x.name for x in cols] self.cols = cols def str_vals(self): if self.DBMS: IpacFormatE = IpacFormatErrorDBMS else: IpacFormatE = IpacFormatError namelist = [col.name for col in self.cols] if self.DBMS: countnamelist = defaultdict(int) for col in self.cols: countnamelist[col.name.lower()] += 1 doublenames = [x for x in countnamelist if countnamelist[x] > 1] if doublenames != []: raise IpacFormatE('IPAC DBMS tables are not case sensitive. ' 'This causes duplicate column names: {0}'.format(doublenames)) for name in namelist: m = re.match('\w+', name) if m.end() != len(name): raise IpacFormatE('{0} - Only alphanumaric characters and _ ' 'are allowed in column names.'.format(name)) if self.DBMS and not(name[0].isalpha() or (name[0] == '_')): raise IpacFormatE('Column name cannot start with numbers: {}'.format(name)) if self.DBMS: if name in ['x', 'y', 'z', 'X', 'Y', 'Z']: raise IpacFormatE('{0} - x, y, z, X, Y, Z are reserved names and ' 'cannot be used as column names.'.format(name)) if len(name) > 16: raise IpacFormatE( '{0} - Maximum length for column name is 16 characters'.format(name)) else: if len(name) > 40: raise IpacFormatE( '{0} - Maximum length for column name is 40 characters.'.format(name)) dtypelist = [] unitlist = [] nullist = [] for col in self.cols: if col.dtype.kind in ['i', 'u']: dtypelist.append('long') elif col.dtype.kind == 'f': dtypelist.append('double') else: dtypelist.append('char') if col.unit is None: unitlist.append('') else: unitlist.append(str(col.unit)) null = getattr(col, 'fill_value', 'null') try: format_func = _format_funcs.get(col.format, _auto_format_func) nullist.append((format_func(col.format, null)).strip()) except: # It is pssible that null and the column values have different # data types (e.g. number und null = 'null' (i.e. a string). # This could cause all kinds of exceptions, so a catch all # block is needed here nullist.append(str(null).strip()) return [namelist, dtypelist, unitlist, nullist] def write(self, lines, widths): '''Write header. The width of each column is determined in Ipac.write. Writing the header must be delayed until that time. This function is called from there, once the width information is available.''' for vals in self.str_vals(): lines.append(self.splitter.join(vals, widths)) return lines class IpacData(fixedwidth.FixedWidthData): """IPAC table data reader""" comment = r'[|\\]' def str_vals(self): '''return str vals for each in the table''' vals_list = [] # just to make sure self._set_col_formats() col_str_iters = [col.iter_str_vals() for col in self.cols] for vals in core.izip(*col_str_iters): vals_list.append(vals) return vals_list def write(self, lines, widths, vals_list): """ IPAC writer, modified from FixedWidth writer """ for vals in vals_list: lines.append(self.splitter.join(vals, widths)) return lines

Page Contents