여러 파일이나 파이프에서 입력을 가져와 읽기를 차단하지 않고 표준 출력에 쓸 수 있고 개별 입력 줄은 그대로 유지할 수 있는 도구가 있습니까? 기본적으로 라인을 끊지 않고 여러 입력을 하나의 출력으로 멀티플렉싱하고 싶습니다.
$ combine file1 <(prog2) ... > nice-output.txt
- 나는 출력 순서에 관심이 없다
- 일부 입력에 데이터가 있는 한 차단되어서는 안 됩니다.
- 효율적이어야 합니다(즉, 귀하의 Perl 문장에 반대표를 던질 수도 있습니다;)
답변1
이 작업을 쉽게 수행 할 수 있어야 합니다 multitail
.
답변2
write
프로세스가 라인 버퍼링(표준 출력이 터미널이 아닌 경우 일반적으로 꺼짐)을 사용해야 하는 프로세스에서 한 번의 호출로 라인을 쓰는 경우 모든 라인을 단일 파이프로 가리킬 수 있습니다.
{ { sleep .1; echo one; sleep .1; echo two; } &
{ echo hello; sleep .15; echo world; };
wait; } | cat
프로세스가 터미널에 쓸 때 라인 버퍼링만 수행하는 경우 를 사용하는 것이 가장 쉽습니다 script
. 파일에만 쓸 수 있습니다.
script -q -c '
{ { sleep .1; echo one; sleep .1; echo two; } &
{ echo hello; sleep .15; echo world; };
wait; }'
tail -n +2 typescript
프로그램이 긴 줄을 쓰거나 줄 버퍼링을 사용하지 않는 경우에는 이 방법이 작동하지 않습니다. 각 입력의 라인을 별도로 읽고 버퍼링하고 라인 끝에서 동기화를 수행할 수 있는 수집기 프로그램이 필요합니다. 이 기능을 갖춘 표준 유틸리티는 없습니다. 나는 두 번째이다갈렙의 조언multitail
.
다음은 여러 명령으로 생성된 줄을 읽고 줄을 끊지 않고 표준 출력으로 내보내는 Python 스크립트입니다. 아직 많이 테스트하지 않았으므로 사용자에게 경고합니다. 나는 그것을 전혀 벤치마킹하지 않았습니다.
#!/usr/bin/env python
import Queue, itertools, os, subprocess, sys, threading
# Queue of (producer_id, line). line==None indicates the end of a producer.
lq = Queue.Queue()
# Line producer
def run_task(i, cmd):
p = subprocess.Popen(cmd, stdout=subprocess.PIPE, shell=True)
line = p.stdout.readline()
while line <> "":
lq.put((i, line))
line = p.stdout.readline()
lq.put((i, None))
# Start a producer for each command passed as an argument
for i in range(1,len(sys.argv)):
threading.Thread(target=run_task, args=(i, sys.argv[i])).start()
sources = len(sys.argv) - 1
# Consumer: print lines as they come in, until no producer is left.
while sources > 0:
(k, line) = lq.get()
if line == None: sources -= 1
else: sys.stdout.write(str(k) + ":" + line)
사용 예:
./collect.py 'sleep 1; ls /; sleep 1; ls /' \
'/bin/echo -n foo; sleep 1; /bin/echo -n bar; sleep 1; /bin/echo qux'
답변3
예, 멀티테일링은 터미널의 하위 집합인 "창" 개념과 관련이 있는 것 같습니다. 파이프라인 구성 요소로 제대로 작동하도록 할 수는 없습니다.
그래서 이건 우리가 직접 해야 할 것 같군요갈라진 너클
/* Copyright © 2015 [email protected]
** Use/modify as you see fit but leave this attribution.
** If you change the interface and want to distribute the
** result please change the binary name too! */
#include <err.h>
#include <errno.h>
#include <string.h>
#include <stdlib.h>
#include <unistd.h>
#include <fcntl.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/select.h>
/* typedefs are for pussies */
struct {
char *filename; /* for clarity of errors */
char *data;
long len;
long cap;
} saved[FD_SETSIZE] = {0};
void
ewriten(int fd, char *buf, int n)
{
int done = 0, c;
while (done < n) {
if ((c=write(fd, buf + done, n - done)) <= 0 && errno != EINTR) {
err(1, "write");
}
done += c;
}
}
int
empty(fd_set *fdset, int maxfd)
{
int i;
for (i=0; i <= maxfd; i++) {
if (FD_ISSET(i, fdset)) return 0;
}
return 1;
}
void
combine(fd_set *fdset, int maxfd)
{
char buf[4096], *cp;
fd_set ready;
int n, i, fd, left;
while (!empty(fdset, maxfd)) {
ready = *fdset;
/* timeouts are for pussies */
if (select(maxfd + 1, &ready, NULL, NULL, NULL) == -1) err(1, "select");
for (fd=0; fd <= maxfd; fd++) {
if (!FD_ISSET(fd, &ready)) continue;
switch (n=read(fd, &buf, sizeof(buf))) {
case -1:
if (errno == EINTR)
break; /* ignore interrupts; we'll re-read next iteration */
if (saved[fd].filename) err(1, "read: %s", saved[fd].filename);
err(1, "read: %d", fd);
case 0:
if (saved[fd].len > 0) {
/* someone forgot their newline at EOF... */
ewriten(1, saved[fd].data, saved[fd].len);
saved[fd].data[0] = '\n'; /* put it back for them */
ewriten(1, saved[fd].data, 1);
}
free(saved[fd].data);
FD_CLR(fd, fdset);
break;
default:
for (cp=buf + n - 1; cp >= buf && *cp != '\n'; cp--); /* find last newline */
left = n - (cp - buf + 1);
if (cp >= buf) {
/* we found one! first dump any saved data from the last read */
if (saved[fd].len > 0) {
ewriten(1, saved[fd].data, saved[fd].len);
saved[fd].len = 0;
}
ewriten(1, buf, cp - buf + 1);
}
if (left > 0) {
/* now save any leftover data for later */
int need = saved[fd].len + left;
if (saved[fd].cap < need &&
(saved[fd].data=realloc(saved[fd].data, need)) == NULL) {
errx(1, "realloc: failed on %d bytes", need);
/* it was good enough for quake... */
}
saved[fd].cap = need;
memcpy(saved[fd].data + saved[fd].len, buf + n - 1 - left, left);
saved[fd].len += left;
}
}
}
}
}
void
addfd(int fd, fd_set *fdset, int *maxfd)
{
FD_SET(fd, fdset);
if (*maxfd < fd) {
*maxfd = fd;
}
}
int
main(int argc, char **argv)
{
fd_set fdset;
char **arg = argv + 1;
char *cp;
struct stat st;
int fd, maxfd = -1;
FD_ZERO(&fdset);
while (*arg != NULL) {
/* getopt is for pussies */
if (strncmp("-u", *arg, 2) == 0) {
*arg += 2;
if (**arg == '\0' && *++arg == NULL ) errx(1, "-u requires argument (comma separated FD list)");
/* reentrancy is for pussies */
for (cp=strtok(*arg, ","); cp != NULL; cp=strtok(NULL, ",")) {
fd = atoi(cp);
if (fstat(fd, &st) != 0) err(1, "%d", fd);
addfd(fd, &fdset, &maxfd);
}
arg++;
} else if (strcmp("-", *arg) == 0) {
if (fstat(0, &st) != 0) err(1, "stdin", fd);
addfd(0, &fdset, &maxfd);
saved[0].filename = "stdin";
arg++;
} else if (strcmp("--", *arg) == 0) {
arg++;
break;
} else if (**arg == '-') {
errx(1, "unrecognized argument %s", *arg);
} else {
break; /* treat as filename */
}
}
/* remaining args are filenames */
for (; *arg != NULL; arg++) {
/* stdio is for pussies */
if ((fd=open(*arg, O_RDONLY)) == -1) err(1, "open: %s", *arg);
addfd(fd, &fdset, &maxfd);
saved[fd].filename = *arg;
}
combine(&fdset, maxfd);
return 0;
}
아아아 기분 좋다.
(참고: 대략 두 세트의 입력에서 테스트되었습니다. 버그가 존재할 수도 있고 존재하지 않을 수도 있습니다.)