Programming Language/Python

[Python] 동시성과 병렬성 문법 - Multiprocess

HJChung 2021. 8. 15. 16:00

1. join 과 is_alive을 이용한 process 실행 큰 틀

한 프로세스만 생성하여 실행해보기 위한 큰 틀은 아래와 같다. 

join 메소드는 파이썬에게 프로세스가 종료 될 때까지 대기하도록 지시하고, is_alive는 process가 실행되고 있는지 여부를 반환한다. 

join([timeout])

If the optional argument timeout is None (the default), the method blocks until the process whose join() method is called terminates. If timeout is a positive number, it blocks at most timeout seconds. Note that the method returns None if its process terminates or if the method times out. Check the process’s exitcode to determine if it terminated.
A process can be joined many times.
A process cannot join itself because this would cause a deadlock. It is an error to attempt to join a process before it has been started.

is_alive()

Return whether the process is alive.
Roughly, a process object is alive from the moment the start() method returns until the child process terminates.
from multiprocessing import Process
import time
import logging

# main 
def main():
    # Logging format 설정
    format = "%(asctime)s: %(message)s"
    logging.basicConfig(format=format, level=logging.INFO, datefmt="%H:%M:%S")
    
    # 함수 인자 확인
    p = Process(target=process_function, args=('First',))
    
    logging.info("Main-Process: before creating Process")
    # 프로세스 시작
    p.start()
    logging.info("Main-Process: During Process...")
    
    # 프로세스 종료 될 때까지 대기
    logging.info("Main-Process: Joined Process")
    p.join()
    
    # 프로세스 상태 확인
    print(f'Process p is alive: {p.is_alive()}')


# multiprocess로 실행할 함수
def process_function(name):
    print("This is Sub-Process {}: starting".format(name))
    time.sleep(3) # 3초간 쉬어주고
    print("This is Sub-Process {}: finishing".format(name))
    

if __name__ == '__main__':
	main()

그러면 실행 결과는

11:11:20: Main-Process: before creating Process
11:11:20: Main-Process: During Process...
11:11:20: Main-Process: Joined Process
This is Sub-Process First: starting

(여기서 3초가 흐른 뒤)
This is Sub-Process First: finishing
Process p is alive: False

가 된다. 

 

※ Process 강제 종료

어떤 특정 조건때 process를 강제 종료시켜야 할 경우 .terminate()를 사용한다. 

logging.info("Main-Process: Terminated Process:")
p.terminate()

 

2. Process

앞서 한 프로세스만 생성하여 실행해보기 위한 큰 틀을 배워보았고, 이번에는 다중으로 process를 생성하여 동시성, 병렬성으로 실행

Python multiprocessing Process class is an abstraction that sets up another Python process, provides it to run code and a way for the parent application to control execution.

 

from multiprocessing import Process, current_process
import os
import random
import time
import logging

# multi process로 할 함수
def square(n):
  time.sleep(3) 
  process_id = os.getpid()
  process_name = current_process().name

  result = n * n

  print(f'Process ID: {process_id}, Process name: {process_name}, result is: {result}')


if __name__ == '__main__':
  # parent process id 확인
  parent_process_id = os.getpid() # 현재 실행 중인 process의 Id를 알 수 있다. 
  print(f'Parent process id: {parent_process_id}')

  # process list를 선언하고, 이 list에 여러 process를 담아서 실행하는데 사용
  processes = list()

  # process 생성 및 실행
  for i in range(1, 10):
    p = Process(name='name'+str(i), target=square, args=(i,)) # name 지정
    processes.append(p)

    # 여기서 process를 시작하고, 
    p.start()

  # 여기서 join으로 기다려준다. 
  for process in processes:
      process.join() 
  
  # 종료
  print("parent process is done")

실행 결과

Parent process id: 15061
Process ID: 15062, Process name: name1, result is: 1
Process ID: 15063, Process name: name2, result is: 4
Process ID: 15064, Process name: name3, result is: 9
Process ID: 15065, Process name: name4, result is: 16
Process ID: 15066, Process name: name5, result is: 25
Process ID: 15067, Process name: name6, result is: 36
Process ID: 15068, Process name: name7, result is: 49
Process ID: 15069, Process name: name8, result is: 64
Process ID: 15070, Process name: name9, result is: 81
parent process is done

 

3. 병렬 작업 실행 - ProcessPoolExecutor

동시에 여러 웹사이트에 방문해서 어떤 값을 가져와보는 것을 multiprocessing으로 구현해보자. 

from concurrent.futures import ProcessPoolExecutor, as_completed
import urllib.request

# 조회 URLS
URLS = [
  'http://www.daum.net/',
  'http://www.cnn.com/',
  'http://naver.com/',
  'http://google.com/'
]

# 실행 함수
def load_url(url, timeout):
  with urllib.request.urlopen(url, timeout=timeout) as conn:
    return conn.read()

# main 함수
def main():
  with ProcessPoolExecutor(max_workers=4) as executor: # HERE!
    # Future(후에 할 일을 예약) 로드
    future_to_url = {executor.submit(load_url, url, timeout=60):  url for url in URLS}

    # 실행
    for future in as_completed(future_to_url):
      url = future_to_url[future]

      try:
        data = len(future.result())
      except Exception as exc:
        print(f'{url} generated an exception: {exc}')
      else:
        print(f'{url} page is {data} bytes')


if __name__ == '__main__':
  main()

 

4. 프로세스간 상태 공유  - 공유 메모리

- Value

동시성 프로그래밍을 할 때 보통 가능한 한 공유된 상태를 사용하지 않는 것이 좋지만 공유 데이터를 사용해야한다면 공유 메모리를 사용하는 방식이 있다. 

공유 메모리

공유할 데이터를 선언할 때  Value 또는 Array를 사용하여 공유 메모리 맵에 저장하고 사용한다. 

  • Value : multiprocessing.Value(typecode_or_type, *args, lock=True)
  • Array: multiprocessing.Array(typecode_or_type, size_or_initializer, *, lock=True)

공유 메모리를 사용하지 않았을 때 (왼쪽)의 코드를 보면 share_value = 0으로 해 놓고 공유해서 사용하려고 시도하지만 제대로 되지 않는다. 

프로세스간 공유된 상태를 사용해야 한다면 공유 메모리를 사용하여 오른쪽 코드처럼 공유 데이터를 선언할 때는 Value(또는 배열일 경우 Array)를 사용하여 뭘 공유 할 것인지 명시해주어야 한다. 

from multiprocessing import Process, current_process, Value, Array
import os

# 예제1) multi process에서 process간 메모리 공유하는 경우

# 실행 함수
def generate_update_number(v: Value):
  for _ in range(50):
    v.value += 1
  print(current_process().name, "data", v.value)

# main 함수
def main():
  # paent process id
  parent_process_id = os.getpid()
  print(f'Parent process ID: {parent_process_id}')

  # process list 선언
  processes = list()

  # process 메모리 공유 변수
  share_value = Value('i', 0) 
  # 그냥 share_value = 0 이 아니라 정확하게 뭘 공유 할 것인지 Value를 사용해서 명시해주어야 한다. 

  for _ in range(1, 10):
    p = Process(target=generate_update_number, args=(share_value,))
    processes.append(p)

    # 실행
    p.start()
  
  # join
  for p in processes:
    p.join()

  # 최종 프로세스 부모 변수 확인
  print('Final Data in parent process: ', share_value.value)

if __name__ == '__main__':
  main()

실행결과

Parent process ID: 18145
Process-2 data 65
Process-1 data 50
Process-3 data 100
Process-4 data 150
Process-5 data 200
Process-6 data 250
Process-7 data 300
Process-8 data 350
Process-9 data 400
Final Data in parent process:  400

 

5. 프로세스간 통신 - Queue와 Pipe

여러 프로세스를 사용할 때, 일반적으로 프로세스 간 통신을 위해 메세지 전달 방식

1) Queue

from multiprocessing import Process, Queue, current_process
import time
import os

def worker(id, baseNum, q):
  process_id = os.getpid()
  process_name = current_process().name

  sub_total = 0
  for i in range(baseNum):
    sub_total += 1
  
  # Produce
  q.put(sub_total)
  # 정보 출력
  print(f'Process ID: {process_id} Process Name: {process_name}')
  print(f'Result is: {sub_total}')

def main():
  parent_process_id = os.getpid()
  print(f'Parent process ID: {parent_process_id}')

  processes = list()
  
  # 성능 측정을 위한 시작 시간 기록
  start_time = time.time()

  # Queue 선언
  q = Queue()
  for i in range(5):
    p = Process(name=str(i), target=worker, args=(i, 10000, q))

    processes.append(p)

    p.start()
  
  for process in processes: 
    process.join()

  # 순수 계산 시간
  print("---- %s seconds ----" % (time.time() - start_time))

  # 종료 플래그
  q.put('exit')
  total = 0
  while True:
    tmp = q.get()
    if tmp == 'exit':
      break
    else:
      total += tmp 

  print(f'Main-Processing Total Count: {total}') 
  print ("Multi process is Done! ")                                                                                                                                                                                                            


if __name__ == "__main__":
  main()

실행결과

Parent process ID: 18812
Process ID: 18814 Process Name: 1
Result is: 10000
Process ID: 18813 Process Name: 0
Result is: 10000
Process ID: 18815 Process Name: 2
Result is: 10000
Process ID: 18816 Process Name: 3
Result is: 10000
Process ID: 18817 Process Name: 4
Result is: 10000
---- 0.13417410850524902 seconds ----
Main-Processing Total Count: 50000
Multi process is Done!

2) Pipe

from multiprocessing import Process, Pipe, current_process
import time
import os

def worker(id, baseNum, conn):
  process_id = os.getpid()
  process_name = current_process().name

  sub_total = 0
  for i in range(baseNum):
    sub_total += 1
  
  # Produce
  conn.send(sub_total) # child가 send로 보내면
  conn.close() # pipeline 잠그기

  # 정보 출력
  print(f'Process ID: {process_id} Process Name: {process_name}')
  print(f'Result is: {sub_total}')

def main():
  parent_process_id = os.getpid()
  print(f'Parent process ID: {parent_process_id}')

  # 성능 측정을 위한 시작 시간 기록
  start_time = time.time()

  # Pipe 선언
  parent_conn, child_conn = Pipe() 

  p = Process(name='child', target=worker, args=('child', 10000, child_conn))

  p.start()

  p.join()

  # 순수 계산 시간
  print("---- %s seconds ----" % (time.time() - start_time))

  print('Main-Processing Total Count: {}'.format(parent_conn.recv()))  # parent가 receive한다. 
  print ("Multi process is Done! ")                                                                                                                                                                                                            


if __name__ == "__main__":
  main()

실행 결과

Parent process ID: 82739
Process ID: 82740 Process Name: child
Result is: 10000
---- 0.08413195610046387 seconds ----
Main-Processing Total Count: 10000
Multi process is Done!

 

 

 

 

Code Reference

고수가 되는 파이썬 : 동시성과 병렬성 문법 배우기 Feat. 멀티스레딩 vs 멀티프로세싱