Ich habe ein Arduino an meinen Computer angeschlossen, das eine Schleife ausführt und alle 100 ms einen Wert über den seriellen Port an den Computer zurücksendet.
Ich möchte ein Python-Skript erstellen, das nur alle paar Sekunden von der seriellen Schnittstelle gelesen wird. Ich möchte also, dass nur das letzte vom Arduino gesendete Objekt angezeigt wird.
Wie machst du das in Pyserial?
Hier ist der Code, den ich ausprobiert habe, der nicht funktioniert. Es liest die Zeilen nacheinander.
import serial
import time
ser = serial.Serial('com4',9600,timeout=1)
while 1:
time.sleep(10)
print ser.readline() #How do I get the most recent line sent from the device?
Vielleicht missverstehe ich Ihre Frage falsch, aber da es sich um eine serielle Leitung handelt, müssen Sie alles, was vom Arduino gesendet wird, nacheinander lesen - es wird im Arduino gepuffert, bis Sie es lesen.
Wenn Sie eine Statusanzeige haben möchten, die das zuletzt gesendete Objekt anzeigt, verwenden Sie einen Thread, der den Code in Ihre Frage enthält (ohne den Ruhezustand), und behalten Sie die letzte vollständige Zeile, die als letzte Zeile des Arduino gelesen wird.
Update: mtasic
s Beispielcode ist ziemlich gut, aber wenn der Arduino beim Aufruf von inWaiting()
eine Teillinie gesendet hat, wird eine abgeschnittene Zeile angezeigt. Stattdessen möchten Sie die letzte complete -Zeile in last_received
setzen und die Teillinie in buffer
belassen, damit sie an die nächste Runde der Schleife angehängt werden kann. Etwas wie das:
def receiving(ser):
global last_received
buffer_string = ''
while True:
buffer_string = buffer_string + ser.read(ser.inWaiting())
if '\n' in buffer_string:
lines = buffer_string.split('\n') # Guaranteed to have at least 2 entries
last_received = lines[-2]
#If the Arduino sends lots of empty lines, you'll lose the
#last filled line, so you could make the above statement conditional
#like so: if lines[-2]: last_received = lines[-2]
buffer_string = lines[-1]
Zur Verwendung von readline()
: Hier ist, was die Pyserial-Dokumentation zu sagen hat (aus Gründen der Übersichtlichkeit leicht editiert und mit readlines () erwähnt):
Seien Sie vorsichtig, wenn Sie "readline" verwenden. Tun Geben Sie beim Öffnen von .__ ein Timeout an. serielle Schnittstelle, sonst kann .__ blockiert werden. für immer, wenn kein Newline-Zeichen .__ ist. empfangen. Beachten Sie auch, dass "readlines ()" funktioniert nur mit einem Timeout. Es hängt von einem Timeout ab und interpretiert das als EOF (Ende der Datei).
das scheint mir ganz vernünftig!
from serial import *
from threading import Thread
last_received = ''
def receiving(ser):
global last_received
buffer = ''
while True:
# last_received = ser.readline()
buffer += ser.read(ser.inWaiting())
if '\n' in buffer:
last_received, buffer = buffer.split('\n')[-2:]
if __== '__main__':
ser = Serial(
port=None,
baudrate=9600,
bytesize=EIGHTBITS,
parity=PARITY_NONE,
stopbits=STOPBITS_ONE,
timeout=0.1,
xonxoff=0,
rtscts=0,
interCharTimeout=None
)
Thread(target=receiving, args=(ser,)).start()
Diese Lösungen belasten die CPU, während sie auf Zeichen warten.
Sie sollten mindestens einen blockierenden Aufruf zum Lesen durchführen (1).
while True:
if '\n' in buffer:
pass # skip if a line already in buffer
else:
buffer += ser.read(1) # this will block until one more char or timeout
buffer += ser.read(ser.inWaiting()) # get remaining buffered chars
... und mach das Split-Ding wie zuvor.
Sie können ser.flushInput()
verwenden, um alle seriellen Daten zu löschen, die sich aktuell im Puffer befinden.
Nachdem Sie die alten Daten gelöscht haben, können Sie ser.readline () verwenden, um die neuesten Daten vom seriellen Gerät abzurufen.
Ich denke, es ist etwas einfacher als die anderen hier vorgeschlagenen Lösungen. Hat für mich gearbeitet, hoffe es ist für Sie geeignet.
Mit dieser Methode können Sie das Zeitlimit für das Erfassen aller Daten für jede Leitung und ein anderes Zeitlimit für das Warten auf zusätzliche Leitungen separat steuern.
# get the last line from serial port
lines = serial_com()
lines[-1]
def serial_com():
'''Serial communications: get a response'''
# open serial port
try:
serial_port = serial.Serial(com_port, baudrate=115200, timeout=1)
except serial.SerialException as e:
print("could not open serial port '{}': {}".format(com_port, e))
# read response from serial port
lines = []
while True:
line = serial_port.readline()
lines.append(line.decode('utf-8').rstrip())
# wait for new data after each line
timeout = time.time() + 0.1
while not serial_port.inWaiting() and timeout > time.time():
pass
if not serial_port.inWaiting():
break
#close the serial port
serial_port.close()
return lines
Die Verwendung von .inWaiting()
in einer Endlosschleife kann problematisch sein. Abhängig von der Implementierung kann es die gesamte CPU belasten. Stattdessen würde ich empfehlen, eine bestimmte Datengröße zum Lesen zu verwenden. In diesem Fall sollten Sie zum Beispiel Folgendes tun:
ser.read(1024)
Sie benötigen eine Schleife, um alles Gesendete zu lesen, wobei der letzte Aufruf von readline () bis zum Timeout blockiert wird. So:
def readLastLine(ser):
last_data=''
while True:
data=ser.readline()
if data!='':
last_data=data
else:
return last_data
Leichte Änderung des Codes von mtasic & Vinay Sajip:
Während ich diesen Code für eine ähnliche Anwendung sehr hilfreich fand, benötigte ich all der Zeilen, die von einem seriellen Gerät zurückkamen, das regelmäßig Informationen sendete.
Ich entschied mich dafür, das erste Element von oben zu platzieren, aufzuzeichnen und die restlichen Elemente dann als neuen Puffer wieder zusammenzufügen und von dort fortzufahren.
Mir ist klar, dass dies nicht nach was Greg gefragt hat, aber ich dachte, es lohnt sich als Randnotiz.
def receiving(ser):
global last_received
buffer = ''
while True:
buffer = buffer + ser.read(ser.inWaiting())
if '\n' in buffer:
lines = buffer.split('\n')
last_received = lines.pop(0)
buffer = '\n'.join(lines)
Zu viele Komplikationen
Was ist der Grund, das Byte-Objekt durch Zeilenumbrüche oder andere Array-Manipulationen aufzuteilen? Ich schreibe die einfachste Methode, die Ihr Problem lösen wird:
import serial
s = serial.Serial(31)
s.write(bytes("ATI\r\n", "utf-8"));
while True:
last = ''
for byte in s.read(s.inWaiting()): last += chr(byte)
if len(last) > 0:
# Do whatever you want with last
print (bytes(last, "utf-8"))
last = ''
Hier ein Beispiel mit einem Wrapper, mit dem Sie die letzte Zeile ohne 100% CPU lesen können
class ReadLine:
"""
pyserial object wrapper for reading line
source: https://github.com/pyserial/pyserial/issues/216
"""
def __init__(self, s):
self.buf = bytearray()
self.s = s
def readline(self):
i = self.buf.find(b"\n")
if i >= 0:
r = self.buf[:i + 1]
self.buf = self.buf[i + 1:]
return r
while True:
i = max(1, min(2048, self.s.in_waiting))
data = self.s.read(i)
i = data.find(b"\n")
if i >= 0:
r = self.buf + data[:i + 1]
self.buf[0:] = data[i + 1:]
return r
else:
self.buf.extend(data)
s = serial.Serial('/dev/ttyS0')
device = ReadLine(s)
while True:
print(device.readline())