우분투 메이트, 라즈베리파이를 사용합니다. 저는 프로그래밍을 처음 접했고 bash를 처음 접했습니다. 간단히 말해서 USB 데이터를 CSV로 변환하면 스크립트 없이도 무엇이든 시도할 수 있습니다.
편집: 장치가 루프에 있으므로 데이터 수신을 중지하는 것은 허용되지 않습니다. 실시간 데이터 필터링으로 실제 데이터만 수신됩니다.
내 또 다른 비슷한 질문에 대해서는 :Linux 터미널이 파일로 출력되지만 필터링됩니까?
RF 수신기를 만드는 회사에서 제공한 Python 스크립트가 있습니다(끝 부분의 스크립트). 이는 사용자 오류일 가능성이 높습니다.
제가 터미널에 쓴 내용은 다음과 같습니다.
python3 'home/#USER/Desktop/python/script.py' /dev/ttyUSB0 | filter.pl >> output.csv
[1+] Stopped
Filter.pl: command not found
sed 터미널 사용:
python3 'home/#USER/Desktop/python/script.py' /dev/ttyUSB0 | sed -n "s/^b';[0-9]\{3\}];\(.\+\);'/\1/p" >> output_file
결과: 플래시 유형의 것 뒤에 CTRL+C가 옵니다.
Exception ignored in: <_io.TextIOrapper name='<stdout>' mode='w' ecpmdomg='UTF-8'>
BrokenPipeError:[Errno32] Broken pipe.
참고: 저는 더 긴 부분만 원합니다. 거기에서 더 자세히 분석한 다음 AT&T M2X 온라인 서비스로 보내겠습니다.
b';"Anything from 1-9999";'
b';311;'
b';312;'
b';312;00000000;036;552;1014f49;3020;2659;6294;1049;2659;S;'
b';313;'
script.py:
#! / Usr / bin / python
# Coding: UTF-8
################################################## #########################
# (C) Tokyo Cosmos Electric, Inc. (TOCOS) - all rights reserved.
# Terms and Conditions:
# - This source code, as long as the Tokyo Cosmos Electric Co., Ltd. copyright separately there is no source code license description
# We will not hold.
# - This source code is no warranty-free support. Any damage which the present source code and product
Tokyo Cosmos Electric Co., Ltd. does not guarantee also #. Report of trouble etc. will be welcome.
# - This source code is published on the premise that run with TWE series Tokyo Cosmos Electric Co., Ltd. to sell
# doing.
################################################## #########################
### Script to read the TWE-Lite standard application
# ? this script is read-only, to do the reading and writing both will require processing by multiple threads.
from serial import *
from sys import stdout, stdin, stderr, exit
Confirmation of # parameter
# First argument: serial port name
! if len (sys.argv) = 2:
print "% s {serial port name}"% sys.argv [0]
exit (1)
# Open a serial port
try:
ser = Serial (sys.argv [1], 115200)
print "open serial port:% s"% sys.argv [1]
except:
print "can not open serial port:% s"% sys.argv [1]
exit (1)
# Display of other messages (output the payload)
def printPayload (l):
if len (l) <3: return False # check of data size
print "command = 0x% 02x (other)"% l [1]
print "src = 0x% 02x"% l [0]
# Directly outputs the payload
print "payload =",
for c in l [2:]:
print "% 02x"% c,
print "(hex)"
return True
# 0x81 interpretation of the message
def printPayload_0x81 (l):
if len (l) = 23:! return False # check of data size
ladr = l [5] << 24 | l [6] << 16 | l [7] << 8 | l [8]
print "command = 0x% 02x (data arrival)"% l [1]
print "src = 0x% 02x"% l [0]
print "src long = 0x% 08x"% ladr
print "dst = 0x% 02x"% l [9]
print "pktid = 0x% 02x"% l [2]
print "prtcl ver = 0x% 02x"% l [3]
print "LQI =% d /% .2f [dbm]"% (l [4], (7 * l [4] -1970) / 20.)
ts = l [10] << 8 | l [11]
print "time stmp =% .3f [s]"% (ts / 64.0)
print "relay flg =% d"% l [12]
vlt = l [13] << 8 | l [14]
print "volt =% 04d [mV]"% vlt
Data of # DI1..4
dibm = l [16]
dibm_chg = l [17]
di = {} # of the current state
di_chg = {} # 1 Once in Lo (1) at least once
for i in range (1,5):
di [i] = 0 if (dibm & 0x1) == 0 else 1
di_chg [i] = 0 if (dibm_chg & 0x1) == 0 else 1
dibm >> = 1
dibm_chg >> = 1
pass
print "DI1 =% d /% d DI2 =% d /% d DI3 =% d /% d DI4 =% d /% d"% (di [1], di_chg [1], di [2], di_chg [ 2], di [3], di_chg [3], di [4], di_chg [4])
Data of # AD1..4
ad = {}
er = l [22]
for i in range (1,5):
av = l [i + 18 - 1]
if av == 0xFF:
# AD and if the port is unused treatment (approximately 2V or more) -1
ad [i] = -1
else:
# Calculation, including the correction bits
ad [i] = ((av * 4) + (er & 0x3)) * 4
er >> = 2
print "AD1 =% 04d AD2 =% 04d AD3 =% 04d AD4 =% 04d [mV]"% (ad [1], ad [2], ad [3], ad [4])
return True
# Interpret the data line by line
while True:
line = ser.readline (). rstrip () # to read in one line units, and remove the trailing line feed code (blocking read)
if len (line)> 0 and line [0] == ':':
print "\ n% s"% line
else:
continue
try:
lst = map (ord, line [1:]. decode ('hex')) # convert the HEX string after decoding the string, to each ord and () the list
csum = sum (lst) & 0xff # checksum if 0 by adding a total of 8bit calculation OK
lst.pop () remove the # checksum from the list
if csum == 0:
if lst [1] == 0x81:
printPayload_0x81 (lst) # reception of IO-related data
else:
printPayload (lst) # Other data reception
else:
print "checksum ng"
except:
print "skip" # when an error
답변1
첫 번째 샘플에서는
Filter.pl: command not found
쉘이 filter.pl 스크립트를 찾지 못했습니다.
스크립트의 전체 경로를 지정해야 합니다. 현재 디렉토리에 있으면 ./filter.pl
다음과 같이 사용하십시오.
/home/USER/Desktop/python/script.py /dev/ttyUSB0 | ./filter.pl >> output.csv
답변2
이게 생각보다 쉬운 것 같아요. 이는 비슷한 문제를 겪고 있는 사람들을 돕기 위한 것입니다.
터미널에 쓰기
python3 -u ./file.py | tee ./output.file
현재 BASH를 사용하여 추가하고 필터링하는 방법을 연구 중입니다.
이것이 누군가에게 도움이 되기를 바랍니다.
편집: 첨부된 내용은 "a"와 함께 작동합니다. tee 뒤에 -a를 추가하세요.
python3 -u ./file.py | tee -a ./output.file