wake-up-neo.com

Planen Sie ein wiederkehrendes Ereignis in Python 3

Ich versuche, ein wiederkehrendes Ereignis so zu planen, dass es in Python 3 jede Minute ausgeführt wird.

Ich habe Klasse sched.scheduler gesehen, aber ich frage mich, ob es einen anderen Weg gibt, dies zu tun. Ich habe gehört, dass ich mehrere Threads dafür verwenden könnte, was mir aber nichts ausmacht.

Im Grunde fordere ich etwas JSON an und parse es dann; sein Wert ändert sich mit der Zeit.

Um sched.scheduler verwenden zu können, muss eine Schleife erstellt werden, um den Even für eine Stunde einzuplanen:

scheduler = sched.scheduler(time.time, time.sleep)

# Schedule the event. THIS IS UGLY!
for i in range(60):
    scheduler.enter(3600 * i, 1, query_rate_limit, ())

scheduler.run()

Welche anderen Möglichkeiten gibt es?

27
Humphrey Bogart

Sie könnten threading.Timer verwenden, dies plant jedoch auch ein einmaliges Ereignis, ähnlich der .enter-Methode von Scheduler-Objekten.

Das normale Muster (in einer beliebigen Sprache) zum Umwandeln eines einmaligen Schedulers in einen periodischen Scheduler besteht darin, dass jedes Ereignis im angegebenen Intervall selbst neu geplant wird. Zum Beispiel würde ich mit sched keine Schleife wie Sie verwenden, sondern eher etwas wie:

def periodic(scheduler, interval, action, actionargs=()):
    scheduler.enter(interval, 1, periodic,
                    (scheduler, interval, action, actionargs))
    action(*actionargs)

und initiieren Sie den gesamten "für immer periodischen Zeitplan" mit einem Anruf

periodic(scheduler, 3600, query_rate_limit)

Oder ich könnte threading.Timer anstelle von scheduler.enter verwenden, aber das Muster ist ziemlich ähnlich.

Wenn Sie eine verfeinerte Variante benötigen (z. B. die periodische Neuplanung zu einem bestimmten Zeitpunkt oder unter bestimmten Bedingungen anhalten), ist dies nicht zu schwer, um einige zusätzliche Parameter zu berücksichtigen.

36
Alex Martelli

Meine bescheidene Sicht auf das Thema:

from threading import Timer

class RepeatedTimer(object):
    def __init__(self, interval, function, *args, **kwargs):
        self._timer     = None
        self.function   = function
        self.interval   = interval
        self.args       = args
        self.kwargs     = kwargs
        self.is_running = False
        self.start()

    def _run(self):
        self.is_running = False
        self.start()
        self.function(*self.args, **self.kwargs)

    def start(self):
        if not self.is_running:
            self._timer = Timer(self.interval, self._run)
            self._timer.start()
            self.is_running = True

    def stop(self):
        self._timer.cancel()
        self.is_running = False

Verwendungszweck:

from time import sleep

def hello(name):
    print "Hello %s!" % name

print "starting..."
rt = RepeatedTimer(1, hello, "World") # it auto-starts, no need of rt.start()
try:
    sleep(5) # your long-running job goes here...
finally:
    rt.stop() # better in a try/finally block to make sure the program ends!

Eigenschaften:

  • Nur Standardbibliothek, keine externen Abhängigkeiten
  • Verwendet das von Alex Martnelli vorgeschlagene Muster
  • start() und stop() können mehrmals sicher aufgerufen werden, auch wenn der Timer bereits gestartet/gestoppt wurde
  • die aufzurufende Funktion kann positionelle und benannte Argumente haben
  • Sie können interval jederzeit ändern, sie wird nach dem nächsten Lauf wirksam. Gleiches für args, kwargs und sogar function!
18
MestreLion

Sie könnten schedule verwenden. Es funktioniert auf Python 2.7 und 3.3 und ist ziemlich leicht:

import schedule
import time

def job():
   print("I'm working...")

schedule.every(10).minutes.do(job)
schedule.every().hour.do(job)
schedule.every().day.at("10:30").do(job)

while 1:
   schedule.run_pending()
   time.sleep(1)
13
dbader

Basierend auf der Antwort von MestreLion wird ein kleines Problem mit Multithreading gelöst: 

from threading import Timer, Lock


class Periodic(object):
    """
    A periodic task running in threading.Timers
    """

    def __init__(self, interval, function, *args, **kwargs):
        self._lock = Lock()
        self._timer = None
        self.function = function
        self.interval = interval
        self.args = args
        self.kwargs = kwargs
        self._stopped = True
        if kwargs.pop('autostart', True):
            self.start()

    def start(self, from_run=False):
        self._lock.acquire()
        if from_run or self._stopped:
            self._stopped = False
            self._timer = Timer(self.interval, self._run)
            self._timer.start()
            self._lock.release()

    def _run(self):
        self.start(from_run=True)
        self.function(*self.args, **self.kwargs)

    def stop(self):
        self._lock.acquire()
        self._stopped = True
        self._timer.cancel()
        self._lock.release()
6
fdb

Sie können den Advanced Python Scheduler verwenden. Es hat sogar eine cronartige Schnittstelle.

6
jordixou

Verwenden Sie Sellerie .

from celery.task import PeriodicTask
from datetime import timedelta


class ProcessClicksTask(PeriodicTask):
    run_every = timedelta(minutes=30)

    def run(self, **kwargs):
        #do something
5
user

Hier ist eine schnelle und schmutzige, nicht blockierende Schleife mit Thread:

#!/usr/bin/env python3
import threading,time

def worker():
    print(time.time())
    time.sleep(5)
    t = threading.Thread(target=worker)
    t.start()


threads = []
t = threading.Thread(target=worker)
threads.append(t)
t.start()
time.sleep(7)
print("Hello World")

Es ist nichts Besonderes, die worker erzeugt mit Verzögerung einen neuen Thread von sich. Vielleicht nicht besonders effizient, aber einfach genug. Antwort von northtree wäre der Weg zu gehen, wenn Sie eine anspruchsvollere Lösung benötigen.

Und basierend auf this können wir dasselbe tun, nur mit Timer:

#!/usr/bin/env python3
import threading,time

def hello():
    t = threading.Timer(10.0, hello)
    t.start()
    print( "hello, world",time.time() )

t = threading.Timer(10.0, hello)
t.start()
time.sleep(12)
print("Oh,hai",time.time())
time.sleep(4)
print("How's it going?",time.time())
1

Basierend auf der Antwort von Alex Martelli habe ich decorator version implementiert, die einfacher zu integrieren ist.

import sched
import time
import datetime
from functools import wraps
from threading import Thread


def async(func):
    @wraps(func)
    def async_func(*args, **kwargs):
        func_hl = Thread(target=func, args=args, kwargs=kwargs)
        func_hl.start()
        return func_hl
    return async_func


def schedule(interval):
    def decorator(func):
        def periodic(scheduler, interval, action, actionargs=()):
            scheduler.enter(interval, 1, periodic,
                            (scheduler, interval, action, actionargs))
            action(*actionargs)

        @wraps(func)
        def wrap(*args, **kwargs):
            scheduler = sched.scheduler(time.time, time.sleep)
            periodic(scheduler, interval, func)
            scheduler.run()
        return wrap
    return decorator


@async
@schedule(1)
def periodic_event():
    print(datetime.datetime.now())


if __== '__main__':
    print('start')
    periodic_event()
    print('end')
1
northtree

Siehe mein Beispiel

import sched, time

def myTask(m,n):
  print n+' '+m

def periodic_queue(interval,func,args=(),priority=1):
  s = sched.scheduler(time.time, time.sleep)
  periodic_task(s,interval,func,args,priority)
  s.run()

def periodic_task(scheduler,interval,func,args,priority):
  func(*args)
  scheduler.enter(interval,priority,periodic_task,
                   (scheduler,interval,func,args,priority))

periodic_queue(1,myTask,('world','hello'))
0
Vladimir Avdeev