ft_utils

concurrent Module Documentation

The concurrent module provides a set of concurrently scalable data structures and patterns for Python. This module is designed to support high-performance, scalable programming with Free Threaded Python.

ConcurrentDict

A concurrently accessible dictionary.

Methods

Operators

ConcurrentDict also supports the following access methods:

Notes

ConcurrentDict does not support all the API methods of a built-in dict. It is designed for basic key-value store operations in a concurrent environment.

Example

from ft_utils.concurrent import ConcurrentDict

d = ConcurrentDict()
d['key'] = 'value'
print(d['key'])  # prints 'value'
print('key' in d))  # prints True
del d['key']
print('key' in d))  # prints False

AtomicInt64

A 64-bit integer that can be updated atomically.

Methods

In addition the following numeric methods are implemented.

Example

from ft_utils.concurrent import AtomicInt64

i = AtomicInt64(10)
print(i.get())  # prints 10
i.incr()
print(i.get())  # prints 11
i.add(5)
print(i.get())  # prints 16

AtomicReference

A reference that can be updated atomically.

Methods

Using compare_exchange

The compare_exchange method can be used in a loop to atomically update the reference, similar to using the CAS instruction in native programming.

Example

from ft_utils.concurrent import AtomicReference

r = AtomicReference(0)

def increment(r):
    while True:
        current = r.get()
        new_value = current + 1
        if r.compare_exchange(current, new_value):
            break

increment(r)
print(r.get())  # prints 1

In this example, the increment function uses a loop to atomically increment the value of the AtomicReference. The compare_exchange method is used to check if the current value is still the same as the expected value, and if so, updates the value to the new value. If another thread has updated the value in the meantime, the compare_exchange method will return False and the loop will retry.

Here are the documents for the new classes:

AtomicFlag

A boolean flag that can be updated atomically.

Methods

Example

from ft_utils.concurrent import AtomicFlag

flag = AtomicFlag(True)
print(flag)  # prints True
flag.set(False)
print(flag)  # prints False

ConcurrentGatheringIterator

A concurrent iterator that gathers values from multiple threads and yields them in order.

Methods

Notes

Example

from ft_utils.concurrent import ConcurrentGatheringIterator

iterator = ConcurrentGatheringIterator()
iterator.insert(0, 'value0')
iterator.insert(1, 'value1')
iterator.insert(2, 'value2')

for value in iterator.iterator(2):
    print(value)  # prints 'value0', 'value1', 'value2'

A more complex example:

from ft_utils.concurrent import ConcurrentGatheringIterator, AtomicInt64
from concurrent.futures import ThreadPoolExecutor

def insert_value(iterator, atomic_index, value):
    index = atomic_index.incr()
    iterator.insert(index, value)

def test_concurrent_gathering_iterator():
    iterator = ConcurrentGatheringIterator()
    atomic_index = AtomicInt64(-1)

    with ThreadPoolExecutor(max_workers=10) as executor:
        futures = []
        for i in range(100):
            futures.append(executor.submit(insert_value, iterator, atomic_index, i))

        for future in futures:
            future.result()

    results = list(iterator.iterator(99))
    assert results == list(range(100))

test_concurrent_gathering_iterator()

In this example, we use a ThreadPoolExecutor to insert values into the ConcurrentGatheringIterator from multiple threads. We use an AtomicInt64 to generate the indices in order. After inserting all the values, we retrieve the results from the iterator and check that they are in the correct order.

Note that the insert_value function is a helper function that inserts a value into the iterator at the next available index. The test_concurrent_gathering_iterator function is the main test function that creates the iterator, inserts values from multiple threads, and checks the results.

This example demonstrates that the ConcurrentGatheringIterator can handle concurrent inserts from multiple threads and still produce the correct results in order.

ConcurrentQueue

A concurrent queue that allows multiple threads to push and pop values.

Methods

Exceptions

Notes

Lock-Free Implementation

The lock-free implementation of the queue uses a combination of atomic operations and careful synchronization to ensure thread safety without the need for locks. This approach can provide better performance and scalability in certain scenarios, particularly those with a large number of readers and writers. It will tend to consume more CPU in lightly loaded contitions than using the lock based approach.

In general, the lock-free implementation is recommended for scenarios where:

On the other hand, the lock-based implementation is recommended for scenarios where:

Example

from ft_utils.concurrent import ConcurrentQueue

queue = ConcurrentQueue()

queue.push('value0')
queue.push('value1')
queue.push('value2')
queue.push('value3')
queue.push('value4')

print(queue.pop())  # prints 'value0'
print(queue.pop(timeout=0.1))
print(queue.pop_local())  # returns a LocalWrapper object
queue.shutdown()
queue.pop()
# Raises ShutDown
queue.pop()
# Raises ShutDown
queue.push('value5')

StdConcurrentQueue

This follows the same API as queue.Queue. For simple applications StdConcurrentQueue will work as a drop in replacement for queue.Queue. However, there are subtle differences:

Therefore, in complex applications it may be a better approach to mindfully replace highly contended queue.Queue instances with StdConcurrentQueue. In this case it is also better to use the simpler ConcurrentQueue where possible.