Ich versuche, ein wiederkehrendes Ereignis so zu planen, dass es in Python 3 jede Minute ausgeführt wird.
Ich habe Klasse sched.scheduler
gesehen, aber ich frage mich, ob es einen anderen Weg gibt, dies zu tun. Ich habe gehört, dass ich mehrere Threads dafür verwenden könnte, was mir aber nichts ausmacht.
Im Grunde fordere ich etwas JSON an und parse es dann; sein Wert ändert sich mit der Zeit.
Um sched.scheduler
verwenden zu können, muss eine Schleife erstellt werden, um den Even für eine Stunde einzuplanen:
scheduler = sched.scheduler(time.time, time.sleep)
# Schedule the event. THIS IS UGLY!
for i in range(60):
scheduler.enter(3600 * i, 1, query_rate_limit, ())
scheduler.run()
Welche anderen Möglichkeiten gibt es?
Sie könnten threading.Timer
verwenden, dies plant jedoch auch ein einmaliges Ereignis, ähnlich der .enter
-Methode von Scheduler-Objekten.
Das normale Muster (in einer beliebigen Sprache) zum Umwandeln eines einmaligen Schedulers in einen periodischen Scheduler besteht darin, dass jedes Ereignis im angegebenen Intervall selbst neu geplant wird. Zum Beispiel würde ich mit sched
keine Schleife wie Sie verwenden, sondern eher etwas wie:
def periodic(scheduler, interval, action, actionargs=()):
scheduler.enter(interval, 1, periodic,
(scheduler, interval, action, actionargs))
action(*actionargs)
und initiieren Sie den gesamten "für immer periodischen Zeitplan" mit einem Anruf
periodic(scheduler, 3600, query_rate_limit)
Oder ich könnte threading.Timer
anstelle von scheduler.enter
verwenden, aber das Muster ist ziemlich ähnlich.
Wenn Sie eine verfeinerte Variante benötigen (z. B. die periodische Neuplanung zu einem bestimmten Zeitpunkt oder unter bestimmten Bedingungen anhalten), ist dies nicht zu schwer, um einige zusätzliche Parameter zu berücksichtigen.
Meine bescheidene Sicht auf das Thema:
from threading import Timer
class RepeatedTimer(object):
def __init__(self, interval, function, *args, **kwargs):
self._timer = None
self.function = function
self.interval = interval
self.args = args
self.kwargs = kwargs
self.is_running = False
self.start()
def _run(self):
self.is_running = False
self.start()
self.function(*self.args, **self.kwargs)
def start(self):
if not self.is_running:
self._timer = Timer(self.interval, self._run)
self._timer.start()
self.is_running = True
def stop(self):
self._timer.cancel()
self.is_running = False
Verwendungszweck:
from time import sleep
def hello(name):
print "Hello %s!" % name
print "starting..."
rt = RepeatedTimer(1, hello, "World") # it auto-starts, no need of rt.start()
try:
sleep(5) # your long-running job goes here...
finally:
rt.stop() # better in a try/finally block to make sure the program ends!
Eigenschaften:
start()
und stop()
können mehrmals sicher aufgerufen werden, auch wenn der Timer bereits gestartet/gestoppt wurdeinterval
jederzeit ändern, sie wird nach dem nächsten Lauf wirksam. Gleiches für args
, kwargs
und sogar function
!Sie könnten schedule verwenden. Es funktioniert auf Python 2.7 und 3.3 und ist ziemlich leicht:
import schedule
import time
def job():
print("I'm working...")
schedule.every(10).minutes.do(job)
schedule.every().hour.do(job)
schedule.every().day.at("10:30").do(job)
while 1:
schedule.run_pending()
time.sleep(1)
Basierend auf der Antwort von MestreLion wird ein kleines Problem mit Multithreading gelöst:
from threading import Timer, Lock
class Periodic(object):
"""
A periodic task running in threading.Timers
"""
def __init__(self, interval, function, *args, **kwargs):
self._lock = Lock()
self._timer = None
self.function = function
self.interval = interval
self.args = args
self.kwargs = kwargs
self._stopped = True
if kwargs.pop('autostart', True):
self.start()
def start(self, from_run=False):
self._lock.acquire()
if from_run or self._stopped:
self._stopped = False
self._timer = Timer(self.interval, self._run)
self._timer.start()
self._lock.release()
def _run(self):
self.start(from_run=True)
self.function(*self.args, **self.kwargs)
def stop(self):
self._lock.acquire()
self._stopped = True
self._timer.cancel()
self._lock.release()
Sie können den Advanced Python Scheduler verwenden. Es hat sogar eine cronartige Schnittstelle.
Verwenden Sie Sellerie .
from celery.task import PeriodicTask
from datetime import timedelta
class ProcessClicksTask(PeriodicTask):
run_every = timedelta(minutes=30)
def run(self, **kwargs):
#do something
Hier ist eine schnelle und schmutzige, nicht blockierende Schleife mit Thread
:
#!/usr/bin/env python3
import threading,time
def worker():
print(time.time())
time.sleep(5)
t = threading.Thread(target=worker)
t.start()
threads = []
t = threading.Thread(target=worker)
threads.append(t)
t.start()
time.sleep(7)
print("Hello World")
Es ist nichts Besonderes, die worker
erzeugt mit Verzögerung einen neuen Thread von sich. Vielleicht nicht besonders effizient, aber einfach genug. Antwort von northtree wäre der Weg zu gehen, wenn Sie eine anspruchsvollere Lösung benötigen.
Und basierend auf this können wir dasselbe tun, nur mit Timer
:
#!/usr/bin/env python3
import threading,time
def hello():
t = threading.Timer(10.0, hello)
t.start()
print( "hello, world",time.time() )
t = threading.Timer(10.0, hello)
t.start()
time.sleep(12)
print("Oh,hai",time.time())
time.sleep(4)
print("How's it going?",time.time())
Basierend auf der Antwort von Alex Martelli habe ich decorator version implementiert, die einfacher zu integrieren ist.
import sched
import time
import datetime
from functools import wraps
from threading import Thread
def async(func):
@wraps(func)
def async_func(*args, **kwargs):
func_hl = Thread(target=func, args=args, kwargs=kwargs)
func_hl.start()
return func_hl
return async_func
def schedule(interval):
def decorator(func):
def periodic(scheduler, interval, action, actionargs=()):
scheduler.enter(interval, 1, periodic,
(scheduler, interval, action, actionargs))
action(*actionargs)
@wraps(func)
def wrap(*args, **kwargs):
scheduler = sched.scheduler(time.time, time.sleep)
periodic(scheduler, interval, func)
scheduler.run()
return wrap
return decorator
@async
@schedule(1)
def periodic_event():
print(datetime.datetime.now())
if __== '__main__':
print('start')
periodic_event()
print('end')
Siehe mein Beispiel
import sched, time
def myTask(m,n):
print n+' '+m
def periodic_queue(interval,func,args=(),priority=1):
s = sched.scheduler(time.time, time.sleep)
periodic_task(s,interval,func,args,priority)
s.run()
def periodic_task(scheduler,interval,func,args,priority):
func(*args)
scheduler.enter(interval,priority,periodic_task,
(scheduler,interval,func,args,priority))
periodic_queue(1,myTask,('world','hello'))