pycassa.pool – Connection Pooling

Connection pooling for Cassandra connections.

class pycassa.pool.ConnectionPool(keyspace, server_list=['localhost:9160'], credentials=None, timeout=0.5, use_threadlocal=True, pool_size=5, prefill=True, socket_factory=<function default_socket_factory>, transport_factory=<function default_transport_factory>, **kwargs)

A pool that maintains a queue of open connections.

All connections in the pool will be opened to keyspace.

server_list is a sequence of servers in the form "host:port" that the pool will connect to. The port defaults to 9160 if excluded. The list will be randomly shuffled before being drawn from sequentially. server_list may also be a function that returns the sequence of servers.

If authentication or authorization is required, credentials must be supplied. This should be a dictionary containing ‘username’ and ‘password’ keys with appropriate string values.

timeout specifies in seconds how long individual connections will block before timing out. If set to None, connections will never timeout.

If use_threadlocal is set to True, repeated calls to get() within the same application thread will return the same ConnectionWrapper object if one is already checked out from the pool. Be careful when setting use_threadlocal to False in a multithreaded application, especially with retries enabled. Synchronization may be required to prevent the connection from changing while another thread is using it.

The pool will keep up to pool_size open connections in the pool at any time. When a connection is returned to the pool, the connection will be discarded if the pool already contains pool_size connections. The total number of simultaneous connections the pool will allow is pool_size + max_overflow, and the number of “sleeping” connections the pool will allow is pool_size.

A good choice for pool_size is a multiple of the number of servers passed to the Pool constructor. If a size less than this is chosen, the last (len(server_list) - pool_size) servers may not be used until either overflow occurs, a connection is recycled, or a connection fails. Similarly, if a multiple of len(server_list) is not chosen, those same servers would have a decreased load. By default, overflow is disabled.

If prefill is set to True, pool_size connections will be opened when the pool is created.

Example Usage:

>>> pool = pycassa.ConnectionPool(keyspace='Keyspace1', server_list=['10.0.0.4:9160', '10.0.0.5:9160'], prefill=False)
>>> cf = pycassa.ColumnFamily(pool, 'Standard1')
>>> cf.insert('key', {'col': 'val'})
1287785685530679
max_overflow

Whether or not a new connection may be opened when the pool is empty is controlled by max_overflow. This specifies how many additional connections may be opened after the pool has reached pool_size; keep in mind that these extra connections will be discarded upon checkin until the pool is below pool_size. This may be set to -1 to indicate no overflow limit. The default value is 0, which does not allow for overflow.

pool_timeout = 30

If pool_size + max_overflow connections have already been checked out, an attempt to retrieve a new connection from the pool will wait up to pool_timeout seconds for a connection to be returned to the pool before giving up. Note that this setting is only meaningful when you are accessing the pool concurrently, such as with multiple threads. This may be set to 0 to fail immediately or -1 to wait forever. The default value is 30.

recycle = 10000

After performing recycle number of operations, connections will be replaced when checked back in to the pool. This may be set to -1 to disable connection recycling. The default value is 10,000.

max_retries = 5

When an operation on a connection fails due to an TimedOutException or UnavailableException, which tend to indicate single or multiple node failure, the operation will be retried on different nodes up to max_retries times before an MaximumRetryException is raised. Setting this to 0 disables retries and setting to -1 allows unlimited retries. The default value is 5.

logging_name = None

By default, each pool identifies itself in the logs using id(self). If multiple pools are in use for different purposes, setting logging_name will help individual pools to be identified in the logs.

get()

Gets a connection from the pool.

put(conn)

Returns a connection to the pool.

execute(f, *args, **kwargs)

Get a connection from the pool, execute f on it with *args and **kwargs, return the connection to the pool, and return the result of f.

fill()

Adds connections to the pool until at least pool_size connections exist, whether they are currently checked out from the pool or not.

New in version 1.2.0.

dispose()

Closes all checked in connections in the pool.

set_server_list(server_list)

Sets the server list that the pool will make connections to.

server_list should be sequence of servers in the form "host:port" that the pool will connect to. The list will be randomly permuted before being used. server_list may also be a function that returns the sequence of servers.

size()

Returns the capacity of the pool.

overflow()

Returns the number of overflow connections that are currently open.

checkedin()

Returns the number of connections currently in the pool.

checkedout()

Returns the number of connections currently checked out from the pool.

add_listener(listener)

Add a PoolListener-like object to this pool.

listener may be an object that implements some or all of PoolListener, or a dictionary of callables containing implementations of some or all of the named methods in PoolListener.

exception pycassa.pool.AllServersUnavailable

Raised when none of the servers given to a pool can be connected to.

exception pycassa.pool.NoConnectionAvailable

Raised when there are no connections left in a pool.

exception pycassa.pool.MaximumRetryException

Raised when a ConnectionWrapper has retried the maximum allowed times before being returned to the pool; note that all of the retries do not have to be on the same operation.

exception pycassa.pool.InvalidRequestError

Pycassa was asked to do something it can’t do.

This error generally corresponds to runtime state errors.

class pycassa.pool.ConnectionWrapper(pool, max_retries, *args, **kwargs)

Creates a wrapper for a Connection object, adding pooling related functionality while still allowing access to the thrift API calls.

These should not be created directly, only obtained through Pool’s get() method.

add(*args, **kwargs)

Increment or decrement a counter.

Parameters:
  • key
  • column_parent
  • column
  • consistency_level
atomic_batch_mutate(*args, **kwargs)

Atomically mutate many columns or super columns for many row keys. See also: Mutation.

mutation_map maps key to column family to a list of Mutation objects to take place at that scope.
Parameters:
  • mutation_map
  • consistency_level
batch_mutate(*args, **kwargs)

Mutate many columns or super columns for many row keys. See also: Mutation.

mutation_map maps key to column family to a list of Mutation objects to take place at that scope.
Parameters:
  • mutation_map
  • consistency_level
describe_keyspace(*args, **kwargs)

describe specified keyspace

Parameters:
  • keyspace
get(*args, **kwargs)

Get the Column or SuperColumn at the given column_path. If no value is present, NotFoundException is thrown. (This is the only method that can throw an exception under non-failure conditions.)

Parameters:
  • key
  • column_path
  • consistency_level
get_count(*args, **kwargs)

returns the number of columns matching <code>predicate</code> for a particular <code>key</code>, <code>ColumnFamily</code> and optionally <code>SuperColumn</code>.

Parameters:
  • key
  • column_parent
  • predicate
  • consistency_level
get_indexed_slices(*args, **kwargs)

Returns the subset of columns specified in SlicePredicate for the rows matching the IndexClause @deprecated use get_range_slices instead with range.row_filter specified

Parameters:
  • column_parent
  • index_clause
  • column_predicate
  • consistency_level
get_keyspace_description(keyspace=None, use_dict_for_col_metadata=False)

Describes the given keyspace.

If use_dict_for_col_metadata is True, the column metadata will be stored as a dictionary instead of a list

A dictionary of the form {column_family_name: CfDef} is returned.

get_range_slices(*args, **kwargs)

returns a subset of columns for a contiguous range of keys.

Parameters:
  • column_parent
  • predicate
  • range
  • consistency_level
get_slice(*args, **kwargs)

Get the group of columns contained by column_parent (either a ColumnFamily name or a ColumnFamily/SuperColumn name pair) specified by the given SlicePredicate. If no matching values are found, an empty list is returned.

Parameters:
  • key
  • column_parent
  • predicate
  • consistency_level
insert(*args, **kwargs)

Insert a Column at the given column_parent.column_family and optional column_parent.super_column.

Parameters:
  • key
  • column_parent
  • column
  • consistency_level
multiget_count(*args, **kwargs)

Perform a get_count in parallel on the given list<binary> keys. The return value maps keys to the count found.

Parameters:
  • keys
  • column_parent
  • predicate
  • consistency_level
multiget_slice(*args, **kwargs)

Performs a get_slice for column_parent and predicate for the given keys in parallel.

Parameters:
  • keys
  • column_parent
  • predicate
  • consistency_level
remove(*args, **kwargs)

Remove data from the row specified by key at the granularity specified by column_path, and the given timestamp. Note that all the values in column_path besides column_path.column_family are truly optional: you can remove the entire row by just specifying the ColumnFamily, or you can remove a SuperColumn or a single Column by specifying those levels too.

Parameters:
  • key
  • column_path
  • timestamp
  • consistency_level
remove_counter(*args, **kwargs)

Remove a counter at the specified location. Note that counters have limited support for deletes: if you remove a counter, you must wait to issue any following update until the delete has reached all the nodes and all of them have been fully compacted.

Parameters:
  • key
  • path
  • consistency_level
return_to_pool()

Returns this to the pool.

This has the same effect as calling ConnectionPool.put() on the wrapper.

truncate(*args, **kwargs)

Truncate will mark and entire column family as deleted. From the user’s perspective a successful call to truncate will result complete data deletion from cfname. Internally, however, disk space will not be immediatily released, as with all deletes in cassandra, this one only marks the data as deleted. The operation succeeds only if all hosts in the cluster at available and will throw an UnavailableException if some hosts are down.

Parameters:
  • cfname
class pycassa.pool.PoolListener

Hooks into the lifecycle of connections in a ConnectionPool.

Usage:

class MyListener(PoolListener):
    def connection_created(self, dic):
        '''perform connect operations'''
    # etc.

# create a new pool with a listener
p = ConnectionPool(..., listeners=[MyListener()])

# or add a listener after the fact
p.add_listener(MyListener())

Listeners receive a dictionary that contains event information and is indexed by a string describing that piece of info. For example, all event dictionaries include ‘level’, so dic[‘level’] will return the prescribed logging level.

There is no need to subclass PoolListener to handle events. Any class that implements one or more of these methods can be used as a pool listener. The ConnectionPool will inspect the methods provided by a listener object and add the listener to one or more internal event queues based on its capabilities. In terms of efficiency and function call overhead, you’re much better off only providing implementations for the hooks you’ll be using.

Each of the PoolListener methods wil be called with a dict as the single parameter. This dict may contain the following fields:

  • connection: The ConnectionWrapper object that persistently manages the connection
  • message: The reason this event happened
  • error: The Exception that caused this event
  • pool_id: The id of the ConnectionPool that this event came from
  • level: The prescribed logging level for this event. Can be ‘debug’, ‘info’, ‘warn’, ‘error’, or ‘critical’

Entries in the dict that are specific to only one event type are detailed with each method.

connection_checked_in(dic)

Called when a connection returns to the pool.

Fields: pool_id, level, and connection.

connection_checked_out(dic)

Called when a connection is retrieved from the Pool.

Fields: pool_id, level, and connection.

connection_created(dic)

Called once for each new Cassandra connection.

Fields: pool_id, level, and connection.

connection_disposed(dic)

Called when a connection is closed.

dic['message']: A reason for closing the connection, if any.

Fields: pool_id, level, connection, and message.

connection_failed(dic)

Called when a connection to a single server fails.

dic['server']: The server the connection was made to.

Fields: pool_id, level, error, server, and connection.

connection_recycled(dic)

Called when a connection is recycled.

dic['old_conn']: The ConnectionWrapper that is being recycled

dic['new_conn']: The ConnectionWrapper that is replacing it

Fields: pool_id, level, old_conn, and new_conn.

pool_at_max(dic)

Called when an attempt is made to get a new connection from the pool, but the pool is already at its max size.

dic['pool_max']: The max number of connections the pool will keep open at one time.

Fields: pool_id, pool_max, and level.

pool_disposed(dic)

Called when a pool is disposed.

Fields: pool_id, and level.

server_list_obtained(dic)

Called when the pool finalizes its server list.

dic['server_list']: The randomly permuted list of servers that the pool will choose from.

Fields: pool_id, level, and server_list.