CSDN Challenge 2
Participation Topic: Study Notes
I. Process Object Methods
start: Start the process
run: Executes the task function specified by target
from multiprocessing import Process
import time
def task():
for i in range(6):
print(f"Main process {i} working")
time.sleep(1)
def study():
for i in range(5):
print(f"Main process {i} studying")
time.sleep(1)
if __name__ == '__main__':
p1 = Process(target=task)
p2 = Process(target=study)
p1.start()
p2.start()
print('Main process completed')
for i in range(6):
print('------------------------hahaha')
time.sleep(1)
Main process completed
------------------------hahaha
Main proces 0 working
Main process 0 studying
------------------------hahaha
Main process 1 working
Main process 1 studying
------------------------hahaha
Main process 2 working
Main process 2 studying
------------------------hahaha
Main process 3 working
Main process 3 studying
------------------------hahaha
Main process 4 working
Main process 4 studying
------------------------hahaha
Main process 5 working
II. Sharing Data Between Processes
Example One, Processes Are Independent and Do Not Affect Global Variables
from multiprocessing import Process
import time
n = 0
def task():
global n
n = 1000 + n
print('task', n)
def study():
global n
n = 10000 + n
print('study', n)
if __name__ == '__main__':
p1 = Process(target=task)
p2 = Process(target=study)
p1.start()
p2.start()
print('Main process completed', n)
Main process completed 0
task 1000
study 10000
Example Two, Using JoinableQueue for Inter-Process Communication
The creation of processes is controlled by the user, while the scheduling, execution, and blocking are managed by the operating system
data=q.get(timeout=1): If no data is available in the queue within 1 second, the thread will block
from multiprocessing import Process, JoinableQueue, Manager
import time
def producer(queue):
for i in range(5):
for j in range(20):
queue.put(f'Produced data{j}')
print(f'【Produced data{j}】')
time.sleep(1)
def consumer(queue):
while True:
for i in range(4):
try:
data = queue.get(timeout=1)
except:
return
else:
print('Received data:', data)
queue.task_done()
time.sleep(1)
if __name__ == '__main__':
q = JoinableQueue()
start_time = time.time()
p = Process(target=producer, kwargs={'queue': q})
p.start()
threads = []
for _ in range(3):
t = Process(target=consumer, kwargs={'queue': q})
t.start()
threads.append(t)
# Wait for production to complete
p.join()
for t in threads:
t.join()
# Wait for all queue items to be processed
q.join()
print('Main thread completed')
end_time = time.time()
print('Execution time:', end_time - start_time)
1. The queue module's Queue is only used within a single process (data sharing between threads)
2. multiprocessing.Queue is designed for data sharing between multiple processes (Process)
3. multiprocessing.Manager().Queue() is specifically used for communication in a process pool
III. Using a Process Pool
**t1 = ProcessPoolExecutor(max_workers=5): Create a process pool that can execute up to a certain number of tasks simultaneously
t1.submit(*args, kwargs): Submit a task to the process pool
t1.shutdown(): Wait for all tasks in the process pool to complete before proceeding
from concurrent.futures.process import ProcessPoolExecutor
from concurrent.futures.thread import ThreadPoolExecutor
import time
import threading
def task(name):
for i in range(6):
print(f"Process 1 working---{name}")
time.sleep(1)
def study(name):
for i in range(5):
print(f"Process 2 studying---{name}")
time.sleep(1)
if __name__ == '__main__':
# Create a process pool
pool = ProcessPoolExecutor(max_workers=5)
start_time = time.time()
# Submit tasks to the process pool
pool.submit(task, 'kobe')
pool.submit(study, 'james')
# Wait for all tasks in the process pool to complete
pool.shutdown()
end_time = time.time()
print('Execution time:', end_time - start_time)
IV. Submitting Multiple Tasks to a Pool
t1.map(func1, list): Submit multiple tasks too the pool
Equivalent to:
for item in list:
t1.submit(func1, item)
Each iteratoin submits one task to the pool
from concurrent.futures.process import ProcessPoolExecutor
from concurrent.futures.thread import ThreadPoolExecutor
import time
import threading
def func(item):
for i in range(2):
print('Executing task {}, round {}'.format(item, i))
time.sleep(0.25)
if __name__ == '__main__':
# Create a process pool
pool = ProcessPoolExecutor(max_workers=3)
data_list = [11, 6, 8, 24, 22] # Test data
# Submit multiple tasks to the process pool
pool.map(func, data_list)
# Equivalent to
# for item in data_list:
# pool.submit(func, item)
V. Using with Statement on Thread and Process Pools
from concurrent.futures.process import ProcessPoolExecutor
import time
import threading
def func(item):
for i in range(2):
print('Executing task {}, round {}'.format(item, i))
time.sleep(0.25)
if __name__ == '__main__':
start_time = time.time()
with ProcessPoolExecutor(max_workers=2) as executor:
executor.map(func, [11, 22, 33])
end_time = time.time()
print('Duration:', end_time - start_time)
VI. Inter-Process Communication
q = Manager().Queue(): Use a queue specifically for process pools for data transfer
from concurrent.futures.process import ProcessPoolExecutor
from multiprocessing import Manager
def process_one(queue):
num = queue.get()
num += 100
print("--------process_one-----number---", num)
queue.put(num)
def process_two(queue):
num = queue.get()
num += 66
print("--------process_two-----number---", num)
queue.put(num)
if __name__ == '__main__':
initial_value = 100
# Use a queue specifically for process pools
queue = Manager().Queue()
queue.put(initial_value)
with ProcessPoolExecutor(max_workers=3) as executor:
executor.submit(process_one, queue)
executor.submit(process_two, queue)
final_value = queue.get()
print('Main process value', final_value)
--------process_one-----number— 200
--------process_two-----number— 266
Main process value 266