Skip to content

Commit

Permalink
update cron lite to 1.1, bugfix and support timezone
Browse files Browse the repository at this point in the history
  • Loading branch information
BaozhenChen committed Jul 25, 2023
1 parent a748fbf commit 4ee6421
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 38 deletions.
6 changes: 4 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
# cron_lite
A very light library to run python functions like cron jobs do. (support cron expressions, decorator style, spawn running and graceful exit. Runs in python service like Apscheduler, no effect of system config)
A very light library to run python functions like cron jobs do. (support cron expressions with timezone, decorator style, spawn running and graceful exit. Runs in python service like Apscheduler, no effect of system config)


### Example

```python
from cron_lite import cron_task, start_all, stop_all
from cron_lite import cron_task, start_all, stop_all, set_time_zone
import time

set_time_zone("Asia/Shanghai")


@cron_task("* * * * * 0/2")
def event1():
Expand Down
87 changes: 55 additions & 32 deletions cron_lite.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,32 @@
#!/usr/bin/env python3.6
# coding: utf-8
from functools import wraps
from croniter import croniter
from typing import Optional, Dict, Callable
from datetime import datetime
import sched
import threading
import time
import traceback
from typing import Optional, Dict, Callable

from croniter import croniter
import threading
import pytz

scheduler_map: Dict[Callable, sched.scheduler] = {}
_switch = False
_error_log_handler = print
_info_log_handler = print
_error_handler = print
_info_handler = print
_time_zone: Optional[pytz.BaseTzInfo] = None


def set_time_zone(time_zone_name: str):
global _time_zone
_time_zone = pytz.timezone(time_zone_name)


def _reigster_next(base_func, cron_expr, till_time_stamp):
next_time = int(croniter(cron_expr).get_next())
def _register_next(base_func, cron_expr, till_time_stamp):
cron_obj = croniter(cron_expr)
if _time_zone:
cron_obj.set_current(datetime.now(tz=_time_zone))
next_time = int(cron_obj.get_next())
if scheduler_map.get(base_func) is None:
scheduler_map[base_func] = sched.scheduler(time.time, time.sleep)
if till_time_stamp is None or next_time <= till_time_stamp:
Expand All @@ -30,65 +41,77 @@ def _run_sched(scheduler: sched.scheduler):
t = scheduler.run(False)
if t is None:
return
time.sleep(t)
st = time.time()
while time.time() - st < t:
if not _switch:
scheduler.empty()
return
time.sleep(1)


def _start():
global _switch
_info_log_handler("cron started")
_info_handler("cron started")
tl = []
for scheduler in scheduler_map.values():
t = threading.Thread(target=_run_sched, args=(scheduler,))
for base_func, scheduler in scheduler_map.items():
print("Registering Job:", base_func.__name__)
t = threading.Thread(target=_run_sched, args=(scheduler, ), daemon=True)
# 有些task非常耗时,会影响退出。目前设计改为退出时不保证task完成
t.start()
tl.append(t)

for t in tl:
t.join()
_info_log_handler("cron finished")
_info_handler("cron finished")
_switch = False # ensure close when there are no more tasks with switch open
scheduler_map.clear()


def cron_task(cron_expr: str, till_time_stamp: int = None):
"""
cron_task decorator to register a function as crontab task
:param cron_expr: the croniter accepted cron_expression
:param cron_expr: the croniter accepted cron_expression. NOTICE: the default timezone is UTC and can be changed by
`set_time_zone`. The format is `min hour day month weekday [sec]`
:param till_time_stamp: run this jog till when. None means forever
:return: the real decorator
"""
assert len(cron_expr.split(" ")) in (5, 6), \
"only supported <min hour day year weekday> and <min hour day year weekday sec>"
"only supported <min hour day month weekday> and <min hour day month weekday sec>"

def deco(func):
@wraps(func)
def inner():
try:
func()
except:
_error_log_handler(f"run {func.__name__} failed\n" + traceback.format_exc())
_reigster_next(inner, cron_expr, till_time_stamp)

_reigster_next(inner, cron_expr, till_time_stamp)
except Exception:
try:
_error_handler(f"run {func.__name__} failed\n" + traceback.format_exc())
except Exception:
_error_handler(f"run {func.__name__} failed\n")
_register_next(inner, cron_expr, till_time_stamp)

_register_next(inner, cron_expr, till_time_stamp)
return inner

return deco


def start_all(spawn: bool = True, info_log_handler=None, error_log_handler=None) -> Optional[threading.Thread]:
def start_all(spawn: bool = True, info_handler=None, error_handler=None) -> Optional[threading.Thread]:
"""
start_all starts all cron tasks registered before.
:param spawn: whether to start a new thread for scheduler. If not, the action will block the current thread
:param info_log_handler: handle info output (scheduler start / stop), default = print, can use logging.info
:param error_log_handler: handle error output (task execute exception), default = print, can use logging.error
:param info_handler: handle info output (scheduler start / stop), default = print, can use logging.info
:param error_handler: handle error output (task execute exception), default = print, can use logging.error
:raise RuntimeError: if the tasks are already started and still running we cannot start again. The feature is not
concurrency-safe
concurrent-safe
:return: the new thread if spawn = True
"""
global _switch, _info_log_handler, _error_log_handler
global _switch, _info_handler, _error_handler
if _switch:
raise RuntimeError("the crontab was already started")
if info_log_handler:
_info_log_handler = info_log_handler
if error_log_handler:
_error_log_handler = error_log_handler
if info_handler:
_info_handler = info_handler
if error_handler:
_error_handler = error_handler

_switch = True
if spawn:
Expand All @@ -103,8 +126,8 @@ def start_all(spawn: bool = True, info_log_handler=None, error_log_handler=None)
def stop_all(wait_thread: Optional[threading.Thread] = None):
"""
stop_all turns off the switch to stop the scheduler. Running jobs will be wait till finished.
:param wait_thread: join() the spawned scheduler thread (if you started it as spawn and you wish) to ensure all jobs
to finish
:param wait_thread: join() the spawned scheduler thread (if you started it as spawn and you want) to ensure all jobs
to finish
:return:
"""
global _switch
Expand Down
6 changes: 3 additions & 3 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

if "upload" in argv:
print("running test")
assert system("python test_cron_lite.py") == 0
assert system("python3.6 test_cron_lite.py") == 0

this_directory = path.abspath(path.dirname(__file__))

Expand All @@ -18,13 +18,13 @@

setup(
name='cron-lite',
version='1.0',
version='1.1',
description='A very light library to run python functions like cron jobs do.',
author='Rainy Chan',
author_email='[email protected]',
url='https://github.com/rainydew/cron_lite',
py_modules=["cron_lite"],
install_requires=['croniter>=1.3.4'],
install_requires=['croniter>=1.3.4', 'pytz>=2022.7'],
keywords='cron task decorator schedule',
long_description=long_description,
python_requires=">=3.6"
Expand Down
2 changes: 1 addition & 1 deletion test_cron_lite.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ def event2():

th = start_all(spawn=True)
print("start")
time.sleep(60)
time.sleep(30)
print("stop")
stop_all(th)
print("done")
Expand Down

0 comments on commit 4ee6421

Please sign in to comment.