线程任务管理
线程级任务(可以让在调度器中的任务在此提交任务的功能,并且不用管理生命循环)¶
- 警告:
!!!该功能只支持CPU密集型线性任务!!!
- 功能说明:
main_task中前三位接受参数必须为share_info, _sharedtaskdict, task_signal_transmission(
如果开启了该功能,正常任务也可以使用,只需要不传入前面所说的三个参数)
@wait_branch_thread_ended必须放在main_task上面,防止主线程结束,分支线程还没运行完导致错误
other_task为需要运行的分支线程,上面必须添加@branch_thread_control装饰器来控制和监视
@branch_thread_control装饰器接收参数share_info, _sharedtaskdict, timeout_processing, task_name
task_name必须是唯一不重复的,用于获取其他分支线程的task_id(使用_sharedtaskdict.read(task_name)
获取task_id去终止,暂停或恢复)名字将按照main_task_name|task_name显示
使用threading.Thread语句必须添加daemon=True将线程设置为守护线程(
没有添加会让关闭操作时间增加,反正主线程结束,会强制终止所有分支线程)
所有的分支线程都可以在网页端查看到运行状态(开启网页端请使用start_task_status_ui())
- 控制方式
这里提供两个控制函数:
在主线程内使用task_signal_transmission[_sharedtaskdict.read(task_name)] = ["action"] action可以填写为kill, pause,
resume, 也可以按顺序填写几个操作
在主线程外部可以使用网页控制端
- 使用示例:
import threading
import time
from task_scheduling.construct import wait_branch_thread_ended, branch_thread_control
@wait_branch_thread_ended
def main_task(share_info, sharedtaskdict, task_signal_transmission, input_info):
task_name = "other_task"
timeout_processing = True
@branch_thread_control(share_info, sharedtaskdict, timeout_processing, task_name)
def other_task(input_info):
while True:
time.sleep(1)
print(input_info)
threading.Thread(target=other_task, args=(input_info,), daemon=True).start()
# Use this statement to terminate the branch thread
# time.sleep(4)
# task_signal_transmission[sharedtaskdict.read(task_name)] = ["kill"]
if __name__ == "__main__":
from task_scheduling.task_creation import task_creation
from task_scheduling.manager import task_scheduler
from task_scheduling.server_webui import start_task_status_ui
from task_scheduling.variable import *
start_task_status_ui()
task_id1 = task_creation(
None, None, FUNCTION_TYPE_CPU, True, "linear_task",
main_task, priority_low, "test")
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
task_scheduler.shutdown_scheduler()
def main_task(share_info, sharedtaskdict, task_signal_transmission, input_info):
task_name = "other_task"
timeout_processing = True
import time
import threading
from task_scheduling.construct import wait_branch_thread_ended, branch_thread_control
@wait_branch_thread_ended
def main():
@branch_thread_control(share_info, sharedtaskdict, timeout_processing, task_name)
def other_task(input_info):
while True:
time.sleep(1)
print(input_info)
threading.Thread(target=other_task, args=(input_info,), daemon=True).start()
# Use this statement to terminate the branch thread
# time.sleep(4)
# task_signal_transmission[sharedtaskdict.read(task_name)] = ["kill"]
main()
if __name__ == "__main__":
from task_scheduling.client import submit_task
from task_scheduling.variable import *
task_id1 = submit_task(
None, None, FUNCTION_TYPE_CPU, True, "linear_task",
main_task, priority_low, "test")