BrokerConnection

class kafka.BrokerConnection(host, port, afi, **configs)[source]

Initialize a Kafka broker connection

Keyword Arguments:
client_id (str): a name for this client. This string is passed in
each request to servers and can be used to identify specific server-side log entries that correspond to this client. Also submitted to GroupCoordinator for logging with respect to consumer group administration. Default: ‘kafka-python-{version}’
reconnect_backoff_ms (int): The amount of time in milliseconds to
wait before attempting to reconnect to a given host. Default: 50.
request_timeout_ms (int): Client request timeout in milliseconds.
Default: 40000.
max_in_flight_requests_per_connection (int): Requests are pipelined
to kafka brokers up to this number of maximum requests per broker connection. Default: 5.
receive_buffer_bytes (int): The size of the TCP receive buffer
(SO_RCVBUF) to use when reading data. Default: None (relies on system defaults). Java client defaults to 32768.
send_buffer_bytes (int): The size of the TCP send buffer
(SO_SNDBUF) to use when sending data. Default: None (relies on system defaults). Java client defaults to 131072.
socket_options (list): List of tuple-arguments to socket.setsockopt
to apply to broker connection sockets. Default: [(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)]
security_protocol (str): Protocol used to communicate with brokers.
Valid values are: PLAINTEXT, SSL. Default: PLAINTEXT.
ssl_context (ssl.SSLContext): pre-configured SSLContext for wrapping
socket connections. If provided, all other ssl_* configurations will be ignored. Default: None.
ssl_check_hostname (bool): flag to configure whether ssl handshake
should verify that the certificate matches the brokers hostname. default: True.
ssl_cafile (str): optional filename of ca file to use in certificate
veriication. default: None.
ssl_certfile (str): optional filename of file in pem format containing
the client certificate, as well as any ca certificates needed to establish the certificate’s authenticity. default: None.
ssl_keyfile (str): optional filename containing the client private key.
default: None.
ssl_password (callable, str, bytes, bytearray): optional password or
callable function that returns a password, for decrypting the client private key. Default: None.
ssl_crlfile (str): optional filename containing the CRL to check for
certificate expiration. By default, no CRL check is done. When providing a file, only the leaf certificate will be checked against this CRL. The CRL can only be checked with Python 3.4+ or 2.7.9+. default: None.
api_version (tuple): Specify which Kafka API version to use.
Accepted values are: (0, 8, 0), (0, 8, 1), (0, 8, 2), (0, 9), (0, 10). Default: (0, 8, 2)
api_version_auto_timeout_ms (int): number of milliseconds to throw a
timeout exception from the constructor when checking the broker api version. Only applies if api_version is None
state_change_callback (callable): function to be called when the
connection state changes from CONNECTING to CONNECTED etc.
metrics (kafka.metrics.Metrics): Optionally provide a metrics
instance for capturing network IO stats. Default: None.

metric_group_prefix (str): Prefix for metric names. Default: ‘’ sasl_mechanism (str): string picking sasl mechanism when security_protocol

is SASL_PLAINTEXT or SASL_SSL. Currently only PLAIN is supported. Default: None
sasl_plain_username (str): username for sasl PLAIN authentication.
Default: None
sasl_plain_password (str): password for sasl PLAIN authentication.
Default: None
blacked_out()[source]

Return true if we are disconnected from the given node and can’t re-establish a connection yet

can_send_more()[source]

Return True unless there are max_in_flight_requests_per_connection.

check_version(timeout=2, strict=False)[source]

Attempt to guess the broker version.

Note: This is a blocking call.

Returns: version tuple, i.e. (0, 10), (0, 9), (0, 8, 2), …

close(error=None)[source]

Close socket and fail all in-flight-requests.

Arguments:
error (Exception, optional): pending in-flight-requests
will be failed with this exception. Default: kafka.errors.ConnectionError.
connect()[source]

Attempt to connect and return ConnectionState

connected()[source]

Return True iff socket is connected.

connecting()[source]

Returns True if still connecting (this may encompass several different states, such as SSL handshake, authorization, etc).

disconnected()[source]

Return True iff socket is closed

recv()[source]

Non-blocking network receive.

Return response if available

send(request)[source]

send request, return Future()

Can block on network if request is larger than send_buffer_bytes