任务重试
重试装饰器(使用该装饰器后,任务触发对应报错后会尝试重新运行多次)¶
retry_on_error(exceptions: Union[Type[Exception], Tuple[Type[Exception], ...], None], max_attempts: int, delay: Union[float, int]) -> Any
- 参数说明:
exceptions: 当什么错误类型发生才开始重试
max_attempts: 最大尝试次数
delay: 每次重试的间隔时间
- 使用示例:
import time
from task_scheduling.construct import retry_on_error
@retry_on_error(exceptions=TypeError, max_attempts=3, delay=1.0)
def linear_task(input_info):
while True:
print(input_info)
time.sleep(input_info)
from task_scheduling.common import set_log_level
set_log_level("DEBUG")
if __name__ == "__main__":
from task_scheduling.task_creation import task_creation
from task_scheduling.manager import task_scheduler
from task_scheduling.variable import *
task_creation(
None, None, FUNCTION_TYPE_CPU, True, "task1",
linear_task, priority_low, "test"
)
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
task_scheduler.shutdown_scheduler()
def linear_task(input_info):
import time
from task_scheduling.construct import retry_on_error
@retry_on_error(exceptions=Exception, max_attempts=3, delay=1.0)
def main():
while True:
print(input_info)
time.sleep(input_info)
main()
if __name__ == "__main__":
from task_scheduling.client import submit_task
from task_scheduling.variable import *
submit_task(
None, None, FUNCTION_TYPE_CPU, True, "task1",
linear_task, priority_low, "test"
)