forked from knadh/listmonk
-
Notifications
You must be signed in to change notification settings - Fork 0
/
import.go
124 lines (106 loc) · 3.74 KB
/
import.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
package main
import (
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net/http"
"strings"
"github.com/knadh/listmonk/internal/subimporter"
"github.com/labstack/echo"
)
// reqImport represents file upload import params.
type reqImport struct {
Mode string `json:"mode"`
Overwrite bool `json:"overwrite"`
Delim string `json:"delim"`
ListIDs []int `json:"lists"`
}
// handleImportSubscribers handles the uploading and bulk importing of
// a ZIP file of one or more CSV files.
func handleImportSubscribers(c echo.Context) error {
app := c.Get("app").(*App)
// Is an import already running?
if app.importer.GetStats().Status == subimporter.StatusImporting {
return echo.NewHTTPError(http.StatusBadRequest,
"An import is already running. Wait for it to finish or stop it before trying again.")
}
// Unmarsal the JSON params.
var r reqImport
if err := json.Unmarshal([]byte(c.FormValue("params")), &r); err != nil {
return echo.NewHTTPError(http.StatusBadRequest,
fmt.Sprintf("Invalid `params` field: %v", err))
}
if r.Mode != subimporter.ModeSubscribe && r.Mode != subimporter.ModeBlacklist {
return echo.NewHTTPError(http.StatusBadRequest, "Invalid `mode`")
}
if len(r.Delim) != 1 {
return echo.NewHTTPError(http.StatusBadRequest,
"`delim` should be a single character")
}
file, err := c.FormFile("file")
if err != nil {
return echo.NewHTTPError(http.StatusBadRequest,
fmt.Sprintf("Invalid `file`: %v", err))
}
src, err := file.Open()
if err != nil {
return err
}
defer src.Close()
out, err := ioutil.TempFile("", "listmonk")
if err != nil {
return echo.NewHTTPError(http.StatusInternalServerError,
fmt.Sprintf("Error copying uploaded file: %v", err))
}
defer out.Close()
if _, err = io.Copy(out, src); err != nil {
return echo.NewHTTPError(http.StatusInternalServerError,
fmt.Sprintf("Error copying uploaded file: %v", err))
}
// Start the importer session.
impSess, err := app.importer.NewSession(file.Filename, r.Mode, r.Overwrite, r.ListIDs)
if err != nil {
return echo.NewHTTPError(http.StatusBadRequest,
fmt.Sprintf("Error starting import session: %v", err))
}
go impSess.Start()
if strings.HasSuffix(strings.ToLower(file.Filename), ".csv") {
go impSess.LoadCSV(out.Name(), rune(r.Delim[0]))
} else {
// Only 1 CSV from the ZIP is considered. If multiple files have
// to be processed, counting the net number of lines (to track progress),
// keeping the global import state (failed / successful) etc. across
// multiple files becomes complex. Instead, it's just easier for the
// end user to concat multiple CSVs (if there are multiple in the first)
// place and uploada as one in the first place.
dir, files, err := impSess.ExtractZIP(out.Name(), 1)
if err != nil {
return echo.NewHTTPError(http.StatusInternalServerError,
fmt.Sprintf("Error processing ZIP file: %v", err))
}
go impSess.LoadCSV(dir+"/"+files[0], rune(r.Delim[0]))
}
return c.JSON(http.StatusOK, okResp{app.importer.GetStats()})
}
// handleGetImportSubscribers returns import statistics.
func handleGetImportSubscribers(c echo.Context) error {
var (
app = c.Get("app").(*App)
s = app.importer.GetStats()
)
return c.JSON(http.StatusOK, okResp{s})
}
// handleGetImportSubscriberStats returns import statistics.
func handleGetImportSubscriberStats(c echo.Context) error {
app := c.Get("app").(*App)
return c.JSON(http.StatusOK, okResp{string(app.importer.GetLogs())})
}
// handleStopImportSubscribers sends a stop signal to the importer.
// If there's an ongoing import, it'll be stopped, and if an import
// is finished, it's state is cleared.
func handleStopImportSubscribers(c echo.Context) error {
app := c.Get("app").(*App)
app.importer.Stop()
return c.JSON(http.StatusOK, okResp{app.importer.GetStats()})
}