__author__ = "Christopher Tomkins-Tinch"
__copyright__ = "Copyright 2015, Christopher Tomkins-Tinch"
__email__ = "tomkinsc@broadinstitute.org"
__license__ = "MIT"
import os
import re
import collections
import shutil
import email.utils
from contextlib import contextmanager
# module-specific
from snakemake.remote import AbstractRemoteProvider, DomainObject
from snakemake.exceptions import HTTPFileException, WorkflowError
from snakemake.logging import logger
from snakemake.utils import os_sync
try:
# third-party modules
import requests
except ImportError as e:
raise WorkflowError(
"The Python 3 package 'requests' "
+ "must be installed to use HTTP(S) remote() file functionality. %s" % e.msg
)
[docs]class RemoteProvider(AbstractRemoteProvider):
def __init__(
self, *args, keep_local=False, stay_on_remote=False, is_default=False, **kwargs
):
super(RemoteProvider, self).__init__(
*args,
keep_local=keep_local,
stay_on_remote=stay_on_remote,
is_default=is_default,
**kwargs
)
@property
def default_protocol(self):
"""The protocol that is prepended to the path when no protocol is specified."""
return "https://"
@property
def available_protocols(self):
"""List of valid protocols for this remote provider."""
return ["http://", "https://"]
[docs] def remote(self, value, *args, insecure=None, **kwargs):
if isinstance(value, str):
values = [value]
elif isinstance(value, collections.abc.Iterable):
values = value
else:
raise TypeError(
"Invalid type ({}) passed to remote: {}".format(type(value), value)
)
for i, file in enumerate(values):
match = re.match("^(https?)://.+", file)
if match:
(protocol,) = match.groups()
if protocol == "https" and insecure:
raise SyntaxError(
"insecure=True cannot be used with a https:// url"
)
if protocol == "http" and insecure not in [None, False]:
raise SyntaxError(
"insecure=False cannot be used with a http:// url"
)
else:
if insecure:
values[i] = "http://" + file
else:
values[i] = "https://" + file
return super(RemoteProvider, self).remote(values, *args, **kwargs)
[docs]class RemoteObject(DomainObject):
""" This is a class to interact with an HTTP server.
"""
def __init__(
self,
*args,
keep_local=False,
provider=None,
additional_request_string="",
allow_redirects=True,
**kwargs
):
super(RemoteObject, self).__init__(
*args,
keep_local=keep_local,
provider=provider,
allow_redirects=allow_redirects,
**kwargs
)
self.additional_request_string = additional_request_string
# === Implementations of abstract class members ===
[docs] @contextmanager # makes this a context manager. after 'yield' is __exit__()
def httpr(self, verb="GET", stream=False):
# if args have been provided to remote(), use them over those given to RemoteProvider()
args_to_use = self.provider.args
if len(self.args):
args_to_use = self.args
# use kwargs passed in to remote() to override those given to the RemoteProvider()
# default to the host and port given as part of the file, falling back to one specified
# as a kwarg to remote() or the RemoteProvider (overriding the latter with the former if both)
kwargs_to_use = {}
kwargs_to_use["username"] = None
kwargs_to_use["password"] = None
kwargs_to_use["auth"] = None
for k, v in self.provider.kwargs.items():
kwargs_to_use[k] = v
for k, v in self.kwargs.items():
kwargs_to_use[k] = v
# Check that in case authentication kwargs are provided, they are either ("username", "password") combination
# or "auth", but not both.
if (
kwargs_to_use["username"]
and kwargs_to_use["password"]
and kwargs_to_use["auth"]
):
raise TypeError(
"Authentication accepts either username and password or requests.auth object"
)
# If "username" and "password" kwargs are provided, use those to construct a tuple for "auth". Neither
# requests.head() nor requests.get() accept them as-is.
if kwargs_to_use["username"] and kwargs_to_use["password"]:
kwargs_to_use["auth"] = (
kwargs_to_use["username"],
kwargs_to_use["password"],
)
# Delete "username" and "password" from kwargs
del kwargs_to_use["username"]
del kwargs_to_use["password"]
url = self.remote_file() + self.additional_request_string
if verb.upper() == "GET":
r = requests.get(url, *args_to_use, stream=stream, **kwargs_to_use)
if verb.upper() == "HEAD":
r = requests.head(url, *args_to_use, **kwargs_to_use)
yield r
r.close()
[docs] def exists(self):
if self._matched_address:
with self.httpr(verb="HEAD") as httpr:
# if a file redirect was found
if httpr.status_code in range(300, 308):
raise HTTPFileException(
"The file specified appears to have been moved (HTTP %s), check the URL or try adding 'allow_redirects=True' to the remote() file object: %s"
% (httpr.status_code, httpr.url)
)
return httpr.status_code == requests.codes.ok
return False
else:
raise HTTPFileException(
"The file cannot be parsed as an HTTP path in form 'host:port/abs/path/to/file': %s"
% self.local_file()
)
[docs] def mtime(self):
if self.exists():
with self.httpr(verb="HEAD") as httpr:
file_mtime = self.get_header_item(httpr, "last-modified", default=None)
logger.debug("HTTP last-modified: {}".format(file_mtime))
epochTime = 0
if file_mtime is not None:
modified_tuple = email.utils.parsedate_tz(file_mtime)
if modified_tuple is None:
logger.debug(
"HTTP last-modified not in RFC2822 format: `{}`".format(
file_mtime
)
)
else:
epochTime = email.utils.mktime_tz(modified_tuple)
return epochTime
else:
raise HTTPFileException(
"The file does not seem to exist remotely: %s" % self.remote_file()
)
[docs] def size(self):
if self.exists():
with self.httpr(verb="HEAD") as httpr:
content_size = int(
self.get_header_item(httpr, "content-size", default=0)
)
return content_size
else:
return self._iofile.size_local
[docs] def download(self, make_dest_dirs=True):
with self.httpr(stream=True) as httpr:
if self.exists():
# Find out if the source file is gzip compressed in order to keep
# compression intact after the download.
# Per default requests decompresses .gz files.
# More detials can be found here: https://stackoverflow.com/questions/25749345/how-to-download-gz-files-with-requests-in-python-without-decoding-it?noredirect=1&lq=1
# Since data transferred with HTTP compression need to be decompressed automatically
# check the header and decode if the content is encoded.
if (
not self.name.endswith(".gz")
and httpr.headers.get("Content-Encoding") == "gzip"
):
# Decode non-gzipped sourcefiles automatically.
# This is needed to decompress uncompressed files that are compressed
# for the transfer by HTTP compression.
httpr.raw.decode_content = True
# if the destination path does not exist
if make_dest_dirs:
os.makedirs(os.path.dirname(self.local_path), exist_ok=True)
with open(self.local_path, "wb") as f:
shutil.copyfileobj(httpr.raw, f)
os_sync() # ensure flush to disk
else:
raise HTTPFileException(
"The file does not seem to exist remotely: %s" % self.remote_file()
)
[docs] def upload(self):
raise HTTPFileException(
"Upload is not permitted for the HTTP remote provider. Is an output set to HTTP.remote()?"
)
@property
def list(self):
raise HTTPFileException(
"The HTTP Remote Provider does not currently support list-based operations like glob_wildcards()."
)