크기가 고르지 않은 두 개의 정렬된 명명된 파이프를 병합하는 방법은 무엇입니까?

크기가 고르지 않은 두 개의 정렬된 명명된 파이프를 병합하는 방법은 무엇입니까?

내 질문은 "동일한 필드의 정렬 값을 기준으로 두 개의 정렬된 파일을 병합합니다."그러나 이를 명명된 파이프로 확장합니다.

정렬된 정수가 포함된 두 개의 텍스트 파일이 있고 이를 병합하고 싶다고 가정해 보겠습니다. 이를 사용하여 sort -nm file1.txt file2.txt > merged.txt일회성 비차단 병합을 수행할 수 있습니다.

이제 이러한 파일이 실제로 제가 만들고 Python에서 채우는 명명된 파이프(FIFO)라고 가정해 보겠습니다. 한 파이프에 번갈아 쓰고 다음 파이프에 쓰는 한 이 작업을 잘 수행할 수 있습니다. 이 코드는 두 개의 정렬된 정수 목록을 생성하고 sort하위 프로세스에서 읽은 명명된 파이프에 기록하는 데 사용됩니다. 그러면 결합된 결과가 단일 파일로 출력됩니다.

import tempfile
import subprocess
import os
import sys


# Make temporary fifos
tempdir = tempfile.mkdtemp()
tempdir = "/tmp/tmph1ilvegn"  # hard-code tempdir for repeated runs
fifo_path1 = os.path.join(tempdir, "fifo1")
fifo_path2 = os.path.join(tempdir, "fifo2")
pos_fifo = os.mkfifo(fifo_path1)
neg_fifo = os.mkfifo(fifo_path2)


# Output will be a sorted merge from 2 inlines2ut streams.
outfile = "sorted_merge.txt"
sortProcess = subprocess.Popen('sort -snm ' +  fifo_path1 + " " + fifo_path2 + " > " +
    outfile, shell=True)


fifo_writer1 = open(fifo_path1, 'w')
fifo_writer2 = open(fifo_path2, 'w')

nlines1 = 0
nlines2 = 0

# Simulate 2 sorted lists by just going iterating through a sorted list and
# printing some numbers to one list and some to the other.

for i in range(1,100000):
    print("i: {}; n1: {}; n2: {}; imbalance:{}".format(i, nlines1, nlines2, nlines1-nlines2))
    line_to_write = (str(i) + "\n")
    if i % 2:
        nlines1 +=1
        fifo_writer2.write(line_to_write)
    else:
        nlines2 +=1
        fifo_writer1.write(line_to_write)

# clean up fifos:
fifo_writer1.close()
fifo_writer2.close()
os.remove(fifo_path1)
os.remove(fifo_path2)
sortProcess.communicate()

정리된 결과를 얻었습니다. 하지만 이제 목록을 로 i % 2변경 i % 3. 원래 버전에서는 fifo1, fifo2, fifo1, fifo2 등으로 인쇄되었습니다. 수정된 버전에서는 두 파이프 중 하나에 두 배의 라인 수를 인쇄합니다.

실행하면 i % 3다음과 같은 결과가 나타납니다.

...
i: 16182; n1: 10788; n2: 5393; imbalance:5395
i: 16183; n1: 10788; n2: 5394; imbalance:5394
i: 16184; n1: 10789; n2: 5394; imbalance:5395
i: 16185; n1: 10790; n2: 5394; imbalance:5396
i: 16186; n1: 10790; n2: 5395; imbalance:5395
i: 16187; n1: 10791; n2: 5395; imbalance:5396
i: 16188; n1: 10792; n2: 5395; imbalance:5397
i: 16189; n1: 10792; n2: 5396; imbalance:5396
i: 16190; n1: 10793; n2: 5396; imbalance:5397
i: 16191; n1: 10794; n2: 5396; imbalance:5398
i: 16192; n1: 10794; n2: 5397; imbalance:5397
i: 16193; n1: 10795; n2: 5397; imbalance:5398

항상 같은 자리에 멈춥니다. strace를 사용하면 다음을 볼 수 있습니다.

writePython 프로세스가 호출 지점 4에서 중단됩니다.write(4, "9\n15170\n15172\n15173\n15175\n15176\n"..., 4100

그러나 프로세스는 sort호출 지점 3에서 중단됩니다.readread(3,

lsof -n -p프로세스 의 출력을 보면 sort값이 도착하기를 기다리고 있는 fifo1반면 write프로세스는 값이 기록되기를 기다리고 있음을 알 수 있습니다 fifo2.

sort    23330 nsheff  txt    REG  259,2   110040 10769142 /usr/bin/sort
sort    23330 nsheff  mem    REG  259,2  2981280 10752335 /usr/lib/locale/locale-archive
sort    23330 nsheff  mem    REG  259,2  1868984  6031544 /lib/x86_64-linux-gnu/libc-2.23.so
sort    23330 nsheff  mem    REG  259,2   138696  6031518 /lib/x86_64-linux-gnu/libpthread-2.23.so
sort    23330 nsheff  mem    REG  259,2   162632  6031516 /lib/x86_64-linux-gnu/ld-2.23.so
sort    23330 nsheff    0u   CHR  136,1      0t0        4 /dev/pts/1
sort    23330 nsheff    1w   REG  259,2        0  4719615 /home/nsheff/code/bamSitesToWig/sorted_merge.txt
sort    23330 nsheff    2u   CHR  136,1      0t0        4 /dev/pts/1
sort    23330 nsheff    3r  FIFO  259,2      0t0   786463 /tmp/tmph1ilvegn/fifo1
sort    23330 nsheff    4r  FIFO  259,2      0t0   786465 /tmp/tmph1ilvegn/fifo2

따라서 어떤 이유로 sort프로세스 *가 수신을 중지 fifo2하여 프로세스가 중단되었습니다.

fifo2이제 발행하여 별도의 리스너를 배치하면... cat fifo2프로세스가 다시 시작되어 수천 번의 반복을 거쳐... 이제 다른 임의 지점(반복 53733)에서 중지됩니다.

버퍼링된 파이프를 사용하면 내가 이해할 수 없는 일이 벌어지고 있고, sort한 스트림에서 다음 스트림으로 읽는 방식이 어떻게 바뀌는지 생각합니다. 나에게 이상한 점은 결정적이며 정확히 동일한 위치에서 실패하고 목록의 균형이 맞지 않을 때만 실패한다는 것입니다.

이 문제를 해결할 수 있는 방법이 있나요?

답변1

분명히 두 개의 명명된 파이프에 서로 다른 양의 데이터를 쓰면 프로그램이 교착 상태에 빠질 것입니다. 1 fifo2(버퍼가 가득 찼음)에 대해 프로그램이 write차단되고 fifo1 (버퍼가 비어 있음) sort에 대해 프로세스가 차단됩니다 .read

sort당신은 그것을 실현하는 방법을 모릅니다 . 파일을 더 큰 덩어리로 읽은 다음 메모리에서 데이터를 처리하여 보다 효율적으로 처리할 수 있습니다. 데이터를 읽는 함수를 사용하면 sort버퍼링이 자동으로 발생할 수도 있습니다 .stdio.h

명명된(및 명명되지 않은) 파이프는 데이터 버퍼를 사용합니다.
버퍼가 가득 차면 읽기 프로세스가 일부 데이터를 읽거나 끝을 닫을 때까지 쓰기 프로세스가 차단됩니다.
버퍼가 비어 있으면 쓰기 프로세스가 일부 데이터를 쓰거나 끝날 때까지 읽기 프로세스가 차단됩니다.

각 사이클에서 fifo1에 한 줄, fifo2에 두 줄을 쓰면 fifo2의 버퍼는 채워지는 반면 fifo1의 버퍼는 절반만 채워집니다.

sort프로그램이 fifo에 쓰는 데이터의 양과 읽고 싶은 데이터의 양 에 따라 이는 분명히 sortfifo1에서 무언가를 읽고 싶은데 프로그램에 빈 버퍼가 있고 프로그램이 쓰기를 원하는 상황으로 이어질 수 있습니다. 전체 버퍼가 있는 fifo2입니다.

sort파이프 버퍼의 크기는 고정되어 있고 프로그램도 고정된 크기를 가지며 고정된 버퍼 크기를 사용하여 데이터를 읽거나 쓸 수 있으므로 결과는 결정적입니다 .

GNU 소스 코드를 볼 수 있습니다 sort:
https://github.com/wertarbyte/coreutils/blob/master/src/sort.c

처음에는 모든 파일에 대한 루프의 함수를 사용하여 모든 입력 파일의 입력 버퍼를 채우려고 합니다 fillbuf.

나중에 어떤 경우에는 fillbuf입력 파일을 다시 호출합니다.

함수에 fillbuf주석이 있습니다

          /* Read as many bytes as possible, but do not read so many
             bytes that there might not be enough room for the
             corresponding line array.  The worst case is when the
             rest of the input file consists entirely of newlines,
             except that the last byte is not a newline.  */

분명히 sort입력 파일을 선택하고 일정량의 데이터가 필요합니다. 읽기가 차단되면 입력 파일이 전환되지 않습니다.

read이 구현은 작업이 잠시 후 일부 데이터 또는 EOF를 반환하므로 영원히 차단되지 않으므로 일반 파일에 잘 작동합니다 .


두 프로세스/스레드 사이를 차단할 수 있는 것이 두 개 이상인 경우 교착 상태를 피하는 것은 항상 어렵습니다. 귀하의 경우 하나의 파이프만 사용해야 합니다. 항상 fifo1에 쓸 데이터가 있고(fifo2가 차단하는 경우) 반대의 경우에는 비차단 작업을 사용하는 것이 도움이 될 수 있습니다.

두 개의 별도 스레드/프로세스를 사용하여 파이프에 쓰는 경우 두 개의 파이프를 사용하는 것이 작동할 수 있지만 스레드/프로세스가 서로 독립적으로 작동하는 경우에만 가능합니다. 파이프라인1에 쓰기로 되어 있는 스레드 A가 어떻게든 스레드 B(파이프라인2에 쓸 때만 차단됨)를 기다리고 있는 경우 이는 도움이 되지 않습니다.

관련 정보