[Python] 동시성과 병렬성 문법 - GIL과 Multithreading
1. 멀티 스레드, 멀티 프로세스 개념
[운영체제] Thread 에서 정리한 적이 있음.
a. Processs vs Thread
Process: 컴퓨터에서 연속적으로 실행되고 있는 컴퓨터 프로그램, 즉 메모리에 올라와 실행되고 있는 독립적인 프로그램 인스턴스
Thread: 프로세스 내에서 동작되는 여러 실행의 흐름 단위
이렇게 하나의 프로세스 내에서 생성된 thread끼리 해당 프로세스의 영역을 공유한다.
- Processs는 독립적, Thread는 프로세스의 서브넷
- Process는 각각 독립적인 자원을 가진다. Thread는 프로세스의 자원을 공유한다. Process는 별도의 Code, Data, Stack, Heap을 독립적으로 가지는 반면 Thread는 Stack만 독립적으로 가지고 Code, Data, Heap은 서로 공유한다.
- Process는 자신만의 주소 영역을 가진다. Thread는 주소 영역을 공유한다.
- Process간에는 IPC기법으로 통신해야하지만 Thread는 필요없다.
b. multi process vs multi thread
아래와 같이 다양한 방식으로 실행하게 운영체제를 구성할 수 있다.
1) multi thread
멀티 스레드의 경우 한 개의 단일 어플리케이션을 여러 스레드로 구성후 작업 처리하는 것을 말한다.
장점은,
1. 사용자에 대한 응답성 향상
어떤 process가 두 역할을 한다고 가정해보자. 첫 번째 기능은 1~100만까지 더하는 기능, 2 번째 기능은 지금까지 더한 수를 전달해 주는 기능
이 두 기능을 하나의 process로 만들면 코드 상 1~100만까지 다 더하는 for문 코드가 다 돌고 나서 2번째 기능의 코드가 실행될 가능성이 크다. 그러면 응답시간이 길어진다.
하지만 첫번째 기능을 thread1, 두번째 기능을 thread2로 나눠서 생각하면 멀티 태스킹이나 멀티 프로세싱(병렬 처리)로 다른 cpu 코어에서 동시 처리가 가능해진다. 그러면 응답시간이 짧아질 수 있다.
2. 자원공유의 효율성
thread는 하나의 프로세스 내에서 자원 공유가 가능하기 때문에 프로세스의 데이터를 모두 접근 가능하다.
각각의 프로세스로 나뉘어져있다면 IPC기법과 같이 프로세스간 자원 공유를 위해 번거로운 작업이 필요없다.
추가적으로, 만약 동시 실행을 위해 6개의 프로세스를 만들었다면 6개의 프로세스 공간이 필요해진다. (하나의 프로세스당 4G라면 총 24G)
그런데 하나의 프로세스당 6개의 쓰레드를 만들면 4G만 있으면 된다. (자원 효율성)
3. 작업이 분리되어 코드가 간결하지만 이는 코드를 어떻게 작성하느냐에 따라 달라지는 문제이긴 하다.
단점은
1. thread는 하나의 프로세스 안에서 작동하기때문에 thread중 한 thread만 문제가 생겨도, 전체 프로세스가 영향을 받는다.
2. thread를 많이 생성하면, Context Switching이 많이 일어나 성능이 저하되기도 한다.
예를 들어, Linux OS에서는 Thread를 Process와 거의 동일하게 다룬다. 그래서 thread역시 많이 생성하면 모든 Thread를 스케쥴링 해야하므로 Context Switching이 빈번할 수 밖에 없다.
3. 그 외에도 자원 공유에서 문제가 발생할 수도 있고(교착 상태), 디버깅이 어렵고, 단일 프로세스에서 멀티 스레드는 효과가 약하다 라는 점 등이 있다.
2) multi process
- 한 개의 단일 어플리케이션을 여러 프로세스로 구성 후 작업 처리하는 것을 말한다.
장점은
1. 한 개의 프로세스 문제 발생은 다른 프로세스에 영향을 주지 않는다.
단점은
2. 복잡한 구현, 복잡한 통신 방식 사용
3. 오버 헤드 발생 가능성
2. Python의 GIL(Global Interpretor Lock)이란?
GIL에 대해서는 <왜 Python에는 GIL이 있는가> 와 <What Is the Python Global Interpreter Lock (GIL)?>, <Memory Management in Python>글을 보고 정리하였다.
In CPython, the global interpreter lock, or GIL, is a mutex that protects access to Python objects, preventing multiple threads from executing Python bytecodes at once. This lock is necessary mainly because CPython's memory management is not thread-safe.
python에서는 멀티 스레드로 코딩을 했어도, 한 타이밍에서는 한 thread만 Python object에접근 가능하록 해놓음.
즉, 단일 스레드만이 Python object에 접근하게 제한하는 mutex.
왜냐하면 Cpython에서는 메모리 관리가 문제가 생길 때 이를 해결하기 위해선 즉, thread를 safe하게 만들기 위해서는 Gil이 필요하다고 판단하기 때문이다.
메모리를 페이지 수가 정해져있는 빈 책이라고 생각해보자. 메모리 크기가 정해져있고, 아직 아무 것도 저장되어있지 않다면 그렇게 비유 할 수 있을 것이다. 그리고 데이터를 메모리에 저장해야하는 application이나 process가 이 책의 작성자가 되고,
작성자가 해당 책에서 어느 페이지, 어느 곳에 작성할 수 있는지 결정하는 관리자가 메모리 관리자가 될 것이다.
그리고 오래되어 아무도 보지 않는 책의 내용을 삭제하고 새로운 내용을 쓸 수 있도록 하는 것이 있다면 그게 바로 garbage collector이다.
python 코드의 메모리 관리는 OS위에 있는 application중 python application에서 처리하는데, Default Python Implementation가 CPython이다. CPython은 C로 작성되었으며 Python bytecode를 interpret한다.
다시말해 CPython의 코드에 메모리를 관리하는 알고리즘과 구조가 구현되어 있는 것이고, 이를 통해 python 코드의 메모리 관리가 된다.
메모리 관리(메모리 할당 및 해제)에서 중요한 것은 ' thread들은 process가 공유하는 메모리에 접근할 수 있는데, 메모리는 컴퓨터의 공유 자원이기 때문에 여러개의 프로세스가 동시에 메모리의 같은 위치를 사용하려고 하면 안 된다'는 것이다.
여러 thread가 공유된 데이터에 접근해서 변경함으로서 race condition이 발생할 수 있고 각 thread들이 race condition을 발생시키지 않으면서 각자의 일을 잘 수행하는 것이 thread-safe하다. 그럼 그렇지 않은게 not thread-safe이다.
그럼 CPython의 메모리 관리 방식이 어떻길래 not thread-safe하다는 것일까?
Python은 모든 것이 객체이다. 그리고 그 모든 객체는PyObject라는 struct타입이다.
PyObject는
- ob_refcnt: reference count
- ob_type: pointer to another type
을 가지는데 CPython은 생성되는 객체 PyObject의 reference를 세어가면서(ob_refcnt) 메모리 관리를 한다.
그런데 객체의 reference를 세다가 race condition이 발생한다던지 할 수 있기 때문에 메모리 관리의 필요성이 있다. 이 방식대로라면 not thread-safe하다..
이러한 위험을 CPython의 메모리 관리에서는 어떻게 처리하고 있을까?
ob_refcnt가 바뀔 때 마다 객체 하나하나마다 mutex를 통해 보호해야할까? 이 방식은 성능적으로도 좋지 않고 deadlock을 발생 시킬 수도 있다. 대신 CPython은 GIL를 사용해서 하나의 thread만 실행 상태에 있을 수 있게 한다. multi thread를 사용하는 코드라도 한 thread만 python code를 실행 할 수 있도록 하는 것이다. (그래서 단일 thread 프로그램을 실행할 때는 GIL의 영향을 모르겠지만 multi thread code에서는 성능 병목 현상이 발생할 수 있다.)
그래서 multi thread 방식으로 짜면 GIL이란 것이 있다.
3. Multithreading
1) 기초 thread 생성 예제 - Main(Parents) Thread vs Sub(Child) Thread
Main(Parents) Thread에서 새로운 Sub(Child) Thread를 생성 했을 때 Sub(Child) Thread가 끝날 때까지 대기
import logging
import threading
import time
# 스레드 실행 함수
def thread_func(name):
logging.info("Sub-Thread: %s starting", name)
time.sleep(3) # sub thread에서 실행할 작업을 구현해줌 (일단 sleep 으로 해보겠음)
#3초가 지난 후 sub thread가 끝난다.
logging.info("Sub-Thread: %s finishing", name)
# 메인 영역
if __name__ == "__main__":
# Loggin format 설정
format = "%(asctime)s: %(message)s"
logging.basicConfig(format=format, level=logging.INFO, dateformat="%H:%M:%S")
logging.info("Main-Thread: before creating thread") # 서브 thread가 만들어 지기 전
# 함수 인자 확인
x = threading.Thread(target=thread_func, args=('First',)) # 서브 thread 만들어짐
logging.info("Main-Thread: before running thread") # 서브 thread가 실행 되기 전
x.start() # 서브 thread 실행
logging.info("Main-Thread: wait for the thread to finish") # 서브 thread가 실행 될 때까지 기다림
logging.info("Main-Thread: all threads are done") # 모든 thread 실행 끝
실행 결과는
17:13:30: Main-Thread: before creating thread
17:13:30: Main-Thread: before running thread
17:13:30: Sub-Thread : First starting
17:13:30: Main-Thread: wait for the thread to finish
17:13:30: Main-Thread: all threads are done # 즉 여기서 main thread는 끝났는데 sub thread는 자기 할 일(3초 sleep하기)를 계속하고
17:13:33: Sub-Thread: First finishing # 여기서 끝남
2) Daemon thread
- 백 그라운드에서 실행
- 1)에서 살펴본 것과 다르게 main thread 종료시 sub thread도 즉시 종료된다.
- 주로 백그라운드 무한 대기 이벤트 발생을 실행하는 부분을 담당한다. ex) main thread영역에 while문 등으로 계속 sub thread가 생성, 실행되게 해 놓고 main thread가 종료되면 그 때부터는 모든 실행이 종료되도록 하는 경우
import logging
import threading
import time
# 스레드 실행 함수
def thread_func(name, d):
logging.info("Sub-Thread: %s starting", name)
for i in d:
print(i)# sub thread에서 실행할 작업을 구현해줌
logging.info("Sub-Thread: %s finishing", name)
# 메인 영역
if __name__ == "__main__":
# Loggin format 설정
format = "%(asctime)s: %(message)s"
logging.basicConfig(format=format, level=logging.INFO, dateformat="%H:%M:%S")
logging.info("Main-Thread: before creating thread") # 서브 thread가 만들어 지기 전
# 함수 인자 확인
x = threading.Thread(target=thread_func, args=('First', range(20000), daemon=True) # 서브 thread 만들어짐 #daemon이라는 옵션을 true로 해주면 daemon thread
y = threading.Thread(target=thread_func, args=('Second', range(10000), daemon=True) # 서브 thread 만들어짐
# 생성한 sub thread가 Daemon thread인지 아닌지 확인하려면
print(x.isDaemon())
logging.info("Main-Thread: before running thread") # 서브 thread가 실행 되기 전
x.start() # 서브 thread 실행
y.start()
logging.info("Main-Thread: wait for the thread to finish") # 서브 thread가 실행 될 때까지 기다림
logging.info("Main-Thread: all threads are done") # 모든 thread 실행 끝
실행 결과는
17:13:30: Main-Thread: before creating thread
17:13:30: Main-Thread: before running thread
17:13:30: Sub-Thread : First starting
17:13:30: Main-Thread: wait for the thread to finish
1
17:13:30: Main-Thread: all threads are done # 즉 여기서 main thread는 끝났는데 sub thread는 자기 할 일(3초 sleep하기)를 계속하고
1 # main thread 종료시 sub thread가 1만 출력하고 같이 바로 종료되어 버림
3) ThreadPoolExecutor
여러 thread를 생성할 때 유용한 방법인 concurrent.futures 패키지 이용
- 대기 중이니 작업 --> Queue -> 완료 상태 조사 -> 결과 또는 예외
# 그룹 thread
import logging
from concurrent.futures import ThreadPoolExecutor #multi threading을 편하게 해주는 패키지
import times
def task(name):
logging.info("Sub-Thread: %s starting", name)
result = 0
for i in range(10000):
result = result + i # sub thread에서 실행할 작업을 구현해줌
logging.info("Sub-Thread: %s finishing. result: %d", name, result)
return result
def main():
# Loggin format 설정
format = "%(asctime)s: %(message)s"
logging.basicConfig(format=format, level=logging.INFO, dateformat="%H:%M:%S")
logging.info("Main-Thread: before creating thread") # 서브 thread가 만들어 지기 전
logging.info("Main-Thread: before running thread") # 서브 thread가 실행 되기 전
# 실행 방법1: max_workers(thread 개수) 설정 해서 하는 방법.
executor = ThreadPoolExecutor(max_workers=3)
task1 = executor.submit(task, ("First", ))
task2 = executor.submit(task, ("Second", ))
# 결과 returun값이 있는 경우
print()
print(task1.result())
print(task2.result())
if __name__ == "__main__":
main()
# 그룹 thread
import logging
from concurrent.futures import ThreadPoolExecutor #multi threading을 편하게 해주는 패키지
import times
def task(name):
logging.info("Sub-Thread: %s starting", name)
result = 0
for i in range(10000):
result = result + i # sub thread에서 실행할 작업을 구현해줌
logging.info("Sub-Thread: %s finishing. result: %d", name, result)
return result
def main():
# Loggin format 설정
format = "%(asctime)s: %(message)s"
logging.basicConfig(format=format, level=logging.INFO, dateformat="%H:%M:%S")
logging.info("Main-Thread: before creating thread") # 서브 thread가 만들어 지기 전
logging.info("Main-Thread: before running thread") # 서브 thread가 실행 되기 전
# 실행 방법2: with 컨텍스트 관리자 사용
with ThreadPoollExecutor(max_workers=3) as executor:
tasks = executor.map(task, ['First', 'Second'])
# 결과 returun값이 있는 경우
print()
print(tasks))
if __name__ == "__main__":
main()
실행 결과는
17:13:30: Main-Thread: before creating thread
17:13:30: Main-Thread: before running thread
17:13:30: Sub-Thread :("First", ) starting
17:13:30: Sub-Thread: ("First", ) finishing. result: 5005000
17:13:30: Sub-Thread :("Second", ) starting
5005000
17:13:30: Sub-Thread: ("Second", ) finishing. result: 5005000
5005000
4) Lock, Deadlock
[운영체제] Semaphore와 deadlock, starvation
1. Semaphore:프로세스간 공유 된 자원에 접근 시 교착상태를 예방하고 프로그램이 정상적으로 작동하게 하기 위한 방법으로 , 한 개의 프로세스만 접근 처리
2. Mutex: 공유된 자원의 데이터를 여러 스레드가 접근하는 것을 막는 것
- Thread Synchronization이 안되서 문제 발생하는 예시)
# 그룹 thread
import logging
from concurrent.futures import ThreadPoolExecutor #multi threading을 편하게 해주는 패키지
import times
class FakeDataStore:
# 공유 변수
# Data, Heap 영역에서 해당 값 공유(thread는 stack만 별도 할당이고, 나머지는 공유)
def __init__(self):
self.value = 0 #이 값은 공유한다.
# 변수 업데이트 함수
# thread 끼리 분리해서 사용하는 Stack 영역 thread값이 함수를 실행할 때 각각의 범위가 필요하기 떼문에 독립적(thread는 stack만 별도 할당이고, 나머지는 공유)
def update(self, n):
logging.info("Thread %s : starting update", n)
Mutex & Lock 등 Thread synchronization이 필요한 곳
local_copy = self.value # 자신만의 local 영역 즉, stack영역에 공유 변수를 복사하고,
local_copy += 1 # 값 변경
time.sleep(0.1)
self.value = local_copy # 변경 된 값을 공유 변수에 반영
# ================위의 코드 4줄.. 이렇게만 하면 동기화 되지 않은 self.value를 각 thread가 동시에 읽어가서 문제가 발생한다. 그래서 Thread synchronization이 필요허다.======================
logging.info("Thread %s : finishing update", n)
if __name__ == "__main__":
# Loggin format 설정
format = "%(asctime)s: %(message)s"
logging.basicConfig(format=format, level=logging.INFO, dateformat="%H:%M:%S")
# 객체 생성
store = FakeDataStore()
logging.info("Testing update. Starting value is %d", store.value)
# With Context 시작
with ThreadPoolExcutor(max_workers=2) as executor:
for n in ['First', 'Second', 'Third']: # thread가 3개 생성
executor.submit(store.update, n)
logging.info("Testing update. Ending value is %d', store.value)
- 동기화 문제 해결
방법 1.
# 그룹 thread
import logging
from concurrent.futures import ThreadPoolExecutor #multi threading을 편하게 해주는 패키지
import times
import threading
class FakeDataStore:
# 공유 변수
# Data, Heap 영역에서 해당 값 공유(thread는 stack만 별도 할당이고, 나머지는 공유)
def __init__(self):
self.value = 0 #이 값은 공유한다.
self._lock = threadinng.Lock()
# 변수 업데이트 함수
# thread 끼리 분리해서 사용하는 Stack 영역 thread값이 함수를 실행할 때 각각의 범위가 필요하기 떼문에 독립적(thread는 stack만 별도 할당이고, 나머지는 공유)
def update(self, n):
logging.info("Thread %s : starting update", n)
# Lock 획득
self._lock.acquire()
logging.info("Thread %s has lock", n)
# Mutex & Lock 등 Thread synchronization이 필요한 곳
local_copy = self.value # 자신만의 local 영역 즉, stack영역에 공유 변수를 복사하고,
local_copy += 1 # 값 변경
time.sleep(0.1)
self.value = local_copy # 변경 된 값을 공유 변수에 반영
# Lock 반환
self._lock.release()
logging.info("Thread %s release lock", n)
logging.info("Thread %s : finishing update", n)
if __name__ == "__main__":
# Loggin format 설정
format = "%(asctime)s: %(message)s"
logging.basicConfig(format=format, level=logging.INFO, dateformat="%H:%M:%S")
# 객체 생성
store = FakeDataStore()
logging.info("Testing update. Starting value is %d", store.value)
# With Context 시작
with ThreadPoolExcutor(max_workers=2) as executor:
for n in ['First', 'Second', 'Third']: # thread가 3개 생성
executor.submit(store.update, n)
logging.info("Testing update. Ending value is %d", store.value)
방법2.
# 그룹 thread
import logging
from concurrent.futures import ThreadPoolExecutor #multi threading을 편하게 해주는 패키지
import times
import threading
class FakeDataStore:
# 공유 변수
# Data, Heap 영역에서 해당 값 공유(thread는 stack만 별도 할당이고, 나머지는 공유)
def __init__(self):
self.value = 0 #이 값은 공유한다.
self._lock = threadinng.Lock()
# 변수 업데이트 함수
# thread 끼리 분리해서 사용하는 Stack 영역 thread값이 함수를 실행할 때 각각의 범위가 필요하기 떼문에 독립적(thread는 stack만 별도 할당이고, 나머지는 공유)
def update(self, n):
logging.info("Thread %s : starting update", n)
with self._lock: # context manager을 사용하면 위의 방법보단 매우 편리
logging.info("Thread %s has lock", n)
# Mutex & Lock 등 Thread synchronization이 필요한 곳
local_copy = self.value # 자신만의 local 영역 즉, stack영역에 공유 변수를 복사하고,
local_copy += 1 # 값 변경
time.sleep(0.1)
self.value = local_copy # 변경 된 값을 공유 변수에 반영
logging.info("Thread %s release lock", n)
logging.info("Thread %s : finishing update", n)
if __name__ == "__main__":
# Loggin format 설정
format = "%(asctime)s: %(message)s"
logging.basicConfig(format=format, level=logging.INFO, dateformat="%H:%M:%S")
# 객체 생성
store = FakeDataStore()
logging.info("Testing update. Starting value is %d", store.value)
# With Context 시작
with ThreadPoolExcutor(max_workers=2) as executor:
for n in ['First', 'Second', 'Third']: # thread가 3개 생성
executor.submit(store.update, n)
logging.info("Testing update. Ending value is %d', store.value)
5) Producer Consumer pattern (Producer and Consumer Pattern Using Queue)
멀티 스레드 디자인 패턴의 정석
Python의 이벤트 객체를 사용한다.
- 0으로 초기화 되어있는 Flag
- Set() 호출 시 1로 바뀜, Clear() 호출시 0으로 바뀜, Wait() 호출시 1일 땐 리턴하고 0일 땐 대기한다. isSet() 호출시 현 플래그 상태를 리턴
import queue
import random
import threading
import time
# 생산자
def producer(pipeline, event):
"""데이터를 생산하는 쪽은 네트워크 대기 상태라 가정 """
while not event.is_set():
message = random.randint(1, 11)
logging.info('Producer got message: %s', message)
pipeline.put(message)
logging.info('Producer received event Exiting')
# 소비자
def consumer():
""" 응답 받고 소비하는 것으로 가정 """
while not event.is_set() or not queue.empty():
message = queue.get()
logging.info('Consumor store message: %s (size: %d)', message, queue.qsize())
logging.info('Producer received event Exiting')
if __name__ == '__main__':
# Logging format 설정
format = "%(asctime)s: %(message)s"
logging.basicConfig(format=format, level=logging.INFO, datefmt="%H:%M:%S")
# queue의 사이즈 설정(중요 이슈)
pipeline = queue.Queue(maxsize=10)
# 이벤트 Flag 초기값 0
event = threading.Event()
# With Context 시작
with concurrennt.futures.ThreadPoolExecutor(max_workers=2) as executor:
executor.submit(producer, pipeline, event) # Thread 하나는 producer
executor.submit(consumer, pipeline, event) # 다른 Thread 하나는 consumer
# 실행 시간 조정
# while True:
# 서버 프로그램은 항상 켜져있어야 해서 보통 while True를 사용하지만, 지금은 일단 0.1초만 실행함
time.sleep(0.1)
logging.info("Main: about to set event")
# Set() 호출: 1(True)
event.set()
Code Reference
고수가 되는 파이썬 : 동시성과 병렬성 문법 배우기 Feat. 멀티스레딩 vs 멀티프로세싱