target: specifies the function the thread will execute
args: positional arguments passed to the target function (tuple)
kwargs: keyword arguments passed to the target function (dictionary)
name: gives the thread a name
Thread object methods:
start()
join()
run(): the task function is ultimately executed inside the thread's run method
import time
import threading
def execute_task(worker_id):
for idx in range(6):
print(f"Worker {worker_id} running iteration {idx}")
time.sleep(1)
def perform_study(student_name):
for idx in range(5):
print(f"Student {student_name} studying round {idx}")
time.sleep(1)
if __name__ == '__main__':
t1 = threading.Thread(target=execute_task, args=('alpha',), daemon=True)
t2 = threading.Thread(target=perform_study, args=('beta',), daemon=True)
t1.start()
t2.start()
t1.join()
t2.join()
print('Main thread finished')
- Using Multithreading with a Class
run(): the task function is ultimately executed inside the thread's run method
import time
import threading
class WorkerThread(threading.Thread):
def run(self):
for idx in range(6):
print(f"Worker iteration {idx}")
time.sleep(1)
if __name__ == '__main__':
worker = WorkerThread()
worker.start()
worker.join()
print('Main thread finished')
- Sharing a Global Variable Among Threads
import threading
shared_counter = 0
def increment_small():
global shared_counter
shared_counter += 10
print('increment_small', shared_counter)
def increment_large():
global shared_counter
shared_counter += 100
print('increment_large', shared_counter)
if __name__ == '__main__':
t1 = threading.Thread(target=increment_small)
t2 = threading.Thread(target=increment_large)
t1.start()
t2.start()
t1.join()
t2.join()
print('Final counter value in main thread:', shared_counter)
Example output:
increment_small 10
increment_large 110
Final counter value in main thread: 110
- Data Corruption When Sharing a Global Variable
Global Interpreter Lock (GIL) in Python:
Exists at the process level.
Because of the GIL, threads can only be concurrent; at any moment only one thread executes Python bytecode.
This means Python threads cannot fully utilise multi-core hardware resources.
**When does a Python thread switch? (GIL release)
- When the program encounters an I/O operation (waiting, blocking)
- When the program execution reaches a certain time threshold (0.005 seconds)**
import threading
shared_counter = 0
def adder_job_a():
global shared_counter
for _ in range(100000):
shared_counter += 1
print('adder_job_a', shared_counter)
def adder_job_b():
global shared_counter
for _ in range(100000):
shared_counter += 1
print('adder_job_b', shared_counter)
if __name__ == '__main__':
t1 = threading.Thread(target=adder_job_a)
t2 = threading.Thread(target=adder_job_b)
t1.start()
t2.start()
t1.join()
t2.join()
print('Final counter in main thread:', shared_counter)
Example output (corrupted):
adder_job_aadder_job_b 1853502
1000000
Final counter in main thread: 1853502
- Fixing Data Corruption with a Lock
When multiple threads manipulate the same global resource, the result may become inaccurate because thread switching occurs at unpredictable times.
Solution 1:
Use a lock to protect the code block that accesses the shared resource, preventing thread switches inside that block (explicitly controlling switch points).
Solution 2:
Use a queue to store data shared among threads.
from threading import Lock, Thread
shared_counter = 0
lock = Lock()
def safe_adder_a():
lock.acquire()
global shared_counter
for _ in range(1000000):
shared_counter += 1
print('safe_adder_a', shared_counter)
lock.release()
def safe_adder_b():
lock.acquire()
global shared_counter
for _ in range(1000000):
shared_counter += 1
print('safe_adder_b', shared_counter)
lock.release()
if __name__ == '__main__':
t1 = Thread(target=safe_adder_a)
t2 = Thread(target=safe_adder_b)
t1.start()
t2.start()
t1.join()
t2.join()
print('Final counter in main thread:', shared_counter)
safe_adder_a 1000000
safe_adder_b 2000000
Final counter in main thread: 2000000
- Fixing Data Corruption Using a Queue
task_done(): sends a signal to the queue that one task has been processed.
join(): blocks until all items in the queue have been processed.
What determines that all items are processed?
- The number of
task_done()calls matches the number of items put into the queue (eachput()must have a correspondingtask_done()).- The queue is empty.
If the queue contains 5 items, 3 have been retrieved and 3
task_done()signals were sent →join()will block.
If the queue contains 5 item, 3 are retrieved but 5task_done()signals are sent →join()will also block (mismatch).
from queue import Queue
import threading
import time
work_queue = Queue()
for i in range(20):
work_queue.put(f'http://data-source-{i}.example.com')
def fetch_and_process():
while work_queue.qsize() > 0:
item = work_queue.get()
print(f'Processing {item}')
time.sleep(1)
work_queue.task_done()
def main():
for _ in range(4):
t = threading.Thread(target=fetch_and_process)
t.start()
work_queue.join()
main()
- ThreadPoolExecutor
pool = ThreadPoolExecutor(max_workers=5): creates a thread pool with a maximum of 5 concurrent tasks.
pool.submit(work): submits a callable to the pool.
pool.shutdown(): waits for all submitted tasks to complete before continuing.
from concurrent.futures import ThreadPoolExecutor
import time
import threading
def background_job():
for i in range(6):
print(f"Background job iteration {i}")
time.sleep(1)
def training_job():
for i in range(5):
print(f"Training job step {i}")
time.sleep(1)
if __name__ == '__main__':
executor = ThreadPoolExecutor(max_workers=5)
start = time.time()
executor.submit(background_job)
executor.submit(training_job)
executor.shutdown()
end = time.time()
print('Total execution time:', end - start)
Background job iteration 0
Training job step 0
Background job iteration 1Training job step 1
Background job iteration 2Training job step 2
...
Background job iteration 5
Training job step 4
Total execution time: 6.05...
- ThreadPoolExecutor with Arguments
from concurrent.futures import ThreadPoolExecutor
import time
def execute_with_name(name):
for i in range(6):
print(f"Executor task for {name} – iteration {i}")
time.sleep(1)
def study_with_name(name):
for i in range(5):
print(f"Study task for {name} – round {i}")
time.sleep(1)
if __name__ == '__main__':
executor = ThreadPoolExecutor(max_workers=5)
start = time.time()
executor.submit(execute_with_name, 'alice')
executor.submit(study_with_name, 'bob')
executor.shutdown()
end = time.time()
print('Total execution time:', end - start)
Executor task for alice – iteration 0
Study task for bob – round 0
...
Executor task for alice – iteration 5
Total execution time: 6.05...
- Submitting Tasks in Bulk with map
pool.map(func, iterable): submits a batch of tasks to the pool.
from concurrent.futures import ThreadPoolExecutor
import time
def process_item(number):
for rnd in range(2):
print(f'Processing item {number}, round {rnd}')
time.sleep(0.25)
if __name__ == '__main__':
with ThreadPoolExecutor(max_workers=3) as pool:
data_items = [11, 6, 8, 24, 22]
pool.map(process_item, data_items)
Processing item 11, round 0
Processing item 6, round 0
Processing item 8, round 0
...
- Using the with Statement to Submit Batch Tasks
from concurrent.futures import ThreadPoolExecutor
import time
def handle(task_id):
for rnd in range(2):
print(f'Task {task_id} – cycle {rnd}')
time.sleep(0.25)
if __name__ == '__main__':
start = time.time()
with ThreadPoolExecutor(max_workers=2) as pool:
pool.map(handle, [101, 202, 303])
end = time.time()
print('Duration:', end - start)
Task 101 – cycle 0
Task 202 – cycle 0
...
Duration: 1.02...