wake-up-neo.com

pyserial - Lesen der letzten von einem seriellen Gerät gesendeten Zeile

Ich habe ein Arduino an meinen Computer angeschlossen, das eine Schleife ausführt und alle 100 ms einen Wert über den seriellen Port an den Computer zurücksendet. 

Ich möchte ein Python-Skript erstellen, das nur alle paar Sekunden von der seriellen Schnittstelle gelesen wird. Ich möchte also, dass nur das letzte vom Arduino gesendete Objekt angezeigt wird.

Wie machst du das in Pyserial?

Hier ist der Code, den ich ausprobiert habe, der nicht funktioniert. Es liest die Zeilen nacheinander.

import serial
import time

ser = serial.Serial('com4',9600,timeout=1)
while 1:
    time.sleep(10)
    print ser.readline() #How do I get the most recent line sent from the device?
18
Greg

Vielleicht missverstehe ich Ihre Frage falsch, aber da es sich um eine serielle Leitung handelt, müssen Sie alles, was vom Arduino gesendet wird, nacheinander lesen - es wird im Arduino gepuffert, bis Sie es lesen.

Wenn Sie eine Statusanzeige haben möchten, die das zuletzt gesendete Objekt anzeigt, verwenden Sie einen Thread, der den Code in Ihre Frage enthält (ohne den Ruhezustand), und behalten Sie die letzte vollständige Zeile, die als letzte Zeile des Arduino gelesen wird.

Update: mtasics Beispielcode ist ziemlich gut, aber wenn der Arduino beim Aufruf von inWaiting() eine Teillinie gesendet hat, wird eine abgeschnittene Zeile angezeigt. Stattdessen möchten Sie die letzte complete -Zeile in last_received setzen und die Teillinie in buffer belassen, damit sie an die nächste Runde der Schleife angehängt werden kann. Etwas wie das:

def receiving(ser):
    global last_received

    buffer_string = ''
    while True:
        buffer_string = buffer_string + ser.read(ser.inWaiting())
        if '\n' in buffer_string:
            lines = buffer_string.split('\n') # Guaranteed to have at least 2 entries
            last_received = lines[-2]
            #If the Arduino sends lots of empty lines, you'll lose the
            #last filled line, so you could make the above statement conditional
            #like so: if lines[-2]: last_received = lines[-2]
            buffer_string = lines[-1]

Zur Verwendung von readline(): Hier ist, was die Pyserial-Dokumentation zu sagen hat (aus Gründen der Übersichtlichkeit leicht editiert und mit readlines () erwähnt):

Seien Sie vorsichtig, wenn Sie "readline" verwenden. Tun Geben Sie beim Öffnen von .__ ein Timeout an. serielle Schnittstelle, sonst kann .__ blockiert werden. für immer, wenn kein Newline-Zeichen .__ ist. empfangen. Beachten Sie auch, dass "readlines ()" funktioniert nur mit einem Timeout. Es hängt von einem Timeout ab und interpretiert das als EOF (Ende der Datei).

das scheint mir ganz vernünftig!

28
Vinay Sajip
from serial import *
from threading import Thread

last_received = ''

def receiving(ser):
    global last_received
    buffer = ''

    while True:
        # last_received = ser.readline()
        buffer += ser.read(ser.inWaiting())
        if '\n' in buffer:
            last_received, buffer = buffer.split('\n')[-2:]

if __==  '__main__':
    ser = Serial(
        port=None,
        baudrate=9600,
        bytesize=EIGHTBITS,
        parity=PARITY_NONE,
        stopbits=STOPBITS_ONE,
        timeout=0.1,
        xonxoff=0,
        rtscts=0,
        interCharTimeout=None
    )

    Thread(target=receiving, args=(ser,)).start()
11
mtasic85

Diese Lösungen belasten die CPU, während sie auf Zeichen warten.

Sie sollten mindestens einen blockierenden Aufruf zum Lesen durchführen (1).

while True:
    if '\n' in buffer: 
        pass # skip if a line already in buffer
    else:
        buffer += ser.read(1)  # this will block until one more char or timeout
    buffer += ser.read(ser.inWaiting()) # get remaining buffered chars

... und mach das Split-Ding wie zuvor.

7
Rufus

Sie können ser.flushInput() verwenden, um alle seriellen Daten zu löschen, die sich aktuell im Puffer befinden.

Nachdem Sie die alten Daten gelöscht haben, können Sie ser.readline () verwenden, um die neuesten Daten vom seriellen Gerät abzurufen.

Ich denke, es ist etwas einfacher als die anderen hier vorgeschlagenen Lösungen. Hat für mich gearbeitet, hoffe es ist für Sie geeignet.

5
user3524946

Mit dieser Methode können Sie das Zeitlimit für das Erfassen aller Daten für jede Leitung und ein anderes Zeitlimit für das Warten auf zusätzliche Leitungen separat steuern.

# get the last line from serial port
lines = serial_com()
lines[-1]              

def serial_com():
    '''Serial communications: get a response'''

    # open serial port
    try:
        serial_port = serial.Serial(com_port, baudrate=115200, timeout=1)
    except serial.SerialException as e:
        print("could not open serial port '{}': {}".format(com_port, e))

    # read response from serial port
    lines = []
    while True:
        line = serial_port.readline()
        lines.append(line.decode('utf-8').rstrip())

        # wait for new data after each line
        timeout = time.time() + 0.1
        while not serial_port.inWaiting() and timeout > time.time():
            pass
        if not serial_port.inWaiting():
            break 

    #close the serial port
    serial_port.close()   
    return lines
3
fja0568

Die Verwendung von .inWaiting() in einer Endlosschleife kann problematisch sein. Abhängig von der Implementierung kann es die gesamte CPU belasten. Stattdessen würde ich empfehlen, eine bestimmte Datengröße zum Lesen zu verwenden. In diesem Fall sollten Sie zum Beispiel Folgendes tun: 

ser.read(1024)
2
Srinath

Sie benötigen eine Schleife, um alles Gesendete zu lesen, wobei der letzte Aufruf von readline () bis zum Timeout blockiert wird. So:

def readLastLine(ser):
    last_data=''
    while True:
        data=ser.readline()
        if data!='':
            last_data=data
        else:
            return last_data
2
quamrana

Leichte Änderung des Codes von mtasic & Vinay Sajip:

Während ich diesen Code für eine ähnliche Anwendung sehr hilfreich fand, benötigte ich all der Zeilen, die von einem seriellen Gerät zurückkamen, das regelmäßig Informationen sendete.

Ich entschied mich dafür, das erste Element von oben zu platzieren, aufzuzeichnen und die restlichen Elemente dann als neuen Puffer wieder zusammenzufügen und von dort fortzufahren.

Mir ist klar, dass dies nicht nach was Greg gefragt hat, aber ich dachte, es lohnt sich als Randnotiz.

def receiving(ser):
    global last_received

    buffer = ''
    while True:
        buffer = buffer + ser.read(ser.inWaiting())
        if '\n' in buffer:
            lines = buffer.split('\n')
            last_received = lines.pop(0)

            buffer = '\n'.join(lines)
2

Zu viele Komplikationen

Was ist der Grund, das Byte-Objekt durch Zeilenumbrüche oder andere Array-Manipulationen aufzuteilen? Ich schreibe die einfachste Methode, die Ihr Problem lösen wird:

import serial
s = serial.Serial(31)
s.write(bytes("ATI\r\n", "utf-8"));
while True:
    last = ''
    for byte in s.read(s.inWaiting()): last += chr(byte)
    if len(last) > 0:
        # Do whatever you want with last
        print (bytes(last, "utf-8"))
        last = ''
2
LXSoft

Hier ein Beispiel mit einem Wrapper, mit dem Sie die letzte Zeile ohne 100% CPU lesen können

class ReadLine:
    """
    pyserial object wrapper for reading line
    source: https://github.com/pyserial/pyserial/issues/216
    """
    def __init__(self, s):
        self.buf = bytearray()
        self.s = s

    def readline(self):
        i = self.buf.find(b"\n")
        if i >= 0:
            r = self.buf[:i + 1]
            self.buf = self.buf[i + 1:]
            return r
        while True:
            i = max(1, min(2048, self.s.in_waiting))
            data = self.s.read(i)
            i = data.find(b"\n")
            if i >= 0:
                r = self.buf + data[:i + 1]
                self.buf[0:] = data[i + 1:]
                return r
            else:
                self.buf.extend(data)

s = serial.Serial('/dev/ttyS0')
device = ReadLine(s)
while True:
    print(device.readline())
0
bdoubleu