이벤트 루프 & thread등을 사용해서 여러 클라이언트의 요청에 대응할 수 있도록 만들어졌는데,
call되는 순서는 아래와 같고, 자세한 구조를 뜯어보겠다.
- @routes.post("/prompt") 외부로 부터 /prompt call을 받으면, server의 이벤트 루프가 이걸 처리하는데,
처리과정에서 prompt_queue에 받은 내용을 기입한다.self.prompt_queue.put((number, prompt_id, prompt, extra_data, outputs_to_execute)) - Prompt_queue에 쌓인 내용은 prompt_worker가 처리하는데, prompt_worker는 별도 쓰레드로 동작한다.
threading.Thread(target=prompt_worker, daemon=True, args=(q, server,)).start() - prompt_worker은 아래와 같이 동작한다. (while True가 있다)
def prompt_worker(q, server):e = execution.PromptExecutor(server) #last_gc_collect = 0need_gc = Falsegc_collect_interval = 10.0
while True:timeout = 1000.0if need_gc:timeout = max(gc_collect_interval - (current_time - last_gc_collect), 0.0)
queue_item = q.get(timeout=timeout) ### 여기서 get을 한다.
## 전체 구조를 살짝 뜯어보면.
1. main.py 부분
아래와 같이 되어있는데,
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
server = server.PromptServer(loop)
q = execution.PromptQueue(server)
- asyncio.new_event_loop()로 새로운 이벤트 루프를 생성
- asyncio.set_event_loop(loop)를 통해 해당 루프를 현재 쓰레드의 기본 이벤트 루프로 설정.
- server.PromptServer가 위 loop를 사용하도록 설정하고, 이를 "server"변수로 정의. 이제 "server"변수는 PromptServer을 의미하며, 비동기 이벤트 루프를 사용하여 외부 요청 작업등을을 처리한다.
- 처리에 시간이 걸리는 작업(외부 응답을 받아야하는등)은 await을 해두고, 이벤트 루프에 쌓인 작업을 처리한다.
- q 란 PromptQueue인데, 선언을 자세히 뜯어보면, None으로 선언되어있던 PromptServer.prompt_queue를, execution.PromptQueue(server)에서 server을 인자값으로 가져다가, Prompt_Queue에서 self로 초기화한다.
- 이렇게도 코딩을 하나 싶은데, 상위객체를 받아다가 하위객체로 연결을 저렇게 한듯하다.
- 그러니까 PromptServer의 prompt_queue는, PromptQueue 초기화시에 할당된다. PromptQueue는 PromptServer의 prompt_queue를 통해 접근한다.
-
class PromptServer():...class PromptQueue:def __init__(self, server):self.server = server...server.prompt_queue = self
- PromptSever에서 loop는 아래와 같이 사용된다.
PromptServer의 messages변수는 Queue로 선언되어있고, - send_sync부분은 event, data, sid를 별로 쓰레드를(call_soon_threadsafe) 통해 메시지 queue에 내용을 삽입한다. loop(이벤트 루프)가 이미 별도 쓰레드로 메시지 queue를 읽고 있는데(이건 밑에설명) 다른 queue에서 내용을삽입하는게 안전하지 않을수 있어서 call_soon_threadsafe를 씀.
-
더보기class PromptServer():def __init__(self, loop):PromptServer.instance = self
mimetypes.init()mimetypes.types_map['.js'] = 'application/javascript; charset=utf-8'
self.user_manager = UserManager()self.supports = ["custom_nodes_from_web"]self.prompt_queue = Noneself.loop = loopself.messages = asyncio.Queue()def send_sync(self, event, data, sid=None):self.loop.call_soon_threadsafe(self.messages.put_nowait, (event, data, sid))@routes.post("/prompt") ##외부에서 제일 먼저 불림async def post_prompt(request):logging.info("got prompt")resp_code = 200out_string = ""json_data = await request.json()json_data = self.trigger_on_prompt(json_data)
if "number" in json_data:number = float(json_data['number'])else:number = self.numberif "front" in json_data:if json_data['front']:number = -number
self.number += 1
if "prompt" in json_data:prompt = json_data["prompt"]valid = execution.validate_prompt(prompt)extra_data = {}if "extra_data" in json_data:extra_data = json_data["extra_data"]
if "client_id" in json_data:extra_data["client_id"] = json_data["client_id"]if valid[0]:prompt_id = str(uuid.uuid4())outputs_to_execute = valid[2]self.prompt_queue.put((number, prompt_id, prompt, extra_data, outputs_to_execute))
-
loop가 별도 쓰레드로 메시지 queue내용을 처리하고 있는 부분에 대한 코드는 아래와 같다.
main.py에 아래와 같이 되어있다.
async def run(server, address='', port=8188, verbose=True, call_on_start=None):
await asyncio.gather(server.start(address, port, verbose, call_on_start), server.publish_loop())
여기서 server.start와 server.pulish_loop를 같이 실행하는데 (asyncio.gather을 통해서)
server.start는 서버 시작하는거고 포트등.. server.publish_loop코드를 server.py에서 보면, 아래와 같다.
while True로 되어있고, self.messages.get()( = 큐에서 작업 꺼내서 처리하기를)을 계속 수행함.
async def publish_loop(self):
while True:
msg = await self.messages.get()
await self.send(*msg)
def send_sync(self, event, data, sid=None):
self.loop.call_soon_threadsafe(
self.messages.put_nowait, (event, data, sid))
def queue_updated(self):
self.send_sync("status", { "status": self.get_queue_info() })
messages는 아래와 같이 선언되어있고. 앞서 말했듯 queue_updated-> send_sync를 통해 메시지들을 넣는다.
self.messages = asyncio.Queue()
queue_updated는 promptQueue에서 call되는데,
아래 부분을 보면 put/get하면 queue_updated()가 불리는걸 볼수있다.
class PromptQueue:
def __init__(self, server):
self.server = server
self.mutex = threading.RLock()
self.not_empty = threading.Condition(self.mutex)
self.task_counter = 0
self.queue = []
self.currently_running = {}
self.history = {}
self.flags = {}
server.prompt_queue = self
def put(self, item):
with self.mutex:
heapq.heappush(self.queue, item) ## yunjung step2.5 heapq에 넣기
self.server.queue_updated()
self.not_empty.notify()
def get(self, timeout=None):
with self.not_empty:
while len(self.queue) == 0:
self.not_empty.wait(timeout=timeout)
if timeout is not None and len(self.queue) == 0:
return None
item = heapq.heappop(self.queue)
i = self.task_counter
self.currently_running[i] = copy.deepcopy(item)
self.task_counter += 1
self.server.queue_updated()
return (item, i)
thread와 이벤트 루프의 차이
'AI상식 w.GPT' 카테고리의 다른 글
SigLIP 리뷰 (0) | 2024.12.06 |
---|---|
Attention 모듈 설명 (매우 쉽게 설명) (0) | 2024.10.28 |
[핵심요약?] On Distillation of Guided Diffusion Models (1) | 2024.10.09 |
SNR in Diffusion (w. GPT) (0) | 2024.10.03 |
GmP CLIP MLP. (clip finetuning) (1) | 2024.10.03 |