Understanding Processes
A process can be understood as an executing program or application. In operating systems, processes serve as the fundamental units for resource allocation.
Think of a real-world company as a process: the company provides resources (computers, desks, etc.), while the employees who perform the actual work represent threads.
Key points: A program must have at least one process, each process contains at least one thread, and multiple processes can handle multiple tasks concurrently.
Process States
When the number of tasks exceeds available CPU cores, some tasks execute while others wait, leading to different process states:
- Ready State: All conditions for execution are met, waiting for CPU allocation
- Running State: The CPU is currently executing the process
- Waiting State: The process is waiting for certain conditions to be met (e.g., a sleeping process)
Python Multiprocessing Fundamentals
For CPU-intensive operations, multiprocessing is preferable over threading. For I/O-bound operations, threading generally performs better. Process switching incurs higher overhead than thread switching.
Implementing Multiprocessing
Basic Process Creation
import time
import multiprocessing
def compute_fibonacci(n):
if n <= 2:
return 1
return compute_fibonacci(n-1) + compute_fibonacci(n-2)
def perform_delay(n):
time.sleep(n)
return n
if __name__ == '__main__':
# Creating a process
worker = multiprocessing.Process(target=compute_fibonacci, args=(30,))
worker.start()
print(f"Process ID: {worker.pid}")
worker.join()
print("Main process completed")
Process Pool Executor
from concurrent.futures import ProcessPoolExecutor
def process_task(item):
time.sleep(item)
return f"Task {item} completed"
if __name__ == '__main__':
with ProcessPoolExecutor(max_workers=4) as executor:
tasks = [1, 2, 3, 4, 5]
results = list(executor.map(process_task, tasks))
for result in results:
print(result)
Process Pool with Different Execution Modes
import multiprocessing
def execute_task(duration):
time.sleep(duration)
return duration
if __name__ == '__main__':
pool = multiprocessing.Pool(processes=multiprocessing.cpu_count())
# Ordered execution
for result in pool.imap(execute_task, [1, 5, 3]):
print(f"Completed task after {result} seconds")
# Unordered execution (results as they complete)
for result in pool.imap_unordered(execute_task, [1, 5, 3]):
print(f"Task finished in {result} seconds")
Inter-Process Communication
Using Queue for Communication
from multiprocessing import Process, Queue
def data_producer(queue):
queue.put('Data item')
time.sleep(1)
def data_consumer(queue):
time.sleep(2)
item = queue.get()
print(f"Received: {item}")
if __name__ == '__main__':
communication_queue = Queue()
producer = Process(target=data_producer, args=(communication_queue,))
consumer = Process(target=data_consumer, args=(communication_queue,))
producer.start()
consumer.start()
producer.join()
consumer.join()
Manager for Process Pools
from multiprocessing import Process, Manager, Pool
def modify_data(shared_dict, key, value):
shared_dict[key] = value
if __name__ == '__main__':
with Manager() as manager:
shared_dict = manager.dict()
processes = [
Process(target=modify_data, args=(shared_dict, f"key_{i}", i))
for i in range(5)
]
for p in processes:
p.start()
for p in processes:
p.join()
print(f"Shared data: {dict(shared_dict)}")
Pipe for Direct Communication
from multiprocessing import Process, Pipe
def send_data(pipe):
pipe.send('Message from sender')
time.sleep(1)
def receive_data(pipe):
time.sleep(2)
message = pipe.recv()
print(f"Received: {message}")
if __name__ == '__main__':
receiver_conn, sender_conn = Pipe()
sender = Process(target=send_data, args=(sender_conn,))
receiver = Process(target=receive_data, args=(receiver_conn,))
sender.start()
receiver.start()
sender.join()
receiver.join()