카테고리 없음

[Multiprocessing] 큐와 파이프를 사용하여 여러 프로세스에서 결과를 받아오는 방법

광장의꽃향기 2022. 6. 24. 15:29

 

 

시나리오

모델 N 개를 train 한 뒤 모델의 best_metric 을 결과로 얻고 싶다.
방법1로 모델 N개를 만들고 방법2로 모델 N개를 만든다.
=> Process 를 2개 띄우고 각 process 마다 subprocess 를 N개 띄운다. 각각의 process 는 {model_name: metric, model_name2: metric} 형식으로 된 dict 를 반환한다.

또 각 모델의 latency 도 알고 싶다.

mp.Process 2개에 Queue 를 하나씩 주고 각 process는 subprocess N개 를 실행하는 방식으로 구현했다.

코드는 아래와 같다. experiment.py 가 entrypoint 이다.

experiment.py

import multiprocessing as mp
import random
import subprocess
import sys
from unittest import result
import pathlib

N_DIRECTION = 2
N_PROCS = 11
acc_q = mp.Queue()
lat_q = mp.Queue()


def mock_acc_process(q):
    result_dict = {}
    cwd = pathlib.Path(__file__).parent.resolve()
    for id in range(N_PROCS):
        model_id = f"model_{id}"
        subprocess.run([sys.executable, f"train_mock.py",
                       f"--id={model_id}"], cwd=cwd)

        with open(cwd/f"results/{model_id}", "r") as f:
            res = f.readlines()[0]
            idx = res.find(":")
            result_dict[model_id] = float(res[idx+1:])

    q.put(result_dict)


def mock_latency_process(q, n_procs):
    model_lat_pairs = [(f"model_{i}", 150 + 10 * i) for i in range(n_procs)]
    random.shuffle(model_lat_pairs)
    results = {k: v for (k, v) in model_lat_pairs}
    q.put(results)


def postprocess_latency_result(latency_q):
    """
    returns : dict[model_name:lat]
    """
    result_dict = latency_q.get()
    new_dict = {}
    for k, v in result_dict.items():
        new_dict[k] = float(v)
    return new_dict


def postprocess_acc_result(acc_q, N_procs=N_PROCS):
    new_dict = {}
    # while not acc_q.empty():
    cnt = 0
    while cnt < N_DIRECTION:
        dict_ = acc_q.get()
        new_dict = {**dict_, **new_dict}
        cnt += 1
        # for k,v in dict_.items():
        # k, v = list(dict_.items())[0]
        # new_dict[k] = float(v)

    return new_dict


def search_dwr():
    acc_procs = []

    for _ in range(N_DIRECTION):
        p = mp.Process(target=mock_acc_process, args=(acc_q, ))
        acc_procs.append(p)
        p.start()

    lat_process = mp.Process(
        target=mock_latency_process, args=(lat_q, N_PROCS))
    lat_process.start()

    latency_result = postprocess_latency_result(lat_q)  # {model : lat}
    acc_result = postprocess_acc_result(acc_q, len(acc_procs))  # {model : acc}

    for proc in acc_procs + [lat_process]:
        proc.join()

    return latency_result, acc_result


if __name__ == '__main__':
    # print(pathlib.Path(__file__).parent.resolve())
    latency, acc = search_dwr()
    assert len(latency.keys()) == len(acc.keys())

    model_lat_acc_pairs = []
    for model_id, lat in latency.items():
        # model_id = int(k.strip("model"))
        model_lat_acc_pairs.append((model_id, lat, acc[model_id]))

    print(sorted(model_lat_acc_pairs))

 

train_mock.py

import argparse
import os

import pathlib
import random
import shutil

if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("--id", type=str)

    args = parser.parse_args()

    cwd = pathlib.Path(__file__).parent.resolve()
    shutil.rmtree(cwd/f"results/", ignore_errors=True)
    os.makedirs(cwd/f"results/", exist_ok=True)
    with open(cwd/f"results/{args.id}", "w") as f:
        f.write(f"top_1_accuracy:{60 + random.random()*10}")

 

Issues

1. Queue' object is not iterable

import multiprocessing as mp

acc_q = mp.Queue()
def search_dwr():
    acc_procs = []
    
    for _ in range(2):
        p = mp.Process(target=mock_acc_process, args=(acc_q))

이 코드의 문제점이 무엇일까? (acc_q) 가 아니라 (acc_q, ) 로 해야한다. 에러에 Queue' object is not iterable 만 나와서 메시지 내용만 보면 찾기 어려운 에러다.

 

2. queue 에서 결과 가져올 때

process 들이 끝나고 queue 에서 결과를 가져오겠다는 생각에 .join() 을 통해 process 들이 끝나길 기다리고 queue.get() 을 했었다. 이러면 에러난다. queue.get() 을 먼저하고 .join() 을 나중에 해야 한다. (https://docs.python.org/ko/3/library/multiprocessing.html 참고)

 

3. subprocess  에서 output 가져오기

stdout 을 통해서만 output 을 주고 받을 수 있다. 이게 불편해서 subprocess 에 의해 실행되는 함수가 결과파일을 생성하도록 했고 subprocess 가 graceful 하게 종료되면 파일 내용을 읽도록 했다.

4. process 안에서 multiprocess 실행하려면 안된다.

def mock_acc_process(q, direction):
    result_dict = {}
    cwd = pathlib.Path(__file__).parent.resolve()

    with mp.Pool(N_PROCS) as pool:
        for i in range(N_PROCS):
            pool.apply_async(train, (model_name(i, direction), cwd))
    
        # multiple_results = [pool.apply_async(train, (model_name(i, direction), cwd)) for i in range(N_PROCS)]
    for i in range(N_PROCS):
        name = model_name(i, direction)
        with open(cwd/f"results/{name}", "r") as f:
            res = f.readlines()[0]
            idx = res.find(":")
            result_dict[name] = float(res[idx+1:])

    q.put(result_dict)

위 코드는 에러가 난다. __main__ 에 접근이 안되기 때문이다. 

 

개선 아이디어

1. 입력-출력 형태를 미리 정해두고 하는게 좋은 것 같다.

어디서는 f"model_name_{id}" 로 model 을 표현하고 어디서는 f"{id} 로 model 을 표현하니 아귀가 안맞아 에러가 나는 부분이 생긴다.자료형을 하나 만들어놔야겠다. ex) {ModelName(id):acc} 이런식으로. 이러면 나중에 수정할 때도 여기만 고치면 될듯.

2. subprocess 가 많으니 mock 해서 실행하는데도 오래걸린다.

Asyncio 의 subprocess run 을 사용해봐야겠다.