import io
import logging
import os
import shutil
import uuid
import ruamel.yaml
import xapian
import jinja2
from pkg_resources import resource_string
from . import types
try:
import git
git_available = True
except ImportError:
git_available = False
[docs]class Entry(object):
def __init__(self, collection, data={}, fname=None):
self.collection = collection
self.data = data or self.empty_data()
self.fname = fname
self.eid = self.data.get('eid', None)
if not self.eid:
if self.fname:
self.eid, ext = os.path.splitext(os.path.basename(self.fname))
else:
self.eid = uuid.uuid4().hex
if not self.fname:
self.fname = self.eid + '.yaml'
def __str__(self):
label = self.collection.settings.get('entry_label', None)
if label:
t = jinja2.Template(label)
return t.render(**self.get_data())
else:
return self.eid
[docs] def get_data(self):
d = self.data.copy()
d['eid'] = self.eid
d['fname'] = self.fname
d['short_id'] = self.short_id
return d
[docs] def empty_data(self):
data = ''
for field in self.collection.settings['fields']:
if not field.get('help', None) is None:
data += "# {name} ({type}): {help}\n".format(**field)
t = field['type']
if field.get('default', None):
data += "{name}: '{default}'\n".format(**field)
elif t == 'string':
data += "{name}: ''\n".format(**field)
elif t == 'text':
data += "{name}: |\n .\n".format(**field)
elif t == 'integer':
data += "{name}: 0\n".format(**field)
elif t == 'float':
data += "{name}: 0.0\n".format(**field)
elif t == 'decimal':
data += "{name}: 0.00\n".format(**field)
elif t == 'list':
data += "{name}: []\n".format(**field)
else:
data += "{name}: \n".format(**field)
return self.collection.yaml.load(data)
@property
def yaml_data(self):
to_dump = self.data.copy()
# Decimal fields can't be represented by
# ruamel.yaml.RoundTripDumper, but transforming them to strings
# should be enough for all cases that we need.
for field in self.collection.settings['fields']:
if field['type'] == 'decimal':
v = to_dump.get(field['name'], '')
if v:
to_dump[field['name']] = str(v)
s_io = io.StringIO()
self.collection.yaml.dump(to_dump, s_io)
return s_io.getvalue()
@property
def idterm(self):
return "Q" + self.eid
@property
def short_id(self):
return self.eid[:8]
[docs] def validate(self):
errors = []
valid = True
for name, field in self.collection.fields.items():
value = self.data.get(name, None)
try:
self.data[name] = field.load(value)
except types.LesanaValueError as e:
valid = False
errors.append(
{
'field': name,
'error': e,
}
)
return valid, errors
[docs] def render(self, template, searchpath='.'):
jtemplate = self.collection.get_template(template, searchpath)
try:
return jtemplate.render(entry=self)
except jinja2.exceptions.TemplateSyntaxError as e:
raise TemplatingError('Template Syntax Error: ' + str(e))
[docs]class Collection(object):
"""
"""
PARSER_FLAGS = (
xapian.QueryParser.FLAG_BOOLEAN
| xapian.QueryParser.FLAG_PHRASE # noqa: W503
| xapian.QueryParser.FLAG_LOVEHATE # noqa: W503
| xapian.QueryParser.FLAG_WILDCARD # noqa: W503
)
def __init__(self, directory=None, itemdir='items'):
self.basedir = directory or os.getcwd()
self.itemdir = os.path.join(self.basedir, itemdir)
self.yaml = ruamel.yaml.YAML()
self.yaml.preserve_quotes = True
try:
with open(os.path.join(self.basedir, 'settings.yaml')) as fp:
self.settings = self.yaml.load(fp)
except FileNotFoundError:
self.settings = self.yaml.load("{}")
self.fields = self._load_field_types()
os.makedirs(os.path.join(self.basedir, '.lesana'), exist_ok=True)
if 'lang' in self.settings:
try:
self.stemmer = xapian.Stem(self.settings['lang'])
except xapian.InvalidArgumentError:
logging.warning(
"Invalid language %s, in settings.yaml: using english.",
self.settings['lang'],
)
self.stemmer = xapian.Stem('english')
else:
self.stemmer = xapian.Stem('english')
self._enquire = None
self.entry_class = Entry
def _get_subsubclasses(self, cls):
for c in cls.__subclasses__():
yield c
yield from self._get_subsubclasses(c)
def _load_field_types(self):
type_loaders = {}
for t in self._get_subsubclasses(types.LesanaType):
type_loaders[t.name] = t
fields = {}
for i, field in enumerate(self.settings.get('fields', [])):
try:
fields[field['name']] = type_loaders[field['type']](
field,
type_loaders,
# value slot 0 is used to store the filename, and we
# reserve a few more slots just in case they are
# needed by lesana or some derivative
value_index=i + 16,
)
except KeyError:
# unknown fields are treated as if they were
# (unvalidated) generic YAML to support working with
# collections based on lesana derivatives
logging.warning(
"Unknown field type %s in field %s",
field['type'],
field['name'],
)
fields[field['name']] = types.LesanaYAML(field, type_loaders)
return fields
def _index_file(self, fname, cache):
with open(os.path.join(self.itemdir, fname)) as fp:
data = self.yaml.load(fp)
entry = self.entry_class(self, data, fname)
valid, errors = entry.validate()
if not valid:
logging.warning(
"Not indexing {fname}: invalid data".format(fname=fname)
)
return False, errors
doc = xapian.Document()
self.indexer.set_document(doc)
for field, loader in self.fields.items():
loader.index(doc, self.indexer, entry.data.get(field))
doc.set_data(entry.yaml_data)
doc.add_boolean_term(entry.idterm)
doc.add_value(0, entry.fname.encode('utf-8'))
cache.replace_document(entry.idterm, doc)
return True, []
@property
def indexed_fields(self):
fields = []
for field in self.settings['fields']:
if field.get('index', '') in ['free', 'field']:
prefix = field.get('prefix', 'X' + field['name'].upper())
fields.append(
{
'prefix': prefix,
'name': field['name'],
'free_search': field['index'] == 'free',
'multi': field['type'] in ['list'],
}
)
return fields
[docs] def update_cache(self, fnames=None, reset=False):
"""
Update the xapian db with the data in files.
``fnames`` is a list of *basenames* of files in ``self.itemdir``.
If no files have been passed, add everything.
if ``reset`` the existing xapian db is deleted before indexing
Return the number of files that have been added to the cache.
"""
if reset:
shutil.rmtree(os.path.join(self.basedir, '.lesana'))
os.makedirs(os.path.join(self.basedir, '.lesana'), exist_ok=True)
cache = xapian.WritableDatabase(
os.path.join(self.basedir, '.lesana/xapian'),
xapian.DB_CREATE_OR_OPEN,
)
self.indexer = xapian.TermGenerator()
self.indexer.set_stemmer(self.stemmer)
if not fnames:
try:
fnames = os.listdir(self.itemdir)
except FileNotFoundError:
logging.warning(
"No such file or directory: {}, not updating cache".format(
self.itemdir
)
)
return 0
updated = 0
for fname in fnames:
try:
valid, errors = self._index_file(fname, cache)
except IOError as e:
logging.warning(
"Could not load file {}: {}".format(fname, str(e))
)
else:
if valid:
updated += 1
else:
logging.warning(
"File {fname} could not be indexed: {errors}".format(
fname=fname, errors=errors
)
)
return updated
[docs] def save_entries(self, entries=[]):
for e in entries:
complete_name = os.path.join(self.itemdir, e.fname)
with open(complete_name, 'w') as fp:
fp.write(e.yaml_data)
[docs] def git_add_files(self, files=[]):
if not git_available:
logging.warning(
"python3-git not available, could not initalise "
+ "the git repository." # noqa: W503
)
return False
if not self.settings.get('git', False):
logging.info("This collection is configured not to use git")
return False
try:
repo = git.Repo(self.basedir, search_parent_directories=True)
except git.exc.InvalidGitRepositoryError:
logging.warning(
"Could not find a git repository in {}".format(self.basedir)
)
return False
repo.index.add(files)
return True
def _get_cache(self):
try:
cache = xapian.Database(
os.path.join(self.basedir, '.lesana/xapian'),
)
except xapian.DatabaseOpeningError:
logging.info("No database found, indexing entries.")
self.update_cache()
cache = xapian.Database(
os.path.join(self.basedir, '.lesana/xapian'),
)
return cache
[docs] def start_search(self, querystring, sort_by=None):
"""
Prepare a search for querystring.
"""
cache = self._get_cache()
queryparser = xapian.QueryParser()
queryparser.set_stemmer(self.stemmer)
queryparser.set_database(cache)
for field in self.indexed_fields:
queryparser.add_prefix(field['name'], field['prefix'])
query = queryparser.parse_query(querystring, self.PARSER_FLAGS)
self._enquire = xapian.Enquire(cache)
self._enquire.set_query(query)
if sort_by:
keymaker = xapian.MultiValueKeyMaker()
for k in sort_by:
if k.startswith('+'):
reverse = False
slot = self.fields[k[1:]].value_index
elif k.startswith('-'):
reverse = True
slot = self.fields[k[1:]].value_index
else:
reverse = False
slot = self.fields[k].value_index
keymaker.add_value(slot, reverse)
self._enquire.set_sort_by_key_then_relevance(keymaker, False)
[docs] def get_search_results(self, offset=0, pagesize=12):
if not self._enquire:
return
for match in self._enquire.get_mset(offset, pagesize):
yield self._match_to_entry(match)
[docs] def get_all_search_results(self):
if not self._enquire:
return
offset = 0
pagesize = 100
while True:
mset = self._enquire.get_mset(offset, pagesize)
if mset.size() == 0:
break
for match in mset:
yield self._match_to_entry(match)
offset += pagesize
[docs] def get_all_documents(self):
cache = self._get_cache()
postlist = cache.postlist("")
for post in postlist:
doc = cache.get_document(post.docid)
yield self._doc_to_entry(doc)
def _match_to_entry(self, match):
return self._doc_to_entry(match.document)
def _doc_to_entry(self, doc):
fname = doc.get_value(0).decode('utf-8')
data = self.yaml.load(doc.get_data())
entry = self.entry_class(self, data=data, fname=fname,)
return entry
[docs] def entry_from_eid(self, eid):
cache = self._get_cache()
postlist = cache.postlist('Q' + eid)
for pitem in postlist:
return self._doc_to_entry(cache.get_document(pitem.docid))
return None
[docs] def entries_from_short_eid(self, seid):
# It would be better to search for partial UIDs inside xapian,
# but I still can't find a way to do it, so this is a workable
# workaround on repos where the eids are stored in the
# filenames.
potential_eids = [
os.path.splitext(f)[0]
for f in os.listdir(self.itemdir)
if f.startswith(seid)
]
return [self.entry_from_eid(u) for u in potential_eids if u]
[docs] def remove_entries(self, eids):
cache = xapian.WritableDatabase(
os.path.join(self.basedir, '.lesana/xapian'),
xapian.DB_CREATE_OR_OPEN,
)
for eid in eids:
for entry in self.entries_from_short_eid(eid):
if entry is not None:
cache.delete_document(entry.idterm)
self.remove_file(entry.fname)
else:
logging.warning("No such entry: {}, ignoring".format(eid))
cache.commit()
cache.close()
[docs] def remove_file(self, fname):
f_path = os.path.join(self.itemdir, fname)
if git_available and self.settings.get('git', False):
try:
repo = git.Repo(self.basedir, search_parent_directories=True)
except git.exc.InvalidGitRepositoryError:
logging.warning(
"Could not find a git repository in {}".format(
self.basedir
)
)
return False
repo.index.remove([f_path])
os.remove(f_path)
[docs] def update_field(self, query, field, value):
self.start_search(query)
changed = []
for e in self.get_all_search_results():
e.data[field] = value
changed.append(e)
self.save_entries(changed)
self.git_add_files(
[os.path.join(self.itemdir, e.fname) for e in changed]
)
self.update_cache([e.fname for e in changed])
[docs] def get_template(self, template_fname, searchpath='.'):
env = jinja2.Environment(
loader=jinja2.FileSystemLoader(
searchpath=searchpath, followlinks=True,
),
# TODO: add autoescaping settings
)
try:
template = env.get_template(template_fname)
except jinja2.exceptions.TemplateNotFound as e:
raise TemplatingError('Could not find template' + str(e))
return template
[docs] @classmethod
def init(
cls, directory=None, git_enabled=True, edit_file=None, settings={}
):
"""
Initialize a lesana repository
directory defaults to .
if git_enabled is True, git support is enabled and if possible a git
repository is initalized.
edit_file is a syncronous function that runs on a filename
(possibly opening the file in an editor) and should manage its
own errors.
"""
c_dir = os.path.abspath(directory or '.')
os.makedirs(c_dir, exist_ok=True)
if git_enabled:
# Try to initalize a git repo
if git_available:
repo = git.Repo.init(c_dir, bare=False)
else:
logging.warning(
"python3-git not available, could not initalise "
+ "the git repository." # noqa: W503
)
repo = None
# Add .lesana directory to .gitignore and add it to the
# staging
lesana_ignored = False
try:
with open(os.path.join(c_dir, '.gitignore'), 'r') as fp:
for line in fp:
if '.lesana' in line:
lesana_ignored = True
continue
except FileNotFoundError:
pass
if not lesana_ignored:
with open(os.path.join(c_dir, '.gitignore'), 'a') as fp:
fp.write('#Added by lesana init\n.lesana')
if repo:
repo.index.add(['.gitignore'])
# TODO: Add hook to index files as they are pulled
# If it doesn't exist, create a skeleton of settings.yaml file
# then open settings.yaml for editing
filepath = os.path.join(c_dir, 'settings.yaml')
if not os.path.exists(filepath):
skel = resource_string('lesana', 'data/settings.yaml').decode(
'utf-8'
)
yaml = ruamel.yaml.YAML()
skel_dict = yaml.load(skel)
skel_dict['git'] = git_enabled
skel_dict.update(settings)
with open(filepath, 'w') as fp:
yaml.dump(skel_dict, stream=fp)
if edit_file:
edit_file(filepath)
if git_enabled and repo:
repo.index.add(['settings.yaml'])
coll = cls(c_dir)
os.makedirs(os.path.join(coll.basedir, coll.itemdir), exist_ok=True)
return coll
[docs]class TemplatingError(Exception):
"""
Raised when there are errors rendering a jinja template
"""