Asynchronous client examples

File: async_key_value.py.

Basic usage

Asynchronous client and cache (AioClient and AioCache) has mostly the same API as synchronous ones (Client and Cache). But there is some peculiarities.

Basic key-value

Firstly, import dependencies.

from pyignite import AioClient

Let’s connect to cluster and perform key-value queries.

# Create client and connect.
client = AioClient()
async with client.connect('127.0.0.1', 10800):
    # Create cache
    cache = await client.get_or_create_cache('test_async_cache')

    # Load data concurrently.
    await asyncio.gather(
        *[cache.put(f'key_{i}', f'value_{i}') for i in range(0, 20)]
    )

    # Key-value queries.
    print(await cache.get('key_10'))
    # value_10
    pprint(await cache.get_all([f'key_{i}' for i in range(0, 10)]))
    # {'key_0': 'value_0',
    #  'key_1': 'value_1',
    #  'key_2': 'value_2',
    #  'key_3': 'value_3',
    #  'key_4': 'value_4',
    #  'key_5': 'value_5',
    #  'key_6': 'value_6',
    #  'key_7': 'value_7',
    #  'key_8': 'value_8',
    #  'key_9': 'value_9'}

Scan

The scan() method returns AioScanCursor, that yields the resulting rows.

# Scan query.
async with cache.scan() as cursor:
    async for k, v in cursor:
        print(f'key = {k}, value = {v}')
# key = key_42, value = value_42
# key = key_43, value = value_43
# key = key_40, value = value_40
# key = key_41, value = value_41
# key = key_37, value = value_37
# key = key_51, value = value_51
# key = key_20, value = value_20
# ......

ExpiryPolicy

File: expiry_policy.py.

You can enable expiry policy (TTL) by two approaches.

Firstly, expiry policy can be set for entire cache by setting PROP_EXPIRY_POLICY in cache settings dictionary on creation.

ttl_cache = await client.create_cache({
    PROP_NAME: 'test',
    PROP_EXPIRY_POLICY: ExpiryPolicy(create=timedelta(seconds=1.0))
})
await ttl_cache.put(1, 1)
await asyncio.sleep(0.5)
value = await ttl_cache.get(1)
print(f"key = {1}, value = {value}")
# key = 1, value = 1
await asyncio.sleep(1.2)
value = await ttl_cache.get(1)
print(f"key = {1}, value = {value}")
# key = 1, value = None

Secondly, expiry policy can be set for all cache operations, which are done under decorator. To create it use with_expire_policy()

ttl_cache = simple_cache.with_expire_policy(access=timedelta(seconds=1.0))
await ttl_cache.put(1, 1)
await asyncio.sleep(0.5)
value = await ttl_cache.get(1)
print(f"key = {1}, value = {value}")
# key = 1, value = 1
await asyncio.sleep(1.7)
value = await ttl_cache.get(1)
print(f"key = {1}, value = {value}")
# key = 1, value = None

Transactions

File: transactions.py.

Client transactions are supported for caches with TRANSACTIONAL mode. Supported only python 3.7+

Let’s create transactional cache:

cache = await client.get_or_create_cache({
    PROP_NAME: 'tx_cache',
    PROP_CACHE_ATOMICITY_MODE: CacheAtomicityMode.TRANSACTIONAL
})

Let’s start a transaction and commit it:

key = 1
async with client.tx_start(
        isolation=TransactionIsolation.REPEATABLE_READ,
        concurrency=TransactionConcurrency.PESSIMISTIC
) as tx:
    await cache.put(key, 'success')
    await tx.commit()

Let’s check that the transaction was committed successfully:

val = await cache.get(key)
print(f"key={key} value={val}")

Let’s check that raising exception inside async with block leads to transaction’s rollback

try:
    async with client.tx_start(
            isolation=TransactionIsolation.REPEATABLE_READ,
            concurrency=TransactionConcurrency.PESSIMISTIC
    ):
        await cache.put(key, 'fail')
        raise RuntimeError('test')
except RuntimeError:
    pass

# key=1 value=success
val = await cache.get(key)
print(f"key={key} value={val}")

Let’s check that timed out transaction is successfully rolled back

try:
    async with client.tx_start(timeout=1000, label='long-tx') as tx:
        await cache.put(key, 'fail')
        await asyncio.sleep(2.0)
        await tx.commit()
except CacheError as e:
    # Cache transaction timed out: GridNearTxLocal[...timeout=1000, ... label=long-tx]
    print(e)

# key=1 value=success
val = await cache.get(key)
print(f"key={key} value={val}")

See more info about transaction’s parameters in a documentation of tx_start()

SQL

File: async_sql.py.

First let us establish a connection.

client = AioClient()
async with client.connect('127.0.0.1', 10800):

Then create tables. Begin with Country table, than proceed with related tables City and CountryLanguage.

COUNTRY_CREATE_TABLE = '''CREATE TABLE Country (
    Code CHAR(3) PRIMARY KEY,
    Name CHAR(52),
    Continent CHAR(50),
    Region CHAR(26),
    SurfaceArea DECIMAL(10,2),
    IndepYear SMALLINT(6),
    Population INT(11),
    LifeExpectancy DECIMAL(3,1),
    GNP DECIMAL(10,2),
    GNPOld DECIMAL(10,2),
    LocalName CHAR(45),
    GovernmentForm CHAR(45),
    HeadOfState CHAR(60),
    Capital INT(11),
    Code2 CHAR(2)
)'''
CITY_CREATE_TABLE = '''CREATE TABLE City (
    ID INT(11),
    Name CHAR(35),
    CountryCode CHAR(3),
    District CHAR(20),
    Population INT(11),
    PRIMARY KEY (ID, CountryCode)
) WITH "affinityKey=CountryCode"'''
LANGUAGE_CREATE_TABLE = '''CREATE TABLE CountryLanguage (
    CountryCode CHAR(3),
    Language CHAR(30),
    IsOfficial BOOLEAN,
    Percentage DECIMAL(4,1),
    PRIMARY KEY (CountryCode, Language)
) WITH "affinityKey=CountryCode"'''
    for query in [
        Query.COUNTRY_CREATE_TABLE,
        Query.CITY_CREATE_TABLE,
        Query.LANGUAGE_CREATE_TABLE,
    ]:
        await client.sql(query)

Create indexes.

CITY_CREATE_INDEX = 'CREATE INDEX idx_country_code ON city (CountryCode)'
LANGUAGE_CREATE_INDEX = 'CREATE INDEX idx_lang_country_code ON CountryLanguage (CountryCode)'
for query in [Query.CITY_CREATE_INDEX, Query.LANGUAGE_CREATE_INDEX]:
    await client.sql(query)

Fill tables with data.

COUNTRY_INSERT = '''INSERT INTO Country(
    Code, Name, Continent, Region,
    SurfaceArea, IndepYear, Population,
    LifeExpectancy, GNP, GNPOld,
    LocalName, GovernmentForm, HeadOfState,
    Capital, Code2
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)'''
CITY_INSERT = '''INSERT INTO City(
    ID, Name, CountryCode, District, Population
) VALUES (?, ?, ?, ?, ?)'''
LANGUAGE_INSERT = '''INSERT INTO CountryLanguage(
    CountryCode, Language, IsOfficial, Percentage
) VALUES (?, ?, ?, ?)'''
await asyncio.gather(*[
    client.sql(Query.COUNTRY_INSERT, query_args=row) for row in TestData.COUNTRY
])

await asyncio.gather(*[
    client.sql(Query.CITY_INSERT, query_args=row) for row in TestData.CITY
])

await asyncio.gather(*[
    client.sql(Query.LANGUAGE_INSERT, query_args=row) for row in TestData.LANGUAGE
])

Now let us answer some questions.

What are the 10 largest cities in our data sample (population-wise)?

async with client.sql('SELECT name, population FROM City ORDER BY population DESC LIMIT 10') as cursor:
    print('Most 10 populated cities:')
    async for row in cursor:
        print(row)
# Most 10 populated cities:
# ['Mumbai (Bombay)', 10500000]
# ['Shanghai', 9696300]
# ['New York', 8008278]
# ['Peking', 7472000]
# ['Delhi', 7206704]
# ['Chongqing', 6351600]
# ['Tianjin', 5286800]
# ['Calcutta [Kolkata]', 4399819]
# ['Wuhan', 4344600]
# ['Harbin', 4289800]

The sql() method returns AioSqlFieldsCursor, that yields the resulting rows.

What are the 10 most populated cities throughout the 3 chosen countries?

If you set the include_field_names argument to True, the sql() method will generate a list of column names as a first yield. Unfortunately, there is no async equivalent of next but you can await __anext__() of AioSqlFieldsCursor

most_populated_in_3_countries = '''
SELECT country.name as country_name, city.name as city_name, MAX(city.population) AS max_pop FROM country
    JOIN city ON city.countrycode = country.code
    WHERE country.code IN ('USA','IND','CHN')
    GROUP BY country.name, city.name ORDER BY max_pop DESC LIMIT 10
'''

async with client.sql(most_populated_in_3_countries, include_field_names=True) as cursor:
    print('Most 10 populated cities in USA, India and China:')
    table_str_pattern = '{:15}\t| {:20}\t| {}'
    print(table_str_pattern.format(*await cursor.__anext__()))
    print('*' * 50)
    async for row in cursor:
        print(table_str_pattern.format(*row))
# Most 10 populated cities in USA, India and China:
# COUNTRY_NAME   	| CITY_NAME           	| MAX_POP
# **************************************************
# India          	| Mumbai (Bombay)     	| 10500000
# China          	| Shanghai            	| 9696300
# United States  	| New York            	| 8008278
# China          	| Peking              	| 7472000
# India          	| Delhi               	| 7206704
# China          	| Chongqing           	| 6351600
# China          	| Tianjin             	| 5286800
# India          	| Calcutta [Kolkata]  	| 4399819
# China          	| Wuhan               	| 4344600
# China          	| Harbin              	| 4289800

Display all the information about a given city

async with client.sql('SELECT * FROM City WHERE id = ?', query_args=[3802], include_field_names=True) as cursor:
    field_names = await cursor.__anext__()
    field_data = await cursor.__anext__()

    print('City info:')
    for field_name, field_value in zip(field_names * len(field_data), field_data):
        print('{}: {}'.format(field_name, field_value))
# City info:
# ID: 3802
# NAME: Detroit
# COUNTRYCODE: USA
# DISTRICT: Michigan
# POPULATION: 951270

Finally, delete the tables used in this example with the following queries:

DROP_TABLE = 'DROP TABLE {} IF EXISTS'
await asyncio.gather(*[
    client.sql(Query.DROP_TABLE.format(table_name.value)) for table_name in TableNames
])