본문 바로가기

AI상식 w.GPT

ComfyUI 구조 리뷰

이벤트 루프 & thread등을 사용해서 여러 클라이언트의 요청에 대응할 수 있도록 만들어졌는데,

 

call되는 순서는 아래와 같고, 자세한 구조를 뜯어보겠다. 

 

  1. @routes.post("/prompt") 외부로 부터 /prompt call을 받으면, server의 이벤트 루프가 이걸 처리하는데, 
    처리과정에서 prompt_queue에 받은 내용을 기입한다. 
               self.prompt_queue.put((number, prompt_id, prompt, extra_data, outputs_to_execute)) 
  2. Prompt_queue에 쌓인 내용은 prompt_worker가 처리하는데, prompt_worker는 별도 쓰레드로 동작한다. 
                threading.Thread(target=prompt_worker, daemon=True, args=(q, server,)).start()
  3. prompt_worker은 아래와 같이 동작한다.  (while True가 있다)
                def prompt_worker(q, server):
                    e = execution.PromptExecutor(server) #
                    last_gc_collect = 0
                    need_gc = False
                    gc_collect_interval = 10.0

             while True:   
                 timeout = 1000.0
                 if need_gc:
                     timeout = max(gc_collect_interval - (current_time - last_gc_collect), 0.0)

                 queue_item = q.get(timeout=timeout) ### 여기서 get을 한다. 

     

 

## 전체 구조를 살짝 뜯어보면. 

 

1. main.py 부분 

아래와 같이 되어있는데,

 

    loop = asyncio.new_event_loop() 
    asyncio.set_event_loop(loop)
    server = server.PromptServer(loop)
    q = execution.PromptQueue(server)

 

  • asyncio.new_event_loop()로 새로운 이벤트 루프를 생성
  • asyncio.set_event_loop(loop)를 통해 해당 루프를 현재 쓰레드의 기본 이벤트 루프로 설정.
  • server.PromptServer가 위 loop를 사용하도록 설정하고, 이를 "server"변수로 정의. 이제 "server"변수는 PromptServer을 의미하며, 비동기 이벤트 루프를 사용하여 외부 요청 작업등을을 처리한다. 
  • 처리에 시간이 걸리는 작업(외부 응답을 받아야하는등)은 await을 해두고, 이벤트 루프에 쌓인 작업을 처리한다. 
  • q 란 PromptQueue인데, 선언을 자세히 뜯어보면, None으로 선언되어있던 PromptServer.prompt_queue를, execution.PromptQueue(server)에서 server을 인자값으로 가져다가,  Prompt_Queue에서 self로 초기화한다. 
  • 이렇게도 코딩을 하나 싶은데, 상위객체를 받아다가 하위객체로 연결을 저렇게 한듯하다. 
  • 그러니까 PromptServer의 prompt_queue는, PromptQueue 초기화시에 할당된다. PromptQueue는 PromptServer의 prompt_queue를 통해 접근한다. 
    • class PromptServer():
      ...
                self.prompt_queue = None
       
      class PromptQueue:
          def __init__(self, server):
              self.server = server
      ...
              server.prompt_queue = self
    •  
    • PromptSever에서 loop는 아래와 같이 사용된다. 
      PromptServer의 messages변수는 Queue로 선언되어있고,
    • send_sync부분은 event, data, sid를 별로 쓰레드를(call_soon_threadsafe) 통해 메시지 queue에 내용을 삽입한다. loop(이벤트 루프)가 이미 별도 쓰레드로 메시지 queue를 읽고 있는데(이건 밑에설명) 다른 queue에서 내용을삽입하는게 안전하지 않을수 있어서 call_soon_threadsafe를 씀. 
    • 더보기

       

      class PromptServer():
          def __init__(selfloop):
              PromptServer.instance = self

              mimetypes.init()
              mimetypes.types_map['.js'= 'application/javascript; charset=utf-8'

              self.user_manager = UserManager()
              self.supports = ["custom_nodes_from_web"]
              self.prompt_queue = None
              self.loop = loop
              self.messages = asyncio.Queue()
       
          def send_sync(selfeventdatasid=None):
              self.loop.call_soon_threadsafe(
                  self.messages.put_nowait, (eventdatasid))
       
       
           @routes.post("/prompt") ##외부에서 제일 먼저 불림
           async def post_prompt(request):
                  logging.info("got prompt")
                  resp_code = 200
                  out_string = ""
                  json_data =  await request.json()
                  json_data = self.trigger_on_prompt(json_data)

                  if "number" in json_data:
                      number = float(json_data['number'])
                  else:
                      number = self.number
                      if "front" in json_data:
                          if json_data['front']:
                              number = -number

                      self.number += 1

                  if "prompt" in json_data:
                      prompt = json_data["prompt"]
                      valid = execution.validate_prompt(prompt)
                      extra_data = {}
                      if "extra_data" in json_data:
                          extra_data = json_data["extra_data"]

                      if "client_id" in json_data:
                          extra_data["client_id"= json_data["client_id"]
                      if valid[0]:
                          prompt_id = str(uuid.uuid4())
                          outputs_to_execute = valid[2]
                          self.prompt_queue.put((numberprompt_idpromptextra_dataoutputs_to_execute))

loop가 별도 쓰레드로 메시지 queue내용을 처리하고 있는 부분에 대한 코드는 아래와 같다. 

main.py에 아래와 같이 되어있다. 

async def run(server, address='', port=8188, verbose=True, call_on_start=None):
    await asyncio.gather(server.start(address, port, verbose, call_on_start), server.publish_loop())

 

여기서 server.start와 server.pulish_loop를 같이 실행하는데 (asyncio.gather을 통해서) 

server.start는 서버 시작하는거고 포트등.. server.publish_loop코드를 server.py에서 보면, 아래와 같다. 

while True로 되어있고, self.messages.get()( = 큐에서 작업 꺼내서 처리하기를)을 계속 수행함. 

    async def publish_loop(self):
        while True:
            msg = await self.messages.get()
            await self.send(*msg)
 
 
    def send_sync(self, event, data, sid=None):
        self.loop.call_soon_threadsafe(
            self.messages.put_nowait, (event, data, sid))

    def queue_updated(self):
        self.send_sync("status", { "status": self.get_queue_info() })
 
 

 

 

messages는 아래와 같이 선언되어있고.  앞서 말했듯 queue_updated-> send_sync를 통해 메시지들을 넣는다.

self.messages = asyncio.Queue()

 

 

queue_updated는 promptQueue에서 call되는데, 

아래 부분을 보면 put/get하면 queue_updated()가 불리는걸 볼수있다. 

class PromptQueue:
    def __init__(self, server):
        self.server = server
        self.mutex = threading.RLock()
        self.not_empty = threading.Condition(self.mutex)
        self.task_counter = 0
        self.queue = []
        self.currently_running = {}
        self.history = {}
        self.flags = {}
        server.prompt_queue = self

    def put(self, item):
        with self.mutex:
            heapq.heappush(self.queue, item) ## yunjung step2.5 heapq에 넣기
            self.server.queue_updated() 
            self.not_empty.notify()

    def get(self, timeout=None):
        with self.not_empty:
            while len(self.queue) == 0:
                self.not_empty.wait(timeout=timeout)
                if timeout is not None and len(self.queue) == 0:
                    return None
            item = heapq.heappop(self.queue)
            i = self.task_counter
            self.currently_running[i] = copy.deepcopy(item)
            self.task_counter += 1
            self.server.queue_updated()
            return (item, i)

 

thread와 이벤트 루프의 차이 

 

'AI상식 w.GPT' 카테고리의 다른 글

SigLIP 리뷰  (0) 2024.12.06
Attention 모듈 설명 (매우 쉽게 설명)  (0) 2024.10.28
[핵심요약?] On Distillation of Guided Diffusion Models  (1) 2024.10.09
SNR in Diffusion (w. GPT)  (0) 2024.10.03
GmP CLIP MLP. (clip finetuning)  (1) 2024.10.03