Datastar_Meetup/lcars/server/taskengine.go
2025-09-30 12:47:59 +02:00

119 lines
3.3 KiB
Go

package server
import (
"log"
"time"
)
// this function schedules the tasks and will be called periodically, see server.Start()
func (s *Server) interval() {
// read scheduled task list from stateDB
// check for next executable task:
// - if there is one or more tasks ready for execution then select one of them.
// - if there is a selected task, update its next execution field and execute the task
defer func() {
if r := recover(); r != nil {
// fmt.Println("Recovered from panic in worker:", r)
log.Printf("recovered from panic in taskengine: %v ", r)
}
}()
tasks, err := s.StateDB.ReadRecords("SELECT * FROM tasks ORDER by next_execution limit 1;")
if err != nil {
log.Printf("error in taskengine: %s ", err)
return
}
if len(tasks) < 1 {
log.Printf("error in taskengine: %s ", "found no task for execution")
return
}
task := tasks[0] // pick the one task with the smallest next execution time, see previous sql statement
task_name, haveTask := task["task_name"].(string)
if !haveTask {
log.Printf("error in taskengine: task %s is of wrong type", task["task_name"])
return
}
nextExecution := task["next_execution"].(int64)
startTime := task["start_time"].(int64)
execInterval := task["interval"].(int64)
nowSeconds := time.Now().Unix()
if nowSeconds < nextExecution { // task execution is not yet due
return
}
// calculate next execution time
for nextExecution = startTime; nowSeconds > nextExecution; nextExecution += execInterval {
// add as many intervals to the starttime until the next execution lies in the future
}
task["start_time"] = task["next_execution"]
task["next_execution"] = nextExecution
/*
no_executions INTEGER, -- how often executed
duration INTEGER, -- duration of the last exec in ms
no_errors INTEGER, -- error count
last_error_text TEXT,
*/
if count, ok := task["no_executions"].(int64); ok {
task["no_executions"] = count + 1
}
// update next_execution in state database
_, err = s.StateDB.UpsertRecord("tasks", "task_id", task)
if err != nil {
log.Printf("error in taskengine: cannot update task record - before execution %s ", err)
return
}
task_func, haveTask := s.Tasks[task_name] // select the function with the matching name
if !haveTask {
log.Printf("error in taskengine: task %s is not defined", task_name)
}
if haveTask {
start := time.Now()
// if !s.Production {
// fmt.Println("Taskengine: executing task:", task_name, start)
// }
err = task_func(s) // finally execute the task; attention: a task that panics will kill the server!
task["duration"] = int(time.Since(start).Milliseconds())
if err != nil {
log.Printf("taskengine: execution task: %s failed with error: %s ", task_name, err)
task["last_error_text"] = err.Error()
if count, ok := task["no_errors"].(int64); ok {
task["no_errors"] = count + 1
}
// if !s.Production {
// fmt.Println("Taskengine: failed task:", task_name, err)
// }
} else {
// if !s.Production {
// fmt.Println("Taskengine: successfully completed task:", task_name, time.Now())
// }
}
_, err = s.StateDB.UpsertRecord("tasks", "task_id", task)
if err != nil {
log.Printf("error in taskengine: cannot update task record - after execution %s ", err)
return
}
}
}