시나리오
모델 N 개를 train 한 뒤 모델의 best_metric 을 결과로 얻고 싶다.
방법1로 모델 N개를 만들고 방법2로 모델 N개를 만든다.
=> Process 를 2개 띄우고 각 process 마다 subprocess 를 N개 띄운다. 각각의 process 는 {model_name: metric, model_name2: metric} 형식으로 된 dict 를 반환한다.
또 각 모델의 latency 도 알고 싶다.
mp.Process 2개에 Queue 를 하나씩 주고 각 process는 subprocess N개 를 실행하는 방식으로 구현했다.
코드는 아래와 같다. experiment.py 가 entrypoint 이다.
experiment.py
import multiprocessing as mp
import random
import subprocess
import sys
from unittest import result
import pathlib
N_DIRECTION = 2
N_PROCS = 11
acc_q = mp.Queue()
lat_q = mp.Queue()
def mock_acc_process(q):
result_dict = {}
cwd = pathlib.Path(__file__).parent.resolve()
for id in range(N_PROCS):
model_id = f"model_{id}"
subprocess.run([sys.executable, f"train_mock.py",
f"--id={model_id}"], cwd=cwd)
with open(cwd/f"results/{model_id}", "r") as f:
res = f.readlines()[0]
idx = res.find(":")
result_dict[model_id] = float(res[idx+1:])
q.put(result_dict)
def mock_latency_process(q, n_procs):
model_lat_pairs = [(f"model_{i}", 150 + 10 * i) for i in range(n_procs)]
random.shuffle(model_lat_pairs)
results = {k: v for (k, v) in model_lat_pairs}
q.put(results)
def postprocess_latency_result(latency_q):
"""
returns : dict[model_name:lat]
"""
result_dict = latency_q.get()
new_dict = {}
for k, v in result_dict.items():
new_dict[k] = float(v)
return new_dict
def postprocess_acc_result(acc_q, N_procs=N_PROCS):
new_dict = {}
# while not acc_q.empty():
cnt = 0
while cnt < N_DIRECTION:
dict_ = acc_q.get()
new_dict = {**dict_, **new_dict}
cnt += 1
# for k,v in dict_.items():
# k, v = list(dict_.items())[0]
# new_dict[k] = float(v)
return new_dict
def search_dwr():
acc_procs = []
for _ in range(N_DIRECTION):
p = mp.Process(target=mock_acc_process, args=(acc_q, ))
acc_procs.append(p)
p.start()
lat_process = mp.Process(
target=mock_latency_process, args=(lat_q, N_PROCS))
lat_process.start()
latency_result = postprocess_latency_result(lat_q) # {model : lat}
acc_result = postprocess_acc_result(acc_q, len(acc_procs)) # {model : acc}
for proc in acc_procs + [lat_process]:
proc.join()
return latency_result, acc_result
if __name__ == '__main__':
# print(pathlib.Path(__file__).parent.resolve())
latency, acc = search_dwr()
assert len(latency.keys()) == len(acc.keys())
model_lat_acc_pairs = []
for model_id, lat in latency.items():
# model_id = int(k.strip("model"))
model_lat_acc_pairs.append((model_id, lat, acc[model_id]))
print(sorted(model_lat_acc_pairs))
train_mock.py
import argparse
import os
import pathlib
import random
import shutil
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--id", type=str)
args = parser.parse_args()
cwd = pathlib.Path(__file__).parent.resolve()
shutil.rmtree(cwd/f"results/", ignore_errors=True)
os.makedirs(cwd/f"results/", exist_ok=True)
with open(cwd/f"results/{args.id}", "w") as f:
f.write(f"top_1_accuracy:{60 + random.random()*10}")
Issues
1. Queue' object is not iterable
import multiprocessing as mp
acc_q = mp.Queue()
def search_dwr():
acc_procs = []
for _ in range(2):
p = mp.Process(target=mock_acc_process, args=(acc_q))
이 코드의 문제점이 무엇일까? (acc_q) 가 아니라 (acc_q, ) 로 해야한다. 에러에 Queue' object is not iterable 만 나와서 메시지 내용만 보면 찾기 어려운 에러다.
2. queue 에서 결과 가져올 때
process 들이 끝나고 queue 에서 결과를 가져오겠다는 생각에 .join() 을 통해 process 들이 끝나길 기다리고 queue.get() 을 했었다. 이러면 에러난다. queue.get() 을 먼저하고 .join() 을 나중에 해야 한다. (https://docs.python.org/ko/3/library/multiprocessing.html 참고)
3. subprocess 에서 output 가져오기
stdout 을 통해서만 output 을 주고 받을 수 있다. 이게 불편해서 subprocess 에 의해 실행되는 함수가 결과파일을 생성하도록 했고 subprocess 가 graceful 하게 종료되면 파일 내용을 읽도록 했다.
4. process 안에서 multiprocess 실행하려면 안된다.
def mock_acc_process(q, direction):
result_dict = {}
cwd = pathlib.Path(__file__).parent.resolve()
with mp.Pool(N_PROCS) as pool:
for i in range(N_PROCS):
pool.apply_async(train, (model_name(i, direction), cwd))
# multiple_results = [pool.apply_async(train, (model_name(i, direction), cwd)) for i in range(N_PROCS)]
for i in range(N_PROCS):
name = model_name(i, direction)
with open(cwd/f"results/{name}", "r") as f:
res = f.readlines()[0]
idx = res.find(":")
result_dict[name] = float(res[idx+1:])
q.put(result_dict)
위 코드는 에러가 난다. __main__ 에 접근이 안되기 때문이다.
개선 아이디어
1. 입력-출력 형태를 미리 정해두고 하는게 좋은 것 같다.
어디서는 f"model_name_{id}" 로 model 을 표현하고 어디서는 f"{id} 로 model 을 표현하니 아귀가 안맞아 에러가 나는 부분이 생긴다.자료형을 하나 만들어놔야겠다. ex) {ModelName(id):acc} 이런식으로. 이러면 나중에 수정할 때도 여기만 고치면 될듯.
2. subprocess 가 많으니 mock 해서 실행하는데도 오래걸린다.
Asyncio 의 subprocess run 을 사용해봐야겠다.