Python Multiprocessing: fork(), Process Class, Pool, and Queue Communication

Process vs Program

A program is code that has been written but not yet executed. When code is running, it becomes a process. A process contains not only the executable code but also the runtime environment and system resources.

In operating systems, a process is the smallest unit of resource allocation.

Creating Processes with fork()

The os module in Python provides a wrapper for common system calls, including fork(), which enables creating child processes:

import os

# fork() only works on Unix/Linux/Mac systems, not Windows
child_pid = os.fork()

if child_pid == 0:
    print('Child process executing')
else:
    print('Parent process executing')

How fork() Works

When os.fork() is called, the operating system creates a new process (child process) and copies all information from the parent process. Both parent and child then continue execution from the fork point.

Key points about fork():

  • Ordinary function calls execute once and return once
  • fork() executes once but returns twice—once in the parent and once in the child
  • The child process always receives 0 as the return value
  • The parent process receives the child's PID (process ID)

This design allows the parent to track all its children by their PIDs, while each child can retrieve the parent's PID using getppid().

Identifying Processes: getpid() and getpppid()

import os

result_pid = os.fork()

if result_pid < 0:
    print("Fork operation failed")
elif result_pid == 0:
    print(f"Child process PID: {os.getpid()}, Parent PID: {os.getppid()}")
else:
    print(f"Parent process PID: {os.getpid()}, Child PID: {result_pid}")

print("Both processes execute this code")

Sample output:

Parent process PID: 19360, Child PID: 19361
Both processes execute this code
Child process PID: 19361, Parent PID: 19360
Both processes execute this code

Process Resource Cleanup with wait()

When a child process completes execution, the parent process should reclaim its resources using os.wait():

import os
import time

pid = os.fork()

if pid == 0:
    for i in range(5):
        print(f"Child {os.getpid()} working...")
        time.sleep(1)
else:
    print(f"Parent {os.getpid()} waiting...")
    # wait() returns a tuple: (child_pid, exit_status)
    # exit_status of 0 indicates normal termination
    child_pid, exit_status = os.wait()
    print(f"Reclaimed child: {child_pid}, status: {exit_status}")
    print("Cleanup complete")

Orphan and Zombie Processes

Orphan Processes

An orphan process occurs when the parent terminates before the child completes execution. On Unix/Linux systems, orphaned children are adopted by the init process (PID 1). Since they will eventually be cleaned up, orphan proceses pose no significant danger.

import os
import time

pid = os.fork()

if pid == 0:
    for i in range(100):
        print(f"Child {os.getpid()}, Parent PID: {os.getppid()}")
        time.sleep(1)
    else:
        print(f"Child {os.getpid()} completed")

Zombie Processes

A zombie process is created when a child finishes execution but the parent hasn't called wait() to collect its exit status. The child remains in the process table, consuming system resources. Excessive zombie processes degrade system performance and should be avoided.

import os
import time

pid = os.fork()

if pid == 0:
    print(f"Child {os.getpid()} exiting")
else:
    while True:
        print(f"Parent {os.getpid()} running indefinitely")
        time.sleep(1)

Global Variables in Multiprocessing

#coding=utf-8
import os
import time

counter = 0

pid = os.fork()

if pid == 0:
    counter += 1
    print(f'Child process: counter={counter}')
else:
    time.sleep(1)
    counter += 1
    print(f'Parent process: counter={counter}')

Each process maintains its own memory space, including global variables. Modifications in one process do not affect the other—this is known as copy-on-write semantics.

Cross-Platform Multiprocessing with multiprocessing Module

Windows lacks native fork() support, so Python provides the multiprocessing module as a cross-platform solution for creating multiple processes.

Basic Process Creation

#coding=utf-8
from multiprocessing import Process
import os

def run_task(name):
    print(f'Child process running: name={name}, pid={os.getpid()}')

if __name__ == '__main__':
    print(f'Parent process: {os.getpid()}')
    p = Process(target=run_task, args=('test',))
    print('Child process starting')
    p.start()
    p.join()
    print('Child process finished')

Process Constructor Parameters

Process([group [, target [, name [, args [, kwargs]]]]])
  • target: The callable object executed by the process
  • args: Tuple of arguments passed to the target
  • kwargs: Dictionary of keyword arguments passed to the target
  • name: Custom name for the process instance
  • group: Reserved for future extensions (typically None)

Process Instance Methods

Method Description
start() Initiates the process
join([timeout]) Waits for process completion
is_alive() Checks if process is still running
run() Executes if no target is specified
terminate() Forcefully terminates the process

Process Instance Properties

Property Description
name Process identifier (default: Process-N)
pid Process ID of the instance

Complete Process Example

from multiprocessing import Process
import os
from time import sleep

def run_task(name, age, **kwargs):
    for i in range(10):
        print(f'Child running: name={name}, age={age}, pid={os.getpid()}')
        print(f'Extra args: {kwargs}')
        sleep(0.5)

if __name__ == '__main__':
    print(f'Parent process: {os.getpid()}')
    p = Process(target=run_task, args=('test', 18), kwargs={"m": 20})
    print('Child process starting')
    p.start()
    sleep(1)
    p.terminate()
    p.join()
    print('Child process terminated')

Output:

Parent process: 21378.
Child process starting
Child running: name=test, age=18, pid=21379...
{'m': 20}
Child running: name=test, age=18, pid=21379...
{'m': 20}
Child process terminated

Multi-Process Coordination Example

from multiprocessing import Process
import time
import os

def worker_one(interval):
    print(f"Worker 1 - Parent: {os.getppid()}, Current: {os.getpid()}")
    start = time.time()
    time.sleep(interval)
    end = time.time()
    print(f"Worker 1 completed in {end - start:.2f} seconds")

def worker_two(interval):
    print(f"Worker 2 - Parent: {os.getppid()}, Current: {os.getpid()}")
    start = time.time()
    time.sleep(interval)
    end = time.time()
    print(f"Worker 2 completed in {end - start:.2f} seconds")

print(f"Main process ID: {os.getpid()}")

p1 = Process(target=worker_one, args=(2,))
p2 = Process(target=worker_two, name="WorkerTwo", args=(1,))

p1.start()
p2.start()

print(f"p2 running: {p2.is_alive()}")

print(f"p1.name={p1.name}, p1.pid={p1.pid}")
print(f"p2.name={p2.name}, p2.pid={p2.pid}")

p1.join()
print(f"p1 still running: {p1.is_alive()}")

Output:

Main process ID: 19866
p2 running: True
p1.name=Process-1, p1.pid=19867
p2.name=WorkerTwo, p2.pid=19868
Worker 1 - Parent: 19866, Current: 19867
Worker 2 - Parent: 19866, Current: 19868
Worker 2 completed in 1.00 seconds
Worker 1 completed in 2.00 seconds
p1 still running: False

Subclassing Process

Define a custom process class by inheriting from Process:

from multiprocessing import Process
import time
import os

class CustomProcess(Process):
    def __init__(self, interval):
        # Initialize parent class to ensure proper setup
        Process.__init__(self)
        self.interval = interval

    def run(self):
        print(f"Child {os.getpid()} starting, parent: {os.getppid()}")
        start = time.time()
        time.sleep(self.interval)
        end = time.time()
        print(f"{os.getpid()} finished in {end - start:.2f}s")

if __name__ == "__main__":
    start = time.time()
    print(f"Main process: {os.getpid()}")
    p1 = CustomProcess(2)
    p1.start()
    p1.join()
    end = time.time()
    print(f"Total time: {end - start:.2f}s")

Process Pool with Pool Class

For managing numerous child processes, Pool provides convenient process reuse:

from multiprocessing import Pool
import os
import time
import random

def worker(task_id):
    start = time.time()
    print(f"Task {task_id} starting, pid={os.getpid()}")
    time.sleep(random.random() * 2)
    end = time.time()
    print(f"Task {task_id} done in {end - start:.2f}s")

pool = Pool(3)  # Maximum 3 concurrent processes
for i in range(10):
    pool.apply_async(worker, (i,))

print("=== Pool started ===")
pool.close()  # No new tasks accepted
pool.join()   # Wait for all tasks to complete
print("=== All tasks finished ===")

Sample output:

=== Pool started ===
Task 0 starting, pid=21466
Task 1 starting, pid=21468
Task 2 starting, pid=21467
Task 0 done in 1.01s
Task 3 starting, pid=21466
Task 2 done in 1.24s
Task 4 starting, pid=21467
...
=== All tasks finished ===

Pool Methods

Method Description
apply_async(func, args) Non-blocking async call
apply(func, args) Blocking synchronous call
close() Stop accepting new tasks
terminate() Immediate termination
join() Wait for completion

Synchronous vs Asynchronous Execution

Synchronous execution blocks until completion:

from multiprocessing import Pool
import os
import time
import random

def worker(task_id):
    start = time.time()
    print(f"Task {task_id} starting, pid={os.getpid()}")
    time.sleep(random.random() * 2)
    end = time.time()
    print(f"Task {task_id} done in {end - start:.2f}s")

pool = Pool(3)
for i in range(10):
    pool.apply(worker, (i,))  # Blocks until complete

pool.close()
pool.join()

Tasks execute sequentially when using apply():

Task 0 starting, pid=21532
Task 0 done in 1.91s
Task 1 starting, pid=21534
...

Callback Functions

Callbacks execute in the parent process after the async task completse:

from multiprocessing import Pool
import time
import os

def background_task():
    print(f"Pool process: pid={os.getpid()}, parent={os.getppid()}")
    for i in range(3):
        print(f"Working on {i}")
        time.sleep(1)
    return "completed"

def callback(result):
    print(f"Callback executing in: pid={os.getpid()}")
    print(f"Result: {result}")

pool = Pool(3)
pool.apply_async(func=background_task, callback=callback)

time.sleep(5)
print(f"Main process: pid={os.getpid()}")

Output:

Pool process: pid=9401, parent=9400
Working on 0
Working on 1
Working on 2
Callback executing in: pid=9400
Result: completed
Main process: pid=9400

Inter-Process Communication with Queue

Basic Queue Operations

from multiprocessing import Queue

q = Queue(3)  # Maximum 3 messages
q.put("Message 1")
q.put("Message 2")
print(q.full())  # False
q.put("Message 3")
print(q.full())  # True

# Blocking put with timeout
try:
    q.put("Message 4", True, 2)
except:
    print(f"Queue full, current size: {q.qsize()}")

# Non-blocking put
try:
    q.put_nowait("Message 4")
except:
    print(f"Queue full, current size: {q.qsize()}")

# Safe writing
if not q.full():
    q.put_nowait("Message 4")

# Reading messages
if not q.empty():
    for _ in range(q.qsize()):
        print(q.get_nowait())

Output:

False
True
Queue full, current size: 3
Queue full, current size: 3
Message 1
Message 2
Message 3

Queue Methods Reference

Method Description
qsize() Number of messages in queue
empty() Returns True if queue is empty
full() Returns True if queue is full
get([block, timeout]) Retrieve message (blocking)
get_nowait() Non-blocking retrieve
put(item, [block, timeout]) Add message (blocking)
put_nowait(item) Non-blocking add

Producer-Consumer Pattern

from multiprocessing import Process, Queue
import os
import time
import random

def producer(q):
    for value in ['A', 'B', 'C']:
        print(f'Producing: {value}')
        q.put(value)
        time.sleep(random.random())

def consumer(q):
    while True:
        if not q.empty():
            value = q.get(True)
            print(f'Consumed: {value}')
            time.sleep(random.random())
        else:
            break

if __name__ == '__main__':
    q = Queue()
    producer_process = Process(target=producer, args=(q,))
    consumer_process = Process(target=consumer, args=(q,))

    producer_process.start()
    producer_process.join()

    consumer_process.start()
    consumer_process.join()

    print('All data produced and consumed')

Queue in Process Pools

When using Pool, use multiprocessing.Manager().Queue() instead of Queue():

from multiprocessing import Manager, Pool
import os
import time

def reader(q):
    print(f"Reader started: pid={os.getpid()}, parent={os.getppid()}")
    for _ in range(q.qsize()):
        print(f"Read: {q.get(True)}")

def writer(q):
    print(f"Writer started: pid={os.getpid()}, parent={os.getppid()}")
    for char in "Python":
        q.put(char)

if __name__ == "__main__":
    print(f"Main process: {os.getpid()}")
    q = Manager().Queue()
    pool = Pool()

    pool.apply(writer, (q,))
    pool.apply(reader, (q,))

    pool.close()
    pool.join()
    print("Done")

Output:

Main process: 21156
Writer started: pid=21162, parent=21156
Reader started: pid=21162, parent=21156
Read: P
Read: y
Read: t
Read: h
Read: o
Read: n
Done

Tags: python multiprocessing fork process pool Queue

Posted on Mon, 18 May 2026 19:32:41 +0000 by ReeceSayer