Source code for plainbox.impl.secure.qualifiers

# This file is part of Checkbox.
#
# Copyright 2013, 2014 Canonical Ltd.
# Written by:
#   Zygmunt Krynicki <zygmunt.krynicki@canonical.com>
#
# Checkbox is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 3,
# as published by the Free Software Foundation.

#
# Checkbox is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Checkbox.  If not, see <http://www.gnu.org/licenses/>.

"""
:mod:`plainbox.impl.secure.qualifiers` -- Job Qualifiers
========================================================

Qualifiers are callable objects that can be used to 'match' a job definition to
some set of rules.
"""

import abc
import functools
import itertools
import logging
import operator
import os
import re
import sre_constants

from plainbox.abc import IJobQualifier
from plainbox.i18n import gettext as _
from plainbox.impl import pod
from plainbox.impl.secure.origin import FileTextSource
from plainbox.impl.secure.origin import Origin
from plainbox.impl.secure.origin import UnknownTextSource


_logger = logging.getLogger("plainbox.secure.qualifiers")


[docs]class SimpleQualifier(IJobQualifier): """ Abstract base class that implements common features of simple (non composite) qualifiers. This allows two concrete subclasses below to have share some code. """ def __init__(self, origin, inclusive=True): if origin is not None and not isinstance(origin, Origin): raise TypeError(_('argument {!a}, expected {}, got {}').format( 'origin', Origin, type(origin))) if not isinstance(inclusive, bool): raise TypeError(_('argument {!a}, expected {}, got {}').format( 'inclusive', bool, type(inclusive))) self._inclusive = inclusive self._origin = origin @property def inclusive(self): return self._inclusive @property def is_primitive(self): return True
[docs] def designates(self, job): return self.get_vote(job) == self.VOTE_INCLUDE
@abc.abstractmethod
[docs] def get_simple_match(self, job): """ Get a simple yes-or-no boolean answer if the given job matches the simple aspect of this qualifier. This method should be overridden by concrete subclasses. """
[docs] def get_vote(self, job): """ Get one of the ``VOTE_IGNORE``, ``VOTE_INCLUDE``, ``VOTE_EXCLUDE`` votes that this qualifier associated with the specified job. :param job: A IJobDefinition instance that is to be visited :returns: * ``VOTE_INCLUDE`` if the job matches the simple qualifier concept embedded into this qualifier and this qualifier is **inclusive**. * ``VOTE_EXCLUDE`` if the job matches the simple qualifier concept embedded into this qualifier and this qualifier is **not inclusive**. * ``VOTE_IGNORE`` otherwise. .. versionadded: 0.5 """ if self.get_simple_match(job): if self.inclusive: return self.VOTE_INCLUDE else: return self.VOTE_EXCLUDE else: return self.VOTE_IGNORE
[docs] def get_primitive_qualifiers(self): """ Return a list of primitives that constitute this qualifier. :returns: A list of IJobQualifier objects that each is the smallest, indivisible entity. Here it just returns a list of one element, itself. .. versionadded: 0.5 """ return [self]
@property def origin(self): """ Origin of this qualifier This property can be used to trace the origin of a qualifier back to its definition point. """ return self._origin
[docs]class RegExpJobQualifier(SimpleQualifier): """ A JobQualifier that designates jobs by matching their id to a regular expression """ def __init__(self, pattern, origin, inclusive=True): """ Initialize a new RegExpJobQualifier with the specified pattern. """ super().__init__(origin, inclusive) try: self._pattern = re.compile(pattern) except sre_constants.error as exc: assert len(exc.args) == 1 # XXX: This is a bit crazy but this lets us have identical error # messages across python3.2 all the way to 3.5. I really really # wish there was a better way at fixing this. exc.args = (re.sub(" at position \d+", "", exc.args[0]), ) raise exc self._pattern_text = pattern
[docs] def get_simple_match(self, job): """ Check if the given job matches this qualifier. This method should not be called directly, it is an implementation detail of SimpleQualifier class. """ return self._pattern.match(job.id) is not None
@property def pattern_text(self): """ text of the regular expression embedded in this qualifier """ return self._pattern_text def __repr__(self): return "{0}({1!r}, inclusive={2})".format( self.__class__.__name__, self._pattern_text, self._inclusive)
[docs]class JobIdQualifier(SimpleQualifier): """ A JobQualifier that designates a single job with a particular id """ def __init__(self, id, origin, inclusive=True): super().__init__(origin, inclusive) self._id = id @property def id(self): """ identifier to match """ return self._id
[docs] def get_simple_match(self, job): """ Check if the given job matches this qualifier. This method should not be called directly, it is an implementation detail of SimpleQualifier class. """ return self._id == job.id
def __repr__(self): return "{0}({1!r}, inclusive={2})".format( self.__class__.__name__, self._id, self._inclusive)
[docs]class NonLocalJobQualifier(SimpleQualifier): """ A JobQualifier that designates only non local jobs """ def __init__(self, origin, inclusive=True): super().__init__(origin, inclusive)
[docs] def get_simple_match(self, job): """ Check if the given job matches this qualifier. This method should not be called directly, it is an implementation detail of SimpleQualifier class. """ return job.plugin != 'local'
def __repr__(self): return "{0}(inclusive={1})".format( self.__class__.__name__, self._inclusive)
[docs]class IMatcher(metaclass=abc.ABCMeta): """ Interface for objects that perform some kind of comparison on a value """ @abc.abstractmethod
[docs] def match(self, value): """ Match (or not) specified value :param value: value to match :returns: True if it matched, False otherwise """
@functools.total_ordering
[docs]class OperatorMatcher(IMatcher): """ A matcher that applies a binary operator to the value """ def __init__(self, op, value): self._op = op self._value = value @property def op(self): """ the operator to use The operator is typically one of the functions from the ``operator`` module. For example. operator.eq corresponds to the == python operator. """ return self._op @property def value(self): """ The right-hand-side value to apply to the operator The left-hand-side is the value that is passed to :meth:`match()` """ return self._value
[docs] def match(self, value): return self._op(self._value, value)
def __repr__(self): return "{0}({1!r}, {2!r})".format( self.__class__.__name__, self._op, self._value) def __eq__(self, other): if isinstance(other, OperatorMatcher): return self.op == other.op and self.value == other.value else: return NotImplemented def __lt__(self, other): if isinstance(other, OperatorMatcher): if self.op < other.op: return True if self.value < other.value: return True return False else: return NotImplemented
[docs]class PatternMatcher(IMatcher): """ A matcher that compares values by regular expression pattern """ def __init__(self, pattern): self._pattern_text = pattern self._pattern = re.compile(pattern) @property def pattern_text(self): return self._pattern_text
[docs] def match(self, value): return self._pattern.match(value) is not None
def __repr__(self): return "{0}({1!r})".format( self.__class__.__name__, self._pattern_text) def __eq__(self, other): if isinstance(other, PatternMatcher): return self.pattern_text == other.pattern_text else: return NotImplemented def __lt__(self, other): if isinstance(other, PatternMatcher): return self.pattern_text < other.pattern_text else: return NotImplemented
[docs]class FieldQualifier(SimpleQualifier): """ A SimpleQualifer that uses matchers to compare particular fields """ def __init__(self, field, matcher, origin, inclusive=True): """ Initialize a new FieldQualifier with the specified field, matcher and inclusive flag :param field: Name of the JobDefinition field to use :param matcher: A IMatcher object :param inclusive: Inclusive selection flag (default: True) """ super().__init__(origin, inclusive) self._field = field self._matcher = matcher @property def field(self): """ Name of the field to match """ return self._field @property def matcher(self): """ The IMatcher-implementing object to use to check for the match """ return self._matcher
[docs] def get_simple_match(self, job): """ Check if the given job matches this qualifier. This method should not be called directly, it is an implementation detail of SimpleQualifier class. """ field_value = getattr(job, str(self._field)) return self._matcher.match(field_value)
def __repr__(self): return "{0}({1!r}, {2!r}, inclusive={3})".format( self.__class__.__name__, self._field, self._matcher, self._inclusive)
[docs]class CompositeQualifier(pod.POD): """ A JobQualifier that has qualifies jobs matching any inclusive qualifiers while not matching all of the exclusive qualifiers """ qualifier_list = pod.Field("qualifier_list", list, pod.MANDATORY) @property def is_primitive(self): return False
[docs] def designates(self, job): return self.get_vote(job) == IJobQualifier.VOTE_INCLUDE
[docs] def get_vote(self, job): """ Get one of the ``VOTE_IGNORE``, ``VOTE_INCLUDE``, ``VOTE_EXCLUDE`` votes that this qualifier associated with the specified job. :param job: A IJobDefinition instance that is to be visited :returns: * ``VOTE_INCLUDE`` if the job matches at least one qualifier voted to select it and no qualifiers voted to deselect it. * ``VOTE_EXCLUDE`` if at least one qualifier voted to deselect it * ``VOTE_IGNORE`` otherwise or if the list of qualifiers is empty. .. versionadded: 0.5 """ if self.qualifier_list: return min([ qualifier.get_vote(job) for qualifier in self.qualifier_list]) else: return IJobQualifier.VOTE_IGNORE
[docs] def get_primitive_qualifiers(self): return get_flat_primitive_qualifier_list(self.qualifier_list)
@property def origin(self): raise NonPrimitiveQualifierOrigin
IJobQualifier.register(CompositeQualifier)
[docs]class NonPrimitiveQualifierOrigin(Exception): """ Exception raised when IJobQualifier.origin is meaningless as it is being requested on a non-primitive qualifier such as the CompositeQualifier """ # NOTE: using CompositeQualifier seems strange but it's a tested proven # component so all we have to ensure is that we read the whitelist files # correctly.
[docs]class WhiteList(CompositeQualifier): """ A qualifier that understands checkbox whitelist files. A whitelist file is a plain text, line oriented file. Each line represents a regular expression pattern that can be matched against the id of a job. The file can contain simple shell-style comments that begin with the pound or hash key (#). Those are ignored. Comments can span both a fraction of a line as well as the whole line. For historical reasons each pattern has an implicit '^' and '$' prepended and appended (respectively) to the actual pattern specified in the file. """ def __init__(self, pattern_list, name=None, origin=None, implicit_namespace=None): """ Initialize a WhiteList object with the specified list of patterns. The patterns must be already mangled with '^' and '$'. """ self._name = name self._origin = origin self._implicit_namespace = implicit_namespace if implicit_namespace is not None: # If we have an implicit namespace then transform all the patterns # without the namespace operator ('::') namespace_pattern = implicit_namespace.replace('.', '\\.') def transform_pattern(maybe_partial_id_pattern): if "::" not in maybe_partial_id_pattern: return "^{}::{}$".format( namespace_pattern, maybe_partial_id_pattern[1:-1]) else: return maybe_partial_id_pattern qualifier_list = [ RegExpJobQualifier( transform_pattern(pattern), origin, inclusive=True) for pattern in pattern_list] else: # Otherwise just use the patterns directly qualifier_list = [ RegExpJobQualifier(pattern, origin, inclusive=True) for pattern in pattern_list] super().__init__(qualifier_list) def __repr__(self): return "<{} name:{!r}>".format(self.__class__.__name__, self.name) @property def name(self): """ name of this WhiteList (might be None) """ return self._name @name.setter def name(self, value): """ set a new name for a WhiteList """ self._name = value @property def origin(self): """ origin object associated with this WhiteList (might be None) """ return self._origin @property def implicit_namespace(self): """ namespace used to qualify patterns without explicit namespace """ return self._implicit_namespace @classmethod
[docs] def from_file(cls, pathname, implicit_namespace=None): """ Load and initialize the WhiteList object from the specified file. :param pathname: file to load :param implicit_namespace: (optional) implicit namespace for jobs that are using partial identifiers (all jobs) :returns: a fresh WhiteList object """ pattern_list, max_lineno = cls._load_patterns(pathname) name = os.path.splitext(os.path.basename(pathname))[0] origin = Origin(FileTextSource(pathname), 1, max_lineno) return cls(pattern_list, name, origin, implicit_namespace)
@classmethod
[docs] def from_string(cls, text, *, filename=None, name=None, origin=None, implicit_namespace=None): """ Load and initialize the WhiteList object from the specified string. :param text: full text of the whitelist :param filename: (optional, keyword-only) filename from which text was read from. This simulates a call to :meth:`from_file()` which properly computes the name and origin of the whitelist. :param name: (optional) name of the whitelist, only used if filename is not specified. :param origin: (optional) origin of the whitelist, only used if a filename is not specified. If omitted a default origin value will be constructed out of UnknownTextSource instance :param implicit_namespace: (optional) implicit namespace for jobs that are using partial identifiers (all jobs) :returns: a fresh WhiteList object The optional filename or a pair of name and origin arguments may be provided in order to have additional meta-data. This is typically needed when the :meth:`from_file()` method cannot be used as the caller already has the full text of the intended file available. """ _logger.debug("Loaded whitelist from %r", filename) pattern_list, max_lineno = cls._parse_patterns(text) # generate name and origin if filename is provided if filename is not None: name = WhiteList.name_from_filename(filename) origin = Origin(FileTextSource(filename), 1, max_lineno) else: # otherwise generate origin if it's not specified if origin is None: origin = Origin(UnknownTextSource(), 1, max_lineno) return cls(pattern_list, name, origin, implicit_namespace)
@classmethod
[docs] def name_from_filename(cls, filename): """ Compute the name of a whitelist based on the name of the file it is stored in. """ return os.path.splitext(os.path.basename(filename))[0]
@classmethod def _parse_patterns(cls, text): """ Load whitelist patterns from the specified text :param text: string of text, including newlines, to parse :returns: (pattern_list, lineno) where lineno is the final line number (1-based) and pattern_list is a list of regular expression strings parsed from the whitelist. """ from plainbox.impl.xparsers import Re from plainbox.impl.xparsers import Visitor from plainbox.impl.xparsers import WhiteList class WhiteListVisitor(Visitor): def __init__(self): self.pattern_list = [] self.lineno = 0 def visit_Re_node(self, node: Re): self.pattern_list.append(r"^{}$".format(node.text.strip())) self.lineno = max(node.lineno, self.lineno) return super().generic_visit(node) visit_ReFixed_node = visit_Re_node visit_RePattern_node = visit_Re_node visit_ReErr_node = visit_Re_node visitor = WhiteListVisitor() visitor.visit(WhiteList.parse(text)) return visitor.pattern_list, visitor.lineno @classmethod def _load_patterns(cls, pathname): """ Load whitelist patterns from the specified file :param pathname: pathname of the file to load and parse :returns: (pattern_list, lineno) where lineno is the final line number (1-based) and pattern_list is a list of regular expression strings parsed from the whitelist. """ with open(pathname, "rt", encoding="UTF-8") as stream: return cls._parse_patterns(stream.read())
[docs]def get_flat_primitive_qualifier_list(qualifier_list): return list(itertools.chain(*[ qual.get_primitive_qualifiers() for qual in qualifier_list]))
[docs]def select_jobs(job_list, qualifier_list): """ Select desired jobs. :param job_list: A list of JobDefinition objects :param qualifier_list: A list of IJobQualifier objects. :returns: A sub-list of JobDefinition objects, selected from job_list. """ # Flatten the qualifier list, so that we can see the fine structure of # composite objects, such as whitelists. flat_qualifier_list = get_flat_primitive_qualifier_list(qualifier_list) # Short-circuit if there are no jobs to select. Min is used later and this # will allow us to assume that the matrix is not empty. if not flat_qualifier_list: return [] # Vote matrix, encodes the vote cast by a particular qualifier for a # particular job. Visually it's a two-dimensional array like this: # # ^ # q | # u | X # a | # l | ........ # i | # f | . # i | . # e | . # r | # -------------------> # job # # The vertical axis represents qualifiers from the flattened qualifier # list. The horizontal axis represents jobs from job list. Dots represent # inclusion, X represents exclusion. # # The result of the select_job() function is a list of jobs that have at # least one inclusion and no exclusions. The resulting list is ordered by # increasing qualifier index. # # The algorithm implemented below is composed of two steps. # # The first step iterates over the vote matrix (row-major, meaning that we # visit all columns for each visit of one row) and constructs two # structures: a set of jobs that got VOTE_INCLUDE and a list of those jobs, # in the order of discovery. All VOTE_EXCLUDE votes are collected in # another set. # # The second step filters-out all items from the excluded job set from the # selected job list. For extra efficiency the algorithm operates on # integers representing the index of a particular job in job_list. # # The final complexity is O(N x M) + O(M), where N is the number of # qualifiers (flattened) and M is the number of jobs. The algorithm assumes # that set lookup is a O(1) operation which is true enough for python. # # A possible optimization would differentiate qualifiers that may select # more than one job and fall-back to the current implementation while # short-circuiting qualifiers that may select at most one job with a # separate set lookup. That would make the algorithm "mostly" linear in the # common case. # # As a separate feature, we might return a list of qualifiers that never # matched anything. That may be helpful for debugging. included_list = [] id_to_index_map = {job.id: index for index, job in enumerate(job_list)} included_set = set() excluded_set = set() for qualifier in flat_qualifier_list: if (isinstance(qualifier, FieldQualifier) and qualifier.field == 'id' and isinstance(qualifier.matcher, OperatorMatcher) and qualifier.matcher.op == operator.eq): # optimize the super-common case where a qualifier refers to # a specific job by using the id_to_index_map to instantly # perform the requested operation on a single job try: j_index = id_to_index_map[qualifier.matcher.value] except KeyError: # The lookup can fail if the pattern is a constant reference to # a generated job that doens't exist yet. To maintain correctness # we should just ignore it, as it would not match anything yet. continue job = job_list[j_index] vote = qualifier.get_vote(job) if vote == IJobQualifier.VOTE_INCLUDE: if j_index in included_set: continue included_set.add(j_index) included_list.append(j_index) elif vote == IJobQualifier.VOTE_EXCLUDE: excluded_set.add(j_index) elif vote == IJobQualifier.VOTE_IGNORE: pass else: for j_index, job in enumerate(job_list): vote = qualifier.get_vote(job) if vote == IJobQualifier.VOTE_INCLUDE: if j_index in included_set: continue included_set.add(j_index) included_list.append(j_index) elif vote == IJobQualifier.VOTE_EXCLUDE: excluded_set.add(j_index) elif vote == IJobQualifier.VOTE_IGNORE: pass return [job_list[index] for index in included_list if index not in excluded_set]