Source code for riemann_client.transport

"""Transports are used for direct communication with the Riemann server. They
are usually used inside a :py:class:`.Client`, and are used to send and receive
protocol buffer objects."""

from __future__ import absolute_import

import abc
import socket
import ssl
import struct

import riemann_client.riemann_pb2


# Default arguments
HOST = 'localhost'
PORT = 5555
TIMEOUT = None


[docs]def socket_recvall(socket, length, bufsize=4096): """A helper method to read of bytes from a socket to a maximum length""" data = b"" while len(data) < length: data += socket.recv(bufsize) return data
[docs]class RiemannError(Exception): """Raised when the Riemann server returns an error message""" pass
[docs]class Transport(object): """Abstract transport definition Subclasses must implement the :py:meth:`.connect`, :py:meth:`.disconnect` and :py:meth:`.send` methods. Can be used as a context manager, which will call :py:meth:`.connect` on entry and :py:meth:`.disconnect` on exit. """ __metaclass__ = abc.ABCMeta def __enter__(self): self.connect() return self def __exit__(self, exc_type, exc_value, traceback): self.disconnect() @abc.abstractmethod
[docs] def connect(self): pass
@abc.abstractmethod
[docs] def disconnect(self): pass
@abc.abstractmethod
[docs] def send(self): pass
[docs]class SocketTransport(Transport): """Provides common methods for Transports that use a sockets""" def __init__(self, host=HOST, port=PORT): self.host = host self.port = port @property def address(self): """ :returns: A tuple describing the address to connect to :rtype: (host, port) """ return self.host, self.port @property def socket(self): """Returns the socket after checking it has been created""" if not hasattr(self, '_socket'): raise RuntimeError("Transport has not been connected!") return self._socket @socket.setter def socket(self, value): self._socket = value
[docs]class UDPTransport(SocketTransport):
[docs] def connect(self): """Creates a UDP socket""" self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
[docs] def disconnect(self): """Closes the socket""" self.socket.close()
[docs] def send(self, message): """Sends a message, but does not return a response :returns: None - can't receive a response over UDP """ self.socket.sendto(message.SerializeToString(), self.address) return None
[docs]class TCPTransport(SocketTransport): def __init__(self, host=HOST, port=PORT, timeout=TIMEOUT): """Communicates with Riemann over TCP :param str host: The hostname to connect to :param int port: The port to connect to :param int timeout: The time in seconds to wait before raising an error """ super(TCPTransport, self).__init__(host, port) self.timeout = timeout
[docs] def connect(self): """Connects to the given host""" self.socket = socket.create_connection(self.address, self.timeout)
[docs] def disconnect(self): """Closes the socket""" self.socket.close()
[docs] def send(self, message): """Sends a message to a Riemann server and returns it's response :param message: The message to send to the Riemann server :returns: The response message from Riemann :raises RiemannError: if the server returns an error """ message = message.SerializeToString() self.socket.sendall(struct.pack('!I', len(message)) + message) length = struct.unpack('!I', self.socket.recv(4))[0] response = riemann_client.riemann_pb2.Msg() response.ParseFromString(socket_recvall(self.socket, length)) if not response.ok: raise RiemannError(response.error) return response
[docs]class TLSTransport(TCPTransport): def __init__(self, host=HOST, port=PORT, timeout=TIMEOUT, ca_certs=None): """Communicates with Riemann over TCP + TLS Options are the same as :py:class:`.TCPTransport` unless noted :param str ca_certs: Path to a CA Cert bundle used to create the socket """ super(TLSTransport, self).__init__(host, port, timeout) self.ca_certs = ca_certs
[docs] def connect(self): """Connects using :py:meth:`TLSTransport.connect` and wraps with TLS""" super(TLSTransport, self).connect() self.socket = ssl.wrap_socket( self.socket, ssl_version=ssl.PROTOCOL_TLSv1, cert_reqs=ssl.CERT_REQUIRED, ca_certs=self.ca_certs)
[docs]class BlankTransport(Transport): """A transport that collects events in a list, and has no connection Used by ``--transport none``, which is useful for testing commands without contacting a Riemann server. This is also used by the automated tests in ``riemann_client/tests/test_riemann_command.py``. """ def __init__(self, *args, **kwargs): self.events = []
[docs] def connect(self): """Creates a list to hold messages""" pass
[docs] def send(self, message): """Adds a message to the list, returning a fake 'ok' response :returns: A response message with ``ok = True`` """ for event in message.events: self.events.append(event) reply = riemann_client.riemann_pb2.Msg() reply.ok = True return reply
[docs] def disconnect(self): """Clears the list of messages""" pass
def __len__(self): return len(self.events)