Asynchronous client examples¶
File: async_key_value.py.
Basic usage¶
Asynchronous client and cache (AioClient
and AioCache
)
has mostly the same API as synchronous ones (Client
and Cache
).
But there is some peculiarities.
Basic key-value¶
Firstly, import dependencies.
from pyignite import AioClient
Let’s connect to cluster and perform key-value queries.
# Create client and connect.
client = AioClient()
async with client.connect('127.0.0.1', 10800):
# Create cache
cache = await client.get_or_create_cache('test_async_cache')
# Load data concurrently.
await asyncio.gather(
*[cache.put(f'key_{i}', f'value_{i}') for i in range(0, 20)]
)
# Key-value queries.
print(await cache.get('key_10'))
# value_10
pprint(await cache.get_all([f'key_{i}' for i in range(0, 10)]))
# {'key_0': 'value_0',
# 'key_1': 'value_1',
# 'key_2': 'value_2',
# 'key_3': 'value_3',
# 'key_4': 'value_4',
# 'key_5': 'value_5',
# 'key_6': 'value_6',
# 'key_7': 'value_7',
# 'key_8': 'value_8',
# 'key_9': 'value_9'}
Scan¶
The scan()
method returns AioScanCursor
,
that yields the resulting rows.
# Scan query.
async with cache.scan() as cursor:
async for k, v in cursor:
print(f'key = {k}, value = {v}')
# key = key_42, value = value_42
# key = key_43, value = value_43
# key = key_40, value = value_40
# key = key_41, value = value_41
# key = key_37, value = value_37
# key = key_51, value = value_51
# key = key_20, value = value_20
# ......
ExpiryPolicy¶
File: expiry_policy.py.
You can enable expiry policy (TTL) by two approaches.
Firstly, expiry policy can be set for entire cache by setting PROP_EXPIRY_POLICY
in cache settings dictionary on creation.
ttl_cache = await client.create_cache({
PROP_NAME: 'test',
PROP_EXPIRY_POLICY: ExpiryPolicy(create=timedelta(seconds=1.0))
})
await ttl_cache.put(1, 1)
await asyncio.sleep(0.5)
value = await ttl_cache.get(1)
print(f"key = {1}, value = {value}")
# key = 1, value = 1
await asyncio.sleep(1.2)
value = await ttl_cache.get(1)
print(f"key = {1}, value = {value}")
# key = 1, value = None
Secondly, expiry policy can be set for all cache operations, which are done under decorator. To create it use
with_expire_policy()
ttl_cache = simple_cache.with_expire_policy(access=timedelta(seconds=1.0))
await ttl_cache.put(1, 1)
await asyncio.sleep(0.5)
value = await ttl_cache.get(1)
print(f"key = {1}, value = {value}")
# key = 1, value = 1
await asyncio.sleep(1.7)
value = await ttl_cache.get(1)
print(f"key = {1}, value = {value}")
# key = 1, value = None
Transactions¶
File: transactions.py.
Client transactions are supported for caches with
TRANSACTIONAL
mode.
Supported only python 3.7+
Let’s create transactional cache:
cache = await client.get_or_create_cache({
PROP_NAME: 'tx_cache',
PROP_CACHE_ATOMICITY_MODE: CacheAtomicityMode.TRANSACTIONAL
})
Let’s start a transaction and commit it:
key = 1
async with client.tx_start(
isolation=TransactionIsolation.REPEATABLE_READ,
concurrency=TransactionConcurrency.PESSIMISTIC
) as tx:
await cache.put(key, 'success')
await tx.commit()
Let’s check that the transaction was committed successfully:
val = await cache.get(key)
print(f"key={key} value={val}")
Let’s check that raising exception inside async with block leads to transaction’s rollback
try:
async with client.tx_start(
isolation=TransactionIsolation.REPEATABLE_READ,
concurrency=TransactionConcurrency.PESSIMISTIC
):
await cache.put(key, 'fail')
raise RuntimeError('test')
except RuntimeError:
pass
# key=1 value=success
val = await cache.get(key)
print(f"key={key} value={val}")
Let’s check that timed out transaction is successfully rolled back
try:
async with client.tx_start(timeout=1000, label='long-tx') as tx:
await cache.put(key, 'fail')
await asyncio.sleep(2.0)
await tx.commit()
except CacheError as e:
# Cache transaction timed out: GridNearTxLocal[...timeout=1000, ... label=long-tx]
print(e)
# key=1 value=success
val = await cache.get(key)
print(f"key={key} value={val}")
See more info about transaction’s parameters in a documentation of tx_start()
SQL¶
File: async_sql.py.
First let us establish a connection.
client = AioClient()
async with client.connect('127.0.0.1', 10800):
Then create tables. Begin with Country table, than proceed with related tables City and CountryLanguage.
COUNTRY_CREATE_TABLE = '''CREATE TABLE Country (
Code CHAR(3) PRIMARY KEY,
Name CHAR(52),
Continent CHAR(50),
Region CHAR(26),
SurfaceArea DECIMAL(10,2),
IndepYear SMALLINT(6),
Population INT(11),
LifeExpectancy DECIMAL(3,1),
GNP DECIMAL(10,2),
GNPOld DECIMAL(10,2),
LocalName CHAR(45),
GovernmentForm CHAR(45),
HeadOfState CHAR(60),
Capital INT(11),
Code2 CHAR(2)
)'''
CITY_CREATE_TABLE = '''CREATE TABLE City (
ID INT(11),
Name CHAR(35),
CountryCode CHAR(3),
District CHAR(20),
Population INT(11),
PRIMARY KEY (ID, CountryCode)
) WITH "affinityKey=CountryCode"'''
LANGUAGE_CREATE_TABLE = '''CREATE TABLE CountryLanguage (
CountryCode CHAR(3),
Language CHAR(30),
IsOfficial BOOLEAN,
Percentage DECIMAL(4,1),
PRIMARY KEY (CountryCode, Language)
) WITH "affinityKey=CountryCode"'''
for query in [
Query.COUNTRY_CREATE_TABLE,
Query.CITY_CREATE_TABLE,
Query.LANGUAGE_CREATE_TABLE,
]:
await client.sql(query)
Create indexes.
CITY_CREATE_INDEX = 'CREATE INDEX idx_country_code ON city (CountryCode)'
LANGUAGE_CREATE_INDEX = 'CREATE INDEX idx_lang_country_code ON CountryLanguage (CountryCode)'
for query in [Query.CITY_CREATE_INDEX, Query.LANGUAGE_CREATE_INDEX]:
await client.sql(query)
Fill tables with data.
COUNTRY_INSERT = '''INSERT INTO Country(
Code, Name, Continent, Region,
SurfaceArea, IndepYear, Population,
LifeExpectancy, GNP, GNPOld,
LocalName, GovernmentForm, HeadOfState,
Capital, Code2
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)'''
CITY_INSERT = '''INSERT INTO City(
ID, Name, CountryCode, District, Population
) VALUES (?, ?, ?, ?, ?)'''
LANGUAGE_INSERT = '''INSERT INTO CountryLanguage(
CountryCode, Language, IsOfficial, Percentage
) VALUES (?, ?, ?, ?)'''
await asyncio.gather(*[
client.sql(Query.COUNTRY_INSERT, query_args=row) for row in TestData.COUNTRY
])
await asyncio.gather(*[
client.sql(Query.CITY_INSERT, query_args=row) for row in TestData.CITY
])
await asyncio.gather(*[
client.sql(Query.LANGUAGE_INSERT, query_args=row) for row in TestData.LANGUAGE
])
Now let us answer some questions.
What are the 10 largest cities in our data sample (population-wise)?¶
async with client.sql('SELECT name, population FROM City ORDER BY population DESC LIMIT 10') as cursor:
print('Most 10 populated cities:')
async for row in cursor:
print(row)
# Most 10 populated cities:
# ['Mumbai (Bombay)', 10500000]
# ['Shanghai', 9696300]
# ['New York', 8008278]
# ['Peking', 7472000]
# ['Delhi', 7206704]
# ['Chongqing', 6351600]
# ['Tianjin', 5286800]
# ['Calcutta [Kolkata]', 4399819]
# ['Wuhan', 4344600]
# ['Harbin', 4289800]
The sql()
method returns AioSqlFieldsCursor
,
that yields the resulting rows.
What are the 10 most populated cities throughout the 3 chosen countries?¶
If you set the include_field_names argument to True, the
sql()
method will generate a list of
column names as a first yield. Unfortunately, there is no async equivalent of next but
you can await __anext__()
of AioSqlFieldsCursor
most_populated_in_3_countries = '''
SELECT country.name as country_name, city.name as city_name, MAX(city.population) AS max_pop FROM country
JOIN city ON city.countrycode = country.code
WHERE country.code IN ('USA','IND','CHN')
GROUP BY country.name, city.name ORDER BY max_pop DESC LIMIT 10
'''
async with client.sql(most_populated_in_3_countries, include_field_names=True) as cursor:
print('Most 10 populated cities in USA, India and China:')
table_str_pattern = '{:15}\t| {:20}\t| {}'
print(table_str_pattern.format(*await cursor.__anext__()))
print('*' * 50)
async for row in cursor:
print(table_str_pattern.format(*row))
# Most 10 populated cities in USA, India and China:
# COUNTRY_NAME | CITY_NAME | MAX_POP
# **************************************************
# India | Mumbai (Bombay) | 10500000
# China | Shanghai | 9696300
# United States | New York | 8008278
# China | Peking | 7472000
# India | Delhi | 7206704
# China | Chongqing | 6351600
# China | Tianjin | 5286800
# India | Calcutta [Kolkata] | 4399819
# China | Wuhan | 4344600
# China | Harbin | 4289800
Display all the information about a given city¶
async with client.sql('SELECT * FROM City WHERE id = ?', query_args=[3802], include_field_names=True) as cursor:
field_names = await cursor.__anext__()
field_data = await cursor.__anext__()
print('City info:')
for field_name, field_value in zip(field_names * len(field_data), field_data):
print('{}: {}'.format(field_name, field_value))
# City info:
# ID: 3802
# NAME: Detroit
# COUNTRYCODE: USA
# DISTRICT: Michigan
# POPULATION: 951270
Finally, delete the tables used in this example with the following queries:
DROP_TABLE = 'DROP TABLE {} IF EXISTS'
await asyncio.gather(*[
client.sql(Query.DROP_TABLE.format(table_name.value)) for table_name in TableNames
])