Connecting to Valkey¶
Generic Client¶
This is the client used to connect directly to a standard Valkey node.
- class valkey.Valkey(host='localhost', port=6379, db=0, password=None, socket_timeout=None, socket_connect_timeout=None, socket_keepalive=None, socket_keepalive_options=None, connection_pool=None, unix_socket_path=None, encoding='utf-8', encoding_errors='strict', charset=None, errors=None, decode_responses=False, retry_on_timeout=False, retry_on_error=None, ssl=False, ssl_keyfile=None, ssl_certfile=None, ssl_cert_reqs='required', ssl_ca_certs=None, ssl_ca_path=None, ssl_ca_data=None, ssl_check_hostname=False, ssl_password=None, ssl_validate_ocsp=False, ssl_validate_ocsp_stapled=False, ssl_ocsp_context=None, ssl_ocsp_expected_cert=None, ssl_min_version=None, ssl_ciphers=None, max_connections=None, single_connection_client=False, health_check_interval=0, client_name=None, lib_name='valkey-py', lib_version='6.1.1', username=None, retry=None, valkey_connect_func=None, credential_provider=None, protocol=2, cache_enabled=False, client_cache=None, cache_max_size=10000, cache_ttl=0, cache_policy=EvictionPolicy.LRU, cache_deny_list=['BF.CARD', 'BF.DEBUG', 'BF.EXISTS', 'BF.INFO', 'BF.MEXISTS', 'BF.SCANDUMP', 'CF.COMPACT', 'CF.COUNT', 'CF.DEBUG', 'CF.EXISTS', 'CF.INFO', 'CF.MEXISTS', 'CF.SCANDUMP', 'CMS.INFO', 'CMS.QUERY', 'DUMP', 'EXPIRETIME', 'FT.AGGREGATE', 'FT.ALIASADD', 'FT.ALIASDEL', 'FT.ALIASUPDATE', 'FT.CURSOR', 'FT.EXPLAIN', 'FT.EXPLAINCLI', 'FT.GET', 'FT.INFO', 'FT.MGET', 'FT.PROFILE', 'FT.SEARCH', 'FT.SPELLCHECK', 'FT.SUGGET', 'FT.SUGLEN', 'FT.SYNDUMP', 'FT.TAGVALS', 'FT._ALIASADDIFNX', 'FT._ALIASDELIFX', 'HRANDFIELD', 'JSON.DEBUG', 'PEXPIRETIME', 'PFCOUNT', 'PTTL', 'SRANDMEMBER', 'TDIGEST.BYRANK', 'TDIGEST.BYREVRANK', 'TDIGEST.CDF', 'TDIGEST.INFO', 'TDIGEST.MAX', 'TDIGEST.MIN', 'TDIGEST.QUANTILE', 'TDIGEST.RANK', 'TDIGEST.REVRANK', 'TDIGEST.TRIMMED_MEAN', 'TOPK.INFO', 'TOPK.LIST', 'TOPK.QUERY', 'TOUCH', 'TTL'], cache_allow_list=['BITCOUNT', 'BITFIELD_RO', 'BITPOS', 'EXISTS', 'GEODIST', 'GEOHASH', 'GEOPOS', 'GEORADIUSBYMEMBER_RO', 'GEORADIUS_RO', 'GEOSEARCH', 'GET', 'GETBIT', 'GETRANGE', 'HEXISTS', 'HGET', 'HGETALL', 'HKEYS', 'HLEN', 'HMGET', 'HSTRLEN', 'HVALS', 'JSON.ARRINDEX', 'JSON.ARRLEN', 'JSON.GET', 'JSON.MGET', 'JSON.OBJKEYS', 'JSON.OBJLEN', 'JSON.RESP', 'JSON.STRLEN', 'JSON.TYPE', 'LCS', 'LINDEX', 'LLEN', 'LPOS', 'LRANGE', 'MGET', 'SCARD', 'SDIFF', 'SINTER', 'SINTERCARD', 'SISMEMBER', 'SMEMBERS', 'SMISMEMBER', 'SORT_RO', 'STRLEN', 'SUBSTR', 'SUNION', 'TS.GET', 'TS.INFO', 'TS.RANGE', 'TS.REVRANGE', 'TYPE', 'XLEN', 'XPENDING', 'XRANGE', 'XREAD', 'XREVRANGE', 'ZCARD', 'ZCOUNT', 'ZDIFF', 'ZINTER', 'ZINTERCARD', 'ZLEXCOUNT', 'ZMSCORE', 'ZRANGE', 'ZRANGEBYLEX', 'ZRANGEBYSCORE', 'ZRANK', 'ZREVRANGE', 'ZREVRANGEBYLEX', 'ZREVRANGEBYSCORE', 'ZREVRANK', 'ZSCORE', 'ZUNION'])[source]¶
Implementation of the Valkey protocol.
This abstract class provides a Python interface to all Valkey commands and an implementation of the Valkey protocol.
Pipelines derive from this, implementing how the commands are sent and received to the Valkey server. Based on configuration, an instance will either use a ConnectionPool, or Connection object to talk to valkey.
It is not safe to pass PubSub or Pipeline objects between threads.
- Parameters:
credential_provider (CredentialProvider | None)
protocol (int | None)
cache_enabled (bool)
client_cache (AbstractCache | None)
cache_max_size (int)
cache_ttl (int)
cache_policy (str)
cache_deny_list (List[str])
cache_allow_list (List[str])
- classmethod from_pool(connection_pool)[source]¶
Return a Valkey client from the given connection pool. The Valkey client will take ownership of the connection pool and close it when the Valkey client is closed.
- Parameters:
connection_pool (ConnectionPool)
- Return type:
- classmethod from_url(url, **kwargs)[source]¶
Return a Valkey client object configured from the given URL
For example:
valkey://[[username]:[password]]@localhost:6379/0 valkeys://[[username]:[password]]@localhost:6379/0 unix://[username@]/path/to/socket.sock?db=0[&password=password]
Three URL schemes are supported:
valkey:// creates a TCP socket connection.
valkeys:// creates a SSL wrapped TCP socket connection.
unix://: creates a Unix Domain Socket connection.
The username, password, hostname, path and all querystring values are passed through urllib.parse.unquote in order to replace any percent-encoded values with their corresponding characters.
There are several ways to specify a database number. The first value found will be used:
A
dbquerystring option, e.g. valkey://localhost?db=0If using the valkey:// or valkeys:// schemes, the path argument of the url, e.g. valkey://localhost/0
A
dbkeyword argument to this function.
If none of these options are specified, the default db=0 is used.
All querystring options are cast to their appropriate Python types. Boolean arguments can be specified with string values “True”/”False” or “Yes”/”No”. Values that cannot be properly cast cause a
ValueErrorto be raised. Once parsed, the querystring arguments and keyword arguments are passed to theConnectionPool’s class initializer. In the case of conflicting arguments, querystring arguments always win.- Parameters:
url (str)
- Return type:
- load_external_module(funcname, func)[source]¶
This function can be used to add externally defined valkey modules, and their namespaces to the valkey client.
funcname - A string containing the name of the function to create func - The function, being added to this class.
ex: Assume that one has a custom valkey module named foomod that creates command named ‘foo.do_thing’ and ‘foo.another_thing’ in valkey. To load function functions into this namespace:
from valkey import Valkey from foomodule import F r = Valkey() r.load_external_module(“foo”, F) r.foo().do_thing(‘your’, ‘arguments’)
For a concrete example see the reimport of the redisjson module in tests/test_connection.py::test_loading_external_modules
- Return type:
None
- lock(name, timeout=None, sleep=0.1, blocking=True, blocking_timeout=None, lock_class=None, thread_local=True)[source]¶
Return a new Lock object using key
namethat mimics the behavior of threading.Lock.If specified,
timeoutindicates a maximum life for the lock. By default, it will remain locked until release() is called.sleepindicates the amount of time to sleep per loop iteration when the lock is in blocking mode and another client is currently holding the lock.blockingindicates whether callingacquireshould block until the lock has been acquired or to fail immediately, causingacquireto return False and the lock not being acquired. Defaults to True. Note this value can be overridden by passing ablockingargument toacquire.blocking_timeoutindicates the maximum amount of time in seconds to spend trying to acquire the lock. A value ofNoneindicates continue trying forever.blocking_timeoutcan be specified as a float or integer, both representing the number of seconds to wait.lock_classforces the specified lock implementation. Note that the only lock class we implement isLock(which is a Lua-based lock). So, it’s unlikely you’ll need this parameter, unless you have created your own custom lock class.thread_localindicates whether the lock token is placed in thread-local storage. By default, the token is placed in thread local storage so that a thread only sees its token, not a token set by another thread. Consider the following timeline:- time: 0, thread-1 acquires my-lock, with a timeout of 5 seconds.
thread-1 sets the token to “abc”
- time: 1, thread-2 blocks trying to acquire my-lock using the
Lock instance.
- time: 5, thread-1 has not yet completed. valkey expires the lock
key.
- time: 5, thread-2 acquired my-lock now that it’s available.
thread-2 sets the token to “xyz”
- time: 6, thread-1 finishes its work and calls release(). if the
token is not stored in thread local storage, then thread-1 would see the token value as “xyz” and would be able to successfully release the thread-2’s lock.
In some use cases it’s necessary to disable thread local storage. For example, if you have code where one thread acquires a lock and passes that lock instance to a worker thread to release later. If thread local storage isn’t disabled in this case, the worker thread won’t see the token set by the thread that acquired the lock. Our assumption is that these cases aren’t common and as such default to using thread local storage.
- Parameters:
name (str)
timeout (float | None)
sleep (float)
blocking (bool)
blocking_timeout (float | None)
lock_class (None | Any)
thread_local (bool)
- parse_response(connection, command_name, **options)[source]¶
Parses a response from the Valkey server
- pipeline(transaction=True, shard_hint=None)[source]¶
Return a new pipeline object that can queue multiple commands for later execution.
transactionindicates whether all commands should be executed atomically. Apart from making a group of operations atomic, pipelines are useful for reducing the back-and-forth overhead between the client and server.- Return type:
Pipeline
- pubsub(**kwargs)[source]¶
Return a Publish/Subscribe object. With this object, you can subscribe to channels and listen for messages that get published to them.
- set_response_callback(command, callback)[source]¶
Set a custom Response Callback
- Parameters:
command (str)
callback (Callable)
- Return type:
None
- transaction(func, *watches, **kwargs)[source]¶
Convenience method for executing the callable func as a transaction while watching all keys specified in watches. The ‘func’ callable should expect a single argument which is a Pipeline object.
- Parameters:
func (Callable[[Pipeline], None])
- Return type:
None
Sentinel Client¶
Valkey Sentinel provides high availability for Valkey. There are commands that can only be executed against a Valkey node running in sentinel mode. Connecting to those nodes, and executing commands against them requires a Sentinel connection.
Connection example (assumes Valkey exists on the ports listed below):
>>> from valkey import Sentinel
>>> sentinel = Sentinel([('localhost', 26379)], socket_timeout=0.1)
>>> sentinel.discover_master('mymaster')
('127.0.0.1', 6379)
>>> sentinel.discover_slaves('mymaster')
[('127.0.0.1', 6380)]
Sentinel¶
- class valkey.sentinel.Sentinel(sentinels, min_other_sentinels=0, sentinel_kwargs=None, **connection_kwargs)[source]¶
Valkey Sentinel cluster client
>>> from valkey.sentinel import Sentinel >>> sentinel = Sentinel([('localhost', 26379)], socket_timeout=0.1) >>> master = sentinel.master_for('mymaster', socket_timeout=0.1) >>> master.set('foo', 'bar') >>> slave = sentinel.slave_for('mymaster', socket_timeout=0.1) >>> slave.get('foo') b'bar'
sentinelsis a list of sentinel nodes. Each node is represented by a pair (hostname, port).min_other_sentinelsdefined a minimum number of peers for a sentinel. When querying a sentinel, if it doesn’t meet this threshold, responses from that sentinel won’t be considered valid.sentinel_kwargsis a dictionary of connection arguments used when connecting to sentinel instances. Any argument that can be passed to a normal Valkey connection can be specified here. Ifsentinel_kwargsis not specified, any socket_timeout and socket_keepalive options specified inconnection_kwargswill be used.connection_kwargsare keyword arguments that will be used when establishing a connection to a Valkey server.- discover_master(service_name)[source]¶
Asks sentinel servers for the Valkey master’s address corresponding to the service labeled
service_name.Returns a pair (address, port) or raises MasterNotFoundError if no master is found.
- execute_command(*args, **kwargs)[source]¶
Execute Sentinel command in sentinel nodes. once - If set to True, then execute the resulting command on a single node at random, rather than across the entire sentinel cluster.
- master_for(service_name, valkey_class=<class 'valkey.client.Valkey'>, connection_pool_class=<class 'valkey.sentinel.SentinelConnectionPool'>, **kwargs)[source]¶
Returns a valkey client instance for the
service_namemaster.A
SentinelConnectionPoolclass is used to retrieve the master’s address before establishing a new connection.NOTE: If the master’s address has changed, any cached connections to the old master are closed.
By default clients will be a
Valkeyinstance. Specify a different class to thevalkey_classargument if you desire something different.The
connection_pool_classspecifies the connection pool to use. TheSentinelConnectionPoolwill be used by default.All other keyword arguments are merged with any connection_kwargs passed to this class and passed to the connection pool as keyword arguments to be used to initialize Valkey connections.
- slave_for(service_name, valkey_class=<class 'valkey.client.Valkey'>, connection_pool_class=<class 'valkey.sentinel.SentinelConnectionPool'>, **kwargs)[source]¶
Returns valkey client instance for the
service_nameslave(s).A SentinelConnectionPool class is used to retrieve the slave’s address before establishing a new connection.
By default clients will be a
Valkeyinstance. Specify a different class to thevalkey_classargument if you desire something different.The
connection_pool_classspecifies the connection pool to use. The SentinelConnectionPool will be used by default.All other keyword arguments are merged with any connection_kwargs passed to this class and passed to the connection pool as keyword arguments to be used to initialize Valkey connections.
SentinelConnectionPool¶
Cluster Client¶
This client is used for connecting to a Valkey Cluster.
ValkeyCluster¶
- class valkey.cluster.ValkeyCluster(host=None, port=6379, startup_nodes=None, cluster_error_retry_attempts=3, retry=None, require_full_coverage=False, reinitialize_steps=5, read_from_replicas=False, dynamic_startup_nodes=True, url=None, address_remap=None, **kwargs)[source]¶
- Parameters:
host (str | None)
port (int)
startup_nodes (List[ClusterNode] | None)
cluster_error_retry_attempts (int)
retry (Retry | None)
require_full_coverage (bool)
reinitialize_steps (int)
read_from_replicas (bool)
dynamic_startup_nodes (bool)
url (str | None)
address_remap (Callable[[Tuple[str, int]], Tuple[str, int]] | None)
- determine_slot(*args)[source]¶
Figure out what slot to use based on args.
- Raises a ValkeyClusterException if there’s a missing key and we can’t
determine what slots to map the command to; or, if the keys don’t all map to the same key slot.
- execute_command(*args, **kwargs)[source]¶
Wrapper for ERRORS_ALLOW_RETRY error handling.
It will try the number of times specified by the config option “self.cluster_error_retry_attempts” which defaults to 3 unless manually configured.
If it reaches the number of times, the command will raise the exception
- Key argument :target_nodes: can be passed with the following types:
nodes_flag: PRIMARIES, REPLICAS, ALL_NODES, RANDOM ClusterNode list<ClusterNode> dict<Any, ClusterNode>
- classmethod from_url(url, **kwargs)[source]¶
Return a Valkey client object configured from the given URL
For example:
valkey://[[username]:[password]]@localhost:6379/0 valkeys://[[username]:[password]]@localhost:6379/0 unix://[username@]/path/to/socket.sock?db=0[&password=password]
Three URL schemes are supported:
valkey:// creates a TCP socket connection.
valkeys:// creates a SSL wrapped TCP socket connection.
unix://: creates a Unix Domain Socket connection.
The username, password, hostname, path and all querystring values are passed through urllib.parse.unquote in order to replace any percent-encoded values with their corresponding characters.
There are several ways to specify a database number. The first value found will be used:
A
dbquerystring option, e.g. valkey://localhost?db=0If using the valkey:// or valkeys:// schemes, the path argument of the url, e.g. valkey://localhost/0
A
dbkeyword argument to this function.
If none of these options are specified, the default db=0 is used.
All querystring options are cast to their appropriate Python types. Boolean arguments can be specified with string values “True”/”False” or “Yes”/”No”. Values that cannot be properly cast cause a
ValueErrorto be raised. Once parsed, the querystring arguments and keyword arguments are passed to theConnectionPool’s class initializer. In the case of conflicting arguments, querystring arguments always win.
- get_node_from_key(key, replica=False)[source]¶
Get the node that holds the key’s slot. If replica set to True but the slot doesn’t have any replicas, None is returned.
- keyslot(key)[source]¶
Calculate keyslot for a given key. See Keys distribution model in https://redis.io/topics/cluster-spec
- load_external_module(funcname, func)[source]¶
This function can be used to add externally defined valkey modules, and their namespaces to the valkey client.
funcname- A string containing the name of the function to createfunc- The function, being added to this class.
- lock(name, timeout=None, sleep=0.1, blocking=True, blocking_timeout=None, lock_class=None, thread_local=True)[source]¶
Return a new Lock object using key
namethat mimics the behavior of threading.Lock.If specified,
timeoutindicates a maximum life for the lock. By default, it will remain locked until release() is called.sleepindicates the amount of time to sleep per loop iteration when the lock is in blocking mode and another client is currently holding the lock.blockingindicates whether callingacquireshould block until the lock has been acquired or to fail immediately, causingacquireto return False and the lock not being acquired. Defaults to True. Note this value can be overridden by passing ablockingargument toacquire.blocking_timeoutindicates the maximum amount of time in seconds to spend trying to acquire the lock. A value ofNoneindicates continue trying forever.blocking_timeoutcan be specified as a float or integer, both representing the number of seconds to wait.lock_classforces the specified lock implementation. Note that the only lock class we implement isLock(which is a Lua-based lock). So, it’s unlikely you’ll need this parameter, unless you have created your own custom lock class.thread_localindicates whether the lock token is placed in thread-local storage. By default, the token is placed in thread local storage so that a thread only sees its token, not a token set by another thread. Consider the following timeline:- time: 0, thread-1 acquires my-lock, with a timeout of 5 seconds.
thread-1 sets the token to “abc”
- time: 1, thread-2 blocks trying to acquire my-lock using the
Lock instance.
- time: 5, thread-1 has not yet completed. valkey expires the lock
key.
- time: 5, thread-2 acquired my-lock now that it’s available.
thread-2 sets the token to “xyz”
- time: 6, thread-1 finishes its work and calls release(). if the
token is not stored in thread local storage, then thread-1 would see the token value as “xyz” and would be able to successfully release the thread-2’s lock.
In some use cases it’s necessary to disable thread local storage. For example, if you have code where one thread acquires a lock and passes that lock instance to a worker thread to release later. If thread local storage isn’t disabled in this case, the worker thread won’t see the token set by the thread that acquired the lock. Our assumption is that these cases aren’t common and as such default to using thread local storage.
- monitor(target_node=None)[source]¶
Returns a Monitor object for the specified target node. The default cluster node will be selected if no target node was specified. Monitor is useful for handling the MONITOR command to the valkey server. next_command() method returns one command from monitor listen() method yields commands from monitor.
- on_connect(connection)[source]¶
- Initialize the connection, authenticate and select a database and send
READONLY if it is set during object initialization.
- pipeline(transaction=None, shard_hint=None)[source]¶
- Cluster impl:
Pipelines do not work in cluster mode the same way they do in normal mode. Create a clone of this object so that simulating pipelines will work correctly. Each command will be called directly when used and when calling execute() will only return the result stack.
- pubsub(node=None, host=None, port=None, **kwargs)[source]¶
Allows passing a ClusterNode, or host&port, to get a pubsub instance connected to the specified node
ClusterNode¶
Async Client¶
See complete example: here
This client is used for communicating with Valkey, asynchronously.
- class valkey.asyncio.client.Valkey(*, host='localhost', port=6379, db=0, password=None, socket_timeout=5, socket_connect_timeout=None, socket_keepalive=None, socket_keepalive_options=None, connection_pool=None, unix_socket_path=None, encoding='utf-8', encoding_errors='strict', decode_responses=False, retry_on_timeout=False, retry_on_error=None, ssl=False, ssl_keyfile=None, ssl_certfile=None, ssl_cert_reqs='required', ssl_ca_certs=None, ssl_ca_data=None, ssl_check_hostname=False, ssl_min_version=None, ssl_ciphers=None, max_connections=None, single_connection_client=False, health_check_interval=0, client_name=None, lib_name='valkey-py', lib_version='6.1.1', username=None, retry=None, auto_close_connection_pool=None, valkey_connect_func=None, credential_provider=None, protocol=2, cache_enabled=False, client_cache=None, cache_max_size=100, cache_ttl=0, cache_policy=EvictionPolicy.LRU, cache_deny_list=['BF.CARD', 'BF.DEBUG', 'BF.EXISTS', 'BF.INFO', 'BF.MEXISTS', 'BF.SCANDUMP', 'CF.COMPACT', 'CF.COUNT', 'CF.DEBUG', 'CF.EXISTS', 'CF.INFO', 'CF.MEXISTS', 'CF.SCANDUMP', 'CMS.INFO', 'CMS.QUERY', 'DUMP', 'EXPIRETIME', 'FT.AGGREGATE', 'FT.ALIASADD', 'FT.ALIASDEL', 'FT.ALIASUPDATE', 'FT.CURSOR', 'FT.EXPLAIN', 'FT.EXPLAINCLI', 'FT.GET', 'FT.INFO', 'FT.MGET', 'FT.PROFILE', 'FT.SEARCH', 'FT.SPELLCHECK', 'FT.SUGGET', 'FT.SUGLEN', 'FT.SYNDUMP', 'FT.TAGVALS', 'FT._ALIASADDIFNX', 'FT._ALIASDELIFX', 'HRANDFIELD', 'JSON.DEBUG', 'PEXPIRETIME', 'PFCOUNT', 'PTTL', 'SRANDMEMBER', 'TDIGEST.BYRANK', 'TDIGEST.BYREVRANK', 'TDIGEST.CDF', 'TDIGEST.INFO', 'TDIGEST.MAX', 'TDIGEST.MIN', 'TDIGEST.QUANTILE', 'TDIGEST.RANK', 'TDIGEST.REVRANK', 'TDIGEST.TRIMMED_MEAN', 'TOPK.INFO', 'TOPK.LIST', 'TOPK.QUERY', 'TOUCH', 'TTL'], cache_allow_list=['BITCOUNT', 'BITFIELD_RO', 'BITPOS', 'EXISTS', 'GEODIST', 'GEOHASH', 'GEOPOS', 'GEORADIUSBYMEMBER_RO', 'GEORADIUS_RO', 'GEOSEARCH', 'GET', 'GETBIT', 'GETRANGE', 'HEXISTS', 'HGET', 'HGETALL', 'HKEYS', 'HLEN', 'HMGET', 'HSTRLEN', 'HVALS', 'JSON.ARRINDEX', 'JSON.ARRLEN', 'JSON.GET', 'JSON.MGET', 'JSON.OBJKEYS', 'JSON.OBJLEN', 'JSON.RESP', 'JSON.STRLEN', 'JSON.TYPE', 'LCS', 'LINDEX', 'LLEN', 'LPOS', 'LRANGE', 'MGET', 'SCARD', 'SDIFF', 'SINTER', 'SINTERCARD', 'SISMEMBER', 'SMEMBERS', 'SMISMEMBER', 'SORT_RO', 'STRLEN', 'SUBSTR', 'SUNION', 'TS.GET', 'TS.INFO', 'TS.RANGE', 'TS.REVRANGE', 'TYPE', 'XLEN', 'XPENDING', 'XRANGE', 'XREAD', 'XREVRANGE', 'ZCARD', 'ZCOUNT', 'ZDIFF', 'ZINTER', 'ZINTERCARD', 'ZLEXCOUNT', 'ZMSCORE', 'ZRANGE', 'ZRANGEBYLEX', 'ZRANGEBYSCORE', 'ZRANK', 'ZREVRANGE', 'ZREVRANGEBYLEX', 'ZREVRANGEBYSCORE', 'ZREVRANK', 'ZSCORE', 'ZUNION'])[source]¶
Implementation of the Valkey protocol.
This abstract class provides a Python interface to all Valkey commands and an implementation of the Valkey protocol.
Pipelines derive from this, implementing how the commands are sent and received to the Valkey server. Based on configuration, an instance will either use a ConnectionPool, or Connection object to talk to valkey.
- Parameters:
host (str)
port (int)
db (str | int)
password (str | None)
socket_timeout (float | None)
socket_connect_timeout (float | None)
socket_keepalive (bool | None)
socket_keepalive_options (Mapping[int, int | bytes] | None)
connection_pool (ConnectionPool | None)
unix_socket_path (str | None)
encoding (str)
encoding_errors (str)
decode_responses (bool)
retry_on_timeout (bool)
retry_on_error (list | None)
ssl (bool)
ssl_keyfile (str | None)
ssl_certfile (str | None)
ssl_cert_reqs (str)
ssl_ca_certs (str | None)
ssl_ca_data (str | None)
ssl_check_hostname (bool)
ssl_min_version (TLSVersion | None)
ssl_ciphers (str | None)
max_connections (int | None)
single_connection_client (bool)
health_check_interval (int)
client_name (str | None)
lib_name (str | None)
lib_version (str | None)
username (str | None)
retry (Retry | None)
auto_close_connection_pool (bool | None)
credential_provider (CredentialProvider | None)
protocol (int | None)
cache_enabled (bool)
client_cache (AbstractCache | None)
cache_max_size (int)
cache_ttl (int)
cache_policy (str)
cache_deny_list (List[str])
cache_allow_list (List[str])
- async aclose(close_connection_pool=None)[source]¶
Closes Valkey client connection
- Parameters:
close_connection_pool (bool | None) – decides whether to close the connection pool used
- Return type:
None
by this Valkey client, overriding Valkey.auto_close_connection_pool. By default, let Valkey.auto_close_connection_pool decide whether to close the connection pool.
- close(close_connection_pool=None)[source]¶
Alias for aclose(), for backwards compatibility
- Parameters:
close_connection_pool (bool | None)
- Return type:
None
- classmethod from_pool(connection_pool)[source]¶
Return a Valkey client from the given connection pool. The Valkey client will take ownership of the connection pool and close it when the Valkey client is closed.
- Parameters:
connection_pool (ConnectionPool)
- Return type:
- classmethod from_url(url, single_connection_client=False, auto_close_connection_pool=None, **kwargs)[source]¶
Return a Valkey client object configured from the given URL
For example:
valkey://[[username]:[password]]@localhost:6379/0 valkeys://[[username]:[password]]@localhost:6379/0 unix://[username@]/path/to/socket.sock?db=0[&password=password]
Three URL schemes are supported:
valkey:// creates a TCP socket connection.
valkeys:// creates a SSL wrapped TCP socket connection.
unix://: creates a Unix Domain Socket connection.
The username, password, hostname, path and all querystring values are passed through urllib.parse.unquote in order to replace any percent-encoded values with their corresponding characters.
There are several ways to specify a database number. The first value found will be used:
A
dbquerystring option, e.g. valkey://localhost?db=0- If using the valkey:// or valkeys:// schemes, the path argument
of the url, e.g. valkey://localhost/0
A
dbkeyword argument to this function.
If none of these options are specified, the default db=0 is used.
All querystring options are cast to their appropriate Python types. Boolean arguments can be specified with string values “True”/”False” or “Yes”/”No”. Values that cannot be properly cast cause a
ValueErrorto be raised. Once parsed, the querystring arguments and keyword arguments are passed to theConnectionPool’s class initializer. In the case of conflicting arguments, querystring arguments always win.- Parameters:
url (str)
single_connection_client (bool)
auto_close_connection_pool (bool | None)
kwargs (Any)
- load_external_module(funcname, func)[source]¶
This function can be used to add externally defined valkey modules, and their namespaces to the valkey client.
funcname - A string containing the name of the function to create func - The function, being added to this class.
ex: Assume that one has a custom valkey module named foomod that creates command named ‘foo.do_thing’ and ‘foo.another_thing’ in valkey. To load function functions into this namespace:
from valkey import Valkey from foomodule import F r = Valkey() r.load_external_module(“foo”, F) r.foo().do_thing(‘your’, ‘arguments’)
For a concrete example see the reimport of the redisjson module in tests/test_connection.py::test_loading_external_modules
- lock(name, timeout=None, sleep=0.1, blocking=True, blocking_timeout=None, lock_class=None, thread_local=True)[source]¶
Return a new Lock object using key
namethat mimics the behavior of threading.Lock.If specified,
timeoutindicates a maximum life for the lock. By default, it will remain locked until release() is called.sleepindicates the amount of time to sleep per loop iteration when the lock is in blocking mode and another client is currently holding the lock.blockingindicates whether callingacquireshould block until the lock has been acquired or to fail immediately, causingacquireto return False and the lock not being acquired. Defaults to True. Note this value can be overridden by passing ablockingargument toacquire.blocking_timeoutindicates the maximum amount of time in seconds to spend trying to acquire the lock. A value ofNoneindicates continue trying forever.blocking_timeoutcan be specified as a float or integer, both representing the number of seconds to wait.lock_classforces the specified lock implementation. Note that the only lock class we implement isLock(which is a Lua-based lock). So, it’s unlikely you’ll need this parameter, unless you have created your own custom lock class.thread_localindicates whether the lock token is placed in thread-local storage. By default, the token is placed in thread local storage so that a thread only sees its token, not a token set by another thread. Consider the following timeline:- time: 0, thread-1 acquires my-lock, with a timeout of 5 seconds.
thread-1 sets the token to “abc”
- time: 1, thread-2 blocks trying to acquire my-lock using the
Lock instance.
- time: 5, thread-1 has not yet completed. valkey expires the lock
key.
- time: 5, thread-2 acquired my-lock now that it’s available.
thread-2 sets the token to “xyz”
- time: 6, thread-1 finishes its work and calls release(). if the
token is not stored in thread local storage, then thread-1 would see the token value as “xyz” and would be able to successfully release the thread-2’s lock.
In some use cases it’s necessary to disable thread local storage. For example, if you have code where one thread acquires a lock and passes that lock instance to a worker thread to release later. If thread local storage isn’t disabled in this case, the worker thread won’t see the token set by the thread that acquired the lock. Our assumption is that these cases aren’t common and as such default to using thread local storage.
- Parameters:
name (bytes | str | memoryview)
timeout (float | None)
sleep (float)
blocking (bool)
blocking_timeout (float | None)
lock_class (Type[Lock] | None)
thread_local (bool)
- Return type:
Lock
- async parse_response(connection, command_name, **options)[source]¶
Parses a response from the Valkey server
- Parameters:
connection (Connection)
command_name (str | bytes)
- pipeline(transaction=True, shard_hint=None)[source]¶
Return a new pipeline object that can queue multiple commands for later execution.
transactionindicates whether all commands should be executed atomically. Apart from making a group of operations atomic, pipelines are useful for reducing the back-and-forth overhead between the client and server.- Parameters:
transaction (bool)
shard_hint (str | None)
- Return type:
Pipeline
- pubsub(**kwargs)[source]¶
Return a Publish/Subscribe object. With this object, you can subscribe to channels and listen for messages that get published to them.
- Parameters:
kwargs (Any)
- Return type:
PubSub
- set_response_callback(command, callback)[source]¶
Set a custom Response Callback
- Parameters:
command (str)
callback (ResponseCallbackProtocol | AsyncResponseCallbackProtocol)
- async transaction(func, *watches, shard_hint=None, value_from_callable=False, watch_delay=None)[source]¶
Convenience method for executing the callable func as a transaction while watching all keys specified in watches. The ‘func’ callable should expect a single argument which is a Pipeline object.
- Parameters:
func (Callable[[Pipeline], Any | Awaitable[Any]])
watches (bytes | str | memoryview)
shard_hint (str | None)
value_from_callable (bool)
watch_delay (float | None)
Async Cluster Client¶
ValkeyCluster (Async)¶
- class valkey.asyncio.cluster.ValkeyCluster(host=None, port=6379, startup_nodes=None, require_full_coverage=True, read_from_replicas=False, dynamic_startup_nodes=True, reinitialize_steps=5, cluster_error_retry_attempts=3, connection_error_retry_attempts=3, max_connections=2147483648, db=0, path=None, credential_provider=None, username=None, password=None, client_name=None, lib_name='valkey-py', lib_version='6.1.1', encoding='utf-8', encoding_errors='strict', decode_responses=False, health_check_interval=0, socket_connect_timeout=None, socket_keepalive=False, socket_keepalive_options=None, socket_timeout=5, retry=None, retry_on_error=None, ssl=False, ssl_ca_certs=None, ssl_ca_data=None, ssl_cert_reqs='required', ssl_certfile=None, ssl_check_hostname=False, ssl_keyfile=None, ssl_min_version=None, ssl_ciphers=None, protocol=2, address_remap=None, cache_enabled=False, client_cache=None, cache_max_size=100, cache_ttl=0, cache_policy=EvictionPolicy.LRU, cache_deny_list=['BF.CARD', 'BF.DEBUG', 'BF.EXISTS', 'BF.INFO', 'BF.MEXISTS', 'BF.SCANDUMP', 'CF.COMPACT', 'CF.COUNT', 'CF.DEBUG', 'CF.EXISTS', 'CF.INFO', 'CF.MEXISTS', 'CF.SCANDUMP', 'CMS.INFO', 'CMS.QUERY', 'DUMP', 'EXPIRETIME', 'FT.AGGREGATE', 'FT.ALIASADD', 'FT.ALIASDEL', 'FT.ALIASUPDATE', 'FT.CURSOR', 'FT.EXPLAIN', 'FT.EXPLAINCLI', 'FT.GET', 'FT.INFO', 'FT.MGET', 'FT.PROFILE', 'FT.SEARCH', 'FT.SPELLCHECK', 'FT.SUGGET', 'FT.SUGLEN', 'FT.SYNDUMP', 'FT.TAGVALS', 'FT._ALIASADDIFNX', 'FT._ALIASDELIFX', 'HRANDFIELD', 'JSON.DEBUG', 'PEXPIRETIME', 'PFCOUNT', 'PTTL', 'SRANDMEMBER', 'TDIGEST.BYRANK', 'TDIGEST.BYREVRANK', 'TDIGEST.CDF', 'TDIGEST.INFO', 'TDIGEST.MAX', 'TDIGEST.MIN', 'TDIGEST.QUANTILE', 'TDIGEST.RANK', 'TDIGEST.REVRANK', 'TDIGEST.TRIMMED_MEAN', 'TOPK.INFO', 'TOPK.LIST', 'TOPK.QUERY', 'TOUCH', 'TTL'], cache_allow_list=['BITCOUNT', 'BITFIELD_RO', 'BITPOS', 'EXISTS', 'GEODIST', 'GEOHASH', 'GEOPOS', 'GEORADIUSBYMEMBER_RO', 'GEORADIUS_RO', 'GEOSEARCH', 'GET', 'GETBIT', 'GETRANGE', 'HEXISTS', 'HGET', 'HGETALL', 'HKEYS', 'HLEN', 'HMGET', 'HSTRLEN', 'HVALS', 'JSON.ARRINDEX', 'JSON.ARRLEN', 'JSON.GET', 'JSON.MGET', 'JSON.OBJKEYS', 'JSON.OBJLEN', 'JSON.RESP', 'JSON.STRLEN', 'JSON.TYPE', 'LCS', 'LINDEX', 'LLEN', 'LPOS', 'LRANGE', 'MGET', 'SCARD', 'SDIFF', 'SINTER', 'SINTERCARD', 'SISMEMBER', 'SMEMBERS', 'SMISMEMBER', 'SORT_RO', 'STRLEN', 'SUBSTR', 'SUNION', 'TS.GET', 'TS.INFO', 'TS.RANGE', 'TS.REVRANGE', 'TYPE', 'XLEN', 'XPENDING', 'XRANGE', 'XREAD', 'XREVRANGE', 'ZCARD', 'ZCOUNT', 'ZDIFF', 'ZINTER', 'ZINTERCARD', 'ZLEXCOUNT', 'ZMSCORE', 'ZRANGE', 'ZRANGEBYLEX', 'ZRANGEBYSCORE', 'ZRANK', 'ZREVRANGE', 'ZREVRANGEBYLEX', 'ZREVRANGEBYSCORE', 'ZREVRANK', 'ZSCORE', 'ZUNION'])[source]¶
Create a new ValkeyCluster client.
Pass one of parameters:
host & port
startup_nodes
Useawaitinitialize()to find cluster nodes & create connections.Useawaitclose()to disconnect connections & close client.Many commands support the target_nodes kwarg. It can be one of the
NODE_FLAGS:PRIMARIESREPLICASALL_NODESRANDOMDEFAULT_NODE
Note: This client is not thread/process/fork safe.
- Parameters:
host (str | None) –
Can be used to point to a startup nodeport (str | int) –
Port used if host is providedstartup_nodes (List[ClusterNode] | None) –
ClusterNodeto used as a startup noderequire_full_coverage (bool) –
When set toFalse: the client will not require a full coverage of the slots. However, if not all slots are covered, and at least one node hascluster-require-full-coverageset toyes, the server will throw aClusterDownErrorfor some key-based commands.When set toTrue: all slots must be covered to construct the cluster client. If not all slots are covered,ValkeyClusterExceptionwill be thrown.read_from_replicas (bool) –
Enable read from replicas in READONLY mode. You can read possibly stale data. When set to true, read commands will be assigned between the primary and its replications in a Round-Robin manner.dynamic_startup_nodes (bool) –
Set the ValkeyCluster’s startup nodes to all the discovered nodes. If true (default value), the cluster’s discovered nodes will be used to determine the cluster nodes-slots mapping in the next topology refresh. It will remove the initial passed startup nodes if their endpoints aren’t listed in the CLUSTER SLOTS output. If you use dynamic DNS endpoints for startup nodes but CLUSTER SLOTS lists specific IP addresses, it is best to set it to false.reinitialize_steps (int) –
Specifies the number of MOVED errors that need to occur before reinitializing the whole cluster topology. If a MOVED error occurs and the cluster does not need to be reinitialized on this current error handling, only the MOVED slot will be patched with the redirected node. To reinitialize the cluster on every MOVED error, set reinitialize_steps to 1. To avoid reinitializing the cluster on moved errors, set reinitialize_steps to 0.cluster_error_retry_attempts (int) –
Number of times to retry before raising an error whenTimeoutErrororConnectionErrororClusterDownErrorare encounteredconnection_error_retry_attempts (int) –
Number of times to retry before reinitializing whenTimeoutErrororConnectionErrorare encountered. The default backoff strategy will be set if Retry object is not passed (see default_backoff in backoff.py). To change it, pass a custom Retry object using the “retry” keyword.max_connections (int) –
Maximum number of connections per node. If there are no free connections & the maximum number of connections are already created, aMaxConnectionsErroris raised. This error may be retried as defined byconnection_error_retry_attemptsaddress_remap (Callable[[Tuple[str, int]], Tuple[str, int]] | None) –
An optional callable which, when provided with an internal network address of a node, e.g. a (host, port) tuple, will return the address where the node is reachable. This can be used to map the addresses at which the nodes _think_ they are, to addresses at which a client may reach them, such as when they sit behind a proxy.db (str | int)
path (str | None)
credential_provider (CredentialProvider | None)
username (str | None)
password (str | None)
client_name (str | None)
lib_name (str | None)
lib_version (str | None)
encoding (str)
encoding_errors (str)
decode_responses (bool)
health_check_interval (float)
socket_connect_timeout (float | None)
socket_keepalive (bool)
socket_keepalive_options (Mapping[int, int | bytes] | None)
socket_timeout (float | None)
retry (Retry | None)
retry_on_error (List[Type[Exception]] | None)
ssl (bool)
ssl_ca_certs (str | None)
ssl_ca_data (str | None)
ssl_cert_reqs (str)
ssl_certfile (str | None)
ssl_check_hostname (bool)
ssl_keyfile (str | None)
ssl_min_version (TLSVersion | None)
ssl_ciphers (str | None)
protocol (int | None)
cache_enabled (bool)
client_cache (AbstractCache | None)
cache_max_size (int)
cache_ttl (int)
cache_policy (str)
cache_deny_list (List[str])
cache_allow_list (List[str])
Rest of the arguments will be passed to theConnectioninstances when created- Raises:
if any arguments are invalid or unknown. Eg:
db != 0 or None
path argument for unix socket connection
none of the host/port & startup_nodes were provided
- Parameters:
host (str | None)
port (str | int)
startup_nodes (List[ClusterNode] | None)
require_full_coverage (bool)
read_from_replicas (bool)
dynamic_startup_nodes (bool)
reinitialize_steps (int)
cluster_error_retry_attempts (int)
connection_error_retry_attempts (int)
max_connections (int)
db (str | int)
path (str | None)
credential_provider (CredentialProvider | None)
username (str | None)
password (str | None)
client_name (str | None)
lib_name (str | None)
lib_version (str | None)
encoding (str)
encoding_errors (str)
decode_responses (bool)
health_check_interval (float)
socket_connect_timeout (float | None)
socket_keepalive (bool)
socket_keepalive_options (Mapping[int, int | bytes] | None)
socket_timeout (float | None)
retry (Retry | None)
retry_on_error (List[Type[Exception]] | None)
ssl (bool)
ssl_ca_certs (str | None)
ssl_ca_data (str | None)
ssl_cert_reqs (str)
ssl_certfile (str | None)
ssl_check_hostname (bool)
ssl_keyfile (str | None)
ssl_min_version (TLSVersion | None)
ssl_ciphers (str | None)
protocol (int | None)
address_remap (Callable[[Tuple[str, int]], Tuple[str, int]] | None)
cache_enabled (bool)
client_cache (AbstractCache | None)
cache_max_size (int)
cache_ttl (int)
cache_policy (str)
cache_deny_list (List[str])
cache_allow_list (List[str])
- classmethod from_url(url, **kwargs)[source]¶
Return a Valkey client object configured from the given URL.
For example:
valkey://[[username]:[password]]@localhost:6379/0 valkeys://[[username]:[password]]@localhost:6379/0
Three URL schemes are supported:
valkey:// creates a TCP socket connection.
valkeys:// creates a SSL wrapped TCP socket connection.
The username, password, hostname, path and all querystring values are passed through
urllib.parse.unquotein order to replace any percent-encoded values with their corresponding characters.All querystring options are cast to their appropriate Python types. Boolean arguments can be specified with string values “True”/”False” or “Yes”/”No”. Values that cannot be properly cast cause a
ValueErrorto be raised. Once parsed, the querystring arguments and keyword arguments are passed toConnectionwhen created. In the case of conflicting arguments, querystring arguments are used.- Parameters:
url (str)
kwargs (Any)
- Return type:
- async initialize()[source]¶
Get all nodes from startup nodes & creates connections if not initialized.
- Return type:
- get_nodes()[source]¶
Get all nodes of the cluster.
- Return type:
List[ClusterNode]
- get_primaries()[source]¶
Get the primary nodes of the cluster.
- Return type:
List[ClusterNode]
- get_replicas()[source]¶
Get the replica nodes of the cluster.
- Return type:
List[ClusterNode]
- set_default_node(node)[source]¶
Set the default node of the client.
- Raises:
DataError – if None is passed or node does not exist in cluster.
- Parameters:
node (ClusterNode)
- Return type:
None
- get_node(host=None, port=None, node_name=None)[source]¶
Get node by (host, port) or node_name.
- Parameters:
host (str | None)
port (int | None)
node_name (str | None)
- Return type:
ClusterNode | None
- get_node_from_key(key, replica=False)[source]¶
Get the cluster node corresponding to the provided key.
- Parameters:
key (str)
replica (bool) –
Indicates if a replica should be returnedNone will returned if no replica holds this key
- Raises:
SlotNotCoveredError – if the key is not covered by any slot.
- Return type:
ClusterNode | None
- keyslot(key)[source]¶
Find the keyslot for a given key.
See: https://valkey.io/docs/manual/scaling/#valkey-cluster-data-sharding
- Parameters:
key (bytes | memoryview | str | int | float)
- Return type:
int
- get_connection_kwargs()[source]¶
Get the kwargs passed to
Connection.- Return type:
Dict[str, Any | None]
- set_response_callback(command, callback)[source]¶
Set a custom response callback.
- Parameters:
command (str)
callback (ResponseCallbackProtocol | AsyncResponseCallbackProtocol)
- Return type:
None
- async execute_command(*args, **kwargs)[source]¶
Execute a raw command on the appropriate cluster node or target_nodes.
It will retry the command as specified by
cluster_error_retry_attempts& then raise an exception.- Parameters:
args (bytes | memoryview | str | int | float) –
Raw command argskwargs (Any) –
target_nodes:
NODE_FLAGSorClusterNodeor List[ClusterNode] or Dict[Any,ClusterNode]Rest of the kwargs are passed to the Valkey connection
- Raises:
ValkeyClusterException – if target_nodes is not provided & the command can’t be mapped to a slot
- Return type:
Any
- pipeline(transaction=None, shard_hint=None)[source]¶
Create & return a new
ClusterPipelineobject.Cluster implementation of pipeline does not support transaction or shard_hint.
- Raises:
ValkeyClusterException – if transaction or shard_hint are truthy values
- Parameters:
transaction (Any | None)
shard_hint (Any | None)
- Return type:
- lock(name, timeout=None, sleep=0.1, blocking=True, blocking_timeout=None, lock_class=None, thread_local=True)[source]¶
Return a new Lock object using key
namethat mimics the behavior of threading.Lock.If specified,
timeoutindicates a maximum life for the lock. By default, it will remain locked until release() is called.sleepindicates the amount of time to sleep per loop iteration when the lock is in blocking mode and another client is currently holding the lock.blockingindicates whether callingacquireshould block until the lock has been acquired or to fail immediately, causingacquireto return False and the lock not being acquired. Defaults to True. Note this value can be overridden by passing ablockingargument toacquire.blocking_timeoutindicates the maximum amount of time in seconds to spend trying to acquire the lock. A value ofNoneindicates continue trying forever.blocking_timeoutcan be specified as a float or integer, both representing the number of seconds to wait.lock_classforces the specified lock implementation. Note that the only lock class we implement isLock(which is a Lua-based lock). So, it’s unlikely you’ll need this parameter, unless you have created your own custom lock class.thread_localindicates whether the lock token is placed in thread-local storage. By default, the token is placed in thread local storage so that a thread only sees its token, not a token set by another thread. Consider the following timeline:- time: 0, thread-1 acquires my-lock, with a timeout of 5 seconds.
thread-1 sets the token to “abc”
- time: 1, thread-2 blocks trying to acquire my-lock using the
Lock instance.
- time: 5, thread-1 has not yet completed. valkey expires the lock
key.
- time: 5, thread-2 acquired my-lock now that it’s available.
thread-2 sets the token to “xyz”
- time: 6, thread-1 finishes its work and calls release(). if the
token is not stored in thread local storage, then thread-1 would see the token value as “xyz” and would be able to successfully release the thread-2’s lock.
In some use cases it’s necessary to disable thread local storage. For example, if you have code where one thread acquires a lock and passes that lock instance to a worker thread to release later. If thread local storage isn’t disabled in this case, the worker thread won’t see the token set by the thread that acquired the lock. Our assumption is that these cases aren’t common and as such default to using thread local storage.
- Parameters:
name (bytes | str | memoryview)
timeout (float | None)
sleep (float)
blocking (bool)
blocking_timeout (float | None)
lock_class (Type[Lock] | None)
thread_local (bool)
- Return type:
Lock
ClusterNode (Async)¶
- class valkey.asyncio.cluster.ClusterNode(host, port, server_type=None, *, max_connections=2147483648, connection_class=<class 'valkey.asyncio.connection.Connection'>, **connection_kwargs)[source]¶
Create a new ClusterNode.
Each ClusterNode manages multiple
Connectionobjects for the (host, port).- Parameters:
host (str)
port (str | int)
server_type (str | None)
max_connections (int)
connection_class (Type[Connection])
connection_kwargs (Any)
ClusterPipeline (Async)¶
- class valkey.asyncio.cluster.ClusterPipeline(client)[source]¶
Create a new ClusterPipeline object.
Usage:
result = await ( rc.pipeline() .set("A", 1) .get("A") .hset("K", "F", "V") .hgetall("K") .mset_nonatomic({"A": 2, "B": 3}) .get("A") .get("B") .delete("A", "B", "K") .execute() ) # result = [True, "1", 1, {"F": "V"}, True, True, "2", "3", 1, 1, 1]
Note: For commands DELETE, EXISTS, TOUCH, UNLINK, mset_nonatomic, which are split across multiple nodes, you’ll get multiple results for them in the array.
- Retryable errors:
- Redirection errors:
- Parameters:
client (ValkeyCluster) –
ExistingValkeyClusterclient
- execute_command(*args, **kwargs)[source]¶
Append a raw command to the pipeline.
- Parameters:
args (bytes | str | memoryview | int | float) –
Raw command argskwargs (Any) –
target_nodes:
NODE_FLAGSorClusterNodeor List[ClusterNode] or Dict[Any,ClusterNode]Rest of the kwargs are passed to the Valkey connection
- Return type:
- async execute(raise_on_error=True, allow_redirections=True)[source]¶
Execute the pipeline.
It will retry the commands as specified by
cluster_error_retry_attempts& then raise an exception.- Parameters:
raise_on_error (bool) –
Raise the first error if there are any errorsallow_redirections (bool) –
Whether to retry each failed command individually in case of redirection errors
- Raises:
ValkeyClusterException – if target_nodes is not provided & the command can’t be mapped to a slot
- Return type:
List[Any]
Connection¶
See complete example: here
Connection¶
Connection (Async)¶
- class valkey.asyncio.connection.Connection(*, host='localhost', port=6379, socket_keepalive=False, socket_keepalive_options=None, socket_type=0, **kwargs)[source]¶
Manages TCP communication to and from a Valkey server
- Parameters:
host (str)
port (str | int)
socket_keepalive (bool)
socket_keepalive_options (Mapping[int, int | bytes] | None)
socket_type (int)
Connection Pools¶
See complete example: here
ConnectionPool¶
- class valkey.connection.ConnectionPool(connection_class=<class 'valkey.connection.Connection'>, max_connections=None, **connection_kwargs)[source]¶
Create a connection pool.
If max_connectionsis set, then this object raisesConnectionErrorwhen the pool’s limit is reached.By default, TCP connections are created unless
connection_classis specified. Use class:.UnixDomainSocketConnection for unix sockets.Any additional keyword arguments are passed to the constructor of
connection_class.- Parameters:
max_connections (int | None)
- disconnect(inuse_connections=True)[source]¶
Disconnects connections in the pool
If
inuse_connectionsis True, disconnect connections that are current in use, potentially by other threads. Otherwise only disconnect connections that are idle in the pool.- Parameters:
inuse_connections (bool)
- Return type:
None
- classmethod from_url(url, **kwargs)[source]¶
Return a connection pool configured from the given URL.
For example:
valkey://[[username]:[password]]@localhost:6379/0 valkeys://[[username]:[password]]@localhost:6379/0 unix://[username@]/path/to/socket.sock?db=0[&password=password]
Three URL schemes are supported:
valkey:// creates a TCP socket connection.
valkeys:// creates a SSL wrapped TCP socket connection.
unix://: creates a Unix Domain Socket connection.
The username, password, hostname, path and all querystring values are passed through urllib.parse.unquote in order to replace any percent-encoded values with their corresponding characters.
There are several ways to specify a database number. The first value found will be used:
A
dbquerystring option, e.g. valkey://localhost?db=0If using the valkey:// or valkeys:// schemes, the path argument of the url, e.g. valkey://localhost/0
A
dbkeyword argument to this function.
If none of these options are specified, the default db=0 is used.
All querystring options are cast to their appropriate Python types. Boolean arguments can be specified with string values “True”/”False” or “Yes”/”No”. Values that cannot be properly cast cause a
ValueErrorto be raised. Once parsed, the querystring arguments and keyword arguments are passed to theConnectionPool’s class initializer. In the case of conflicting arguments, querystring arguments always win.
- get_connection(command_name, *keys, **options)[source]¶
Get a connection from the pool
- Parameters:
command_name (str)
- Return type:
- release(connection)[source]¶
Releases the connection back to the pool
- Parameters:
connection (Connection)
- Return type:
None
ConnectionPool (Async)¶
- class valkey.asyncio.connection.ConnectionPool(connection_class=<class 'valkey.asyncio.connection.Connection'>, max_connections=None, **connection_kwargs)[source]¶
Create a connection pool.
If max_connectionsis set, then this object raisesConnectionErrorwhen the pool’s limit is reached.By default, TCP connections are created unless
connection_classis specified. UseUnixDomainSocketConnectionfor unix sockets.Any additional keyword arguments are passed to the constructor of
connection_class.- Parameters:
connection_class (Type[AbstractConnection])
max_connections (int | None)
- can_get_connection()[source]¶
Return True if a connection can be retrieved from the pool.
- Return type:
bool
- async disconnect(inuse_connections=True)[source]¶
Disconnects connections in the pool
If
inuse_connectionsis True, disconnect connections that are current in use, potentially by other tasks. Otherwise only disconnect connections that are idle in the pool.- Parameters:
inuse_connections (bool)
- async ensure_connection(connection)[source]¶
Ensure that the connection object is connected and valid
- Parameters:
connection (AbstractConnection)
- classmethod from_url(url, **kwargs)[source]¶
Return a connection pool configured from the given URL.
For example:
valkey://[[username]:[password]]@localhost:6379/0 valkeys://[[username]:[password]]@localhost:6379/0 unix://[username@]/path/to/socket.sock?db=0[&password=password]
Three URL schemes are supported:
valkey:// creates a TCP socket connection.
valkeys:// creates a SSL wrapped TCP socket connection.
unix://: creates a Unix Domain Socket connection.
The username, password, hostname, path and all querystring values are passed through urllib.parse.unquote in order to replace any percent-encoded values with their corresponding characters.
There are several ways to specify a database number. The first value found will be used:
A
dbquerystring option, e.g. valkey://localhost?db=0- If using the valkey:// or valkeys:// schemes, the path argument
of the url, e.g. valkey://localhost/0
A
dbkeyword argument to this function.
If none of these options are specified, the default db=0 is used.
All querystring options are cast to their appropriate Python types. Boolean arguments can be specified with string values “True”/”False” or “Yes”/”No”. Values that cannot be properly cast cause a
ValueErrorto be raised. Once parsed, the querystring arguments and keyword arguments are passed to theConnectionPool’s class initializer. In the case of conflicting arguments, querystring arguments always win.- Parameters:
url (str)
- Return type:
_CP
- get_available_connection()[source]¶
Get a connection from the pool, without making sure it is connected