Skip to content

Asynchronous Programming in Python: A Deep Dive

Asynchronous Programming in Python: A Deep Dive

Asynchronous programming is essential for building high-performance applications that can handle multiple tasks efficiently. In this guide, we’ll explore Python’s asyncio library and learn how to write asynchronous code.

Understanding Asynchronous Programming

Asynchronous programming allows you to write concurrent code without using threads. Key concepts include:

  1. Coroutines
  2. Event Loops
  3. Tasks
  4. Futures

Getting Started with Asyncio

Let’s start with a simple example:

# @filename: main.py


async def hello():
    print("Hello")
    await asyncio.sleep(1)
    print("World")

async def main():
    await hello()

asyncio.run(main())

Coroutines and Tasks

Coroutines are the building blocks of async code:

# @filename: main.py

async def fetch_data(delay):
    print(f"Starting fetch_data({delay})")
    await asyncio.sleep(delay)
    print(f"Finished fetch_data({delay})")
    return f"Data from {delay}"

async def main():
    # Create tasks
    task1 = asyncio.create_task(fetch_data(2))
    task2 = asyncio.create_task(fetch_data(1))

    # Wait for both tasks to complete
    start_time = time.time()
    results = await asyncio.gather(task1, task2)
    end_time = time.time()

    print(f"Results: {results}")
    print(f"Total time: {end_time - start_time:.2f} seconds")

asyncio.run(main())

Async Context Managers

Create reusable async resources:

# @filename: main.py

class AsyncResource:
    async def __aenter__(self):
        print("Acquiring resource")
        await asyncio.sleep(1)
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        print("Releasing resource")
        await asyncio.sleep(1)

    async def process(self):
        print("Processing data")
        await asyncio.sleep(1)

async def main():
    async with AsyncResource() as resource:
        await resource.process()

asyncio.run(main())

Real-World Example: Async Web Scraper

Let’s build a web scraper that fetches multiple URLs concurrently:

# @filename: Dockerfile

from typing import List, Dict


class AsyncWebScraper:
    def __init__(self):
        self.session = None

    async def __aenter__(self):
        self.session = aiohttp.ClientSession()
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        if self.session:
            await self.session.close()

    async def fetch_page(self, url: str) -> str:
        """Fetch a single page."""
        async with self.session.get(url) as response:
            return await response.text()

    async def parse_page(self, html: str) -> Dict:
        """Parse page content."""
        soup = bs4.BeautifulSoup(html, 'html.parser')
        return {
            'title': soup.title.string if soup.title else None,
            'h1_tags': [h1.text for h1 in soup.find_all('h1')],
            'links': len(soup.find_all('a')),
        }

    async def process_url(self, url: str) -> Dict:
        """Process a single URL."""
        try:
            html = await self.fetch_page(url)
            data = await self.parse_page(html)
            data['url'] = url
            return data
        except Exception as e:
            return {'url': url, 'error': str(e)}

    async def scrape_urls(self, urls: List[str]) -> List[Dict]:
        """Scrape multiple URLs concurrently."""
        tasks = [self.process_url(url) for url in urls]
        return await asyncio.gather(*tasks)

async def main():
    urls = [
        'https://python.org',
        'https://github.com',
        'https://stackoverflow.com',
    ]

    start_time = time.time()

    async with AsyncWebScraper() as scraper:
        results = await scraper.scrape_urls(urls)

    end_time = time.time()

    print(f"Scraped {len(results)} pages in {end_time - start_time:.2f} seconds")

    for result in results:
        if 'error' in result:
            print(f"Error scraping {result['url']}: {result['error']}")
        else:
            print(f"\nURL: {result['url']}")
            print(f"Title: {result['title']}")
            print(f"H1 Tags: {result['h1_tags']}")
            print(f"Number of links: {result['links']}")

if __name__ == '__main__':
    asyncio.run(main())

Project: Async Task Queue

Let’s build a task queue system that processes jobs asynchronously:

# @filename: Dockerfile


from dataclasses import dataclass
from typing import Dict, List, Optional, Callable, Awaitable
from datetime import datetime


logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

@dataclass
class Task:
    id: str
    name: str
    payload: Dict
    status: str
    created_at: datetime
    started_at: Optional[datetime] = None
    completed_at: Optional[datetime] = None
    result: Optional[Dict] = None
    error: Optional[str] = None

class AsyncTaskQueue:
    def __init__(self, max_workers: int = 3):
        self.max_workers = max_workers
        self.tasks: Dict[str, Task] = {}
        self.queue: asyncio.Queue = asyncio.Queue()
        self.workers: List[asyncio.Task] = []
        self.handlers: Dict[str, Callable[[Dict], Awaitable[Dict]]] = {}

    def register_handler(self, task_name: str, handler: Callable[[Dict], Awaitable[Dict]]):
        """Register a handler for a specific task type."""
        self.handlers[task_name] = handler

    async def add_task(self, name: str, payload: Dict) -> str:
        """Add a new task to the queue."""
        if name not in self.handlers:
            raise ValueError(f"No handler registered for task type: {name}")

        task_id = str(uuid.uuid4())
        task = Task(
            id=task_id,
            name=name,
            payload=payload,
            status='pending',
            created_at=datetime.now()
        )

        self.tasks[task_id] = task
        await self.queue.put(task_id)

        logger.info(f"Added task {task_id} of type {name}")
        return task_id

    async def get_task(self, task_id: str) -> Optional[Task]:
        """Get task status and result."""
        return self.tasks.get(task_id)

    async def process_task(self, task_id: str):
        """Process a single task."""
        task = self.tasks[task_id]
        handler = self.handlers[task.name]

        try:
            task.status = 'running'
            task.started_at = datetime.now()

            logger.info(f"Processing task {task_id}")
            result = await handler(task.payload)

            task.status = 'completed'
            task.result = result

        except Exception as e:
            logger.error(f"Error processing task {task_id}: {e}")
            task.status = 'failed'
            task.error = str(e)

        finally:
            task.completed_at = datetime.now()

    async def worker(self):
        """Worker process that handles tasks from the queue."""
        while True:
            try:
                task_id = await self.queue.get()
                await self.process_task(task_id)
                self.queue.task_done()
            except Exception as e:
                logger.error(f"Worker error: {e}")

    async def start(self):
        """Start the task queue workers."""
        self.workers = [
            asyncio.create_task(self.worker())
            for _ in range(self.max_workers)
        ]
        logger.info(f"Started {self.max_workers} workers")

    async def stop(self):
        """Stop the task queue workers."""
        for worker in self.workers:
            worker.cancel()

        await asyncio.gather(*self.workers, return_exceptions=True)
        logger.info("Stopped all workers")

# Example usage
async def example_handler(payload: Dict) -> Dict:
    """Example task handler that simulates processing."""
    await asyncio.sleep(2)  # Simulate work
    return {'processed': payload}

async def main():
    # Create and start task queue
    queue = AsyncTaskQueue(max_workers=3)
    queue.register_handler('example', example_handler)
    await queue.start()

    try:
        # Add some tasks
        tasks = []
        for i in range(5):
            task_id = await queue.add_task('example', {'data': f'task_{i}'})
            tasks.append(task_id)

        # Wait for tasks to complete
        while True:
            incomplete = False
            for task_id in tasks:
                task = await queue.get_task(task_id)
                if task.status not in ('completed', 'failed'):
                    incomplete = True
                    break

            if not incomplete:
                break

            await asyncio.sleep(0.5)

        # Print results
        for task_id in tasks:
            task = await queue.get_task(task_id)
            print(f"\nTask {task_id}:")
            print(f"Status: {task.status}")
            print(f"Result: {task.result}")
            print(f"Error: {task.error}")
            print(f"Duration: {(task.completed_at - task.started_at).total_seconds():.2f}s")

    finally:
        await queue.stop()

if __name__ == '__main__':
    asyncio.run(main())

Best Practices

  1. Error Handling
# @filename: utils.py
async def safe_operation():
    try:
        async with timeout(5):  # Set timeout
            await potentially_long_operation()
    except asyncio.TimeoutError:
        print("Operation timed out")
    except Exception as e:
        print(f"Operation failed: {e}")
  1. Resource Management
# @filename: utils.py
async def manage_resources():
    async with AsyncResource() as resource:
        try:
            await resource.process()
        except Exception:
            # Resource will be properly released
            raise
  1. Concurrency Control
# @filename: utils.py
async def controlled_concurrency():
    semaphore = asyncio.Semaphore(5)  # Limit concurrent operations
    async with semaphore:
        await limited_operation()
  1. Task Cancellation
# @filename: utils.py
async def handle_cancellation():
    try:
        await long_operation()
    except asyncio.CancelledError:
        # Clean up resources
        raise  # Re-raise to propagate cancellation

Common Patterns

  1. Producer-Consumer
# @filename: utils.py
async def producer(queue):
    for i in range(5):
        await queue.put(i)
        await asyncio.sleep(1)

async def consumer(queue):
    while True:
        item = await queue.get()
        print(f"Processed {item}")
        queue.task_done()
  1. Fan-out/Fan-in
# @filename: utils.py
async def fan_out_fan_in(items):
    # Fan out
    tasks = [process_item(item) for item in items]

    # Fan in
    results = await asyncio.gather(*tasks)
    return results

Conclusion

Asynchronous programming in Python enables:

  • Efficient I/O operations
  • Better resource utilization
  • Improved application performance
  • Scalable concurrent operations

Keep exploring asyncio and its ecosystem to build robust async applications.

Further Reading

Python Programming Best Practices
Share:

Continue Reading

Python Fundamentals: A Complete Guide for Beginners in 2024

Master Python programming from scratch with this comprehensive guide for beginners. Learn about variables, data types, control structures, and best practices with practical examples and hands-on exercises. Perfect for those starting their programming journey in 2024.

Read article
PythonProgrammingBeginner Friendly

Mastering Object-Oriented Programming in Python: A Complete Guide

Learn object-oriented programming in Python from the ground up. Master classes, inheritance, polymorphism, and encapsulation with practical examples. Perfect for developers looking to write more organized and maintainable Python code in 2024.

Read article
PythonProgramming

Python Functions and Modules: Writing Reusable Code

Functions and modules are essential building blocks for writing clean, maintainable, and reusable Python code. This guide covers everything from basic function definitions to advanced concepts like decorators and generators. Learn how to organize your code into modules and packages, and discover best practices for creating modular Python applications.

Read article
PythonProgrammingAdvanced

AI-Assisted Content

This article includes AI-assisted content that has been reviewed for accuracy. Always test code snippets before use.