Python Concurrency: Threads, Processes, and Thread Pools

Global Interpreter Lock in Python

The Global Interpreter Lock (GIL) is a mutex that protects access to Python objects, preventing multiple native threads from executing Python bytecodes at the same time. In CPython, the standard implementation of Python, the GIL ensures that only one thread executes at a time.

This means that even with multiple threads, Python programs cannot achieve true parallelism on multi-core processors for CPU-bound tasks. Each thread must acquire the GIL before executing bytecode, and releases it periodically or during I/O operations.

The following example demonstrates how the GIL can lead to race conditions when multiple threads modify shared variables:

counter = 0

def increment():
    global counter
    for _ in range(1000000):
        counter += 1

def decrement():
    global counter
    for _ in range(1000000):
        counter -= 1

import threading

thread1 = threading.Thread(target=increment)
thread2 = threading.Thread(target=decrement)
thread1.start()
thread2.start()

thread1.join()
thread2.join()
print(counter)

Although the expected result is 0, multiple runs produce different values. This happens because threads don't hold the GIL for their entire execution time—they release it periodically, allowing other threads to run and modify the shared variable.

Bytecode Explanation

We can examine the bytecode to understand why the result isn't zero. The following code demonstrates this:

import dis

def increment(a):
    a += 1

def decrement(a):
    a -= 1

dis.dis(increment)
dis.dis(decrement)

The bytecode for these operations shows that both threads are executing similar instructions that modify a shared variable. When the GIL is released, either thread's operation might be applied, leading to inconsistent results.

Thread Programming

For I/O-bound operations, multithreading can improve performance by allowing other threads to run while one is waiting for I/O operations to complete.

Here's an example using the threading module to create threads:

import time
import threading

def process_task(task_name, duration):
    print(f"Starting {task_name}")
    time.sleep(duration)
    print(f"Finished {task_name}")

if __name__ == "__main__":
    thread1 = threading.Thread(target=process_task, args=("Task 1", 2))
    thread2 = threading.Thread(target=process_task, args=("Task 2", 2))
    
    start_time = time.time()
    thread1.start()
    thread2.start()
    
    print(f"Main thread execution time: {time.time() - start_time:.2f} seconds")

The output shows that the main thread continues execution immediately after starting the child threads, without waiting for them to complete. To ensure the main thread waits for child threads to finish, we can use the join() method.

thread1.join()
thread2.join()
print(f"Total execution time: {time.time() - start_time:.2f} seconds")

Alternatively, we can set threads as daemon threads, which will automatically terminate when the main thread exits:

thread1.daemon = True
thread2.daemon = True

Inheritance Approach for Thread Creation

Besides passing a target function to Thread, we can also create custom thread classes by inheriting from the Thread class:

import time
import threading

class DataFetcher(threading.Thread):
    def __init__(self, task_name, duration):
        super().__init__()
        self.task_name = task_name
        self.duration = duration
    
    def run(self):
        print(f"Starting {self.task_name}")
        time.sleep(self.duration)
        print(f"Finished {self.task_name}")

if __name__ == "__main__":
    thread1 = DataFetcher("Data Fetch 1", 2)
    thread2 = DataFetcher("Data Fetch 2", 4)
    
    start_time = time.time()
    thread1.start()
    thread2.start()
    
    thread1.join()
    thread2.join()
    
    print(f"Total execution time: {time.time() - start_time:.2f} seconds")

Thread Communication

Threads can communicate through shared variables or thread-safe data structures like Queues.

Shared Variables (Not Recommended)

Using shared variables is generally unsafe due to race conditions:

import time
import threading

data_queue = []

def process_data():
    while True:
        if len(data_queue):
            item = data_queue.pop(0)
            print(f"Processing item: {item}")
            time.sleep(0.5)
        else:
            time.sleep(0.1)

def generate_data():
    print("Generating data...")
    time.sleep(2)
    for i in range(10):
        data_queue.append(f"Item-{i}")
    print("Data generation complete")

if __name__ == "__main__":
    data_thread = threading.Thread(target=generate_data)
    data_thread.start()
    
    for i in range(3):
        processor = threading.Thread(target=process_data)
        processor.start()

Queue-Based Communication (Thread-Safe)

The Queue class provides thread-safe communication between threads:

from queue import Queue
import time
import threading

def process_data(queue):
    while True:
        item = queue.get()  # Blocks if queue is empty
        print(f"Processing: {item}")
        time.sleep(0.5)
        queue.task_done()

def generate_data(queue):
    print("Generating data...")
    time.sleep(2)
    for i in range(10):
        queue.put(f"Item-{i}")
    print("Data generation complete")

if __name__ == "__main__":
    data_queue = Queue(maxsize=20)
    
    producer = threading.Thread(target=generate_data, args=(data_queue,))
    producer.start()
    
    for i in range(3):
        consumer = threading.Thread(target=process_data, args=(data_queue,))
        consumer.start()
    
    producer.join()
    data_queue.join()  # Wait for all items to be processed
    print("All tasks completed")

Thread Synchronization

Lock

Locks ensure that only one thread can execute a critical section at a time:

from threading import Lock

counter = 0
lock = Lock()

def increment():
    global counter
    for _ in range(1000000):
        with lock:  # Context manager for lock acquisition/release
            counter += 1

def decrement():
    global counter
    for _ in range(1000000):
        with lock:
            counter -= 1

import threading

thread1 = threading.Thread(target=increment)
thread2 = threading.Thread(target=decrement)
thread1.start()
thread2.start()

thread1.join()
thread2.join()
print(counter)

Be aware that locks can cause performance overhead and may lead to deadlocks if not used carefully.

RLock (Reentrant Lock)

RLocks allow the same thread to acquire the lock multiple times:

from threading import RLock

counter = 0
lock = RLock()

def nested_function():
    global counter
    with lock:
        counter += 1

def increment():
    global counter
    for _ in range(1000000):
        with lock:
            nested_function()

def decrement():
    global counter
    for _ in range(1000000):
        with lock:
            counter -= 1

import threading

thread1 = threading.Thread(target=increment)
thread2 = threading.Thread(target=decrement)
thread1.start()
thread2.start()

thread1.join()
thread2.join()
print(counter)

Condition Variables

Condition variables allow threads to wait for certain conditions to be met:

import threading

class Speaker(threading.Thread):
    def __init__(self, condition):
        super().__init__(name="Speaker")
        self.condition = condition
    
    def run(self):
        with self.condition:
            print(f"{self.name}: Hello")
            self.condition.notify()
            
            self.condition.wait()
            print(f"{self.name}: How are you?")
            self.condition.notify()
            
            self.condition.wait()
            print(f"{self.name}: Let's discuss programming")
            self.condition.notify()

class Listener(threading.Thread):
    def __init__(self, condition):
        super().__init__(name="Listener")
        self.condition = condition
    
    def run(self):
        with self.condition:
            self.condition.wait()
            print(f"{self.name}: Hi there")
            self.condition.notify()
            
            self.condition.wait()
            print(f"{self.name}: I'm good, thanks!")
            self.condition.notify()
            
            self.condition.wait()
            print(f"{self.name}: Sounds great!")
            self.condition.notify()

if __name__ == "__main__":
    condition = threading.Condition()
    speaker = Speaker(condition)
    listener = Listener(condition)
    
    speaker.start()
    listener.start()

Semaphores

Semaphores control the number of threads that can access a resource simultaneously:

import threading
import time

class WebCrawler(threading.Thread):
    def __init__(self, url, semaphore):
        super().__init__()
        self.url = url
        self.semaphore = semaphore
    
    def run(self):
        with self.semaphore:
            time.sleep(2)  # Simulate network request
            print(f"Crawled: {self.url}")

class URLGenerator(threading.Thread):
    def __init__(self, semaphore):
        super().__init__()
        self.semaphore = semaphore
    
    def run(self):
        urls = [f"https://example.com/page{i}" for i in range(10)]
        for url in urls:
            with self.semaphore:
                crawler = WebCrawler(url, self.semaphore)
                crawler.start()

if __name__ == "__main__":
    semaphore = threading.Semaphore(3)  # Limit to 3 concurrent crawlers
    url_generator = URLGenerator(semaphore)
    url_generator.start()

Thread Pools with ThreadPoolExecutor

Thread pools efficiently manage multiple threads by reusing them for different tasks:

from concurrent.futures import ThreadPoolExecutor
import time

def fetch_page(url, delay):
    time.sleep(delay)
    return f"Fetched: {url} in {delay} seconds"

if __name__ == "__main__":
    urls = ["https://example.com", "https://example.org", "https://example.net"]
    delays = [1, 2, 3]
    
    with ThreadPoolExecutor(max_workers=2) as executor:
        # Submit tasks
        futures = [executor.submit(fetch_page, url, delay) 
                  for url, delay in zip(urls, delays)]
        
        # Check task completion
        for future in futures:
            print(future.result())

Processing Results with as_completed

The as_completed function returns results as tasks complete, regardless of submission order:

from concurrent.futures import ThreadPoolExecutor, as_completed
import time

def fetch_page(url, delay):
    time.sleep(delay)
    return f"Fetched: {url} in {delay} seconds"

if __name__ == "__main__":
    urls = ["https://example.com", "https://example.org", "https://example.net"]
    delays = [3, 1, 2]  # Different delays
    
    with ThreadPoolExecutor(max_workers=2) as executor:
        futures = [executor.submit(fetch_page, url, delay) 
                  for url, delay in zip(urls, delays)]
        
        # Results will be returned in completion order
        for future in as_completed(futures):
            print(future.result())

Using map for Ordered Results

The map function returns results in the same order as the input arguments:

from concurrent.futures import ThreadPoolExecutor
import time

def fetch_page(url, delay):
    time.sleep(delay)
    return f"Fetched: {url} in {delay} seconds"

if __name__ == "__main__":
    urls = ["https://example.com", "https://example.org", "https://example.net"]
    delays = [3, 1, 2]
    
    with ThreadPoolExecutor(max_workers=2) as executor:
        # Results will be returned in input order
        for result in executor.map(lambda args: fetch_page(*args), 
                                 list(zip(urls, delays))):
            print(result)

Waiting for Specific Tasks with wait

The wait function allows you to wait for specific task completion conditions:

from concurrent.futures import ThreadPoolExecutor, wait, FIRST_COMPLETED
import time

def fetch_page(url, delay):
    time.sleep(delay)
    return f"Fetched: {url} in {delay} seconds"

if __name__ == "__main__":
    urls = ["https://example.com", "https://example.org", "https://example.net"]
    delays = [3, 1, 2]
    
    with ThreadPoolExecutor(max_workers=2) as executor:
        futures = [executor.submit(fetch_page, url, delay) 
                  for url, delay in zip(urls, delays)]
        
        # Wait for first task to complete
        wait(futures, return_when=FIRST_COMPLETED)
        print("First task completed")
        
        # Process remaining results
        for future in as_completed(futures):
            print(future.result())

Comparing Threads and Processes

Due to the GIL, Python threads cannot achieve true parallelism for CPU-bound tasks. For such tasks, multiprocessing is more effective:

CPU-Bound Tasks

import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor

def fibonacci(n):
    if n <= 1:
        return n
    return fibonacci(n-1) + fibonacci(n-2)

if __name__ == "__main__":
    numbers = [30, 31, 32, 33, 34]
    
    # Thread pool execution
    start_time = time.time()
    with ThreadPoolExecutor(max_workers=2) as executor:
        results = list(executor.map(fibonacci, numbers))
    thread_time = time.time() - start_time
    print(f"Threads completed in {thread_time:.2f} seconds")
    
    # Process pool execution
    start_time = time.time()
    with ProcessPoolExecutor(max_workers=2) as executor:
        results = list(executor.map(fibonacci, numbers))
    process_time = time.time() - start_time
    print(f"Processes completed in {process_time:.2f} seconds")

I/O-Bound Tasks

For I/O-bound tasks, threads and processes perform similarly, but threads are more lightweight:

import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor

def simulate_io(duration):
    time.sleep(duration)
    return duration

if __name__ == "__main__":
    durations = [1] * 10  # Simulate 10 I/O operations
    
    # Thread pool execution
    start_time = time.time()
    with ThreadPoolExecutor(max_workers=4) as executor:
        results = list(executor.map(simulate_io, durations))
    thread_time = time.time() - start_time
    print(f"Threads completed in {thread_time:.2f} seconds")
    
    # Process pool execution
    start_time = time.time()
    with ProcessPoolExecutor(max_workers=4) as executor:
        results = list(executor.map(simulate_io, durations))
    process_time = time.time() - start_time
    print(f"Processes completed in {process_time:.2f} seconds")

Process Programming with multiprocessing

The multiprocessing module creates separate processes, each with its own Python interpreter and memory space:

import os
import time

def child_process():
    print(f"Child process: {os.getpid()}, Parent: {os.getppid()}")
    time.sleep(2)
    print("Child process completed")

if __name__ == "__main__":
    print(f"Main process: {os.getpid()}")
    
    pid = os.fork()
    
    if pid == 0:
        child_process()
    else:
        print(f"Created child process with PID: {pid}")
        os.waitpid(pid, 0)  # Wait for child to complete
        print("Main process completed")

Process Pools

The Pool class provides a convenient way to distribute tasks across multiple processes:

import multiprocessing
import time

def process_task(task_id, duration):
    time.sleep(duration)
    return f"Task {task_id} completed in {duration} seconds"

if __name__ == "__main__":
    tasks = [(i, i%2+1) for i in range(5)]  # (task_id, duration)
    
    with multiprocessing.Pool(processes=2) as pool:
        # Process tasks asynchronously
        results = pool.starmap(process_task, tasks)
        
        for result in results:
            print(result)

Process Communication

Queue

Queues can be used for communication between processes:

import multiprocessing
import time

def producer(queue):
    for i in range(5):
        queue.put(f"Item-{i}")
        time.sleep(0.5)
    print("Producer finished")

def consumer(queue):
    while True:
        item = queue.get()
        if item is None:  # Sentinel value to stop
            break
        print(f"Consumed: {item}")
    print("Consumer finished")

if __name__ == "__main__":
    queue = multiprocessing.Queue()
    
    prod = multiprocessing.Process(target=producer, args=(queue,))
    cons = multiprocessing.Process(target=consumer, args=(queue,))
    
    prod.start()
    cons.start()
    
    prod.join()
    queue.put(None)  # Send sentinel value
    cons.join()

Pipe

Pipes provide a more efficient communication mechanism between two processes:

import multiprocessing

def sender(pipe):
    for i in range(5):
        pipe.send(f"Message-{i}")
    pipe.close()

def receiver(pipe):
    while True:
        try:
            msg = pipe.recv()
            print(f"Received: {msg}")
        except EOFError:
            break
    pipe.close()

if __name__ == "__main__":
    recv_pipe, send_pipe = multiprocessing.Pipe()
    
    sender_proc = multiprocessing.Process(target=sender, args=(send_pipe,))
    receiver_proc = multiprocessing.Process(target=receiver, args=(recv_pipe,))
    
    sender_proc.start()
    receiver_proc.start()
    
    sender_proc.join()
    receiver_proc.join()

Shared Memory

Manager objects allow sharing complex data structures between processes:

import multiprocessing

def update_shared_data(shared_dict, key, value):
    shared_dict[key] = value

def main():
    manager = multiprocessing.Manager()
    shared_dict = manager.dict()
    
    processes = []
    for i in range(5):
        p = multiprocessing.Process(
            target=update_shared_data, 
            args=(shared_dict, f"Key-{i}", f"Value-{i}")
        )
        processes.append(p)
        p.start()
    
    for p in processes:
        p.join()
    
    print("Shared data:", shared_dict)

if __name__ == "__main__":
    main()

Tags: python multithreading multiprocessing Concurrency Thread Pool

Posted on Sun, 07 Jun 2026 17:04:27 +0000 by dhvani