package server import ( "log" "time" ) // this function schedules the tasks and will be called periodically, see server.Start() func (s *Server) interval() { // read scheduled task list from stateDB // check for next executable task: // - if there is one or more tasks ready for execution then select one of them. // - if there is a selected task, update its next execution field and execute the task defer func() { if r := recover(); r != nil { // fmt.Println("Recovered from panic in worker:", r) log.Printf("recovered from panic in taskengine: %v ", r) } }() tasks, err := s.StateDB.ReadRecords("SELECT * FROM tasks ORDER by next_execution limit 1;") if err != nil { log.Printf("error in taskengine: %s ", err) return } if len(tasks) < 1 { log.Printf("error in taskengine: %s ", "found no task for execution") return } task := tasks[0] // pick the one task with the smallest next execution time, see previous sql statement task_name, haveTask := task["task_name"].(string) if !haveTask { log.Printf("error in taskengine: task %s is of wrong type", task["task_name"]) return } nextExecution := task["next_execution"].(int64) startTime := task["start_time"].(int64) execInterval := task["interval"].(int64) nowSeconds := time.Now().Unix() if nowSeconds < nextExecution { // task execution is not yet due return } // calculate next execution time for nextExecution = startTime; nowSeconds > nextExecution; nextExecution += execInterval { // add as many intervals to the starttime until the next execution lies in the future } task["start_time"] = task["next_execution"] task["next_execution"] = nextExecution /* no_executions INTEGER, -- how often executed duration INTEGER, -- duration of the last exec in ms no_errors INTEGER, -- error count last_error_text TEXT, */ if count, ok := task["no_executions"].(int64); ok { task["no_executions"] = count + 1 } // update next_execution in state database _, err = s.StateDB.UpsertRecord("tasks", "task_id", task) if err != nil { log.Printf("error in taskengine: cannot update task record - before execution %s ", err) return } task_func, haveTask := s.Tasks[task_name] // select the function with the matching name if !haveTask { log.Printf("error in taskengine: task %s is not defined", task_name) } if haveTask { start := time.Now() // if !s.Production { // fmt.Println("Taskengine: executing task:", task_name, start) // } err = task_func(s) // finally execute the task; attention: a task that panics will kill the server! task["duration"] = int(time.Since(start).Milliseconds()) if err != nil { log.Printf("taskengine: execution task: %s failed with error: %s ", task_name, err) task["last_error_text"] = err.Error() if count, ok := task["no_errors"].(int64); ok { task["no_errors"] = count + 1 } // if !s.Production { // fmt.Println("Taskengine: failed task:", task_name, err) // } } else { // if !s.Production { // fmt.Println("Taskengine: successfully completed task:", task_name, time.Now()) // } } _, err = s.StateDB.UpsertRecord("tasks", "task_id", task) if err != nil { log.Printf("error in taskengine: cannot update task record - after execution %s ", err) return } } }