Global Interpreter Lock in Python
The Global Interpreter Lock (GIL) is a mutex that protects access to Python objects, preventing multiple native threads from executing Python bytecodes at the same time. In CPython, the standard implementation of Python, the GIL ensures that only one thread executes at a time.
This means that even with multiple threads, Python programs cannot achieve true parallelism on multi-core processors for CPU-bound tasks. Each thread must acquire the GIL before executing bytecode, and releases it periodically or during I/O operations.
The following example demonstrates how the GIL can lead to race conditions when multiple threads modify shared variables:
counter = 0
def increment():
global counter
for _ in range(1000000):
counter += 1
def decrement():
global counter
for _ in range(1000000):
counter -= 1
import threading
thread1 = threading.Thread(target=increment)
thread2 = threading.Thread(target=decrement)
thread1.start()
thread2.start()
thread1.join()
thread2.join()
print(counter)
Although the expected result is 0, multiple runs produce different values. This happens because threads don't hold the GIL for their entire execution time—they release it periodically, allowing other threads to run and modify the shared variable.
Bytecode Explanation
We can examine the bytecode to understand why the result isn't zero. The following code demonstrates this:
import dis
def increment(a):
a += 1
def decrement(a):
a -= 1
dis.dis(increment)
dis.dis(decrement)
The bytecode for these operations shows that both threads are executing similar instructions that modify a shared variable. When the GIL is released, either thread's operation might be applied, leading to inconsistent results.
Thread Programming
For I/O-bound operations, multithreading can improve performance by allowing other threads to run while one is waiting for I/O operations to complete.
Here's an example using the threading module to create threads:
import time
import threading
def process_task(task_name, duration):
print(f"Starting {task_name}")
time.sleep(duration)
print(f"Finished {task_name}")
if __name__ == "__main__":
thread1 = threading.Thread(target=process_task, args=("Task 1", 2))
thread2 = threading.Thread(target=process_task, args=("Task 2", 2))
start_time = time.time()
thread1.start()
thread2.start()
print(f"Main thread execution time: {time.time() - start_time:.2f} seconds")
The output shows that the main thread continues execution immediately after starting the child threads, without waiting for them to complete. To ensure the main thread waits for child threads to finish, we can use the join() method.
thread1.join()
thread2.join()
print(f"Total execution time: {time.time() - start_time:.2f} seconds")
Alternatively, we can set threads as daemon threads, which will automatically terminate when the main thread exits:
thread1.daemon = True
thread2.daemon = True
Inheritance Approach for Thread Creation
Besides passing a target function to Thread, we can also create custom thread classes by inheriting from the Thread class:
import time
import threading
class DataFetcher(threading.Thread):
def __init__(self, task_name, duration):
super().__init__()
self.task_name = task_name
self.duration = duration
def run(self):
print(f"Starting {self.task_name}")
time.sleep(self.duration)
print(f"Finished {self.task_name}")
if __name__ == "__main__":
thread1 = DataFetcher("Data Fetch 1", 2)
thread2 = DataFetcher("Data Fetch 2", 4)
start_time = time.time()
thread1.start()
thread2.start()
thread1.join()
thread2.join()
print(f"Total execution time: {time.time() - start_time:.2f} seconds")
Thread Communication
Threads can communicate through shared variables or thread-safe data structures like Queues.
Shared Variables (Not Recommended)
Using shared variables is generally unsafe due to race conditions:
import time
import threading
data_queue = []
def process_data():
while True:
if len(data_queue):
item = data_queue.pop(0)
print(f"Processing item: {item}")
time.sleep(0.5)
else:
time.sleep(0.1)
def generate_data():
print("Generating data...")
time.sleep(2)
for i in range(10):
data_queue.append(f"Item-{i}")
print("Data generation complete")
if __name__ == "__main__":
data_thread = threading.Thread(target=generate_data)
data_thread.start()
for i in range(3):
processor = threading.Thread(target=process_data)
processor.start()
Queue-Based Communication (Thread-Safe)
The Queue class provides thread-safe communication between threads:
from queue import Queue
import time
import threading
def process_data(queue):
while True:
item = queue.get() # Blocks if queue is empty
print(f"Processing: {item}")
time.sleep(0.5)
queue.task_done()
def generate_data(queue):
print("Generating data...")
time.sleep(2)
for i in range(10):
queue.put(f"Item-{i}")
print("Data generation complete")
if __name__ == "__main__":
data_queue = Queue(maxsize=20)
producer = threading.Thread(target=generate_data, args=(data_queue,))
producer.start()
for i in range(3):
consumer = threading.Thread(target=process_data, args=(data_queue,))
consumer.start()
producer.join()
data_queue.join() # Wait for all items to be processed
print("All tasks completed")
Thread Synchronization
Lock
Locks ensure that only one thread can execute a critical section at a time:
from threading import Lock
counter = 0
lock = Lock()
def increment():
global counter
for _ in range(1000000):
with lock: # Context manager for lock acquisition/release
counter += 1
def decrement():
global counter
for _ in range(1000000):
with lock:
counter -= 1
import threading
thread1 = threading.Thread(target=increment)
thread2 = threading.Thread(target=decrement)
thread1.start()
thread2.start()
thread1.join()
thread2.join()
print(counter)
Be aware that locks can cause performance overhead and may lead to deadlocks if not used carefully.
RLock (Reentrant Lock)
RLocks allow the same thread to acquire the lock multiple times:
from threading import RLock
counter = 0
lock = RLock()
def nested_function():
global counter
with lock:
counter += 1
def increment():
global counter
for _ in range(1000000):
with lock:
nested_function()
def decrement():
global counter
for _ in range(1000000):
with lock:
counter -= 1
import threading
thread1 = threading.Thread(target=increment)
thread2 = threading.Thread(target=decrement)
thread1.start()
thread2.start()
thread1.join()
thread2.join()
print(counter)
Condition Variables
Condition variables allow threads to wait for certain conditions to be met:
import threading
class Speaker(threading.Thread):
def __init__(self, condition):
super().__init__(name="Speaker")
self.condition = condition
def run(self):
with self.condition:
print(f"{self.name}: Hello")
self.condition.notify()
self.condition.wait()
print(f"{self.name}: How are you?")
self.condition.notify()
self.condition.wait()
print(f"{self.name}: Let's discuss programming")
self.condition.notify()
class Listener(threading.Thread):
def __init__(self, condition):
super().__init__(name="Listener")
self.condition = condition
def run(self):
with self.condition:
self.condition.wait()
print(f"{self.name}: Hi there")
self.condition.notify()
self.condition.wait()
print(f"{self.name}: I'm good, thanks!")
self.condition.notify()
self.condition.wait()
print(f"{self.name}: Sounds great!")
self.condition.notify()
if __name__ == "__main__":
condition = threading.Condition()
speaker = Speaker(condition)
listener = Listener(condition)
speaker.start()
listener.start()
Semaphores
Semaphores control the number of threads that can access a resource simultaneously:
import threading
import time
class WebCrawler(threading.Thread):
def __init__(self, url, semaphore):
super().__init__()
self.url = url
self.semaphore = semaphore
def run(self):
with self.semaphore:
time.sleep(2) # Simulate network request
print(f"Crawled: {self.url}")
class URLGenerator(threading.Thread):
def __init__(self, semaphore):
super().__init__()
self.semaphore = semaphore
def run(self):
urls = [f"https://example.com/page{i}" for i in range(10)]
for url in urls:
with self.semaphore:
crawler = WebCrawler(url, self.semaphore)
crawler.start()
if __name__ == "__main__":
semaphore = threading.Semaphore(3) # Limit to 3 concurrent crawlers
url_generator = URLGenerator(semaphore)
url_generator.start()
Thread Pools with ThreadPoolExecutor
Thread pools efficiently manage multiple threads by reusing them for different tasks:
from concurrent.futures import ThreadPoolExecutor
import time
def fetch_page(url, delay):
time.sleep(delay)
return f"Fetched: {url} in {delay} seconds"
if __name__ == "__main__":
urls = ["https://example.com", "https://example.org", "https://example.net"]
delays = [1, 2, 3]
with ThreadPoolExecutor(max_workers=2) as executor:
# Submit tasks
futures = [executor.submit(fetch_page, url, delay)
for url, delay in zip(urls, delays)]
# Check task completion
for future in futures:
print(future.result())
Processing Results with as_completed
The as_completed function returns results as tasks complete, regardless of submission order:
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
def fetch_page(url, delay):
time.sleep(delay)
return f"Fetched: {url} in {delay} seconds"
if __name__ == "__main__":
urls = ["https://example.com", "https://example.org", "https://example.net"]
delays = [3, 1, 2] # Different delays
with ThreadPoolExecutor(max_workers=2) as executor:
futures = [executor.submit(fetch_page, url, delay)
for url, delay in zip(urls, delays)]
# Results will be returned in completion order
for future in as_completed(futures):
print(future.result())
Using map for Ordered Results
The map function returns results in the same order as the input arguments:
from concurrent.futures import ThreadPoolExecutor
import time
def fetch_page(url, delay):
time.sleep(delay)
return f"Fetched: {url} in {delay} seconds"
if __name__ == "__main__":
urls = ["https://example.com", "https://example.org", "https://example.net"]
delays = [3, 1, 2]
with ThreadPoolExecutor(max_workers=2) as executor:
# Results will be returned in input order
for result in executor.map(lambda args: fetch_page(*args),
list(zip(urls, delays))):
print(result)
Waiting for Specific Tasks with wait
The wait function allows you to wait for specific task completion conditions:
from concurrent.futures import ThreadPoolExecutor, wait, FIRST_COMPLETED
import time
def fetch_page(url, delay):
time.sleep(delay)
return f"Fetched: {url} in {delay} seconds"
if __name__ == "__main__":
urls = ["https://example.com", "https://example.org", "https://example.net"]
delays = [3, 1, 2]
with ThreadPoolExecutor(max_workers=2) as executor:
futures = [executor.submit(fetch_page, url, delay)
for url, delay in zip(urls, delays)]
# Wait for first task to complete
wait(futures, return_when=FIRST_COMPLETED)
print("First task completed")
# Process remaining results
for future in as_completed(futures):
print(future.result())
Comparing Threads and Processes
Due to the GIL, Python threads cannot achieve true parallelism for CPU-bound tasks. For such tasks, multiprocessing is more effective:
CPU-Bound Tasks
import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
def fibonacci(n):
if n <= 1:
return n
return fibonacci(n-1) + fibonacci(n-2)
if __name__ == "__main__":
numbers = [30, 31, 32, 33, 34]
# Thread pool execution
start_time = time.time()
with ThreadPoolExecutor(max_workers=2) as executor:
results = list(executor.map(fibonacci, numbers))
thread_time = time.time() - start_time
print(f"Threads completed in {thread_time:.2f} seconds")
# Process pool execution
start_time = time.time()
with ProcessPoolExecutor(max_workers=2) as executor:
results = list(executor.map(fibonacci, numbers))
process_time = time.time() - start_time
print(f"Processes completed in {process_time:.2f} seconds")
I/O-Bound Tasks
For I/O-bound tasks, threads and processes perform similarly, but threads are more lightweight:
import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
def simulate_io(duration):
time.sleep(duration)
return duration
if __name__ == "__main__":
durations = [1] * 10 # Simulate 10 I/O operations
# Thread pool execution
start_time = time.time()
with ThreadPoolExecutor(max_workers=4) as executor:
results = list(executor.map(simulate_io, durations))
thread_time = time.time() - start_time
print(f"Threads completed in {thread_time:.2f} seconds")
# Process pool execution
start_time = time.time()
with ProcessPoolExecutor(max_workers=4) as executor:
results = list(executor.map(simulate_io, durations))
process_time = time.time() - start_time
print(f"Processes completed in {process_time:.2f} seconds")
Process Programming with multiprocessing
The multiprocessing module creates separate processes, each with its own Python interpreter and memory space:
import os
import time
def child_process():
print(f"Child process: {os.getpid()}, Parent: {os.getppid()}")
time.sleep(2)
print("Child process completed")
if __name__ == "__main__":
print(f"Main process: {os.getpid()}")
pid = os.fork()
if pid == 0:
child_process()
else:
print(f"Created child process with PID: {pid}")
os.waitpid(pid, 0) # Wait for child to complete
print("Main process completed")
Process Pools
The Pool class provides a convenient way to distribute tasks across multiple processes:
import multiprocessing
import time
def process_task(task_id, duration):
time.sleep(duration)
return f"Task {task_id} completed in {duration} seconds"
if __name__ == "__main__":
tasks = [(i, i%2+1) for i in range(5)] # (task_id, duration)
with multiprocessing.Pool(processes=2) as pool:
# Process tasks asynchronously
results = pool.starmap(process_task, tasks)
for result in results:
print(result)
Process Communication
Queue
Queues can be used for communication between processes:
import multiprocessing
import time
def producer(queue):
for i in range(5):
queue.put(f"Item-{i}")
time.sleep(0.5)
print("Producer finished")
def consumer(queue):
while True:
item = queue.get()
if item is None: # Sentinel value to stop
break
print(f"Consumed: {item}")
print("Consumer finished")
if __name__ == "__main__":
queue = multiprocessing.Queue()
prod = multiprocessing.Process(target=producer, args=(queue,))
cons = multiprocessing.Process(target=consumer, args=(queue,))
prod.start()
cons.start()
prod.join()
queue.put(None) # Send sentinel value
cons.join()
Pipe
Pipes provide a more efficient communication mechanism between two processes:
import multiprocessing
def sender(pipe):
for i in range(5):
pipe.send(f"Message-{i}")
pipe.close()
def receiver(pipe):
while True:
try:
msg = pipe.recv()
print(f"Received: {msg}")
except EOFError:
break
pipe.close()
if __name__ == "__main__":
recv_pipe, send_pipe = multiprocessing.Pipe()
sender_proc = multiprocessing.Process(target=sender, args=(send_pipe,))
receiver_proc = multiprocessing.Process(target=receiver, args=(recv_pipe,))
sender_proc.start()
receiver_proc.start()
sender_proc.join()
receiver_proc.join()
Shared Memory
Manager objects allow sharing complex data structures between processes:
import multiprocessing
def update_shared_data(shared_dict, key, value):
shared_dict[key] = value
def main():
manager = multiprocessing.Manager()
shared_dict = manager.dict()
processes = []
for i in range(5):
p = multiprocessing.Process(
target=update_shared_data,
args=(shared_dict, f"Key-{i}", f"Value-{i}")
)
processes.append(p)
p.start()
for p in processes:
p.join()
print("Shared data:", shared_dict)
if __name__ == "__main__":
main()