-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpostgres.go
184 lines (161 loc) · 5.24 KB
/
postgres.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
package main
import (
"database/sql"
"fmt"
"log"
"strings"
"sync"
"github.com/lib/pq"
)
// ExtractPostgres is the main entry point for frankenbot.
// It takes in a Config and kicks in concurrent workers
// for the entire process, starting with MakeTable for stores
// and then again for each name in config.TargetTables
func ExtractPostgres(config Config, parentGroup *sync.WaitGroup) {
defer parentGroup.Done()
pSourceConnStr := fmt.Sprintf("dbname=%s user=%s password=%s host=%s port=%s connect_timeout=10",
config.PSource["name"], config.PSource["username"], config.PSource["password"],
config.PSource["host"], config.PSource["port"])
pDestinationConnStr := fmt.Sprintf("dbname=%s user=%s password=%s host=%s port=%s",
config.PDestination["name"], config.PDestination["username"], config.PDestination["password"],
config.PDestination["host"], config.PDestination["port"])
pSourceDB, err := sql.Open("postgres", pSourceConnStr)
defer pSourceDB.Close()
if err != nil {
log.Fatal("postgres.go:26: ", err)
}
pDestinationDB, err := sql.Open("postgres", pDestinationConnStr)
defer pDestinationDB.Close()
if err != nil {
log.Fatal("postgres.go:31: ", err)
}
wg := &sync.WaitGroup{}
wg.Add(1)
go MakeTable("stores", config, pSourceDB, pDestinationDB, wg)
for _, name := range config.TargetTables {
wg.Add(1)
go MakeTable(name, config, pSourceDB, pDestinationDB, wg)
}
wg.Wait()
log.Println("*** Finished extracting Postgres ***")
}
// MakeTable creates SQL tables and uses the fan out pattern to kick off
// extraction for that table for each store
func MakeTable(name string, config Config, source, destination *sql.DB, wg *sync.WaitGroup) {
defer wg.Done()
query := fmt.Sprintf("SELECT * FROM %s LIMIT 1", name)
rows, err := source.Query(query)
if err != nil {
log.Fatal("postgres.go:49: ", err)
}
defer rows.Close()
columns, _ := rows.Columns()
cTypes, _ := rows.ColumnTypes()
columnTypes := make([]string, len(cTypes))
for i, cType := range cTypes {
columnTypes[i] = cType.DatabaseTypeName()
}
table := &Table{Name: name, Columns: columns, DatabaseTypes: columnTypes, Extracted: false}
query = fmt.Sprintf("CREATE SEQUENCE IF NOT EXISTS %s_id_seq", name)
_, err = destination.Exec(query)
if err != nil {
log.Fatal("postgres.go:65: ", err)
}
query = table.FormatCreate()
log.Printf("Making table %s", name)
_, err = destination.Exec(query)
if err != nil {
log.Fatal("postgres.go:72: ", err)
}
query = fmt.Sprintf(`SELECT pg_get_indexdef(idx.oid) || ';' as script
FROM pg_index ind
JOIN pg_class idx ON idx.oid = ind.indexrelid
JOIN pg_class tbl on tbl.oid = ind.indrelid
LEFT JOIN pg_namespace ns on ns.oid = tbl.relnamespace
WHERE tbl.relname = '%s'`, name)
rows, err = source.Query(query)
if err != nil {
log.Println("postgres.go:83: ", err)
}
var index string
indexBuilder := make([]string, 2)
for rows.Next() {
rows.Scan(&index)
indexBuilder = strings.Split(index, "INDEX")
index = indexBuilder[0] + "INDEX IF NOT EXISTS" + indexBuilder[1]
_, err = destination.Exec(index)
if err != nil {
log.Println("postgres.go:95: ", err)
}
}
for _, id := range config.MatchingIds {
wg.Add(1)
go ExtractTable(id, table, config.Filters, config.Since, source, destination, wg)
}
}
// ExtractTable does the heavy lifting of actually
// extracting data from one databaase into another.
// It first determines the count and then uses the fan out
// pattern to concurrently extract batches.
func ExtractTable(id int, table *Table, filters map[string]map[string]string, since map[string]map[string]string, source, destination *sql.DB, wg *sync.WaitGroup) {
defer wg.Done()
var key string
if table.Name == "stores" {
key = "id"
} else {
key = "store_id"
}
var count int
countQuery := fmt.Sprintf("SELECT COUNT(*) FROM %s WHERE %s = %v", table.Name, key, id)
if sinceMap, ok := since[table.Name]; ok {
for col, date := range sinceMap {
countQuery += fmt.Sprintf(" AND %s >= DATE '%s'", col, date)
}
}
err := source.QueryRow(countQuery).Scan(&count)
if err != nil {
log.Println("postgres.go:132: ", err)
}
log.Printf("Extracting %s for store %v, %v rows found", table.Name, id, count)
query := fmt.Sprintf("SELECT * FROM %s WHERE %s = %v", table.Name, key, id)
if sinceMap, ok := since[table.Name]; ok {
for col, date := range sinceMap {
query += fmt.Sprintf(" AND %s >= DATE '%s'", col, date)
}
}
rows, err := source.Query(query)
if err != nil {
log.Println("postgres.go:146: ", err)
}
defer rows.Close()
txn, err := destination.Begin()
if err != nil {
log.Println("postgres.go:152: ", err)
}
stmt, err := txn.Prepare(pq.CopyIn(table.Name, table.Columns...))
if err != nil {
log.Println("postgres.go:156: ", err)
}
scanner := newSliceScan(table.Columns)
for rows.Next() {
err = scanner.Update(rows)
if err != nil {
log.Println("postgres.go:163: ", err)
}
t := table.FormatInsert(scanner.Get(), filters)
_, err = stmt.Exec(t...)
if err != nil {
log.Println("postgres:169: ", err)
}
}
_, err = stmt.Exec()
if err != nil {
log.Println("postgres:175: ", err)
}
stmt.Close()
err = txn.Commit()
if err != nil {
log.Println("postgres:181: ", err)
}
log.Printf("*** Finished extracting %s for store %v ***", table.Name, id)
}