__author__ = "Christopher Tomkins-Tinch"
__copyright__ = "Copyright 2017, Christopher Tomkins-Tinch"
__email__ = "tomkinsc@broadinstitute.org"
__license__ = "MIT"
# built-ins
import time
import os
import re
import json
import logging
import xml.etree.ElementTree as ET
# module-specific
from snakemake.remote import AbstractRemoteObject, AbstractRemoteProvider
from snakemake.exceptions import WorkflowError, NCBIFileException
from snakemake.logging import logger
try:
# third-party modules
from Bio import Entrez
except ImportError as e:
raise WorkflowError(
"The Python package 'biopython' needs to be installed to use NCBI Entrez remote() file functionality. %s"
% e.msg
)
[docs]class RemoteProvider(AbstractRemoteProvider):
def __init__(
self,
*args,
keep_local=False,
stay_on_remote=False,
is_default=False,
email=None,
**kwargs
):
super(RemoteProvider, self).__init__(
*args,
keep_local=keep_local,
stay_on_remote=stay_on_remote,
is_default=is_default,
email=email,
**kwargs
)
self._ncbi = NCBIHelper(*args, email=email, **kwargs)
[docs] def remote_interface(self):
return self._ncbi
@property
def default_protocol(self):
"""The protocol that is prepended to the path when no protocol is specified."""
return "ncbi://"
@property
def available_protocols(self):
"""List of valid protocols for this remote provider."""
return ["ncbi://"]
[docs] def search(
self, query, *args, db="nuccore", idtype="acc", retmode="json", **kwargs
):
return list(
self._ncbi.search(
query, *args, db=db, idtype=idtype, retmode=retmode, **kwargs
)
)
[docs]class RemoteObject(AbstractRemoteObject):
""" This is a class to interact with NCBI / GenBank.
"""
def __init__(
self,
*args,
keep_local=False,
stay_on_remote=False,
provider=None,
email=None,
db=None,
rettype=None,
retmode=None,
**kwargs
):
super(RemoteObject, self).__init__(
*args,
keep_local=keep_local,
stay_on_remote=stay_on_remote,
provider=provider,
email=email,
db=db,
rettype=rettype,
retmode=retmode,
**kwargs
)
if provider:
self._ncbi = provider.remote_interface()
else:
self._ncbi = NCBIHelper(*args, email=email, **kwargs)
if db and not self._ncbi.is_valid_db(db):
raise NCBIFileException(
"DB specified is not valid. Options include: {dbs}".format(
dbs=", ".join(self._ncbi.valid_dbs)
)
)
else:
self.db = db
self.rettype = rettype
self.retmode = retmode
self.kwargs = kwargs
# === Implementations of abstract class members ===
[docs] def exists(self):
if not self.retmode or not self.rettype:
likely_request_options = self._ncbi.guess_db_options_for_extension(
self.file_ext, db=self.db, rettype=self.rettype, retmode=self.retmode
)
self.db = likely_request_options["db"]
self.retmode = likely_request_options["retmode"]
self.rettype = likely_request_options["rettype"]
return self._ncbi.exists(self.accession, db=self.db)
[docs] def mtime(self):
if self.exists():
return self._ncbi.mtime(self.accession, db=self.db)
else:
raise NCBIFileException(
"The record does not seem to exist remotely: %s" % self.accession
)
[docs] def size(self):
if self.exists():
return self._ncbi.size(self.accession, db=self.db)
else:
return self._iofile.size_local
[docs] def download(self):
if self.exists():
self._ncbi.fetch_from_ncbi(
[self.accession],
os.path.dirname(self.accession),
rettype=self.rettype,
retmode=self.retmode,
file_ext=self.file_ext,
db=self.db,
**self.kwargs
)
else:
raise NCBIFileException(
"The record does not seem to exist remotely: %s" % self.accession
)
[docs] def upload(self):
raise NCBIFileException(
"Upload is not permitted for the NCBI remote provider. Is an output set to NCBI.RemoteProvider.remote()?"
)
@property
def list(self):
raise NCBIFileException(
"The NCBI Remote Provider does not currently support list-based operations like glob_wildcards()."
)
@property
def accession(self):
accession, version, file_ext = self._ncbi.parse_accession_str(self.local_file())
return accession + "." + version
@property
def file_ext(self):
accession, version, file_ext = self._ncbi.parse_accession_str(self.local_file())
return file_ext
@property
def version(self):
accession, version, file_ext = self._ncbi.parse_accession_str(self.local_file())
return version
[docs]class NCBIHelper(object):
def __init__(self, *args, email=None, **kwargs):
if not email:
raise NCBIFileException(
"An e-mail address must be provided to either the remote file or the RemoteProvider() as email=<your_address>. The NCBI requires e-mail addresses for queries."
)
self.email = email
self.entrez = Entrez
self.entrez.email = self.email
self.entrez.tool = "Snakemake"
# valid NCBI Entrez efetch options
# via https://www.ncbi.nlm.nih.gov/books/NBK25499/table/chapter4.T._valid_values_of__retmode_and/?report=objectonly
self.efetch_options = {
"bioproject": [{"rettype": "xml", "retmode": "xml", "ext": "xml"}],
"biosample": [
{"rettype": "full", "retmode": "xml", "ext": "xml"},
{"rettype": "full", "retmode": "text", "ext": "txt"},
],
"biosystems": [{"rettype": "xml", "retmode": "xml", "ext": "xml"}],
"gds": [{"rettype": "summary", "retmode": "text", "ext": "txt"}],
"gene": [
{"rettype": "null", "retmode": "asn.1", "ext": "asn1"},
{"rettype": "null", "retmode": "xml", "ext": "xml"},
{"rettype": "gene_table", "retmode": "text", "ext": "gene_table"},
],
"homologene": [
{"rettype": "null", "retmode": "asn.1", "ext": "asn1"},
{"rettype": "null", "retmode": "xml", "ext": "xml"},
{
"rettype": "alignmentscores",
"retmode": "text",
"ext": "alignmentscores",
},
{"rettype": "fasta", "retmode": "text", "ext": "fasta"},
{"rettype": "homologene", "retmode": "text", "ext": "homologene"},
],
"mesh": [{"rettype": "full", "retmode": "text", "ext": "txt"}],
"nlmcatalog": [
{"rettype": "null", "retmode": "text", "ext": "txt"},
{"rettype": "null", "retmode": "xml", "ext": "xml"},
],
"nuccore": [
{"rettype": "null", "retmode": "text", "ext": "txt"},
{"rettype": "null", "retmode": "asn.1", "ext": "asn1"},
{"rettype": "native", "retmode": "xml", "ext": "xml"},
{"rettype": "acc", "retmode": "text", "ext": "acc"},
{"rettype": "fasta", "retmode": "text", "ext": "fasta"},
{"rettype": "fasta", "retmode": "xml", "ext": "fasta.xml"},
{"rettype": "seqid", "retmode": "text", "ext": "seqid"},
{"rettype": "gb", "retmode": "text", "ext": "gb"},
{"rettype": "gb", "retmode": "xml", "ext": "gb.xml"},
{"rettype": "gbc", "retmode": "xml", "ext": "gbc"},
{"rettype": "ft", "retmode": "text", "ext": "ft"},
{"rettype": "gbwithparts", "retmode": "text", "ext": "gbwithparts"},
{"rettype": "fasta_cds_na", "retmode": "text", "ext": "fasta_cds_na"},
{"rettype": "fasta_cds_aa", "retmode": "text", "ext": "fasta_cds_aa"},
],
"nucest": [
{"rettype": "null", "retmode": "text", "ext": "txt"},
{"rettype": "null", "retmode": "asn.1", "ext": "asn1"},
{"rettype": "native", "retmode": "xml", "ext": "xml"},
{"rettype": "acc", "retmode": "text", "ext": "acc"},
{"rettype": "fasta", "retmode": "text", "ext": "fasta"},
{"rettype": "fasta", "retmode": "xml", "ext": "fasta.xml"},
{"rettype": "seqid", "retmode": "text", "ext": "seqid"},
{"rettype": "gb", "retmode": "text", "ext": "gb"},
{"rettype": "gb", "retmode": "xml", "ext": "gb.xml"},
{"rettype": "gbc", "retmode": "xml", "ext": "gbc"},
{"rettype": "est", "retmode": "text", "ext": "est"},
],
"nucgss": [
{"rettype": "null", "retmode": "text", "ext": "txt"},
{"rettype": "null", "retmode": "asn.1", "ext": "asn1"},
{"rettype": "native", "retmode": "xml", "ext": "xml"},
{"rettype": "acc", "retmode": "text", "ext": "acc"},
{"rettype": "fasta", "retmode": "text", "ext": "fasta"},
{"rettype": "fasta", "retmode": "xml", "ext": "fasta.xml"},
{"rettype": "seqid", "retmode": "text", "ext": "seqid"},
{"rettype": "gb", "retmode": "text", "ext": "gb"},
{"rettype": "gb", "retmode": "xml", "ext": "gb.xml"},
{"rettype": "gbc", "retmode": "xml", "ext": "gbc"},
{"rettype": "gss", "retmode": "text", "ext": "gss"},
],
"protein": [
{"rettype": "null", "retmode": "text", "ext": "txt"},
{"rettype": "null", "retmode": "asn.1", "ext": "asn1"},
{"rettype": "native", "retmode": "xml", "ext": "xml"},
{"rettype": "acc", "retmode": "text", "ext": "acc"},
{"rettype": "fasta", "retmode": "text", "ext": "fasta"},
{"rettype": "fasta", "retmode": "xml", "ext": "fasta.xml"},
{"rettype": "seqid", "retmode": "text", "ext": "seqid"},
{"rettype": "ft", "retmode": "text", "ext": "ft"},
{"rettype": "gp", "retmode": "text", "ext": "gp"},
{"rettype": "gp", "retmode": "xml", "ext": "gp.xml"},
{"rettype": "gpc", "retmode": "xml", "ext": "gpc"},
{"rettype": "ipg", "retmode": "xml", "ext": "xml"},
],
"popset": [
{"rettype": "null", "retmode": "text", "ext": "txt"},
{"rettype": "null", "retmode": "asn.1", "ext": "asn1"},
{"rettype": "native", "retmode": "xml", "ext": "xml"},
{"rettype": "acc", "retmode": "text", "ext": "acc"},
{"rettype": "fasta", "retmode": "text", "ext": "fasta"},
{"rettype": "fasta", "retmode": "xml", "ext": "fasta.xml"},
{"rettype": "seqid", "retmode": "text", "ext": "seqid"},
{"rettype": "gb", "retmode": "text", "ext": "gb"},
{"rettype": "gb", "retmode": "xml", "ext": "gb.xml"},
{"rettype": "gbc", "retmode": "xml", "ext": "gbc"},
],
"pmc": [
{"rettype": "null", "retmode": "xml", "ext": "xml"},
{"rettype": "medline", "retmode": "text", "ext": "medline"},
],
"pubmed": [
{"rettype": "null", "retmode": "asn.1", "ext": "asn1"},
{"rettype": "null", "retmode": "xml", "ext": "xml"},
{"rettype": "medline", "retmode": "text", "ext": "medline"},
{"rettype": "uilist", "retmode": "text", "ext": "uilist"},
{"rettype": "abstract", "retmode": "text", "ext": "abstract"},
],
"sequences": [
{"rettype": "null", "retmode": "text", "ext": "txt"},
{"rettype": "acc", "retmode": "text", "ext": "acc"},
{"rettype": "fasta", "retmode": "text", "ext": "fasta"},
{"rettype": "seqid", "retmode": "text", "ext": "seqid"},
],
"snp": [
{"rettype": "null", "retmode": "asn.1", "ext": "asn1"},
{"rettype": "null", "retmode": "xml", "ext": "xml"},
{"rettype": "flt", "retmode": "text", "ext": "flt"},
{"rettype": "fasta", "retmode": "text", "ext": "fasta"},
{"rettype": "rsr", "retmode": "text", "ext": "rsr"},
{"rettype": "ssexemplar", "retmode": "text", "ext": "ssexemplar"},
{"rettype": "chr", "retmode": "text", "ext": "chr"},
{"rettype": "docset", "retmode": "text", "ext": "docset"},
{"rettype": "uilist", "retmode": "text", "ext": "uilist"},
{"rettype": "uilist", "retmode": "xml", "ext": "uilist.xml"},
],
"sra": [{"rettype": "full", "retmode": "xml", "ext": "xml"}],
"taxonomy": [
{"rettype": "null", "retmode": "xml", "ext": "xml"},
{"rettype": "uilist", "retmode": "text", "ext": "uilist"},
{"rettype": "uilist", "retmode": "xml", "ext": "uilist.xml"},
],
}
@property
def valid_extensions(self):
extensions = set()
for db, db_options in self.efetch_options.items():
for options in db_options:
extensions |= set([options["ext"]])
return list(extensions)
[docs] def dbs_for_options(self, file_ext, rettype=None, retmode=None):
possible_dbs = set()
for db, db_options in self.efetch_options.items():
for option_dict in db_options:
if option_dict["ext"] == file_ext:
if retmode and option_dict["retmode"] != retmode:
continue
if rettype and option_dict["rettype"] != rettype:
continue
possible_dbs |= set([db])
break
return possible_dbs
[docs] def options_for_db_and_extension(self, db, file_ext, rettype=None, retmode=None):
possible_options = []
assert file_ext, "file_ext must be defined"
if not self.is_valid_db(db):
raise NCBIFileException(
"DB specified is not valid. Options include: {dbs}".format(
dbs=", ".join(self.valid_dbs)
)
)
db_options = self.efetch_options[db]
for opt in db_options:
if file_ext == opt["ext"]:
if retmode and opt["retmode"] != retmode:
continue
if rettype and opt["rettype"] != rettype:
continue
possible_options.append(opt)
return possible_options
[docs] def guess_db_options_for_extension(
self, file_ext, db=None, rettype=None, retmode=None
):
if db and rettype and retmode:
if self.is_valid_db_request(db, rettype, retmode):
request_options = {}
request_options["db"] = db
request_options["rettype"] = rettype
request_options["retmode"] = retmode
request_options["ext"] = file_ext
return request_options
possible_dbs = [db] if db else self.dbs_for_options(file_ext, rettype, retmode)
if len(possible_dbs) > 1:
raise NCBIFileException(
'Ambigious db for file extension specified: "{}"; possible databases include: {}'.format(
file_ext, ", ".join(list(possible_dbs))
)
)
elif len(possible_dbs) == 1:
likely_db = possible_dbs.pop()
likely_options = self.options_for_db_and_extension(
likely_db, file_ext, rettype, retmode
)
if len(likely_options) == 1:
request_options = {}
request_options["db"] = likely_db
request_options["rettype"] = likely_options[0]["rettype"]
request_options["retmode"] = likely_options[0]["retmode"]
request_options["ext"] = likely_options[0]["ext"]
return request_options
elif len(likely_options) > 1:
raise NCBIFileException(
"Please clarify the rettype and retmode. Multiple request types are possible for the file extension ({}) specified: {}".format(
file_ext, likely_options
)
)
else:
raise NCBIFileException(
"No request options found. Please check the file extension ({}), db ({}), rettype ({}), and retmode ({}) specified.".format(
file_ext, db, rettype, retmode
)
)
[docs] def is_valid_db_request(self, db, rettype, retmode):
if not self.is_valid_db(db):
raise NCBIFileException(
"DB specified is not valid. Options include: {dbs}".format(
dbs=", ".join(self.valid_dbs)
)
)
db_options = self.efetch_options[db]
for opt in db_options:
if opt["rettype"] == rettype and opt["retmode"] == retmode:
return True
return False
@property
def valid_dbs(self):
return self.efetch_options.keys()
[docs] def is_valid_db(self, db):
return db in self.valid_dbs
[docs] def parse_accession_str(self, id_str):
"""
This tries to match an NCBI accession as defined here:
https://www.ncbi.nlm.nih.gov/Sequin/acc.html
"""
m = re.search(
r"(?P<accession>(?:[a-zA-Z]{1,6}|NW_|NC_|NM_|NR_)\d{1,10})(?:\.(?P<version>\d+))?(?:\.(?P<file_ext>\S+))?.*",
id_str,
)
accession, version, file_ext = ("", "", "")
if m:
accession = m.group("accession")
version = m.group("version")
file_ext = m.group("file_ext")
assert (
file_ext
), "file_ext must be defined: {}.{}.<file_ext>. Possible values include: {}".format(
accession, version, ", ".join(list(self.valid_extensions))
)
assert version, "version must be defined: {}.<version>.{}".format(
accession, file_ext
)
return accession, version, file_ext
@staticmethod
def _seq_chunks(seq, n):
# https://stackoverflow.com/a/312464/190597 (Ned Batchelder)
""" Yield successive n-sized chunks from seq."""
for i in range(0, len(seq), n):
yield seq[i : i + n]
def _esummary_and_parse(
self,
accession,
xpath_selector,
db="nuccore",
return_type=int,
raise_on_failure=True,
retmode="xml",
**kwargs
):
result = self.entrez.esummary(db=db, id=accession, **kwargs)
root = ET.fromstring(result.read())
nodes = root.findall(xpath_selector)
retval = 0
if len(nodes):
retval = return_type(nodes[0].text)
else:
if raise_on_failure:
raise NCBIFileException("The esummary query failed.")
return retval
[docs] def exists(self, accession, db="nuccore"):
result = self.entrez.esearch(db=db, term=accession, rettype="count")
root = ET.fromstring(result.read())
nodes = root.findall(".//Count")
count = 0
if len(nodes):
count = int(nodes[0].text)
else:
raise NCBIFileException("The esummary query failed.")
if count == 1:
return True
else:
logger.warning(
'The accession specified, "{acc}", could not be found in the database "{db}".\nConsider if you may need to specify a different database via "db=<db_id>".'.format(
acc=accession, db=db
)
)
return False
[docs] def size(self, accession, db="nuccore"):
return self._esummary_and_parse(accession, ".//*[@Name='Length']", db=db)
[docs] def mtime(self, accession, db="nuccore"):
update_date = self._esummary_and_parse(
accession, ".//Item[@Name='UpdateDate']", db=db, return_type=str
)
pattern = "%Y/%m/%d"
epoch_update_date = int(time.mktime(time.strptime(update_date, pattern)))
return epoch_update_date
[docs] def fetch_from_ncbi(
self,
accession_list,
destination_dir,
force_overwrite=False,
rettype="fasta",
retmode="text",
file_ext=None,
combined_file_prefix=None,
remove_separate_files=False,
chunk_size=1,
db="nuccore",
**kwargs
):
"""
This function downloads and saves files from NCBI.
Adapted in part from the BSD-licensed code here:
https://github.com/broadinstitute/viral-ngs/blob/master/util/genbank.py
"""
max_chunk_size = 500
# Conform to NCBI retreival guidelines by chunking into 500-accession chunks if
# >500 accessions are specified and chunk_size is set to 1
# Also clamp chunk size to 500 if the user specified a larger value.
if chunk_size > max_chunk_size or (
len(accession_list) > max_chunk_size and chunk_size == 1
):
chunk_size = max_chunk_size
outEx = {"fasta": "fasta", "ft": "tbl", "gb": "gbk"}
output_directory = os.path.abspath(os.path.expanduser(destination_dir))
if not os.path.exists(output_directory):
os.makedirs(output_directory)
output_extension = str(file_ext)
# ensure the extension starts with a ".", also allowing for passed-in
# extensions that already have it
if output_extension[:1] != ".":
output_extension = "." + output_extension
logger.info(
"Fetching {} entries from NCBI: {}\n".format(
str(len(accession_list)), ", ".join(accession_list[:10])
)
)
output_files = []
for chunk_num, chunk in enumerate(self._seq_chunks(accession_list, chunk_size)):
# sleep to throttle requests to 2 per second per NCBI guidelines:
# https://www.ncbi.nlm.nih.gov/books/NBK25497/#chapter2.Usage_Guidelines_and_Requiremen
time.sleep(0.5)
acc_string = ",".join(chunk)
# if the filename would be longer than Linux allows, simply say "chunk-chunk_num"
if len(acc_string) + len(output_extension) <= 254:
output_file_path = os.path.join(
output_directory, acc_string + output_extension
)
else:
output_file_path = os.path.join(
output_directory, "chunk-{}".format(chunk_num) + output_extension
)
if not force_overwrite:
logger.info("not overwriting, checking for existence")
assert not os.path.exists(output_file_path), (
"""File %s already exists. Consider removing
this file or specifying a different output directory. The files for the accessions specified
can be overwritten if you add force_overwrite flag. Processing aborted."""
% output_file_path
)
try_count = 1
while True:
try:
logger.info(
"Fetching file {}: {}, try #{}".format(
chunk_num + 1, acc_string, try_count
)
)
handle = self.entrez.efetch(
db=db, rettype=rettype, retmode=retmode, id=acc_string, **kwargs
)
with open(output_file_path, "w") as outf:
for line in handle:
outf.write(line)
output_files.append(output_file_path)
except IOError:
logger.warning(
"Error fetching file {}: {}, try #{} probably because NCBI is too busy.".format(
chunk_num + 1, acc_string, try_count
)
)
try_count += 1
if try_count > 4:
logger.warning("Tried too many times. Aborting.")
raise
# if the fetch failed, wait a few seconds and try again.
logger.info("Waiting and retrying...")
time.sleep(2)
continue
break
# assert that we are not trying to remove the intermediate files without writing a combined file
if remove_separate_files:
assert combined_file_prefix, """The intermediate files
can only be removed if a combined file is written via combined_file_prefix"""
# build a path to the combined genome file
if combined_file_prefix:
concatenated_genome_file_path = os.path.join(
output_directory, combined_file_prefix + output_extension
)
if not force_overwrite:
assert not os.path.exists(concatenated_genome_file_path), (
"""File %s already exists. Consider removing
this file or specifying a different output directory. The files for the accessions specified
can be overwritten if you add force_overwrite flag. Processing aborted."""
% output_file_path
)
# concatenate the files together into one genome file
with open(concatenated_genome_file_path, "w") as outfile:
for file_path in output_files:
with open(file_path) as infile:
for line in infile:
outfile.write(line)
# if the option is specified, remove the intermediate fasta files
if remove_separate_files:
while len(output_files) > 0:
os.unlink(output_files.pop())
# add the combined file to the list of files returned
output_files.append(concatenated_genome_file_path)
# return list of files
return output_files
[docs] def search(self, query, *args, db="nuccore", idtype="acc", **kwargs):
# enforce JSON return mode
kwargs["retmode"] = "json"
# if the user specifies retmax, use it and limit there
# otherwise page 200 at a time and return all
if "retmax" not in kwargs or not kwargs["retmax"]:
kwargs["retmax"] = 100000
return_all = True
else:
return_all = False
kwargs["retstart"] = kwargs.get("retstart", 0)
def esearch_json(term, *args, **kwargs):
handle = self.entrez.esearch(term=term, *args, **kwargs)
json_result = json.loads(handle.read())
return json_result
def result_ids(json):
if (
"esearchresult" in json_results
and "idlist" in json_results["esearchresult"]
):
return json_results["esearchresult"]["idlist"]
else:
raise NCBIFileException("ESearch error")
has_more = True
while has_more:
json_results = esearch_json(
term=query, *args, db=db, idtype=idtype, **kwargs
)
for acc in result_ids(json_results):
yield acc
if return_all and (
"count" in json_results["esearchresult"]
and int(json_results["esearchresult"]["count"])
> kwargs["retmax"] + kwargs["retstart"]
):
kwargs["retstart"] += kwargs["retmax"]
# sleep to throttle requests to <2 per second per NCBI guidelines:
# https://www.ncbi.nlm.nih.gov/books/NBK25497/#chapter2.Usage_Guidelines_and_Requiremen
time.sleep(0.5)
else:
has_more = False