From ddd7188f88119955aae41dc5ca9c87d9a93c74e7 Mon Sep 17 00:00:00 2001 From: Emin Date: Tue, 10 Oct 2023 23:41:14 +0300 Subject: [PATCH] Queue task dispatcher logic --- .gitignore | 1 + internal/logger.go | 6 ++--- internal/polling.go | 2 ++ internal/queue.go | 26 ++++++++++++------ internal/search.go | 8 +++--- internal/structs.go | 2 +- internal/sync.go | 4 ++- internal/sync_loop.go | 63 ++++++++++++++----------------------------- templates/tbody.html | 4 +++ 9 files changed, 56 insertions(+), 60 deletions(-) diff --git a/.gitignore b/.gitignore index 5269d6a..5eabe36 100644 --- a/.gitignore +++ b/.gitignore @@ -2,3 +2,4 @@ tmp imap-sync LOG_imapsync db.db +.vscode diff --git a/internal/logger.go b/internal/logger.go index 7a614b0..8ff426d 100644 --- a/internal/logger.go +++ b/internal/logger.go @@ -13,7 +13,7 @@ var log = Log func SetupLogger() { - log.SetFormatter(&logrus.TextFormatter{ + Log.SetFormatter(&logrus.TextFormatter{ ForceColors: true, // Enable colors in the console output FullTimestamp: true, // Show full timestamp with date and time TimestampFormat: "2006-01-02 15:04:05", @@ -25,6 +25,6 @@ func SetupLogger() { }, }) - log.SetReportCaller(true) - log.SetLevel(logrus.TraceLevel) + Log.SetReportCaller(true) + Log.SetLevel(logrus.TraceLevel) } diff --git a/internal/polling.go b/internal/polling.go index 0f272a6..ccd6db0 100644 --- a/internal/polling.go +++ b/internal/polling.go @@ -12,5 +12,7 @@ func GetPollingData(index int) PageData { Tasks: tasks, } + // addOneTask() + return data } diff --git a/internal/queue.go b/internal/queue.go index a645f01..509f2a6 100644 --- a/internal/queue.go +++ b/internal/queue.go @@ -11,6 +11,7 @@ type Credentials struct { } var queue *list.List +var taskChan chan Task const PageSize = 20 @@ -29,41 +30,46 @@ func GetQueueData(index int) PageData { return data } -func getPageByIndex(index int) []Task { - var tasks []Task +func getPageByIndex(index int) []*Task { + var tasks []*Task start := (index - 1) * PageSize end := start + PageSize for i, e := 0, queue.Front(); i < end && e != nil; i, e = i+1, e.Next() { if i >= start { - tasks = append(tasks, e.Value.(Task)) + tasks = append(tasks, e.Value.(*Task)) } } - return tasks } func InitQueue() { + SetupLogger() queue = list.New() + taskChan = make(chan Task) for i := 0; i < 10; i++ { addOneTask() } + go processPendingTasks() } func addOneTask() { - task := Task{ + task := &Task{ ID: queue.Len() + 1, SourceAccount: "jomo", SourceServer: "imap.gmail.com", DestinationAccount: "emin", DestinationServer: "imap.yandex.com", - Status: "In progress", + Status: "Pending", } queue.PushFront(task) + go func() { + taskChan <- *task + }() } func AddTask(sourceDetails, destinationDetails Credentials) { - task := Task{ + task := &Task{ ID: queue.Len() + 1, SourceAccount: sourceDetails.Account, SourceServer: sourceDetails.Server, @@ -71,7 +77,11 @@ func AddTask(sourceDetails, destinationDetails Credentials) { DestinationAccount: destinationDetails.Account, DestinationServer: destinationDetails.Server, DestinationPassword: destinationDetails.Password, - Status: "In progress", + Status: "Pending", } + queue.PushFront(task) + go func() { + taskChan <- *task + }() } diff --git a/internal/search.go b/internal/search.go index 2bc7f99..97d586a 100644 --- a/internal/search.go +++ b/internal/search.go @@ -17,11 +17,11 @@ func GetSearchData(searchQuery string) PageData { return data } -func searchInQueue(searchQuery string) []Task { - var results []Task +func searchInQueue(searchQuery string) []*Task { + var results []*Task chunkSize := 150 // number of tasks to process in each chunk numChunks := (queue.Len() + chunkSize - 1) / chunkSize // round up division - chunkResults := make([][]Task, numChunks) + chunkResults := make([][]*Task, numChunks) var wg sync.WaitGroup for i := 0; i < numChunks; i++ { wg.Add(1) @@ -34,7 +34,7 @@ func searchInQueue(searchQuery string) []Task { } for j, e := 0, queue.Front(); j < end && e != nil; j, e = j+1, e.Next() { if j >= start { - task := e.Value.(Task) + task := e.Value.(*Task) if fuzzy.Match(searchQuery, strconv.Itoa(task.ID)) || fuzzy.Match(searchQuery, task.SourceAccount) || fuzzy.Match(searchQuery, task.SourceServer) || diff --git a/internal/structs.go b/internal/structs.go index ba83292..89a83c4 100644 --- a/internal/structs.go +++ b/internal/structs.go @@ -13,5 +13,5 @@ type Task struct { type PageData struct { Index int - Tasks []Task + Tasks []*Task } diff --git a/internal/sync.go b/internal/sync.go index d54084d..43daa3e 100644 --- a/internal/sync.go +++ b/internal/sync.go @@ -7,7 +7,7 @@ import ( "os/exec" ) -func syncIMAP(details Task) error { +func syncIMAP(details *Task) error { cmd := exec.Command("imapsync", "--host1", details.SourceServer, "--user1", details.SourceAccount, "--password1", details.SourcePassword, "--host2", details.DestinationServer, "--user2", details.DestinationAccount, "--password2", details.DestinationPassword) var stdBuffer bytes.Buffer @@ -23,5 +23,7 @@ func syncIMAP(details Task) error { // Command output realtime // log.Println(stdBuffer.String()) + details.Status = "Done" + return nil } diff --git a/internal/sync_loop.go b/internal/sync_loop.go index 408a4db..67df49e 100644 --- a/internal/sync_loop.go +++ b/internal/sync_loop.go @@ -2,59 +2,36 @@ package internal import "time" -func getFirstInProgressTaskIndex() int { - // Do it from the back of the queue - for i, e := queue.Back(), 0; i != nil; i, e = i.Prev(), e+1 { - if i.Value.(Task).Status == "In Progress" { - return queue.Len() - e - } - } - return -1 -} - -// Infinite loop waiting for in progress tasks to appear in the queue and then calling the sync function on them and setting their status to "Done" when they are finished. -func syncLoop() { +func processPendingTasks() { for { - // Wait for a task to appear in the queue - for queue.Len() == 0 { - time.Sleep(1 * time.Second) - } + // Get the index of the first pending task + task := getFirstPendingTask() - // Get the index of the first task in the queue that is in progress - index := getFirstInProgressTaskIndex() - if index == -1 { + // If there are no pending tasks, wait for a new task to be added + if task == nil { + task := <-taskChan + simulateSyncIMAP(&task) continue } - // Get the task from the queue - task := getTaskByIndex(index) - - // Set the task status to "In Progress" - task.Status = "In Progress" - setTaskByIndex(index, task) - - // Sync the task - syncIMAP(task) - - // Set the task status to "Done" - task.Status = "Done" - setTaskByIndex(index, task) + simulateSyncIMAP(task) + time.Sleep(2000 * time.Millisecond) } } -func getTaskByIndex(index int) Task { - for i, e := 0, queue.Front(); i < index && e != nil; i, e = i+1, e.Next() { - if i == index { - return e.Value.(Task) +// return pointer to first pending task from back of queue +func getFirstPendingTask() *Task { + for e := queue.Back(); e != nil; e = e.Prev() { + if task, ok := e.Value.(*Task); ok && task.Status == "Pending" { + return task } } - return Task{} + return nil } -func setTaskByIndex(index int, task Task) { - for i, e := 0, queue.Front(); i < index && e != nil; i, e = i+1, e.Next() { - if i == index { - e.Value = task - } - } +func simulateSyncIMAP(details *Task) error { + details.Status = "In Progress" + time.Sleep(5000 * time.Millisecond) + details.Status = "Done" + return nil } diff --git a/templates/tbody.html b/templates/tbody.html index dcd1fdc..4eb482f 100644 --- a/templates/tbody.html +++ b/templates/tbody.html @@ -6,7 +6,11 @@ {{.DestinationServer}} {{.DestinationAccount}} + {{if eq .Status "Done"}} + {{.Status}} + {{else}} {{.Status}} + {{end}} {{end}} \ No newline at end of file