Skip to content

Commit

Permalink
Queue task dispatcher logic
Browse files Browse the repository at this point in the history
  • Loading branch information
xeome committed Oct 10, 2023
1 parent 3c53999 commit ddd7188
Show file tree
Hide file tree
Showing 9 changed files with 56 additions and 60 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@ tmp
imap-sync
LOG_imapsync
db.db
.vscode
6 changes: 3 additions & 3 deletions internal/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ var log = Log

func SetupLogger() {

log.SetFormatter(&logrus.TextFormatter{
Log.SetFormatter(&logrus.TextFormatter{
ForceColors: true, // Enable colors in the console output
FullTimestamp: true, // Show full timestamp with date and time
TimestampFormat: "2006-01-02 15:04:05",
Expand All @@ -25,6 +25,6 @@ func SetupLogger() {
},
})

log.SetReportCaller(true)
log.SetLevel(logrus.TraceLevel)
Log.SetReportCaller(true)
Log.SetLevel(logrus.TraceLevel)
}
2 changes: 2 additions & 0 deletions internal/polling.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,7 @@ func GetPollingData(index int) PageData {
Tasks: tasks,
}

// addOneTask()

return data
}
26 changes: 18 additions & 8 deletions internal/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ type Credentials struct {
}

var queue *list.List
var taskChan chan Task

const PageSize = 20

Expand All @@ -29,49 +30,58 @@ func GetQueueData(index int) PageData {
return data
}

func getPageByIndex(index int) []Task {
var tasks []Task
func getPageByIndex(index int) []*Task {
var tasks []*Task
start := (index - 1) * PageSize
end := start + PageSize

for i, e := 0, queue.Front(); i < end && e != nil; i, e = i+1, e.Next() {
if i >= start {
tasks = append(tasks, e.Value.(Task))
tasks = append(tasks, e.Value.(*Task))
}
}

return tasks
}

func InitQueue() {
SetupLogger()
queue = list.New()
taskChan = make(chan Task)
for i := 0; i < 10; i++ {
addOneTask()
}
go processPendingTasks()
}

func addOneTask() {
task := Task{
task := &Task{
ID: queue.Len() + 1,
SourceAccount: "jomo",
SourceServer: "imap.gmail.com",
DestinationAccount: "emin",
DestinationServer: "imap.yandex.com",
Status: "In progress",
Status: "Pending",
}
queue.PushFront(task)
go func() {
taskChan <- *task
}()
}

func AddTask(sourceDetails, destinationDetails Credentials) {
task := Task{
task := &Task{
ID: queue.Len() + 1,
SourceAccount: sourceDetails.Account,
SourceServer: sourceDetails.Server,
SourcePassword: sourceDetails.Password,
DestinationAccount: destinationDetails.Account,
DestinationServer: destinationDetails.Server,
DestinationPassword: destinationDetails.Password,
Status: "In progress",
Status: "Pending",
}

queue.PushFront(task)
go func() {
taskChan <- *task
}()
}
8 changes: 4 additions & 4 deletions internal/search.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@ func GetSearchData(searchQuery string) PageData {
return data
}

func searchInQueue(searchQuery string) []Task {
var results []Task
func searchInQueue(searchQuery string) []*Task {
var results []*Task
chunkSize := 150 // number of tasks to process in each chunk
numChunks := (queue.Len() + chunkSize - 1) / chunkSize // round up division
chunkResults := make([][]Task, numChunks)
chunkResults := make([][]*Task, numChunks)
var wg sync.WaitGroup
for i := 0; i < numChunks; i++ {
wg.Add(1)
Expand All @@ -34,7 +34,7 @@ func searchInQueue(searchQuery string) []Task {
}
for j, e := 0, queue.Front(); j < end && e != nil; j, e = j+1, e.Next() {
if j >= start {
task := e.Value.(Task)
task := e.Value.(*Task)
if fuzzy.Match(searchQuery, strconv.Itoa(task.ID)) ||
fuzzy.Match(searchQuery, task.SourceAccount) ||
fuzzy.Match(searchQuery, task.SourceServer) ||
Expand Down
2 changes: 1 addition & 1 deletion internal/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,5 @@ type Task struct {

type PageData struct {
Index int
Tasks []Task
Tasks []*Task
}
4 changes: 3 additions & 1 deletion internal/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"os/exec"
)

func syncIMAP(details Task) error {
func syncIMAP(details *Task) error {

cmd := exec.Command("imapsync", "--host1", details.SourceServer, "--user1", details.SourceAccount, "--password1", details.SourcePassword, "--host2", details.DestinationServer, "--user2", details.DestinationAccount, "--password2", details.DestinationPassword)
var stdBuffer bytes.Buffer
Expand All @@ -23,5 +23,7 @@ func syncIMAP(details Task) error {
// Command output realtime
// log.Println(stdBuffer.String())

details.Status = "Done"

return nil
}
63 changes: 20 additions & 43 deletions internal/sync_loop.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,59 +2,36 @@ package internal

import "time"

func getFirstInProgressTaskIndex() int {
// Do it from the back of the queue
for i, e := queue.Back(), 0; i != nil; i, e = i.Prev(), e+1 {
if i.Value.(Task).Status == "In Progress" {
return queue.Len() - e
}
}
return -1
}

// Infinite loop waiting for in progress tasks to appear in the queue and then calling the sync function on them and setting their status to "Done" when they are finished.
func syncLoop() {
func processPendingTasks() {
for {
// Wait for a task to appear in the queue
for queue.Len() == 0 {
time.Sleep(1 * time.Second)
}
// Get the index of the first pending task
task := getFirstPendingTask()

// Get the index of the first task in the queue that is in progress
index := getFirstInProgressTaskIndex()
if index == -1 {
// If there are no pending tasks, wait for a new task to be added
if task == nil {
task := <-taskChan
simulateSyncIMAP(&task)
continue
}

// Get the task from the queue
task := getTaskByIndex(index)

// Set the task status to "In Progress"
task.Status = "In Progress"
setTaskByIndex(index, task)

// Sync the task
syncIMAP(task)

// Set the task status to "Done"
task.Status = "Done"
setTaskByIndex(index, task)
simulateSyncIMAP(task)
time.Sleep(2000 * time.Millisecond)
}
}

func getTaskByIndex(index int) Task {
for i, e := 0, queue.Front(); i < index && e != nil; i, e = i+1, e.Next() {
if i == index {
return e.Value.(Task)
// return pointer to first pending task from back of queue
func getFirstPendingTask() *Task {
for e := queue.Back(); e != nil; e = e.Prev() {
if task, ok := e.Value.(*Task); ok && task.Status == "Pending" {
return task
}
}
return Task{}
return nil
}

func setTaskByIndex(index int, task Task) {
for i, e := 0, queue.Front(); i < index && e != nil; i, e = i+1, e.Next() {
if i == index {
e.Value = task
}
}
func simulateSyncIMAP(details *Task) error {
details.Status = "In Progress"
time.Sleep(5000 * time.Millisecond)
details.Status = "Done"
return nil
}
4 changes: 4 additions & 0 deletions templates/tbody.html
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,11 @@
<td>{{.DestinationServer}}</td>
<td>{{.DestinationAccount}}</td>
<td>
{{if eq .Status "Done"}}
<span class="badge badge-outline-success">{{.Status}}</span>
{{else}}
<span class="badge badge-outline-warning">{{.Status}}</span>
{{end}}
</td>
</tr>
{{end}}

0 comments on commit ddd7188

Please sign in to comment.