이 작업을 어느 정도 수행하는 Python 스크립트가 있습니다.
current_tasks = TaskManager()
MAXPROCS = 8
while len(outstanding_tasks) > 0:
if len(current_tasks.running) < MAXPROCS:
current_tasks.addTask(outstanding_tasks.next())
else:
current_tasks.wait_for_one_finish()
Outstanding_tasks.next()는 기본적으로 다음과 같습니다.
p = subprocess.Popen([task], stdout=OUTFILE, stderr=subprocess.PIPE)
그리고 current_tasks.wait_for_one_finish()
:
waiting = True
while waiting:
for t in tasks:
ret = t.poll()
if ret not None:
handle_stderr(t)
waiting = False
break
매우 간단합니다. 한 번에 8개의 작업을 실행할 때까지 요청 시 작업을 생성한 다음 한 번에 하나씩 완료될 때까지 차단한 다음 더 많은 작업을 생성합니다.
문제는 이것이다:
stderr=subprocess.PIPE
각 하위 프로세스는 stderr을 파이프에 씁니다. 충돌이 발생하여 파이프에 큰 로그 메시지 또는 기타 항목을 쓰려고 하고 메시지가 파이프 버퍼의 크기를 초과하는 경우 write()가 차단됩니다. 프로세스가 완료되지 않으므로 내 제어 프로세스는 반환 값을 확인하지 않고 poll()
stderr에서 읽습니다.
분명히 이 문제를 해결하는 몇 가지 방법이 있습니다.
- 내 하위 프로세스에서 임시 파일로 stderr 리디렉션
- 실행 중인 모든 작업의 stderr 파일 설명자에서 읽고 이를 메모리에 버퍼링하는 Python 스레드를 생성합니다.
- 내 작은 임시 이벤트 루프에는 select() 또는 다른 것이 있습니다.
하지만 이 모든 것은 내 애플리케이션 코드에서 처리해야 하는 작업입니다. 내가 알고 싶은 것은: 파이프의 동작을 얻을 수 있는 방법이 있습니까? 하지만 자식 프로세스가 항상 stderr에 성공적인 write()를 수행한 다음 내가 볼 필요 없이 종료할 수 있도록 크고 탄력적인 버퍼를 사용하는 방법이 있습니까? 끝날 때까지?
답변1
짧은 대답은: 아니오입니다.
하위 프로세스 파이프를 통해 전송된 대규모 데이터를 처리하는 데 필요한 해결 방법을 강조했습니다. "훌륭하고 큰 탄력적 버퍼" 파이프라인이 존재하지 않습니다. 이것은 ... 불리운다하위 프로세스 관리 Python 문서교착 상태의 잠재적 원인으로 추가된 솔루션을 호출하여 proc.communicate()
stderr에서 읽을 수 있습니다. 귀하의 경우 문제는 communicate()
모든 프로세스를 동시에 호출할 수 없으며 모든 데이터를 읽을 때까지 메서드가 차단된다는 것입니다.
저라면 아마도 루프 대신 select()
모든 프로세스에 대한 호출을 사용할 것입니다. 모든 프로세스가 무언가를 수행할 때까지 차단할 수 있으며, 프로세스가 종료되면 stderr 파이프를 닫으므로 일석이조입니다(데이터가 stderr에 기록되는 시기를 알고 프로세스가 종료되는 시기를 알 수 있음).stderr
proc.poll()
select()