119 lines
3.3 KiB
Go
119 lines
3.3 KiB
Go
package server
|
|
|
|
import (
|
|
"log"
|
|
"time"
|
|
)
|
|
|
|
// this function schedules the tasks and will be called periodically, see server.Start()
|
|
func (s *Server) interval() {
|
|
|
|
// read scheduled task list from stateDB
|
|
// check for next executable task:
|
|
// - if there is one or more tasks ready for execution then select one of them.
|
|
// - if there is a selected task, update its next execution field and execute the task
|
|
|
|
defer func() {
|
|
if r := recover(); r != nil {
|
|
// fmt.Println("Recovered from panic in worker:", r)
|
|
log.Printf("recovered from panic in taskengine: %v ", r)
|
|
}
|
|
}()
|
|
|
|
tasks, err := s.StateDB.ReadRecords("SELECT * FROM tasks ORDER by next_execution limit 1;")
|
|
|
|
if err != nil {
|
|
log.Printf("error in taskengine: %s ", err)
|
|
return
|
|
}
|
|
|
|
if len(tasks) < 1 {
|
|
log.Printf("error in taskengine: %s ", "found no task for execution")
|
|
return
|
|
}
|
|
|
|
task := tasks[0] // pick the one task with the smallest next execution time, see previous sql statement
|
|
|
|
task_name, haveTask := task["task_name"].(string)
|
|
if !haveTask {
|
|
log.Printf("error in taskengine: task %s is of wrong type", task["task_name"])
|
|
return
|
|
}
|
|
|
|
nextExecution := task["next_execution"].(int64)
|
|
startTime := task["start_time"].(int64)
|
|
execInterval := task["interval"].(int64)
|
|
nowSeconds := time.Now().Unix()
|
|
|
|
if nowSeconds < nextExecution { // task execution is not yet due
|
|
return
|
|
}
|
|
|
|
// calculate next execution time
|
|
for nextExecution = startTime; nowSeconds > nextExecution; nextExecution += execInterval {
|
|
// add as many intervals to the starttime until the next execution lies in the future
|
|
}
|
|
|
|
task["start_time"] = task["next_execution"]
|
|
task["next_execution"] = nextExecution
|
|
|
|
/*
|
|
no_executions INTEGER, -- how often executed
|
|
duration INTEGER, -- duration of the last exec in ms
|
|
no_errors INTEGER, -- error count
|
|
last_error_text TEXT,
|
|
|
|
*/
|
|
|
|
if count, ok := task["no_executions"].(int64); ok {
|
|
task["no_executions"] = count + 1
|
|
}
|
|
|
|
// update next_execution in state database
|
|
_, err = s.StateDB.UpsertRecord("tasks", "task_id", task)
|
|
if err != nil {
|
|
log.Printf("error in taskengine: cannot update task record - before execution %s ", err)
|
|
return
|
|
}
|
|
|
|
task_func, haveTask := s.Tasks[task_name] // select the function with the matching name
|
|
|
|
if !haveTask {
|
|
log.Printf("error in taskengine: task %s is not defined", task_name)
|
|
}
|
|
|
|
if haveTask {
|
|
|
|
start := time.Now()
|
|
// if !s.Production {
|
|
// fmt.Println("Taskengine: executing task:", task_name, start)
|
|
// }
|
|
err = task_func(s) // finally execute the task; attention: a task that panics will kill the server!
|
|
|
|
task["duration"] = int(time.Since(start).Milliseconds())
|
|
|
|
if err != nil {
|
|
log.Printf("taskengine: execution task: %s failed with error: %s ", task_name, err)
|
|
task["last_error_text"] = err.Error()
|
|
if count, ok := task["no_errors"].(int64); ok {
|
|
task["no_errors"] = count + 1
|
|
}
|
|
// if !s.Production {
|
|
// fmt.Println("Taskengine: failed task:", task_name, err)
|
|
// }
|
|
} else {
|
|
// if !s.Production {
|
|
// fmt.Println("Taskengine: successfully completed task:", task_name, time.Now())
|
|
// }
|
|
}
|
|
|
|
_, err = s.StateDB.UpsertRecord("tasks", "task_id", task)
|
|
if err != nil {
|
|
log.Printf("error in taskengine: cannot update task record - after execution %s ", err)
|
|
return
|
|
}
|
|
|
|
}
|
|
|
|
}
|