diff --git a/CHANGELOG.md b/CHANGELOG.md index f74f37bc7..01143913c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,10 @@ -## v1.0.0 [TBD] +## v1.1.0 [2022-05-26] + +### Features + +- Adds MySQL Adaptor via [#515](https://github.com/compose/transporter/pull/515), thanks @atomicules! + +## v1.0.0 [2021-12-01] This release does not introduce any new functionalities, but catches up transporter with modern golang. It also introduces a new standard way of running adaptor tests. diff --git a/README.md b/README.md index 815115ef1..8f62aac7e 100644 --- a/README.md +++ b/README.md @@ -43,6 +43,7 @@ Below is a list of each adaptor and its support of the feature: | file | | X | | mongodb | X | X | | postgresql | | X | +| mysql | | X | | rabbitmq | X | | | rethinkdb | | X | +---------------+-------------+----------------+ @@ -64,6 +65,7 @@ Each adaptor has its own README page with details on configuration and capabilit * [postgresql](./adaptor/postgres) * [rabbitmq](./adaptor/rabbitmq) * [rethinkdb](./adaptor/rethinkdb) +* [mysql](./adaptor/mysql) Native Functions ---------------- diff --git a/adaptor/all/all.go b/adaptor/all/all.go index 299f10599..1518c4636 100644 --- a/adaptor/all/all.go +++ b/adaptor/all/all.go @@ -6,6 +6,7 @@ import ( _ "github.com/compose/transporter/adaptor/file" _ "github.com/compose/transporter/adaptor/mongodb" _ "github.com/compose/transporter/adaptor/postgres" + _ "github.com/compose/transporter/adaptor/mysql" _ "github.com/compose/transporter/adaptor/rabbitmq" _ "github.com/compose/transporter/adaptor/rethinkdb" ) diff --git a/adaptor/mysql/DEVELOPMENT_NOTES.md b/adaptor/mysql/DEVELOPMENT_NOTES.md new file mode 100644 index 000000000..9f3a0373d --- /dev/null +++ b/adaptor/mysql/DEVELOPMENT_NOTES.md @@ -0,0 +1,495 @@ +## Development notes + +Notes below were as written during development of the adaptor in a bit of a +effort to reduce the number of comments in the files although there are still a +lot of comments in some areas. + +--- + +This is being built using the Postgresql adaptor as a basis and using +[go-sql-driver/mysql](https://github.com/go-sql-driver/mysql). It's noted that +[go-mysql-org](https://github.com/go-mysql-org) and in particular +[canal](https://github.com/go-mysql-org/go-mysql#canal) look like a good +alternative though. **NOTE:** We switched to `go-mysql-org/go-mysql` for +replication/tailing. + +### Setup and testing on MacOS with Pkgsrc (other package managers are available) + +1. Install client and server + + sudo pkgin install mysql-client + sudo pkgin install mysql-server + +2. Edit `/opt/pkg/etc/my.cnf` and point `data-dir` somewhere (I opted +for `~/Database/mysql`). Add `secure_file_priv = "/tmp"` too. + +3. Run `mysql_install_db` + +4. Run it `cd /opt/pkg ; /opt/pkg/bin/mysqld_safe &` + +Alternatively (because *right now* only 5.6 is available via Pkgsrc), +obtain a [DMG of the community server](https://downloads.mysql.com/archives/community/) for MacOS. +Version `5.7.31 for macos10.14` is available and works on Monterey. + +You'll need to change the root password to empty/blank for the tests though: + +``` +SET PASSWORD FOR 'root'@'localhost' = PASSWORD(''); +``` +### Element types + +Postgresql has an ARRAY data type so for each array also pulls the [element +type](https://www.postgresql.org/docs/9.6/infoschema-element-types.html) within + +> When a table column [...] the respective information schema view only contains +> ARRAY in the column data_type. + +This happens under the `iterateTable` function. Note that here the `c` is a sql +variable; Not to be confused with the `c` variable outside of this; Yay for +naming. If we want to run these queries manually the only bits that change are +the `%v`. E.g: `...WHERE c.table_schema = 'public' AND c.table_name = 't_random'`. +The query will output something like this: + + column_name | data_type | element_type + -------------+-----------+-------------- + s | integer | + md5 | text | + (2 rows) + +### Data types + +Comparing differences from Postgresql using these sources: + +- +- + +There are three code areas that need changing: + +1. `colXXX` constants at top of adaptor\_test.go +2. `setupData` in adaptor\_test.go +3. `TestReadComplex` in reader\_test.go + +Some comments: + +- No ARRAY in MySQL +- [Timestamp assumes UTC](https://dev.mysql.com/doc/refman/8.0/en/datetime.html) +- The `--colbytea` bits are all just comments so it's easier to match things up +- On that note I'm re-ording things so it's consistent +- [Inserting binary can be done like this](https://stackoverflow.com/a/10283197/208793) +- No BIGSERIAL, etc +- Geometry is there, just a bit different +- No CIDR +- ENUM has to be done a bit differently, no need to CREATE TYPE + +I'm currently developing with a ye-olde 5.6 version so it doesn't like: + +- ENUM +- SET +- VARBINARY +- JSON + +### TestReadComplex Notes + +#### Text + +Remove newline for now for `text`: + +``` +--- FAIL: TestReadComplex (0.01s) +reader_test.go:117: Expected coltext of row to equal this is \n extremely important (string), but was this is + extremely important (string) +``` + +#### Float + +> Float MySQL also supports this optional precision specification, but the +> precision value in FLOAT(p) is used only to determine storage size. A precision +> from 0 to 23 results in a 4-byte single-precision FLOAT column. A precision from +> 24 to 53 results in an 8-byte double-precision DOUBLE column. + +#### Blob + +Tried using `go:embded` and inserting the blob data as a string, but couldn't +get it to work. I.e. + +``` +// For testing Blob +//go:embed logo-mysql-170x115.png +blobdata string +``` + +And then: + +``` +fmt.Sprintf(`INSERT INTO %s VALUES (... '%s');`, blobdata) +``` + +Tried with `'%s'` (didn't work at all) and `%q` which inserted, but didn't +extract correctly. + +In the end I used `LOAD_FILE` to insert (like you are probably meant to), but +would be nice to do directly from Go. + +Ultimately I'll probably remove this test. + +#### Spatial + +This is a handy package: https://github.com/paulmach/orb + +Need to think about how we want to handle spatial types: + +1. Decode from WKB in reader.go before we get to testing OR +2. Leave as WKB, decode for the test only OR +3. Leave as WKB, don't decode at all, instead encode the test data to match + +Another good option: https://github.com/twpayne/go-geom + +Struggling. I think I'd like to take the "raw" data and decode for the test. + +Another option: https://github.com/paulsmith/gogeos + +From here: https://dev.mysql.com/doc/refman/5.6/en/gis-data-formats.html + +> Internally, MySQL stores geometry values in a format that is not identical to +> either WKT or WKB format. (Internal format is like WKB but with an initial 4 +> bytes to indicate the SRID.) + +> For the WKB part, these MySQL-specific considerations apply: +> +> - The byte-order indicator byte is 1 because MySQL stores geometries as little-endian values. +> +> - MySQL supports geometry types of Point, LineString, Polygon, MultiPoint, MultiLineString, MultiPolygon, and GeometryCollection. Other geometry types are not supported. + +Maybe we should just strip the SRID? Then we'd be left with just wkb + +Getting ahead a bit, but need to think about how we transfer things MySQL to +MySQL and MySQL to X. + +I managed to get things working with go-geom and reading the MySQL data as hex. +go-geom has handy wkbhex functions that Orb doesn't. It's _possible_ we fell +foul of this with Orb: + +> Scanning directly from MySQL columns is supported. By default MySQL returns +> geometry data as WKB but prefixed with a 4 byte SRID. To support this, if the +> data is not valid WKB, the code will strip the first 4 bytes, the SRID, and try +> again. **This works for most use cases**. + +Emphasis mine. + +I've had to strip off the SRID to get things to work with go-geom. Going to Hex +allows us to do that. + +#### Bit + +TODO (write words here) + +#### Binary + +This is probably very similar to Blob. At the moment we store a Hex value and +for the purposes of testing and comparison we then convert that to a string +representation of the hex value on read. + +### Writer notes + +#### TestInsert + +Postgresql [uses this format](https://www.postgresql.org/docs/current/plpgsql-declarations.html#PLPGSQL-DECLARATION-PARAMETERS): + +``` +query := fmt.Sprintf("INSERT INTO %v (%v) VALUES (%v);", m.Namespace(), strings.Join(keys, ", "), strings.Join(placeholders, ", ")) +log.Infoln(query1) +// INSERT INTO writer_insert_test.simple_test_table (id, colvar, coltimestamp) VALUES ($1, $2, $3); +_, err := s.Exec(query, data...) +``` + +I.e. takes advantage of Postgresql parameters. MySQL... doesn't work the same. +Can't find it documented, but can do this for mysql: + +``` +INSERT INTO writer_insert_test.simple_test_table (id, colvar, coltimestamp) VALUES (?, ?, ?); +``` + +Maybe we can also use named values with a `:colon` prefix? But probably we don't need to. + +Seeing some odd switching around though: + +``` +INFO[0000] INSERT INTO writer_insert_test.simple_test_table (id, colvar, coltimestamp) VALUES (?, ?, ?); +INFO[0000] INSERT INTO writer_insert_test.simple_test_table (id, colvar, coltimestamp) VALUES (?, ?, ?); +INFO[0000] INSERT INTO writer_insert_test.simple_test_table (coltimestamp, id, colvar) VALUES (?, ?, ?); +``` + +Needs to be ordered? Maybe not, seems it adjusts the order of the data too: + +``` +INFO[0000] INSERT INTO writer_insert_test.simple_test_table (id, colvar, coltimestamp) VALUES (?, ?, ?); +INFO[0000] [7 hello world 2021-12-16 13:14:20.575528 +0000 UTC] +INFO[0000] INSERT INTO writer_insert_test.simple_test_table (coltimestamp, id, colvar) VALUES (?, ?, ?); +INFO[0000] [2021-12-16 13:14:20.57585 +0000 UTC 8 hello world] +``` + +It's inserting data fine: + +``` +mysql> select * from simple_test_table; ++----+-------------+---------------------+ +| id | colvar | coltimestamp | ++----+-------------+---------------------+ +| 0 | hello world | 2021-12-16 13:14:21 | +| 1 | hello world | 2021-12-16 13:14:21 | +| 2 | hello world | 2021-12-16 13:14:21 | +| 3 | hello world | 2021-12-16 13:14:21 | +| 4 | hello world | 2021-12-16 13:14:21 | +| 5 | hello world | 2021-12-16 13:14:21 | +| 6 | hello world | 2021-12-16 13:14:21 | +| 7 | hello world | 2021-12-16 13:14:21 | +| 8 | hello world | 2021-12-16 13:14:21 | +| 9 | hello world | 2021-12-16 13:14:21 | ++----+-------------+---------------------+ +10 rows in set (0.00 sec) +``` + +I was seeing this error: + +After sorting the parameter issue (`?`) I was then left with this failure: + +``` +--- FAIL: TestInsert (0.11s) +writer_test.go:93: Error on test query: sql: Scan error on column index 2, name "coltimestamp": unsupported Scan, storing driver.Value type []uint8 into type *time.Time +``` + +Reading more on +[go-sql-driver/mysql](https://github.com/go-sql-driver/mysql#timetime-support) I +found: + +> The default internal output type of MySQL `DATE` and `DATETIME` values is +> `[]byte` which allows you to scan the value into a `[]byte`, `string` or +> `sql.RawBytes` variable in your program. +> +> However, many want to scan MySQL `DATE` and `DATETIME` values into `time.Time` +> variables, which is the logical equivalent in Go to `DATE` and `DATETIME` in +> MySQL. You can do that by changing the internal output type from `[]byte` to +> `time.Time` with the DSN parameter `parseTime=true`. + +And so sticking that on in `TestInsert` was enough: + +``` +mysql://root@tcp(localhost)/%s?parseTime=true +``` + +#### TestComplexInsert + +I think we can assume the SRID is 0: + +https://dba.stackexchange.com/questions/182519/how-do-i-dump-spatial-types-like-point-with-their-srids-in-mysql + +Another rough note to self... do we need to look at using +[`interpolateParams=true`](https://github.com/go-sql-driver/mysql#interpolateparams)? + + +### Tailing + +We switched to +[go-mysql-org/go-mysql](https://github.com/go-mysql-org/go-mysql#canal) from +[go-sql-driver/mysql](https://github.com/go-sql-driver/mysql) because it has +replication support. There are two parts to it: + +- Replication +- Canal + +AFAICT Canal is more a higher level abstraction on the Replication stuff. It +uses that package. So possibly what we want to use is the Replication package as +the [brief example given](https://github.com/go-mysql-org/go-mysql#example) +looks close to what we want to do and what MySQL does. + +Can't run `SHOW MASTER STATUS;` on Compose to get what we need for replication. +Well, not as is anyway, will need additional grants. + +Can build a dummy/simple app to test out the replication package: + +```go +package main + +import ( + "github.com/go-mysql-org/go-mysql/replication" + "github.com/go-mysql-org/go-mysql/mysql" + "github.com/go-mysql-org/go-mysql/client" + "net/url" + "os" + "context" + "fmt" + "strconv" +) + +var ( + dsn = "mysql://admin:[REDACTED]@aws-eu-west-1-portal.4.dblayer.com:15788/compose" +) + +func main() { + // Could add a dns.Parse to the driver + parsedDSN, _ := url.Parse(dsn) + host := parsedDSN.Hostname() + port := parsedDSN.Port() + portInt, _ := strconv.Atoi(port) + user := parsedDSN.User.Username() + // stupid password makes things harder + pass, _ := parsedDSN.User.Password() + path := parsedDSN.Path[1:] + scheme := parsedDSN.Scheme + + // Need to get the log and position. Use driver or client? I guess Transporter client properly, but + // for testing use package client directly? + conn, _ := client.Connect(fmt.Sprintf("%s:%s", host, port), user, pass, path) + + r, _ := conn.Execute("SHOW MASTER STATUS") + binFile, _ := r.GetString(0, 0) + binPosition, _ := r.GetInt(0, 1) + + cfg := replication.BinlogSyncerConfig { + ServerID: 100, + Flavor: scheme, + Host: host, + Port: uint16(portInt), + User: user, + Password: pass, + } + + syncer := replication.NewBinlogSyncer(cfg) + + streamer, _ := syncer.StartSync(mysql.Position{binFile, uint32(binPosition)}) + + // OR + //gtidSet, _ := mysql.ParseMysqlGTIDSet("a852989a-1894-4fcb-a060-a4aaaf06b9f0:1-36") + //streamer, _ := syncer.StartSyncGTID(gtidSet) + + for { + ev, _ := streamer.GetEvent(context.Background()) + ev.Dump(os.Stdout) + } + // Then need to start handling things here a bit differently. +} +``` + +Also, reading through the [Postgresql logical +decoding](https://www.postgresql.org/docs/9.4/logicaldecoding-example.html) so +can understand what the Postgresql Tailer is looking at versus what we get from +the binlog, etc. + +How does Postgresql get changes since last call? Magic inside Postgresql it +seems, you only get the changes once. + +#### MySQL setup for testing + +If using the Pkgsrc MySQL then need to edit `/opt/pkg/etc/my.cnf` and ensure: + +- `log_bin` is uncommented +- `server_id` is uncommented and has a value + +to test tailing. + +Need 5.7+ MySQL as 5.6 gives: + +``` +=== QueryEvent === +Date: 2022-02-22 15:40:24 +Log position: 138769 +Event size: 197 +Slave proxy ID: 1 +Execution time: 0 +Error code: 0 +Schema: test +Query: INSERT INTO recipes (recipe_id, recipe_name) VALUES (1,"Tacos"), (2,"Tomato Soup"), (3,"Grilled Cheese") +``` + +I.e. under a `QueryEvent` and not a `RowsEvent` + +If using community server install... + +``` +sudo mkdir /usr/local/mysql/etc +sudo vim /usr/local/mysql/etc/my.cnf +``` + +``` +[mysqld] +log_bin +server_id = 100 +secure_file_priv = "/tmp" +``` + +Need at least that in to run tailing tests, etc. + +#### Understanding update rows + +The binlog appears to have two entries a before vs after: + +``` +=== UpdateRowsEventV2 === +Date: 2022-02-22 19:49:19 +Log position: 2716787 +Event size: 71 +TableID: 299 +Flags: 3 +Column count: 3 +Values: +-- +0:11 +1:"Superwoman" +2:"2022-02-22 19:49:18" +-- +0:11 +1:"hello" +2:"2022-02-22 19:49:19" +``` + +``` +mysql> select * from recipes; ++-----------+----------------+---------------+ +| recipe_id | recipe_name | recipe_rating | ++-----------+----------------+---------------+ +| 1 | Tacos | NULL | +| 2 | Tomato Soup | NULL | +| 3 | Grilled Cheese | NULL | ++-----------+----------------+---------------+ +3 rows in set (0.00 sec) + +mysql> update recipes set recipe_name = 'Nachos' where recipe_id = 1; +Query OK, 1 row affected (0.02 sec) +Rows matched: 1 Changed: 1 Warnings: 0 + +mysql> select * from recipes; ++-----------+----------------+---------------+ +| recipe_id | recipe_name | recipe_rating | ++-----------+----------------+---------------+ +| 1 | Nachos | NULL | +| 2 | Tomato Soup | NULL | +| 3 | Grilled Cheese | NULL | ++-----------+----------------+---------------+ +3 rows in set (0.00 sec) +``` + +Results in: + +``` +[[1 Tacos ] [1 Nachos ]] +``` + +How does Transporter handle this? Well, what does Postgresql do? + +``` +compose=> update recipes set recipe_name = 'Nachos' where recipe_id = 1; +``` + +``` +compose=# SELECT * FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL); + lsn | xid | data +-----------+-----+------------------------------------------------------------------------------------------------------------------------ + 0/6000108 | 497 | BEGIN 497 + 0/6000108 | 497 | table public.recipes: UPDATE: recipe_id[integer]:1 recipe_name[character varying]:'Nachos' recipe_rating[integer]:null + 0/60002D0 | 497 | COMMIT 497 +(3 rows) +``` + +So just one row from Postgresql + +So for MySQL we need to skip the first row if it's an update. Gah. diff --git a/adaptor/mysql/README.md b/adaptor/mysql/README.md new file mode 100644 index 000000000..847715e08 --- /dev/null +++ b/adaptor/mysql/README.md @@ -0,0 +1,39 @@ +# MySQL adaptor + +## Using the adaptor + +You need to specify a sink and source like so: + +``` +var source = mysql({ + "uri": "mysql://user:pass@source.host.com:11111/database?ssl=custom", + "tail": true, + "cacert": "/path/to/source.crt", +}) + +var sink = mysql({ + "uri": "mysql://user:pass@sink.host.com:22222/database?ssl=custom", + "cacert": "/path/to/sink.crt", + "servername": "sink.host.com", +}) + +t.Source("source", source, "/.*/").Save("sink", sink, "/.*/") +``` + +- tailing is optional and only makes sense on the source +- For TLS you can use `ssl=true` which does unverified TLS or `ssl=custom` in +which case you need to supply the `cacert`. +- You don't need to supply the `servername`, but if you do the certificate will +be verified against it + +### Requirements + +- The source must allow the connecting user to query the binlog +- Per Postgresql you need to create the sink/destination table structure first + +### Limitations + +- Note that per the Postgresql adaptor this probably isn't very performant at +copying huge databases as there is no bulk option yet. +- Has only been developed and tested using MySQL as the sink and source. Unsure +how it will function when combined with other adaptors. diff --git a/adaptor/mysql/adaptor_test.go b/adaptor/mysql/adaptor_test.go new file mode 100644 index 000000000..47a7be0a2 --- /dev/null +++ b/adaptor/mysql/adaptor_test.go @@ -0,0 +1,178 @@ +package mysql + +import ( + "fmt" + "math/rand" + "os" + "os/exec" + "testing" + "time" + + "github.com/compose/transporter/log" +) + +// Order cols per: https://dev.mysql.com/doc/refman/5.7/en/data-types.html +const ( + basicSchema = "id INTEGER PRIMARY KEY, colvar VARCHAR(255), coltimestamp TIMESTAMP" + complexSchema = `id INTEGER AUTO_INCREMENT, + colinteger INTEGER, + colsmallint SMALLINT, + coltinyint TINYINT, + colmediumint MEDIUMINT, + colbigint BIGINT, + coldecimal DECIMAL(8,8), + colfloat FLOAT(23), + coldoubleprecision DOUBLE PRECISION, + colbit BIT(6), + coldate DATE, + coltime TIME, + coltimestamp TIMESTAMP, + colyear YEAR, + colchar CHAR, + colvar VARCHAR(255), + colbinary BINARY(10), + colblob BLOB, + coltext TEXT, + coljson JSON, + colpoint POINT, + collinestring LINESTRING, + colpolygon POLYGON, + colgeometrycollection GEOMETRYCOLLECTION, + PRIMARY KEY (id, colvar)` +) + +var ( + defaultTestClient = &Client{ + uri: DefaultURI, + } + defaultSession *Session + dbsToTest = []*TestData{ + readerTestData, + readerComplexTestData, + tailerTestData, + writerTestData, + writerComplexTestData, + writerUpdateTestData, + writerDeleteTestData, + writerComplexUpdateTestData, + writerComplexDeleteTestData, + writerComplexDeletePkTestData, + } + + randomHeros = []string{"Superwoman", "Wonder Woman", "Batman", "Superman", + "Thor", "Iron Man", "Spiderman", "Hulk", "Star-Lord", "Black Widow", + "Ant\nMan"} +) + +type TestData struct { + DB string + Table string + Schema string + InsertCount int +} + +func setup() { + log.Infoln("setting up tests") + rand.Seed(time.Now().Unix()) + + s, err := defaultTestClient.Connect() + if err != nil { + log.Errorf("unable to initialize connection to mysql, %s", err) + os.Exit(1) + } + defaultSession = s.(*Session) + for _, testData := range dbsToTest { + if _, err := defaultSession.mysqlSession.Exec(fmt.Sprintf("DROP DATABASE IF EXISTS %s;", testData.DB)); err != nil { + log.Errorf("unable to drop database, could affect tests, %s", err) + } + if _, err := defaultSession.mysqlSession.Exec(fmt.Sprintf("CREATE DATABASE %s;", testData.DB)); err != nil { + log.Errorf("unable to create database, could affect tests, %s", err) + } + setupData(testData) + } +} + +func setupData(data *TestData) { + + c, err := NewClient(WithURI(fmt.Sprintf("mysql://root@localhost:3306?%s", data.DB))) + if err != nil { + log.Errorf("unable to initialize connection to mysql, %s", err) + } + defer c.Close() + s, err := c.Connect() + if err != nil { + log.Errorf("unable to obtain session to mysql, %s", err) + } + mysqlSession := s.(*Session).mysqlSession + + if _, err := mysqlSession.Exec(fmt.Sprintf("DROP TABLE IF EXISTS %s;", data.Table)); err != nil { + log.Errorf("unable to drop table, could affect tests, %s", err) + } + + _, err = mysqlSession.Exec(fmt.Sprintf("CREATE TABLE %s ( %s );", data.Table, data.Schema)) + if err != nil { + log.Errorf("unable to create table, could affect tests, %s", err) + } + + // cp file to tmp for blob test + cmd := exec.Command("cp", "logo-mysql-170x115.png", "/tmp/logo-mysql-170x115.png") + err = cmd.Run() + if err != nil { + log.Errorf("unable to copy blob image, could affect tests, %s", err) + } + for i := 0; i < data.InsertCount; i++ { + if data.Schema == complexSchema { + if _, err := mysqlSession.Exec(fmt.Sprintf(` + INSERT INTO %s VALUES ( + NULL, -- id + %d, -- colinteger INTEGER, + 32767, -- colsmallint SMALLINT, + 127, -- coltinyint TINYINT, + 8388607, -- colmediumint MEDIUMINT, + 21474836471, -- colbigint BIGINT, + 0.23509838, -- coldecimal DECIMAL(8,8), + 0.314259892323, -- colfloat FLOAT, + 0.314259892323, -- coldoubleprecision DOUBLE PRECISION, + b'101', -- colbit BIT, + '2021-12-10', -- coldate DATE, + '13:45:00', -- coltime TIME, + now(), -- coltimestamp TIMESTAMP, + '2021', -- colyear YEAR, + 'a', -- colchar CHAR, + '%s', -- colvar VARCHAR(255), + 0xDEADBEEF, -- colbinary BINARY, + LOAD_FILE('/tmp/logo-mysql-170x115.png'), -- colblob BLOB, + 'this is extremely important', -- coltext TEXT, + '{"name": "batman", "sidekick": "robin"}', -- coljson JSON, + ST_GeomFromText('POINT (15 15)'), -- colpoint POINT, + ST_GeomFromText('LINESTRING (0 0,1 1,2 2)'), -- collinestring LINESTRING, + ST_GeomFromText('POLYGON ((0 0,10 0,10 10,0 10,0 0),(5 5,7 5,7 7,5 7, 5 5))'), -- colpolygon POLYGON, + ST_GeomFromText('GEOMETRYCOLLECTION (POINT (1 1),LINESTRING (0 0,1 1,2 2,3 3,4 4))') -- colgeometrycollection GEOMETRYCOLLECTION, + ); + `, data.Table, i, randomHeros[i%len(randomHeros)])); err != nil { + log.Errorf("unexpected Insert error, %s\n", err) + } + } else if data.Schema == basicSchema { + if _, err := mysqlSession.Exec(fmt.Sprintf(`INSERT INTO %s VALUES ( + %d, -- id + '%s', -- colvar VARCHAR(255), + now() -- coltimestamp TIMESTAMP, + );`, data.Table, i, randomHeros[i%len(randomHeros)])); err != nil { + log.Errorf("unexpected Insert error, %s\n", err) + } + } + } +} + +func TestMain(m *testing.M) { + setup() + code := m.Run() + shutdown() + os.Exit(code) +} + +func shutdown() { + log.Infoln("shutting down tests") + defaultSession.mysqlSession.Close() + log.Infoln("tests shutdown complete") +} diff --git a/adaptor/mysql/client.go b/adaptor/mysql/client.go new file mode 100644 index 000000000..5412c91b5 --- /dev/null +++ b/adaptor/mysql/client.go @@ -0,0 +1,140 @@ +package mysql + +import ( + "database/sql" + "errors" + "io/ioutil" + "net/url" + "os" + "strings" + + "github.com/compose/transporter/client" + "github.com/compose/transporter/log" + + //_ "github.com/go-sql-driver/mysql" // import mysql driver + "github.com/go-mysql-org/go-mysql/driver" // full import of alternative mysql driver +) + +const ( + // DefaultURI is the default endpoint of MySQL on the local machine. + // Primarily used when initializing a new Client without a specific URI. + DefaultURI = "mysql://root@localhost:3306?" +) + +var ( + _ client.Client = &Client{} +) + +// ClientOptionFunc is a function that configures a Client. +// It is used in NewClient. +type ClientOptionFunc func(*Client) error + +// Client represents a client to the underlying File source. +type Client struct { + uri string + db string + mysqlSession *sql.DB +} + +// NewClient creates a default file client +func NewClient(options ...ClientOptionFunc) (*Client, error) { + // Set up the client + c := &Client{ + uri: DefaultURI, + db: "test", // TODO: Temporary change from `mysql`? The default local + // instance I have has `test`, but that was before I + // switched to connecting as root + } + + // Run the options on it + for _, option := range options { + if err := option(c); err != nil { + return nil, err + } + } + return c, nil +} + +// WithURI defines the full connection string for the MySQL connection +// Make this handle the different DSNs for these two? +// - https://github.com/go-sql-driver/mysql#dsn-data-source-name +// - https://github.com/go-mysql-org/go-mysql#driver +func WithURI(uri string) ClientOptionFunc { + return func(c *Client) error { + _, err := url.Parse(uri) + c.uri = uri + return err + } +} + +// WithCustomTLS configures the RootCAs for the underlying TLS connection +func WithCustomTLS(uri string, cert string, serverName string) ClientOptionFunc { + return func(c *Client) error { + if cert == "" { + // Then there are no TLS options to configure + return nil + } + if _, err := os.Stat(cert); err != nil { + return errors.New("Cert file not found") + } + + caPem, err := ioutil.ReadFile(cert) + if err != nil { + return err + } + + log.Debugf("Cert: %s", caPem) + // Pass through to the driver + // If serverName then don't do insecureSkipVerify + insecureSkipVerify := true + if serverName != "" { + insecureSkipVerify = false + } + driverErr := driver.SetCustomTLSConfig(uri, caPem, make([]byte, 0), make([]byte, 0), insecureSkipVerify, serverName) + if driverErr != nil { + return driverErr + } + return nil + } +} + +// Close implements necessary calls to cleanup the underlying *sql.DB +func (c *Client) Close() { + if c.mysqlSession != nil { + c.mysqlSession.Close() + } +} + +// Connect initializes the MySQL connection +func (c *Client) Connect() (client.Session, error) { + var err error + var dsn string + + if c.mysqlSession == nil { + // Previously it said here "there's really no way for this to error...", but that sounds + // like terrible advice when developing, especially, as it took me ages to figure out I + // was getting: + // + // > panic: invalid DSN: missing the slash separating the database name + // + // So let's do _something_ + // Also, let's strip prefix if it is there since we need a DSN + dsn = strings.Replace(c.uri, "mysql://", "", 1) + log.Debugln("DSN: " + dsn) + c.mysqlSession, err = sql.Open("mysql", dsn) + if err != nil { + panic(err.Error()) // TODO: Maybe not panic? + } + log.Debugln(c.uri) + // TODO: Error handling below? + uri, _ := url.Parse(c.uri) + if uri.Path != "" { + c.db = uri.Path[1:] + } + } + // We need to disable Foreign Key Checks for imports so also use that to check connection + // Ideally we don't want to send this _every_ time just once per session + // Previously we used `err = c.mysqlSession.Ping()` to check the connection + _, err = c.mysqlSession.Exec("SET FOREIGN_KEY_CHECKS=0;") + return &Session{c.mysqlSession, c.db}, err +} diff --git a/adaptor/mysql/client_test.go b/adaptor/mysql/client_test.go new file mode 100644 index 000000000..3c4c99e7c --- /dev/null +++ b/adaptor/mysql/client_test.go @@ -0,0 +1,84 @@ +package mysql + +import ( + "errors" + "reflect" + "testing" +) + +var ( + defaultClient = &Client{ + uri: DefaultURI, + db: "test", + } + + errBadClient = errors.New("bad client") + + clientTests = []struct { + name string + options []ClientOptionFunc // input + expected *Client // expected result + expectedErr error // expected error + }{ + { + "default_client", + make([]ClientOptionFunc, 0), + defaultClient, + nil, + }, + { + "with_err", + []ClientOptionFunc{WithErr()}, + defaultClient, + errBadClient, + }, + } +) + +func WithErr() ClientOptionFunc { + return func(c *Client) error { + return errBadClient + } +} + +func TestNewClient(t *testing.T) { + for _, ct := range clientTests { + actual, err := NewClient(ct.options...) + if err != ct.expectedErr { + t.Fatalf("[%s] unexpected NewClient error, expected %+v, got %+v\n", ct.name, ct.expectedErr, err) + } + if err == nil && !reflect.DeepEqual(ct.expected, actual) { + t.Errorf("[%s] Client mismatch\nexpected %+v\ngot %+v", ct.name, ct.expected, actual) + } + } +} + +var ( + connectTests = []struct { + name string + client *Client + expectedErr error + }{ + { + "default connect", + defaultClient, + nil, + }, + } +) + +func TestConnect(t *testing.T) { + if testing.Short() { + t.Skip("skipping Connect in short mode") + } + + for _, ct := range connectTests { + _, err := ct.client.Connect() + if err != ct.expectedErr { + t.Fatalf("[%s] unexpected Connect error, expected %+v, got %+v\n", ct.name, ct.expectedErr, err) + } + if err == nil { + ct.client.Close() + } + } +} diff --git a/adaptor/mysql/logo-mysql-170x115.png b/adaptor/mysql/logo-mysql-170x115.png new file mode 100644 index 000000000..73b55bd0d Binary files /dev/null and b/adaptor/mysql/logo-mysql-170x115.png differ diff --git a/adaptor/mysql/mysql.go b/adaptor/mysql/mysql.go new file mode 100644 index 000000000..f16c49871 --- /dev/null +++ b/adaptor/mysql/mysql.go @@ -0,0 +1,70 @@ +package mysql + +import ( + "sync" + + "github.com/compose/transporter/adaptor" + "github.com/compose/transporter/client" + + //_ "github.com/go-sql-driver/mysql" // import mysql driver + _ "github.com/go-mysql-org/go-mysql/driver" // import alternative mysql driver +) + +const ( + description = "a mysql adaptor that functions as both a source and a sink" + + sampleConfig = `{ + "uri": "${MYSQL_URI}", + // "tail": false, + // "cacert": "/path/to/cert.pem", + // "servername": "${MYSQL_DOMAIN}", +}` +) + +var ( + _ adaptor.Adaptor = &mysql{} +) + +// MySQL is an adaptor to read / write to mysql. +// it works as a source by copying files, and then optionally tailing the binlog +type mysql struct { + adaptor.BaseConfig + Tail bool `json:"tail" doc:"if tail is true, then the mysql source will tail the binlog after copying the namespace"` + CACert string `json:"cacert" doc:"path to CA cert"` + ServerName string `json:"servername" doc:"if a separate servername is needed to verify the certificate against. Requires cacert"` +} + +func init() { + adaptor.Add( + "mysql", + func() adaptor.Adaptor { + return &mysql{} + }, + ) +} + +func (m *mysql) Client() (client.Client, error) { + return NewClient(WithURI(m.URI), + WithCustomTLS(m.URI, m.CACert, m.ServerName)) +} + +func (m *mysql) Reader() (client.Reader, error) { + if m.Tail { + return newTailer(m.URI), nil + } + return newReader(), nil +} + +func (m *mysql) Writer(done chan struct{}, wg *sync.WaitGroup) (client.Writer, error) { + return newWriter(), nil +} + +// Description for mysql adaptor +func (m *mysql) Description() string { + return description +} + +// SampleConfig for mysql adaptor +func (m *mysql) SampleConfig() string { + return sampleConfig +} diff --git a/adaptor/mysql/mysql_test.go b/adaptor/mysql/mysql_test.go new file mode 100644 index 000000000..d12232a86 --- /dev/null +++ b/adaptor/mysql/mysql_test.go @@ -0,0 +1,44 @@ +package mysql + +import ( + "testing" + + "github.com/compose/transporter/adaptor" +) + +func TestDescription(t *testing.T) { + p := &mysql{} + if p.Description() != description { + t.Errorf("unexpected Description, expected %s, got %s\n", description, p.Description()) + } +} + +func TestSampleConfig(t *testing.T) { + p := &mysql{} + if p.SampleConfig() != sampleConfig { + t.Errorf("unexpected SampleConfig, expected %s, got %s\n", sampleConfig, p.SampleConfig()) + } +} + +var initTests = []map[string]interface{}{ + {"uri": DefaultURI}, + {"uri": DefaultURI, "tail": true}, +} + +func TestInit(t *testing.T) { + for _, it := range initTests { + a, err := adaptor.GetAdaptor("mysql", it) + if err != nil { + t.Fatalf("unexpected GetV2() error, %s", err) + } + if _, err := a.Client(); err != nil { + t.Errorf("unexpected Client() error, %s", err) + } + if _, err := a.Reader(); err != nil { + t.Errorf("unexpected Reader() error, %s", err) + } + if _, err := a.Writer(nil, nil); err != nil { + t.Errorf("unexpected Writer() error, %s", err) + } + } +} diff --git a/adaptor/mysql/reader.go b/adaptor/mysql/reader.go new file mode 100644 index 000000000..26eb8ee01 --- /dev/null +++ b/adaptor/mysql/reader.go @@ -0,0 +1,261 @@ +package mysql + +import ( + "encoding/hex" + "database/sql" + "fmt" + "regexp" + "strconv" + "strings" + "time" + + "github.com/compose/transporter/client" + "github.com/compose/transporter/log" + "github.com/compose/transporter/message" + "github.com/compose/transporter/message/data" + "github.com/compose/transporter/message/ops" + "github.com/twpayne/go-geom/encoding/wkbhex" + "github.com/twpayne/go-geom/encoding/wkt" +) + +var ( + _ client.Reader = &Reader{} +) + +// Reader implements the behaviour defined by client.Reader for interfacing with MySQL. +type Reader struct { +} + +func newReader() client.Reader { + return &Reader{} +} + +func (r *Reader) Read(resumeMap map[string]client.MessageSet, filterFn client.NsFilterFunc) client.MessageChanFunc { + return func(s client.Session, done chan struct{}) (chan client.MessageSet, error) { + out := make(chan client.MessageSet) + session := s.(*Session) + go func() { + defer close(out) + log.With("db", session.db).Infoln("starting Read func") + tables, err := r.listTables(session.db, session.mysqlSession, filterFn) + if err != nil { + log.With("db", session.db).Errorf("unable to list tables, %s", err) + return + } + results := r.iterateTable(session.db, session.mysqlSession, tables, done) + for { + select { + case <-done: + return + case result, ok := <-results: + if !ok { + log.With("db", session.db).Infoln("Read completed") + return + } + out <- client.MessageSet{ + Msg: message.From(ops.Insert, result.table, result.data), + } + } + } + }() + + return out, nil + } +} + +func (r *Reader) listTables(db string, session *sql.DB, filterFn func(name string) bool) (<-chan string, error) { + out := make(chan string) + tablesResult, err := session.Query("SELECT table_schema, table_name FROM INFORMATION_SCHEMA.TABLES") + if err != nil { + return nil, err + } + go func() { + defer close(out) + for tablesResult.Next() { + var schema string + var tname string + err = tablesResult.Scan(&schema, &tname) + if err != nil { + log.With("db", db).Infoln("error scanning table name...") + continue + } + name := fmt.Sprintf("%s.%s", schema, tname) + if filterFn(name) && matchFunc(name) { + log.With("db", db).With("table", name).Infoln("sending for iteration...") + out <- name + } else { + log.With("db", db).With("table", name).Debugln("skipping iteration...") + } + } + log.With("db", db).Infoln("done iterating collections") + }() + return out, nil +} + +func matchFunc(table string) bool { + if strings.HasPrefix(table, "information_schema.") || + strings.HasPrefix(table, "performance_schema.") || + strings.HasPrefix(table, "mysql.") || + strings.HasPrefix(table, "sys.") { + + return false + } + return true +} + +type doc struct { + table string + data data.Data +} + +func (r *Reader) iterateTable(db string, session *sql.DB, in <-chan string, done chan struct{}) <-chan doc { + out := make(chan doc) + go func() { + defer close(out) + for { + select { + case c, ok := <-in: + if !ok { + return + } + log.With("db", db).With("table", c).Infoln("iterating...") + schemaTable := strings.Split(c, ".") + columnsResult, err := session.Query(fmt.Sprintf(` + SELECT COLUMN_NAME AS column_name, DATA_TYPE as data_type, "" as element_type + FROM INFORMATION_SCHEMA.COLUMNS + WHERE + TABLE_SCHEMA = '%v' + AND TABLE_NAME = '%v' + ORDER BY ORDINAL_POSITION; + `, schemaTable[0], schemaTable[1])) + // No element_types in mysql since no ARRAY data type + // at the moment we add an empty column to get the same layout as Postgres + // TODO: Update this code so we don't need that empty column? + if err != nil { + log.With("db", db).With("table", c).Errorf("error getting columns %v", err) + continue + } + var columns [][]string + for columnsResult.Next() { + var columnName string + var columnType string + var columnArrayType sql.NullString // this value may be nil + + err = columnsResult.Scan(&columnName, &columnType, &columnArrayType) + recoveredRegex := regexp.MustCompile("recovered") + if err != nil && !recoveredRegex.MatchString(err.Error()) { + log.With("table", c).Errorf("error scanning columns %v", err) + continue + } + + column := []string{columnName, columnType} + columns = append(columns, column) + log.With("db", db).With("table", c).Debugln(columnName + ": " + columnType) + } + + // build docs for table + docsResult, _ := session.Query(fmt.Sprintf("SELECT * FROM %v", c)) + + for docsResult.Next() { + dest := make([]interface{}, len(columns)) + for i := range columns { + dest[i] = make([]byte, 30) + dest[i] = &dest[i] + } + + var docMap map[string]interface{} + err = docsResult.Scan(dest...) + if err != nil { + log.With("table", c).Errorf("error scanning row %v", err) + continue + } + + docMap = make(map[string]interface{}) + + for i, value := range dest { + xType := fmt.Sprintf("%T", value) + log.With("db", db).With("table", c).Debugf("value: %s, type: %s", value, xType) + switch value := value.(type) { + // Seems everything is []uint8 + case []uint8: + docMap[columns[i][0]] = casifyValue(string(value), columns[i][1]) + case string: + docMap[columns[i][0]] = casifyValue(string(value), columns[i][1]) + default: + arrayRegexp := regexp.MustCompile("[[]]$") + if arrayRegexp.MatchString(columns[i][1]) { + } else { + docMap[columns[i][0]] = value + } + } + } + out <- doc{table: c, data: docMap} + } + log.With("db", db).With("table", c).Infoln("iterating complete") + case <-done: + log.With("db", db).Infoln("iterating no more") + return + } + } + }() + return out +} + +// In Postgres this is in tailer.go, but since this is called even without tailing it seems like it should be here +func casifyValue(value string, valueType string) interface{} { + + switch { + case value == "null": + return nil + // NOTE: No need to do this for true binary data, but _if_ we wanted a string + // representation (`deadbeef000000000000`) of binary data (`0xDEADBEEF`) then this + // would be what we'd do + // + //case valueType == "binary" || valueType == "blob": + // b := hex.EncodeToString([]byte(value)) + // return b + case valueType == "bit": + bithexvalue := hex.EncodeToString([]byte(value)) + bitintvalue, err := strconv.ParseInt(bithexvalue, 10, 64) + if err != nil { + fmt.Printf("\nBit (%v) parse error: %v\n\n", value, err) + } + bitvalue := strconv.FormatInt(bitintvalue, 2) + return bitvalue + case valueType == "geometry" || valueType == "point" || valueType == "linestring" || valueType == "polygon" || valueType == "multipoint" || valueType == "multilinestring" || valueType == "multipolygon" || valueType == "geometrycollection": + geomhexvalue := hex.EncodeToString([]byte(value)) + // Strip SRID + geom, err := wkbhex.Decode(geomhexvalue[8:]) + if err != nil { + fmt.Printf("\nSpatial hex (%v) parse error: %v\n\n", value, err) + } + wktGeom, err := wkt.Marshal(geom) + if err != nil { + fmt.Printf("\nSpatial (%v) parse error: %v\n\n", value, err) + } + return wktGeom + case valueType == "int" || valueType == "smallint" || valueType == "tinyint" || valueType == "mediumint" || valueType == "bigint": + // NOTE: No error handling here because copied Postgresql. Not really an excuse, but there you go + i, _ := strconv.ParseInt(value, 10, 64) + return i + case valueType == "double" || valueType == "float" || valueType == "decimal": + // NOTE: No error handling here because copied Postgresql. Not really an excuse, but there you go + f, _ := strconv.ParseFloat(value, 64) + return f + case valueType == "timestamp": + // parse time like 2015-08-21 16:09:02.988058 + t, err := time.Parse("2006-01-02 15:04:05.9", value) + if err != nil { + fmt.Printf("\nTime (%v) parse error: %v\n\n", value, err) + } + return t + case valueType == "date": + t, err := time.Parse("2006-01-02", value) + if err != nil { + fmt.Printf("\nTime (%v) parse error: %v\n\n", value, err) + } + return t + } + + return value +} diff --git a/adaptor/mysql/reader_test.go b/adaptor/mysql/reader_test.go new file mode 100644 index 000000000..651287367 --- /dev/null +++ b/adaptor/mysql/reader_test.go @@ -0,0 +1,163 @@ +package mysql + +import ( + _ "embed" + "encoding/hex" + "fmt" + "strings" + "testing" + "time" + + "github.com/compose/transporter/client" + "github.com/compose/transporter/message" +) + +var ( + readerTestData = &TestData{"reader_test", "reader_test_table", basicSchema, 10} + + // For testing Blob + //go:embed logo-mysql-170x115.png + blobdata string +) + +func TestRead(t *testing.T) { + if testing.Short() { + t.Skip("skipping Read in short mode") + } + + reader := newReader() + readFunc := reader.Read(map[string]client.MessageSet{}, func(table string) bool { + if strings.HasPrefix(table, "information_schema.") || + strings.HasPrefix(table, "performance_schema.") || + strings.HasPrefix(table, "mysql.") || + strings.HasPrefix(table, "sys.") { + + return false + } + return table == readerTestData.DB+"."+readerTestData.Table + }) + done := make(chan struct{}) + c, err := NewClient(WithURI(fmt.Sprintf("mysql://root@localhost:3306?%s", readerTestData.DB))) + if err != nil { + t.Fatalf("unable to initialize connection to mysql, %s", err) + } + defer c.Close() + s, err := c.Connect() + if err != nil { + t.Fatalf("unable to obtain session to mysql, %s", err) + } + msgChan, err := readFunc(s, done) + if err != nil { + t.Fatalf("unexpected Read error, %s\n", err) + } + var numMsgs int + for range msgChan { + numMsgs++ + } + if numMsgs != readerTestData.InsertCount { + t.Errorf("bad message count, expected %d, got %d\n", readerTestData.InsertCount, numMsgs) + } + close(done) +} + +var ( + readerComplexTestData = &TestData{"reader_complex_test", "reader_complex_test_table", complexSchema, 10} +) + +func TestReadComplex(t *testing.T) { + if testing.Short() { + t.Skip("skipping Read in short mode") + } + + reader := newReader() + readFunc := reader.Read(map[string]client.MessageSet{}, func(table string) bool { + if strings.HasPrefix(table, "information_schema.") || + strings.HasPrefix(table, "performance_schema.") || + strings.HasPrefix(table, "mysql.") || + strings.HasPrefix(table, "sys.") { + + return false + } + return table == readerComplexTestData.DB+"."+readerComplexTestData.Table + }) + done := make(chan struct{}) + c, err := NewClient(WithURI(fmt.Sprintf("mysql://root@localhost:3306?%s", readerComplexTestData.DB))) + if err != nil { + t.Fatalf("unable to initialize connection to mysql, %s", err) + } + defer c.Close() + s, err := c.Connect() + if err != nil { + t.Fatalf("unable to obtain session to mysql, %s", err) + } + msgChan, err := readFunc(s, done) + if err != nil { + t.Fatalf("unexpected Read error, %s\n", err) + } + msgs := make([]message.Msg, 0) + for msg := range msgChan { + msgs = append(msgs, msg.Msg) + } + if len(msgs) != readerComplexTestData.InsertCount { + t.Errorf("bad message count, expected %d, got %d\n", readerComplexTestData.InsertCount, len(msgs)) + } + for i := 0; i < readerTestData.InsertCount; i++ { + for key, value := range map[string]interface{}{ + "id": int64(i) + 1, + "colinteger": int64(i), + "colsmallint": int64(32767), + "coltinyint": int64(127), + "colmediumint": int64(8388607), + "colbigint": int64(21474836471), + "coldecimal": 0.23509838, + "colfloat": 0.31426, + "coldoubleprecision": 0.314259892323, + "colbit": "101", + "coldate": time.Date(2021, 12, 10, 0, 0, 0, 0, time.UTC), + "coltime": "13:45:00", + "colyear": uint64(2021), + "colchar": "a", + "colvar": randomHeros[i%len(randomHeros)], + "colbinary": "deadbeef000000000000", + "colblob": blobdata, + "coltext": "this is extremely important", + "coljson": "{\"name\": \"batman\", \"sidekick\": \"robin\"}", + "colpoint": "POINT (15 15)", + "collinestring": "LINESTRING (0 0, 1 1, 2 2)", + "colpolygon": "POLYGON ((0 0, 10 0, 10 10, 0 10, 0 0), (5 5, 7 5, 7 7, 5 7, 5 5))", + "colgeometrycollection": "GEOMETRYCOLLECTION (POINT (1 1), LINESTRING (0 0, 1 1, 2 2, 3 3, 4 4))", + } { + switch { + case key == "colbinary": + // NOTE: This is a "hack" for testing purposes. + // True binary data (colblob) works fine and no additional parsing is required + // (i.e. nothing in `casifyValue` for it and the blob comparison works) + // When we insert the Golang value of 0xDEADBEEF into MySQL and just read it we + // get a weird string. I.e. the actual binary data. But I cannot for the life + // of me figure out how in Golang to convert 0xDEADBEEF to the same form. I.e. + // like the blobdata. So... + // + // In a mysql shell you can get a human readable form from: + // + // mysql> select hex(colbinary) from reader_complex_test_table limit 1; + // +----------------------+ + // | hex(colbinary) | + // +----------------------+ + // | DEADBEEF000000000000 | + // +----------------------+ + // + // So that is what we do here just for the ease of testing + binvalue := hex.EncodeToString([]byte(msgs[i].Data().Get(key).(string))) + if binvalue != value { + t.Errorf("Expected %v of row to equal %v (%T), but was %v (%T)", key, value, value, binvalue, binvalue) + } + default: + if msgs[i].Data().Get(key) != value { + // Fatalf here hides other errors because it's a FailNow so use Error instead + t.Errorf("Expected %v of row to equal %v (%T), but was %v (%T)", key, value, value, msgs[i].Data().Get(key), msgs[i].Data().Get(key)) + } + } + } + } + close(done) +} diff --git a/adaptor/mysql/session.go b/adaptor/mysql/session.go new file mode 100644 index 000000000..1c97ba4fc --- /dev/null +++ b/adaptor/mysql/session.go @@ -0,0 +1,15 @@ +package mysql + +import ( + "database/sql" + + "github.com/compose/transporter/client" +) + +var _ client.Session = &Session{} + +// Session serves as a wrapper for the underlying *sql.DB +type Session struct { + mysqlSession *sql.DB + db string +} diff --git a/adaptor/mysql/tailer.go b/adaptor/mysql/tailer.go new file mode 100644 index 000000000..5d5236312 --- /dev/null +++ b/adaptor/mysql/tailer.go @@ -0,0 +1,400 @@ +package mysql + +import ( + "context" + "database/sql" + "fmt" + "net/url" + "os" + "regexp" + "strconv" + "time" + + "github.com/compose/transporter/client" + "github.com/compose/transporter/commitlog" + "github.com/compose/transporter/log" + "github.com/compose/transporter/message" + "github.com/compose/transporter/message/data" + "github.com/compose/transporter/message/ops" + + // Naming conflict with Transporter adaptor itself + gomysql "github.com/go-mysql-org/go-mysql/mysql" + "github.com/go-mysql-org/go-mysql/replication" +) + +var ( + _ client.Reader = &Tailer{} +) + +// Tailer implements the behaviour defined by client.Tailer for interfacing with the MySQL binlog. +// We'll have to pass through the dsn so that we can use it to configure the sync client +type Tailer struct { + reader client.Reader + dsn string +} + +func newTailer(dsn string) client.Reader { + return &Tailer{newReader(), dsn} +} + +// Tail does the things +func (t *Tailer) Read(resumeMap map[string]client.MessageSet, filterFn client.NsFilterFunc) client.MessageChanFunc { + return func(s client.Session, done chan struct{}) (chan client.MessageSet, error) { + // How is resuming supposed to work? + readFunc := t.reader.Read(resumeMap, filterFn) + msgChan, err := readFunc(s, done) + if err != nil { + return nil, err + } + session := s.(*Session) + + // TODO: This could go in a separate function and return a cfg? + parsedDSN, err:= url.Parse(t.dsn) + if err != nil { + return nil, err + } + host := parsedDSN.Hostname() + port := parsedDSN.Port() + portInt, err:= strconv.Atoi(port) + if err != nil { + return nil, err + } + user := parsedDSN.User.Username() + pass, _ := parsedDSN.User.Password() + // Not needed? + //path := parsedDSN.Path[1:] + scheme := parsedDSN.Scheme + + // Find binlog info + var binFile string + var binPosition int + var _binBinlogDoDB string + var _binBinlogIgnoreDB string + var _binExecutedGtidSet string + result := session.mysqlSession.QueryRow("SHOW MASTER STATUS") + // We need to scan all columns... even though we don't care about them all. + // mysql> show master status; + // +-------------------+----------+--------------+------------------+-------------------------------------------+ + // | File | Position | Binlog_Do_DB | Binlog_Ignore_DB | Executed_Gtid_Set | + // +-------------------+----------+--------------+------------------+-------------------------------------------+ + // | master-bin.000001 | 163739 | | | a852989a-1894-4fcb-a060-a4aaaf06b9f0:1-55 | + // +-------------------+----------+--------------+------------------+-------------------------------------------+ + // 1 row in set (0.04 sec) + // + scanErr := result.Scan(&binFile, &binPosition, &_binBinlogDoDB, &_binBinlogIgnoreDB, &_binExecutedGtidSet) + log.Debugf("binFile: %s, binPosition: %d", binFile, binPosition) + if scanErr != nil { + // Quit gracefully since can't tail? + log.Errorln("Can't find binFile or binPosition. Unable to tail") + os.Exit(1) + } + + // Find serverID + var serverID uint32 + result = session.mysqlSession.QueryRow("SELECT @@server_id as SERVER_ID") + scanErr = result.Scan(&serverID) + if scanErr != nil { + // Quit gracefully since can't tail? + log.Errorln("Can't find source server ID") + os.Exit(1) + } + + // Configure sync client + cfg := replication.BinlogSyncerConfig{ + ServerID: serverID, + Flavor: scheme, + Host: host, + Port: uint16(portInt), + User: user, + Password: pass, + } + + // Create syncer + syncer := replication.NewBinlogSyncer(cfg) + + // Start streamer + streamer, _ := syncer.StartSync(gomysql.Position{Name: binFile, Pos: uint32(binPosition)}) + // How to properly close this? + // There is no EndSync, but there is a close we can call on the `done` channel + + out := make(chan client.MessageSet) + // Will we have to pass things (such as streamer) into this function? + go func() { + defer close(out) + // read until reader done + for msg := range msgChan { + out <- msg + } + // start tailing/streaming + log.With("db", session.db).Infoln("Listening for changes...") + for { + // Use timeout context (for now at least) + // If we are using a timeout I think we can happily sit there for a bit + ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) + defer cancel() + select { + // Notes to self on what this is doing... + // From reading around, e.g: https://golangbyexample.com/select-statement-golang/ + // I _think_ the blocking 1 sec sleep is there just to give the "done" channel a chance to + // execute otherwise there is no guarantee it would close because the "tailing" + // channel could also be executing and if both are ready it'll select one at random. + // For Postgresql this works because each call pulls all the logical decoding messages + // since the last call. + // For MySQL this isn't going to work correctly because we are pulling/streaming one + // event at a time. A 1 second sleep is no good. + // Historically, way back, channels weren't used: + // + // - https://github.com/compose/transporter/pull/281/files + // - https://github.com/compose/transporter/blob/7875ce0a2343fe94d7d6f9703e2e578cd6b77cba/pkg/adaptor/postgres/postgres.go#L305-L318 + // + // We need to stick with channels, but need to do this a bit differently + // Can we do outside of the select/case? + // Unless we can use DumpEvents instead of GetEvent? + // Or we use default? That way it doesn't block but should still close + case <-done: + log.With("db", session.db).Infoln("tailing stopping...") + syncer.Close() + // ?? this doesn't use the cancel above. Need to fix. + return + default: + // This blocks until an event is received which will still prevent the done channel from executing so use a timeout + event, ctxerr := streamer.GetEvent(ctx) + // Can't easily use below with `log.` so leaving commented out for debugging + //event.Dump(os.Stdout) + + // Do not really understand this next bit yet + // Cancels existing/current context? + cancel() + if ctxerr == context.DeadlineExceeded { + // Allow `done` to execute + continue + } + + msgSlice, skip, err := t.processEvent(s, event, filterFn) + if err != nil { + log.With("db", session.db).Errorf("error processing event from binlog %v", err) + continue + } + // send processed events to the channel + // What if there is an event we want to skip? Need a way to process that? + if skip { + log.With("db", session.db).Debugf("skipping event from binlog %v", msgSlice) + continue + } + for _, msg := range msgSlice { + out <- msg + } + } + } + }() + + return out, nil + } +} + +// For a statement like this: +// +// INSERT INTO recipes (recipe_id, recipe_name) VALUES (1,'Tacos'), (2,'Tomato Soup'), (3,'Grilled Cheese'); +// Postgresql has multiple events split per logical decoding rows: +// +// 0/500CEC8 | 496 | table public.recipes: INSERT: recipe_id[integer]:1 recipe_name[character varying]:'Tacos' recipe_rating[integer]:null +// 0/500D050 | 496 | table public.recipes: INSERT: recipe_id[integer]:2 recipe_name[character varying]:'Tomato Soup' recipe_rating[integer]:null +// 0/500D120 | 496 | table public.recipes: INSERT: recipe_id[integer]:3 recipe_name[character varying]:'Grilled Cheese' recipe_rating[integer]:null +// +// MySQL has one binlog event containing multiple updates ("Same, but different") +// +// [[1 Tacos] [2 Tomato Soup] [3 Grilled Cheese]] +// +// It seems we do not get the column names, instead we'll get `` if a column is skipped +// This is unfortunate for our use case as we'll have to fill in the column names +// +// For Postgresql, a string like this from logical decoding: +// +// "id[integer]:1 data[text]:'1'" +// +// Will end up like: +// +// map[data:1 id:1] +// +// So we need to get MySQL stuff in that format. +// +// Note: Canal has a lot of depth for MySQL sync that we (fortunately! For me!) don't need to handle in Transporter (which is more breadth than depth) +func (t *Tailer) processEvent(s client.Session, event *replication.BinlogEvent, filterFn client.NsFilterFunc) ([]client.MessageSet, bool, error) { + var ( + result []client.MessageSet + skip = false + err error + action ops.Op + schema, table string + ) + + // We are basically copying this from the following, but there's not really a different way to write these: + // + // - https://github.com/go-mysql-org/go-mysql/blob/d1666538b005e996414063695ca223994e9dc19d/canal/sync.go#L91-L172 + // - https://github.com/go-mysql-org/go-mysql/blob/b4f7136548f0758730685ebd78814eb3e5e4b0b0/canal/sync.go#L248-L272 + switch event.Event.(type) { + case *replication.RowsEvent: + // Need to cast + rowsEvent := event.Event.(*replication.RowsEvent) + log.Debugln("Logging rowsEvent:") + log.Debugln(rowsEvent) + // We only care about Insert / Update / Delete + // 1. Schema + schema = string(rowsEvent.Table.Schema) + // 2. Table + table = string(rowsEvent.Table.Table) + // Make sure we are getting changes on valid tables + schemaAndTable := fmt.Sprintf("%v.%v", schema, table) + if !filterFn(schemaAndTable) { + skip = true + // TODO: Do we need to configure an empty result? + return result, skip, fmt.Errorf("Error processing action from string: %v", rowsEvent.Rows) + } + // 3. Action (Insert / Update / Delete) + switch event.Header.EventType { + case replication.WRITE_ROWS_EVENTv1, replication.WRITE_ROWS_EVENTv2: + action = ops.Insert + case replication.DELETE_ROWS_EVENTv1, replication.DELETE_ROWS_EVENTv2: + action = ops.Delete + case replication.UPDATE_ROWS_EVENTv1, replication.UPDATE_ROWS_EVENTv2: + action = ops.Update + // For an update MySQL binlog returns a before vs after, but we just need the after + // I.e. this: + // + // mysql> update recipes set recipe_name = 'Nachos' where recipe_id = 1; + // + // results in: + // + // [[1 Tacos ] [1 Nachos ]] + // + default: + // TODO: Do we want to skip? Or just Error? + return result, skip, fmt.Errorf("Error processing action from string: %v", rowsEvent.Rows) + } + // Fetch column / data-type info before we can do 4. + + session := s.(*Session) + // Copied from reader.go `iterateTable` + // TODO: Use a common function for both + // TODO: Do we really want to do this _every_ time? Seems ultra inefficient + columnsResult, err := session.mysqlSession.Query(fmt.Sprintf(` + SELECT COLUMN_NAME AS column_name, DATA_TYPE as data_type, "" as element_type + FROM INFORMATION_SCHEMA.COLUMNS + WHERE + TABLE_SCHEMA = '%v' + AND TABLE_NAME = '%v' + ORDER BY ORDINAL_POSITION; + `, schema, table)) + // No element_types in mysql since no ARRAY data type + // at the moment we add an empty column to get the same layout as Postgres + // TODO: Update this code so we don't need that empty column? + if err != nil { + log.With("schema", schema).With("table", table).Errorf("error getting columns %v", err) + } + var columns [][]string + for columnsResult.Next() { + var columnName string + var columnType string + var columnArrayType sql.NullString // this value may be nil + + err = columnsResult.Scan(&columnName, &columnType, &columnArrayType) + recoveredRegex := regexp.MustCompile("recovered") + if err != nil && !recoveredRegex.MatchString(err.Error()) { + log.With("table", table).Errorf("error scanning columns %v", err) + continue + } + + column := []string{columnName, columnType} + columns = append(columns, column) + log.With("db", session.db).Debugln(columnName + ": " + columnType) + } + // 4. Remaining stuff / data + for i, row := range rowsEvent.Rows { + // This is the tricky bit! + + log.With("op", action).With("table", schemaAndTable).Debugln("received") + + // Skip first row for an update + if i == 0 && action == ops.Update { + continue + } + + // TODO: We might want to take advantage of `handleUnsigned`: + // + // https://github.com/go-mysql-org/go-mysql/blob/b4f7136548f0758730685ebd78814eb3e5e4b0b0/canal/rows.go#L46 + + docMap := parseEventRow(columns, row) + result = append(result, client.MessageSet{ + Msg: message.From(action, schemaAndTable, docMap), + Mode: commitlog.Sync, + }) + } + default: + skip = true + } + + return result, skip, err +} + +func parseEventRow(columns [][]string, d []interface{}) data.Data { + // The main issue with MySQL is that we don't get the column names!!! So we need to fill those in... + // We can use `TableMapEvent`s or Transporter itself since it has read the table. `iterateTable`? + + // See reader.go + // out <- doc{table: c, data: docMap} + // docMap[columns[i][0]] = value + + data := make(data.Data) + + // I think basically need to merge `iterateTable` with the data from the binlog. + + // row = [1 Tacos] + + // Might not need any of this dest stuff... + // Since that is for scanning into and we don't need to do that + //dest := make([]interface{}, len(columns)) + //for i := range columns { + // dest[i] = make([]byte, 30) + // dest[i] = &dest[i] + //} + + // Using data instead + //var docMap map[string]interface{} + + // We don't need to Scan, we have the data already + //err = docsResult.Scan(dest...) + //if err != nil { + // log.With("table", c).Errorf("error scanning row %v", err) + // continue + //} + + //Using data instead + //docMap = make(map[string]interface{}) + + for i, value := range d { + log.Debugln("Logging value from parseEventRow:") + log.Debugln(value) + xType := fmt.Sprintf("%T", value) + log.Debugln("Logging type from parseEventRow:") + log.Debugln(xType) + switch value := value.(type) { + // Seems everything is []uint8 + case []uint8: + data[columns[i][0]] = casifyValue(string(value), columns[i][1]) + case string: + data[columns[i][0]] = casifyValue(string(value), columns[i][1]) + default: + // TODO: This is probably a Postgresql thing and needs removing here and in reader.go + arrayRegexp := regexp.MustCompile("[[]]$") + if arrayRegexp.MatchString(columns[i][1]) { + } else { + data[columns[i][0]] = value + } + } + } + + // Any difference between docMap and data in this reader context? + // Data is `map[string]interface{}` + // So it's the same + return data +} diff --git a/adaptor/mysql/tailer_test.go b/adaptor/mysql/tailer_test.go new file mode 100644 index 000000000..c3b84b87c --- /dev/null +++ b/adaptor/mysql/tailer_test.go @@ -0,0 +1,144 @@ +package mysql + +import ( + "database/sql" + "fmt" + "strings" + "sync" + "testing" + "time" + + "github.com/compose/transporter/client" +) + +func checkBinLogReadable(s *sql.DB) error { + var File string + var Position int + var _BinlogDoDB string + var _BinlogIgnoreDB string + var _ExecutedGtidSet string + err := s.QueryRow(`SHOW MASTER STATUS;`).Scan(&File, &Position, &_BinlogDoDB, &_BinlogIgnoreDB, &_ExecutedGtidSet) + return err +} + +var ( + tailerTestData = &TestData{"tailer_test", "tailer_test_table", basicSchema, 10} +) + +func TestTailer(t *testing.T) { + if testing.Short() { + t.Skip("skipping Tailer in short mode") + } + dsn := "mysql://root@localhost:3306?%s" + c, err := NewClient(WithURI(fmt.Sprintf(dsn, tailerTestData.DB))) + if err != nil { + t.Fatalf("unable to initialize connection to mysql, %s", err) + } + defer c.Close() + s, err := c.Connect() + if err != nil { + t.Fatalf("unable to obtain session to mysql, %s", err) + } + + if err := checkBinLogReadable(s.(*Session).mysqlSession); err != nil { + t.Fatalf("unable to query binlog, %s", err) + } + time.Sleep(1 * time.Second) + + // There is no t.Debug unfortunately so retaining below but commented out + //t.Log("DEBUG: Starting tailer...") + r := newTailer(dsn) + // There is no t.Debug unfortunately so retaining below but commented out + //t.Log("DEBUG: Tailer running") + readFunc := r.Read(map[string]client.MessageSet{}, func(table string) bool { + if strings.HasPrefix(table, "information_schema.") || + strings.HasPrefix(table, "performance_schema.") || + strings.HasPrefix(table, "mysql.") || + strings.HasPrefix(table, "sys.") { + return false + } + return table == fmt.Sprintf("%s.%s", tailerTestData.DB, tailerTestData.Table) + }) + done := make(chan struct{}) + msgChan, err := readFunc(s, done) + if err != nil { + t.Fatalf("unexpected Read error, %s\n", err) + } + // There is no t.Debug unfortunately so retaining below but commented out + //t.Log("DEBUG: Checking count for initial drain") + checkCount("initial drain", tailerTestData.InsertCount, msgChan, t) + + // There is no t.Debug unfortunately so retaining below but commented out + //t.Log("DEBUG: Inserting some stuff") + for i := 10; i < 20; i++ { + // No error handling, this is testing + _, _ = s.(*Session).mysqlSession.Exec(fmt.Sprintf(`INSERT INTO %s VALUES ( + %d, -- id + '%s', -- colvar VARCHAR(255), + now() -- coltimestamp TIMESTAMP, + );`, tailerTestData.Table, i, randomHeros[i%len(randomHeros)])) + } + // There is no t.Debug unfortunately so retaining below but commented out + //t.Log("DEBUG: Checking count for tailed data") + checkCount("tailed data", 10, msgChan, t) + + // There is no t.Debug unfortunately so retaining below but commented out + //t.Log("DEBUG: Updating data") + for i := 10; i < 20; i++ { + // No error handling, this is testing + _, _ = s.(*Session).mysqlSession.Exec(fmt.Sprintf("UPDATE %s SET colvar = 'hello' WHERE id = %d;", tailerTestData.Table, i)) + } + // There is no t.Debug unfortunately so retaining below but commented out + //t.Log("DEBUG: Checking count for updated data") + // Note: During developing found this was returning 20 messages + // This is because binlog returns a before and after for the update + // Handling this in processEvent + // See more comments about this in that function + checkCount("updated data", 10, msgChan, t) + + // There is no t.Debug unfortunately so retaining below but commented out + //t.Log("DEBUG: Deleting data") + for i := 10; i < 20; i++ { + // No error handling, this is testing + _, _ = s.(*Session).mysqlSession.Exec(fmt.Sprintf(`DELETE FROM %v WHERE id = %d; `, tailerTestData.Table, i)) + } + + // There is no t.Debug unfortunately so retaining below but commented out + //t.Log("DEBUG: Checking count for deleted data") + checkCount("deleted data", 10, msgChan, t) + + close(done) +} + +func checkCount(desc string, expected int, msgChan <-chan client.MessageSet, t *testing.T) { + // There is no t.Debug unfortunately so retaining below but commented out + //t.Log("DEBUG: Running checkCount") + var numMsgs int + var wg sync.WaitGroup + wg.Add(1) + go func(wg *sync.WaitGroup) { + for { + select { + case <-msgChan: + numMsgs++ + case <-time.After(1 * time.Second): + if numMsgs == expected { + wg.Done() + return + } + // The below isn't quitting things as quickly as intended + case <-time.After(20 * time.Second): + wg.Done() + return + } + // There is no t.Debug unfortunately so retaining below, but commented out + //t.Logf("DEBUG: %d messages so far", numMsgs) + } + }(&wg) + wg.Wait() + if numMsgs != expected { + t.Errorf("[%s] bad message count, expected %d, got %d\n", desc, expected, numMsgs) + } else { + t.Logf("[%s] message count ok", desc) + } +} diff --git a/adaptor/mysql/writer.go b/adaptor/mysql/writer.go new file mode 100644 index 000000000..0f3f930f2 --- /dev/null +++ b/adaptor/mysql/writer.go @@ -0,0 +1,315 @@ +package mysql + +import ( + "database/sql" + "encoding/json" + "fmt" + "strings" + "time" + + "github.com/compose/mejson" + "github.com/compose/transporter/client" + "github.com/compose/transporter/log" + "github.com/compose/transporter/message" + "github.com/compose/transporter/message/ops" + "github.com/twpayne/go-geom" + "github.com/twpayne/go-geom/encoding/wkt" +) + +var _ client.Writer = &Writer{} + +// Writer implements client.Writer for use with MySQL +type Writer struct { + writeMap map[ops.Op]func(message.Msg, *sql.DB) error +} + +func newWriter() *Writer { + w := &Writer{} + w.writeMap = map[ops.Op]func(message.Msg, *sql.DB) error{ + ops.Insert: insertMsg, + ops.Update: updateMsg, + ops.Delete: deleteMsg, + } + return w +} + +func (w *Writer) Write(msg message.Msg) func(client.Session) (message.Msg, error) { + return func(s client.Session) (message.Msg, error) { + writeFunc, ok := w.writeMap[msg.OP()] + if !ok { + log.Infof("no function registered for operation, %s", msg.OP()) + if msg.Confirms() != nil { + msg.Confirms() <- struct{}{} + } + return msg, nil + } + if err := writeFunc(msg, s.(*Session).mysqlSession); err != nil { + return nil, err + } + if msg.Confirms() != nil { + msg.Confirms() <- struct{}{} + } + return msg, nil + } +} + +func insertMsg(m message.Msg, s *sql.DB) error { + log.With("table", m.Namespace()).Debugln("INSERT") + var ( + keys []string + placeholders []string + data []interface{} + err error + ) + + i := 1 + for key, value := range m.Data() { + keys = append(keys, key) + // Mysql uses "?, ?, ?" instead of "$1, $2, $3" + // Wrap placeholder for geometry types + // Overkill using switch/case for just geometry, + // but there might be other types we need to handle + placeholder := "?" + switch value.(type) { + case *geom.Point, *geom.LineString, *geom.Polygon, *geom.GeometryCollection: + // Wrap in ST_GeomFromText + // Supposedly not required in "later" MySQLs + // Although the format changes, e.g. `POINT (15,15)` vs WKT of `POINT (15 15)` + // So might as well stick with it. Possible performance impact? + // We could use binary `ST_GeomFromWKB` though + placeholder = "ST_GeomFromText(?)" + } + placeholders = append(placeholders, placeholder) + + log.Debugf("Type of value is %T", value) + switch t := value.(type) { + // Can add others here such as binary and bit, etc if needed + case *geom.Point, *geom.LineString, *geom.Polygon, *geom.GeometryCollection: + // Do not care about t, but working around golangci-lint + _ = t + value, err = wkt.Marshal(value.(geom.T)) + if err != nil { + return err + } + value = value.(string) + case time.Time: + // MySQL can write this format into DATE, DATETIME and TIMESTAMP + value = value.(time.Time).Format("2006-01-02 15:04:05.000000") + case map[string]interface{}, mejson.M, []map[string]interface{}, mejson.S: + // This is used so we can write values like the following to json fields: + // + // map[string]interface{}{"name": "batman"}, + // + // Keeping for compatibility with the Postgresql adaptor. + // With MySQL we can just write a json string. + value, err = json.Marshal(value) + if err != nil { + return err + } + } + data = append(data, value) + + i = i + 1 + } + + query := fmt.Sprintf("INSERT INTO %v (%v) VALUES (%v);", m.Namespace(), strings.Join(keys, ", "), strings.Join(placeholders, ", ")) + log.Debugf("query: %s", query) + log.Debugf("data: %s", data) + + // TODO: Figure out finding the log level so we only run this bit in debug + //if log.level == "debug" { + // for i := 0; i < len(data); i++ { + // log.With("table", m.Namespace()).Debugf("data: %s", data[i]) + // } + //} + // INSERT INTO writer_insert_test.simple_test_table (id, colvar, coltimestamp) VALUES ($1, $2, $3); + _, err = s.Exec(query, data...) + return err +} + +func deleteMsg(m message.Msg, s *sql.DB) error { + log.With("table", m.Namespace()).With("values", m.Data()).Debugln("DELETE") + var ( + ckeys []string + vals []interface{} + ) + pkeys, err := primaryKeys(m.Namespace(), s) + if err != nil { + return err + } + i := 1 + for key, value := range m.Data() { + if pkeys[key] { // key is primary key + ckeys = append(ckeys, fmt.Sprintf("%v = ?", key)) + } + switch value.(type) { + case map[string]interface{}, mejson.M, []map[string]interface{}, mejson.S: + // This is used so we can write values like the following to json fields: + // + // map[string]interface{}{"name": "batman"}, + // + // Keeping for compatibility with the Postgresql adaptor. + // With MySQL we can just write a json string. + value, err = json.Marshal(value) + if err != nil { + return err + } + } + vals = append(vals, value) + i = i + 1 + } + + if len(pkeys) != len(ckeys) { + return fmt.Errorf("All primary keys were not accounted for. Provided: %v; Required; %v", ckeys, pkeys) + } + + query := fmt.Sprintf("DELETE FROM %v WHERE %v;", m.Namespace(), strings.Join(ckeys, " AND ")) + log.Debugf("query: %s", query) + log.Debugf("vals: %s", vals) + _, err = s.Exec(query, vals...) + return err +} + +func updateMsg(m message.Msg, s *sql.DB) error { + log.With("table", m.Namespace()).Debugln("UPDATE") + var ( + ckeys []string + ukeys []string + cvals []interface{} + uvals []interface{} + vals []interface{} + ) + + pkeys, err := primaryKeys(m.Namespace(), s) + if err != nil { + return err + } + + i := 1 + for key, value := range m.Data() { + // Mysql uses "?, ?, ?" instead of "$1, $2, $3" + // Wrap placeholder for geometry types + // Overkill using switch/case for just geometry, + // but there might be other types we need to handle + placeholder := "?" + switch value.(type) { + case *geom.Point, *geom.LineString, *geom.Polygon, *geom.GeometryCollection: + // Wrap in ST_GeomFromText + // Supposedly not required in "later" MySQLs + // Although the format changes, e.g. `POINT (15,15)` vs WKT of `POINT (15 15)` + // So might as well stick with it. Possible performance impact? + // We could use binary `ST_GeomFromWKB` though + placeholder = "ST_GeomFromText(?)" + } + if pkeys[key] { // key is primary key + ckeys = append(ckeys, fmt.Sprintf("%v=%s", key, placeholder)) + } else { + ukeys = append(ukeys, fmt.Sprintf("%v=%s", key, placeholder)) + } + + switch t := value.(type) { + // Can add others here such as binary and bit, etc if needed + case *geom.Point, *geom.LineString, *geom.Polygon, *geom.GeometryCollection: + // Do not care about t, but working around golangci-lint + _ = t + value, err = wkt.Marshal(value.(geom.T)) + if err != nil { + return err + } + value = value.(string) + case time.Time: + // MySQL can write this format into DATE, DATETIME and TIMESTAMP + value = value.(time.Time).Format("2006-01-02 15:04:05.000000") + case map[string]interface{}, mejson.M, []map[string]interface{}, mejson.S: + // This is used so we can write values like the following to json fields: + // + // map[string]interface{}{"name": "batman"}, + // + // Keeping for compatibility with the Postgresql adaptor. + // With MySQL we can just write a json string. + value, err = json.Marshal(value) + if err != nil { + return err + } + } + // if it's a primary key it needs to go at the end of the vals list + // So perhaps easier to do cvals and uvals and then combine at end + if pkeys[key] { + cvals = append(cvals, value) + } else { + uvals = append(uvals, value) + } + i = i + 1 + } + + // Join vals + vals = append(uvals, cvals...) + + if len(pkeys) != len(ckeys) { + return fmt.Errorf("All primary keys were not accounted for. Provided: %v; Required; %v", ckeys, pkeys) + } + + query := fmt.Sprintf("UPDATE %v SET %v WHERE %v;", m.Namespace(), strings.Join(ukeys, ", "), strings.Join(ckeys, " AND ")) + // Note: For Postgresql this results in: + // + // UPDATE writer_update_test.update_test_table SET colvar=$2, coltimestamp=$3 WHERE id=$1; + // + // which is wrong for MySQL, need just `?` + // + log.Debugf("query: %s", query) + log.Debugf("vals: %s", vals) + _, err = s.Exec(query, vals...) + return err +} + +func primaryKeys(namespace string, db *sql.DB) (primaryKeys map[string]bool, err error) { + primaryKeys = map[string]bool{} + namespaceArray := strings.SplitN(namespace, ".", 2) + var ( + tableSchema string + tableName string + columnName string + ) + if namespaceArray[1] == "" { + tableSchema = "public" + tableName = namespaceArray[0] + } else { + tableSchema = namespaceArray[0] + tableName = namespaceArray[1] + } + + // Need to update this + // unexpected Update error, Error 1109: Unknown table 'constraint_column_usage' in information_schema + // + // This returns something like: + // + // column_name + // ------------- + // recipe_id + // recipe_rating + // (2 rows) + // + // Below from here: https://stackoverflow.com/a/12379241/208793 + tablesResult, err := db.Query(fmt.Sprintf(` + SELECT k.COLUMN_NAME + FROM information_schema.table_constraints t + LEFT JOIN information_schema.key_column_usage k + USING(constraint_name,table_schema,table_name) + WHERE t.constraint_type='PRIMARY KEY' + AND t.table_schema='%v' + AND t.table_name='%v' + `, tableSchema, tableName)) + if err != nil { + return primaryKeys, err + } + + for tablesResult.Next() { + err = tablesResult.Scan(&columnName) + if err != nil { + return primaryKeys, err + } + primaryKeys[columnName] = true + } + + return primaryKeys, err +} diff --git a/adaptor/mysql/writer_test.go b/adaptor/mysql/writer_test.go new file mode 100644 index 000000000..5613b317b --- /dev/null +++ b/adaptor/mysql/writer_test.go @@ -0,0 +1,494 @@ +package mysql + +import ( + "fmt" + "math/rand" + "testing" + "time" + + "github.com/compose/transporter/adaptor" + "github.com/compose/transporter/message" + "github.com/compose/transporter/message/data" + "github.com/compose/transporter/message/ops" + "github.com/twpayne/go-geom" + "github.com/twpayne/go-geom/encoding/wkt" +) + +var optests = []struct { + op ops.Op + registered bool +}{ + {ops.Insert, true}, + {ops.Update, true}, + {ops.Delete, true}, + {ops.Command, false}, + {ops.Noop, false}, +} + +func TestOpFunc(t *testing.T) { + w := newWriter() + for _, ot := range optests { + if _, ok := w.writeMap[ot.op]; ok != ot.registered { + t.Errorf("op (%s) registration incorrect, expected %+v, got %+v\n", ot.op.String(), ot.registered, ok) + } + } +} + +var ( + writerTestData = &TestData{"writer_insert_test", "simple_test_table", basicSchema, 0} +) + +func TestInsert(t *testing.T) { + confirms, cleanup := adaptor.MockConfirmWrites() + defer adaptor.VerifyWriteConfirmed(cleanup, t) + w := newWriter() + c, err := NewClient(WithURI(fmt.Sprintf("mysql://root@localhost:3306?%s", writerTestData.DB))) + if err != nil { + t.Fatalf("unable to initialize connection to mysql, %s", err) + } + defer c.Close() + s, err := c.Connect() + if err != nil { + t.Fatalf("unable to obtain session to mysql, %s", err) + } + // See this for why the Format has that date: https://pkg.go.dev/time#pkg-constants + for i := 0; i < 10; i++ { + if _, err := w.Write( + message.WithConfirms( + confirms, + message.From( + ops.Insert, + fmt.Sprintf("%s.%s", writerTestData.DB, writerTestData.Table), + data.Data{"id": i, "colvar": "hello world", "coltimestamp": time.Now().Format("2006-01-02 15:04:05.000000")}), + ), + )(s); err != nil { + t.Errorf("unexpected Insert error, %s\n", err) + } + } + + if _, err := w.Write(message.WithConfirms( + confirms, + message.From( + ops.Command, + fmt.Sprintf("%s.%s", writerTestData.DB, writerTestData.Table), + map[string]interface{}{}, + )), + )(s); err != nil { + t.Errorf("unexpected Command error, %s", err) + } + + if _, err := w.Write(message.From( + ops.Command, + fmt.Sprintf("%s.%s", writerTestData.DB, writerTestData.Table), + map[string]interface{}{}, + ))(s); err != nil { + t.Errorf("unexpected Command error, %s", err) + } + + var ( + id int + stringValue string + timeByteValue []byte + timeValue time.Time + ) + if err := s.(*Session).mysqlSession. + QueryRow(fmt.Sprintf("SELECT id, colvar, coltimestamp FROM %s WHERE id = 4", writerTestData.Table)). + Scan(&id, &stringValue, &timeByteValue); err != nil { + t.Fatalf("Error on test query: %v", err) + } + // Parse timeValue + // There is no t.Debug unfortunately so retaining below but commented out + //t.Logf("DEBUG: %s", timeByteValue) + // For some reason we lose the fractional bit on the scan, perhaps because zeroes? + // But seems we can omit: + // > When parsing (only), the input may contain a fractional second field + // > immediately after the seconds field, even if the layout does not signify its + // > presence. + // + // NOTE: No error handling on time.Parse since this is a test file + timeValue, _ = time.Parse("2006-01-02 15:04:05", string(timeByteValue)) + + if id != 4 || stringValue != "hello world" || timeValue.Before(time.Now().Add(-30*time.Second).UTC()) { + t.Fatalf("Values were not what they were expected to be: %v, %v, %v", id, stringValue, timeValue) + } + + var count int + err = s.(*Session).mysqlSession. + QueryRow(fmt.Sprintf("SELECT COUNT(id) FROM %s;", writerTestData.Table)). + Scan(&count) + if err != nil { + t.Errorf("unable to count table, %s", err) + } + if count != 10 { + t.Errorf("wrong document count, expected 10, got %d", count) + } +} + +var ( + writerComplexTestData = &TestData{"writer_complex_insert_test", "complex_test_table", complexSchema, 0} +) + +func wktToGeom(wktForm string) geom.T { + // NOTE: No error handling on the below since this is a test file + geomForm, _ := wkt.Unmarshal(wktForm) + return geomForm +} + +func TestComplexInsert(t *testing.T) { + w := newWriter() + c, err := NewClient(WithURI(fmt.Sprintf("mysql://root@localhost:3306?%s", writerComplexTestData.DB))) + if err != nil { + t.Fatalf("unable to initialize connection to mysql, %s", err) + } + defer c.Close() + s, err := c.Connect() + if err != nil { + t.Fatalf("unable to obtain session to mysql, %s", err) + } + // These need to be Go native? + // What creates this table? Because we need to match... + // !! This has to match `complex_schema` in adaptor_test !! + for i := 0; i < 10; i++ { + msg := message.From(ops.Insert, fmt.Sprintf("%s.%s", writerComplexTestData.DB, writerComplexTestData.Table), data.Data{ + "id": i, + "colinteger": int64(3), + "colsmallint": int64(32767), + "coltinyint": int64(127), + "colmediumint": int64(8388607), + "colbigint": int64(21474836471), + "coldecimal": 0.23509838, + "colfloat": 0.31426, + "coldoubleprecision": 0.314259892323, + // I think we need to do what we did in reader_test, but in reverse? + // "b'101'" gets interpreted as a string + "colbit": 0b101, + "coldate": time.Date(2021, 12, 10, 0, 0, 0, 0, time.UTC).Format("2006-01-02"), + "coltime": "13:45:00", + "coltimestamp": time.Now().Format("2006-01-02 15:04:05.000000"), + "colyear": "2021", + "colchar": "a", + "colvar": randomHeros[i], + "colbinary": 0xDEADBEEF, + "colblob": 0xDEADBEEF, + "coltext": "this is extremely important", + "coljson": map[string]interface{}{"name": "batman"}, + //"coljson": "{\"name\": \"batman\", \"sidekick\": \"robin\"}", + // Maybe it makes sense to have geometry as a Go representation of geometry + // So go-geom since we are using that at the moment + // And then we can manipulate in writer.go to insert as required + "colpoint": wktToGeom("POINT (15 15)"), + "collinestring": wktToGeom("LINESTRING (0 0, 1 1, 2 2)"), + "colpolygon": wktToGeom("POLYGON ((0 0, 10 0, 10 10, 0 10, 0 0),(5 5, 7 5, 7 7, 5 7, 5 5))"), + "colgeometrycollection": wktToGeom("GEOMETRYCOLLECTION (POINT (1 1),LINESTRING (0 0, 1 1, 2 2, 3 3, 4 4))"), + }) + if _, err := w.Write(msg)(s); err != nil { + t.Errorf("unexpected Insert error, %s\n", err) + } + } + var ( + id int + stringValue string + timeByteValue []byte + timeValue time.Time + ) + if err := s.(*Session).mysqlSession. + QueryRow(fmt.Sprintf("SELECT id, colvar, coltimestamp FROM %s WHERE id = 4", writerComplexTestData.Table)). + Scan(&id, &stringValue, &timeByteValue); err != nil { + t.Fatalf("Error on test query: %v", err) + } + // NOTE: No error handling on time.Parse since this is a test file + timeValue, _ = time.Parse("2006-01-02 15:04:05", string(timeByteValue)) + if id != 4 || stringValue != randomHeros[4] || timeValue.Before(time.Now().Add(-30*time.Second).UTC()) { + t.Fatalf("Values were not what they were expected to be: %v, %v, %v", id, stringValue, timeValue) + } + + var count int + err = s.(*Session).mysqlSession. + QueryRow(fmt.Sprintf("SELECT COUNT(id) FROM %s;", writerComplexTestData.Table)). + Scan(&count) + if err != nil { + t.Errorf("unable to count table, %s", err) + } + if count != 10 { + t.Errorf("wrong document count, expected 10, got %d", count) + } +} + +var ( + writerUpdateTestData = &TestData{"writer_update_test", "update_test_table", basicSchema, 0} +) + +func TestUpdate(t *testing.T) { + w := newWriter() + c, err := NewClient(WithURI(fmt.Sprintf("mysql://root@localhost:3306?%s", writerUpdateTestData.DB))) + if err != nil { + t.Fatalf("unable to initialize connection to mysql, %s", err) + } + defer c.Close() + s, err := c.Connect() + if err != nil { + t.Fatalf("unable to obtain session to mysql, %s", err) + } + for i := 0; i < 10; i++ { + msg := message.From( + ops.Insert, + fmt.Sprintf("%s.%s", writerUpdateTestData.DB, writerUpdateTestData.Table), + data.Data{"id": i, "colvar": "hello world", "coltimestamp": time.Now().Format("2006-01-02 15:04:05.000000")}) + if _, err := w.Write(msg)(s); err != nil { + t.Errorf("unexpected Insert error, %s\n", err) + } + } + msg := message.From( + ops.Update, + fmt.Sprintf("%s.%s", writerUpdateTestData.DB, writerUpdateTestData.Table), + data.Data{"id": 1, "colvar": "robin", "coltimestamp": time.Now().Format("2006-01-02 15:04:05.000000")}) + if _, err := w.Write(msg)(s); err != nil { + t.Errorf("unexpected Update error, %s\n", err) + } + + var ( + id int + stringValue string + timeByteValue []byte + timeValue time.Time + ) + if err := s.(*Session).mysqlSession. + QueryRow(fmt.Sprintf("SELECT id, colvar, coltimestamp FROM %s WHERE id = 1", writerUpdateTestData.Table)). + Scan(&id, &stringValue, &timeByteValue); err != nil { + t.Fatalf("Error on test query: %v", err) + } + // NOTE: No error handling on time.Parse since this is a test file + timeValue, _ = time.Parse("2006-01-02 15:04:05", string(timeByteValue)) + if id != 1 || stringValue != "robin" || timeValue.Before(time.Now().Add(-30*time.Second).UTC()) { + t.Fatalf("Values were not what they were expected to be: %v, %v, %v", id, stringValue, timeValue) + } + + var count int + err = s.(*Session).mysqlSession. + QueryRow(fmt.Sprintf("SELECT COUNT(id) FROM %s;", writerUpdateTestData.Table)). + Scan(&count) + if err != nil { + t.Errorf("unable to count table, %s", err) + } + if count != 10 { + t.Errorf("wrong document count, expected 10, got %d", count) + } +} + +var ( + writerComplexUpdateTestData = &TestData{"writer_complex_update_test", "complex_update_test_table", complexSchema, 10} +) + +func TestComplexUpdate(t *testing.T) { + ranInt := rand.Intn(writerComplexUpdateTestData.InsertCount) + w := newWriter() + c, err := NewClient(WithURI(fmt.Sprintf("mysql://root@localhost:3306?%s", writerComplexUpdateTestData.DB))) + if err != nil { + t.Fatalf("unable to initialize connection to mysql, %s", err) + } + defer c.Close() + s, err := c.Connect() + if err != nil { + t.Fatalf("unable to obtain session to mysql, %s", err) + } + msg := message.From(ops.Update, fmt.Sprintf("%s.%s", writerComplexUpdateTestData.DB, writerComplexUpdateTestData.Table), data.Data{ + "id": ranInt + 1, + "colinteger": int64(4), + "colsmallint": int64(30000), + "coltinyint": int64(100), + "colmediumint": int64(8000000), + "colbigint": int64(4000001240125), + "coldecimal": 0.23509838, + "colfloat": 0.31426, + "coldoubleprecision": 0.314259892323, + "colbit": 0b101, + "coldate": time.Date(2021, 12, 10, 0, 0, 0, 0, time.UTC).Format("2006-01-02"), + "coltime": "14:45:00", + "coltimestamp": time.Now().Format("2006-01-02 15:04:05.000000"), + "colyear": "2022", + "colchar": "b", + "colvar": randomHeros[ranInt], + "colbinary": 0xCAFEBABE, + "colblob": 0xCAFEBABE, + "coltext": "this is extremely important", + "coljson": "{\"name\": \"batman\", \"sidekick\": \"robin\"}", + "colpoint": wktToGeom("POINT (20 20)"), + "collinestring": wktToGeom("LINESTRING (3 3, 4 4, 5 5)"), + "colpolygon": wktToGeom("POLYGON ((1 1, 11 1, 11 11, 1 11, 1 1),(6 6, 8 6, 8 8, 6 8, 6 6))"), + "colgeometrycollection": wktToGeom("GEOMETRYCOLLECTION (POINT (2 2),LINESTRING (5 5, 6 6, 7 7, 8 8, 9 9))"), + }) + if _, err := w.Write(msg)(s); err != nil { + t.Errorf("unexpected Update error, %s\n", err) + } + + var ( + id int + stringValue string + timeByteValue []byte + timeValue time.Time + bigint int64 + ) + if err := s.(*Session).mysqlSession. + QueryRow(fmt.Sprintf("SELECT id, colvar, coltimestamp, colbigint FROM %s WHERE id = %d", writerComplexUpdateTestData.Table, ranInt+1)). + Scan(&id, &stringValue, &timeByteValue, &bigint); err != nil { + t.Fatalf("Error on test query: %v", err) + } + // NOTE: No error handling on time.Parse since this is a test file + timeValue, _ = time.Parse("2006-01-02 15:04:05", string(timeByteValue)) + if id != ranInt+1 || stringValue != randomHeros[ranInt] || timeValue.Before(time.Now().Add(-30*time.Second).UTC()) || bigint != int64(4000001240125) { + t.Fatalf("Values were not what they were expected to be: %v, %v, %v, %v", id, stringValue, timeValue, bigint) + } + + var count int + err = s.(*Session).mysqlSession. + QueryRow(fmt.Sprintf("SELECT COUNT(id) FROM %s;", writerComplexUpdateTestData.Table)). + Scan(&count) + if err != nil { + t.Errorf("unable to count table, %s", err) + } + if count != writerComplexUpdateTestData.InsertCount { + t.Errorf("wrong document count, expected %d, got %d", writerComplexUpdateTestData.InsertCount, count) + } +} + +var ( + writerDeleteTestData = &TestData{"writer_delete_test", "delete_test_table", basicSchema, 0} +) + +func TestDelete(t *testing.T) { + w := newWriter() + c, err := NewClient(WithURI(fmt.Sprintf("mysql://root@localhost:3306?%s", writerDeleteTestData.DB))) + if err != nil { + t.Fatalf("unable to initialize connection to mysql, %s", err) + } + defer c.Close() + s, err := c.Connect() + if err != nil { + t.Fatalf("unable to obtain session to mysql, %s", err) + } + for i := 0; i < 10; i++ { + msg := message.From( + ops.Insert, + fmt.Sprintf("%s.%s", writerDeleteTestData.DB, writerDeleteTestData.Table), + data.Data{"id": i, "colvar": "hello world", "coltimestamp": time.Now().Format("2006-01-02 15:04:05.000000")}) + if _, err := w.Write(msg)(s); err != nil { + t.Errorf("unexpected Insert error, %s\n", err) + } + } + msg := message.From(ops.Delete, fmt.Sprintf("%s.%s", writerDeleteTestData.DB, writerDeleteTestData.Table), data.Data{"id": 1}) + if _, err := w.Write(msg)(s); err != nil { + t.Errorf("unexpected Update error, %s\n", err) + } + + var id int + if err := s.(*Session).mysqlSession. + QueryRow(fmt.Sprintf("SELECT id FROM %s WHERE id = 1", writerDeleteTestData.Table)). + Scan(&id); err == nil { + t.Fatalf("Values were found, but where not expected to be: %v", id) + } + + var count int + err = s.(*Session).mysqlSession. + QueryRow(fmt.Sprintf("SELECT COUNT(id) FROM %s;", writerDeleteTestData.Table)). + Scan(&count) + if err != nil { + t.Errorf("unable to count table, %s", err) + } + if count != 9 { + t.Errorf("wrong document count, expected 9, got %d", count) + } +} + +var ( + writerComplexDeleteTestData = &TestData{"writer_complex_delete_test", "complex_delete_test_table", complexSchema, 10} +) + +func TestComplexDelete(t *testing.T) { + ranInt := rand.Intn(writerComplexDeleteTestData.InsertCount) + w := newWriter() + c, err := NewClient(WithURI(fmt.Sprintf("mysql://root@localhost:3306?%s", writerComplexDeleteTestData.DB))) + if err != nil { + t.Fatalf("unable to initialize connection to mysql, %s", err) + } + defer c.Close() + s, err := c.Connect() + if err != nil { + t.Fatalf("unable to obtain session to mysql, %s", err) + } + msg := message.From( + ops.Delete, + fmt.Sprintf("%s.%s", writerComplexDeleteTestData.DB, writerComplexDeleteTestData.Table), + data.Data{"id": ranInt + 1, "colvar": randomHeros[ranInt]}) + if _, err := w.Write(msg)(s); err != nil { + t.Errorf("unexpected Delete error, %s\n", err) + } + + var id int + if err := s.(*Session).mysqlSession. + QueryRow(fmt.Sprintf("SELECT id FROM %s WHERE id = %d AND colvar = '%s'", writerComplexDeleteTestData.Table, ranInt+1, randomHeros[ranInt])). + Scan(&id); err == nil { + t.Fatalf("Values were found, but where not expected to be: %v", id) + } + // Add a row count check as well because if it picks the wrong row due to + // off-by-one then it'll fail to delete, but _also_ fail to find so will think it's + // passed + var count int + err = s.(*Session).mysqlSession. + QueryRow(fmt.Sprintf("SELECT COUNT(id) FROM %s;", writerComplexDeleteTestData.Table)). + Scan(&count) + if err != nil { + t.Errorf("unable to count table, %s", err) + } + if count != 9 { + t.Errorf("wrong document count, expected 9, got %d", count) + } +} + +var ( + writerComplexDeletePkTestData = &TestData{"writer_complex_pk_delete_test", "complex_pk_delete_test_table", complexSchema, 10} +) + +func TestComplexDeleteWithoutAllPrimarykeys(t *testing.T) { + // This checks for an expected failure. I.e. should not be possible to delete + // the row without all primary keys + ranInt := rand.Intn(writerComplexDeletePkTestData.InsertCount) + w := newWriter() + c, err := NewClient(WithURI(fmt.Sprintf("mysql://root@localhost:3306?%s", writerComplexDeletePkTestData.DB))) + if err != nil { + t.Fatalf("unable to initialize connection to mysql, %s", err) + } + defer c.Close() + s, err := c.Connect() + if err != nil { + t.Fatalf("unable to obtain session to mysql, %s", err) + } + msg := message.From( + ops.Delete, + fmt.Sprintf("%s.%s", writerComplexDeletePkTestData.DB, writerComplexDeletePkTestData.Table), + data.Data{"id": ranInt + 1}) + if _, err := w.Write(msg)(s); err == nil { + t.Fatalf("Did not receive anticipated error from mysql.writeMessage") + } else { + t.Logf("Received expected error: %s", err) + } + + var id int + if err := s.(*Session).mysqlSession. + QueryRow(fmt.Sprintf("SELECT id FROM %s WHERE id = %d AND colvar = '%s'", writerComplexDeletePkTestData.Table, + ranInt+1, + randomHeros[ranInt])). + Scan(&id); err != nil { + t.Fatalf("Expected to find values, but none were found: %v", err) + } + // Add a row count check as well + var count int + err = s.(*Session).mysqlSession. + QueryRow(fmt.Sprintf("SELECT COUNT(id) FROM %s;", writerComplexDeletePkTestData.Table)). + Scan(&count) + if err != nil { + t.Errorf("unable to count table, %s", err) + } + if count != 10 { + t.Errorf("wrong document count, expected 10, got %d", count) + } +} diff --git a/go.mod b/go.mod index df95e8067..21a481979 100644 --- a/go.mod +++ b/go.mod @@ -5,15 +5,17 @@ go 1.17 require ( github.com/compose/mejson v0.0.0-20150828131556-afcf51c7c640 github.com/dop251/goja v0.0.0-20170430194003-d382686fd20b + github.com/go-mysql-org/go-mysql v1.5.1-0.20220505091125-145f68457838 github.com/hashicorp/go-version v0.0.0-20161031182605-e96d38404026 - github.com/lib/pq v0.0.0-20170103192009-8df6253d1317 + github.com/lib/pq v1.8.0 github.com/nu7hatch/gouuid v0.0.0-20131221200532-179d4d0c4d8d github.com/oklog/run v1.0.0 github.com/olekukonko/tablewriter v0.0.0-20170128050532-febf2d34b54a github.com/robertkrimen/otto v0.0.0-20171130103205-3b44b4dcb6c0 - github.com/sirupsen/logrus v1.0.4 + github.com/sirupsen/logrus v1.4.2 github.com/smartystreets/go-aws-auth v0.0.0-20160722044803-2043e6d0bb7e github.com/streadway/amqp v0.0.0-20150320153439-6a378341a305 + github.com/twpayne/go-geom v1.4.1 gopkg.in/gorethink/gorethink.v3 v3.0.5 gopkg.in/mgo.v2 v2.0.0-20160818020120-3f83fa500528 gopkg.in/olivere/elastic.v2 v2.0.1-0.20180214101641-ad2886760fe8 @@ -27,21 +29,27 @@ require ( github.com/cenkalti/backoff v0.0.0-20150522193654-6c45d6bc1e78 // indirect github.com/dlclark/regexp2 v1.1.6 // indirect github.com/fortytw2/leaktest v1.3.0 // indirect - github.com/golang/protobuf v0.0.0-20150526012109-34a5f244f1c0 // indirect + github.com/golang/protobuf v1.5.0 // indirect + github.com/google/uuid v1.3.0 // indirect github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed // indirect + github.com/konsorten/go-windows-terminal-sequences v1.0.2 // indirect github.com/mailru/easyjson v0.0.0-20171120080333-32fa128f234d // indirect github.com/mattn/go-runewidth v0.0.2 // indirect - github.com/pkg/errors v0.8.0 // indirect + github.com/pingcap/errors v0.11.5-0.20201126102027-b0a155152ca3 // indirect + github.com/pkg/errors v0.9.1 // indirect + github.com/satori/go.uuid v1.2.0 // indirect + github.com/shopspring/decimal v0.0.0-20180709203117-cd690d0c9e24 // indirect + github.com/siddontang/go v0.0.0-20180604090527-bdc77568d726 // indirect + github.com/siddontang/go-log v0.0.0-20180807004314-8d05993dda07 // indirect github.com/smartystreets/goconvey v1.6.6 // indirect - github.com/stretchr/testify v1.7.0 // indirect - golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2 // indirect - golang.org/x/net v0.0.0-20190311183353-d8887717615a // indirect - golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a // indirect - golang.org/x/text v0.3.0 // indirect - gopkg.in/airbrake/gobrake.v2 v2.0.9 // indirect + go.uber.org/atomic v1.7.0 // indirect + golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9 // indirect + golang.org/x/net v0.0.0-20201021035429-f5854403a974 // indirect + golang.org/x/sys v0.0.0-20220412211240-33da011f77ad // indirect + golang.org/x/text v0.3.6 // indirect + google.golang.org/protobuf v1.26.0 // indirect gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect gopkg.in/fatih/pool.v2 v2.0.0-20160721145410-20a0a429c5f9 // indirect - gopkg.in/gemnasium/logrus-airbrake-hook.v2 v2.1.2 // indirect gopkg.in/sourcemap.v1 v1.0.3 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect ) diff --git a/go.sum b/go.sum index 0c36656f9..c9bba3184 100644 --- a/go.sum +++ b/go.sum @@ -1,88 +1,221 @@ +github.com/Azure/go-ansiterm v0.0.0-20170929234023-d6e3b3328b78/go.mod h1:LmzpDX56iTiv29bbRTIsUNlaFfuhWRQBWjQdVyAevI8= +github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= +github.com/DATA-DOG/go-sqlmock v1.3.2 h1:2L2f5t3kKnCLxnClDD/PrDfExFFa1wjESgxHG/B1ibo= +github.com/DATA-DOG/go-sqlmock v1.3.2/go.mod h1:f/Ixk793poVmq4qj/V1dPUg2JEAKC73Q5eFN3EC/SaM= +github.com/Masterminds/goutils v1.1.0/go.mod h1:8cTjp+g8YejhMuvIA5y2vz3BpJxksy863GQaJW2MFNU= +github.com/Masterminds/semver v1.5.0/go.mod h1:MB6lktGJrhw8PrUyiEoblNEGEQ+RzHPF078ddwwvV3Y= +github.com/Masterminds/sprig v2.22.0+incompatible/go.mod h1:y6hNFY5UBTIWBxnzTeuNhlNS5hqE0NB0E6fgfo2Br3o= +github.com/Microsoft/go-winio v0.4.14/go.mod h1:qXqCSQ3Xa7+6tgxaGTIe4Kpcdsi+P8jBhyzoq1bpyYA= +github.com/Nvveen/Gotty v0.0.0-20120604004816-cd527374f1e5/go.mod h1:lmUJ/7eu/Q8D7ML55dXQrVaamCz2vxCfdQBasLZfHKk= +github.com/atomicules/go-mysql v1.4.1-0.20220421104750-53c3b91d6803 h1:Ag+Cs4ZfGflCN/7FED1CFD/9XwwEyTYLbt74WeyyaC4= +github.com/atomicules/go-mysql v1.4.1-0.20220421104750-53c3b91d6803/go.mod h1:TRs381neMzw+J5+bobjUY2ZsIMgvp4wBCRBW274gc68= github.com/bitly/go-hostpool v0.1.0 h1:XKmsF6k5el6xHG3WPJ8U0Ku/ye7njX7W81Ng7O2ioR0= github.com/bitly/go-hostpool v0.1.0/go.mod h1:4gOCgp6+NZnVqlKyZ/iBZFTAJKembaVENUpMkpg42fw= github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 h1:DDGfHa7BWjL4YnC6+E63dPcxHo2sUxDIu8g3QgEJdRY= github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869/go.mod h1:Ekp36dRnpXw/yCqJaO+ZrUyxD+3VXMFFr56k5XYrpB4= github.com/cenkalti/backoff v0.0.0-20150522193654-6c45d6bc1e78 h1:KlvkioUMRhOdYA2dOfRJnaX2iRNozf84moIMFNJ7j64= github.com/cenkalti/backoff v0.0.0-20150522193654-6c45d6bc1e78/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM= +github.com/cenkalti/backoff/v3 v3.0.0/go.mod h1:cIeZDE3IrqwwJl6VUwCN6trj1oXrTS4rc0ij+ULvLYs= github.com/compose/mejson v0.0.0-20150828131556-afcf51c7c640 h1:PiVpAo3GfeLRW9LFAOVw92VuJPwhWS2d4cX0l0xfXUM= github.com/compose/mejson v0.0.0-20150828131556-afcf51c7c640/go.mod h1:L5A8Xp96L2Zy71V3VfdCSQDTIZ0fZjOC6wNaGE7JW8E= -github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= +github.com/containerd/continuity v0.0.0-20190827140505-75bee3e2ccb6/go.mod h1:GL3xCUCBDV3CZiTSEKksMWbLE66hEyuu9qyDOOqM47Y= +github.com/cznic/golex v0.0.0-20181122101858-9c343928389c/go.mod h1:+bmmJDNmKlhWNG+gwWCkaBoTy39Fs+bzRxVBzoTQbIc= +github.com/cznic/mathutil v0.0.0-20181122101859-297441e03548/go.mod h1:e6NPNENfs9mPDVNRekM7lKScauxd5kXTr1Mfyig6TDM= +github.com/cznic/parser v0.0.0-20160622100904-31edd927e5b1/go.mod h1:2B43mz36vGZNZEwkWi8ayRSSUXLfjL8OkbzwW4NcPMM= +github.com/cznic/sortutil v0.0.0-20181122101858-f5f958428db8/go.mod h1:q2w6Bg5jeox1B+QkJ6Wp/+Vn0G/bo3f1uY7Fn3vivIQ= +github.com/cznic/strutil v0.0.0-20171016134553-529a34b1c186/go.mod h1:AHHPPPXTw0h6pVabbcbyGRK1DckRn7r/STdZEeIDzZc= +github.com/cznic/y v0.0.0-20170802143616-045f81c6662a/go.mod h1:1rk5VM7oSnA4vjp+hrLQ3HWHa+Y4yPCa3/CsJrcNnvs= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/dlclark/regexp2 v1.1.6 h1:CqB4MjHw0MFCDj+PHHjiESmHX+N7t0tJzKvC6M97BRg= github.com/dlclark/regexp2 v1.1.6/go.mod h1:2pZnwuY/m+8K6iRw6wQdMtk+rH5tNGR1i55kozfMjCc= +github.com/docker/go-connections v0.4.0/go.mod h1:Gbd7IOopHjR8Iph03tsViu4nIes5XhDvyHbTtUxmeec= +github.com/docker/go-units v0.4.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk= github.com/dop251/goja v0.0.0-20170430194003-d382686fd20b h1:lncLs5A33vr20e4bxw/BFeOBOr1n+a2BUJGZy0vCbdM= github.com/dop251/goja v0.0.0-20170430194003-d382686fd20b/go.mod h1:Mw6PkjjMXWbTj+nnj4s3QPXq1jaT0s5pC0iFD4+BOAA= github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw= github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g= -github.com/golang/protobuf v0.0.0-20150526012109-34a5f244f1c0 h1:x+oiaqEWfBd1Ea/T8aN985crREcc7jZ9AEBMbhETmWA= -github.com/golang/protobuf v0.0.0-20150526012109-34a5f244f1c0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/go-mysql-org/go-mysql v1.5.1-0.20220505091125-145f68457838 h1:AZqwTcfXnYgPAlaH0YDdmDQoT8SQ6rQ6GYFO7s0fu4Q= +github.com/go-mysql-org/go-mysql v1.5.1-0.20220505091125-145f68457838/go.mod h1:GX0clmylJLdZEYAojPCDTCvwZxbTBrke93dV55715u0= +github.com/go-sql-driver/mysql v1.3.0/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w= +github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg= +github.com/golang/protobuf v1.5.0 h1:LUVKkCeviFUMKqHa4tXIIij/lbhnMbP7Fn5wKdKkRh4= +github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= +github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= +github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU= +github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI= +github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= +github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1 h1:EGx4pi6eqNxGaHF6qqu48+N2wcFQ5qg5FXgOdqsJ5d8= github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY= github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed h1:5upAirOpQc1Q53c0bnx2ufif5kANL7bfZWcc6VJWJd8= github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed/go.mod h1:tMWxXQ9wFIaZeTI9F+hmhFiGpFmhOHzyShyFUhRm0H4= github.com/hashicorp/go-version v0.0.0-20161031182605-e96d38404026 h1:qWx/DcC6l4ZzuS+JBAzI5XjtLFDCc08zYeZ0kLnaH2g= github.com/hashicorp/go-version v0.0.0-20161031182605-e96d38404026/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA= +github.com/huandu/xstrings v1.3.0/go.mod h1:y5/lhBue+AyNmUVz9RLU9xbLR0o4KIIExikq4ovT0aE= +github.com/imdario/mergo v0.3.9/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA= +github.com/jmoiron/sqlx v1.3.3 h1:j82X0bf7oQ27XeqxicSZsTU5suPwKElg3oyxNn43iTk= +github.com/jmoiron/sqlx v1.3.3/go.mod h1:2BljVx/86SuTyjE+aPYlHCTNvZrnJXghYGpNiXLBMCQ= github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7C0MuV77Wo= github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU= +github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= +github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= +github.com/konsorten/go-windows-terminal-sequences v1.0.2 h1:DB17ag19krx9CFsz4o3enTrPXyIXCl+2iCXH/aMAp9s= +github.com/konsorten/go-windows-terminal-sequences v1.0.2/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= +github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pretty v0.2.1 h1:Fmg33tUaq4/8ym9TJN1x7sLJnHVwhP33CNkpYV/7rwI= github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= -github.com/lib/pq v0.0.0-20170103192009-8df6253d1317 h1:QfWEYA9/r1gkhuMk6bLk8kPXek0/mxHCzLKcR/kkcL8= -github.com/lib/pq v0.0.0-20170103192009-8df6253d1317/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= +github.com/lib/pq v0.0.0-20180327071824-d34b9ff171c2/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= +github.com/lib/pq v1.2.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= +github.com/lib/pq v1.8.0 h1:9xohqzkUwzR4Ga4ivdTcawVS89YSDVxXMa3xJX3cGzg= +github.com/lib/pq v1.8.0/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= github.com/mailru/easyjson v0.0.0-20171120080333-32fa128f234d h1:bM4HYnlVXPgUKmzl7o3drEaVfOk+sTBiADAQOWjU+8I= github.com/mailru/easyjson v0.0.0-20171120080333-32fa128f234d/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc= github.com/mattn/go-runewidth v0.0.2 h1:UnlwIPBGaTZfPQ6T1IGzPI0EkYAQmT9fAEJ/poFC63o= github.com/mattn/go-runewidth v0.0.2/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzpuz5H//U1FU= +github.com/mattn/go-sqlite3 v1.14.6/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A/KQRfk6bU= +github.com/mitchellh/copystructure v1.0.0/go.mod h1:SNtv71yrdKgLRyLFxmLdkAbkKEFWgYaq1OVrnRcwhnw= +github.com/mitchellh/reflectwalk v1.0.0/go.mod h1:mSTlrgnPZtwu0c4WaC2kGObEpuNDbx0jmZXqmk4esnw= github.com/nu7hatch/gouuid v0.0.0-20131221200532-179d4d0c4d8d h1:VhgPp6v9qf9Agr/56bj7Y/xa04UccTW04VP0Qed4vnQ= github.com/nu7hatch/gouuid v0.0.0-20131221200532-179d4d0c4d8d/go.mod h1:YUTz3bUH2ZwIWBy3CJBeOBEugqcmXREj14T+iG/4k4U= github.com/oklog/run v1.0.0 h1:Ru7dDtJNOyC66gQ5dQmaCa0qIsAUFY3sFpK1Xk8igrw= github.com/oklog/run v1.0.0/go.mod h1:dlhp/R75TPv97u0XWUtDeV/lRKWPKSdTuV0TZvrmrQA= github.com/olekukonko/tablewriter v0.0.0-20170128050532-febf2d34b54a h1:m6hB6GkmZ/suOSKZM7yx3Yt+7iZ9HNfzacCykJqgXA8= github.com/olekukonko/tablewriter v0.0.0-20170128050532-febf2d34b54a/go.mod h1:vsDQFd/mU46D+Z4whnwzcISnGGzXWMclvtLoiIKAKIo= -github.com/pkg/errors v0.8.0 h1:WdK/asTD0HN+q6hsWO3/vpuAkAr+tw6aNJNDFFf0+qw= -github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/opencontainers/go-digest v1.0.0-rc1/go.mod h1:cMLVZDEM3+U2I4VmLI6N8jQYUd2OVphdqWwCJHrFt2s= +github.com/opencontainers/image-spec v1.0.1/go.mod h1:BtxoFyWECRxE4U/7sNtV5W15zMzWCbyJoFRP3s7yZA0= +github.com/opencontainers/runc v1.0.0-rc9/go.mod h1:qT5XzbpPznkRYVz/mWwUaVBUv2rmF59PVA73FjuZG0U= +github.com/ory/dockertest/v3 v3.6.0/go.mod h1:4ZOpj8qBUmh8fcBSVzkH2bws2s91JdGvHUqan4GHEuQ= +github.com/pingcap/check v0.0.0-20190102082844-67f458068fc8 h1:USx2/E1bX46VG32FIw034Au6seQ2fY9NEILmNh/UlQg= +github.com/pingcap/check v0.0.0-20190102082844-67f458068fc8/go.mod h1:B1+S9LNcuMyLH/4HMTViQOJevkGiik3wW2AN9zb2fNQ= +github.com/pingcap/errors v0.11.0/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8= +github.com/pingcap/errors v0.11.5-0.20201029093017-5a7df2af2ac7/go.mod h1:G7x87le1poQzLB/TqvTJI2ILrSgobnq4Ut7luOwvfvI= +github.com/pingcap/errors v0.11.5-0.20201126102027-b0a155152ca3 h1:LllgC9eGfqzkfubMgjKIDyZYaa609nNWAyNZtpy2B3M= +github.com/pingcap/errors v0.11.5-0.20201126102027-b0a155152ca3/go.mod h1:G7x87le1poQzLB/TqvTJI2ILrSgobnq4Ut7luOwvfvI= +github.com/pingcap/log v0.0.0-20200511115504-543df19646ad/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= +github.com/pingcap/log v0.0.0-20210317133921-96f4fcab92a4/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= +github.com/pingcap/parser v0.0.0-20210415081931-48e7f467fd74/go.mod h1:xZC8I7bug4GJ5KtHhgAikjTfU4kBv1Sbo3Pf1MZ6lVw= +github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/remyoudompheng/bigfft v0.0.0-20190728182440-6a916e37a237/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= github.com/robertkrimen/otto v0.0.0-20171130103205-3b44b4dcb6c0 h1:5RhOP2qFOTBwBzvlMm3ehEbaJaq32CH9pXfTMcvzV3s= github.com/robertkrimen/otto v0.0.0-20171130103205-3b44b4dcb6c0/go.mod h1:xvqspoSXJTIpemEonrMDFq6XzwHYYgToXWj5eRX1OtY= -github.com/sirupsen/logrus v1.0.4 h1:gzbtLsZC3Ic5PptoRG+kQj4L60qjK7H7XszrU163JNQ= -github.com/sirupsen/logrus v1.0.4/go.mod h1:pMByvHTf9Beacp5x1UXfOR9xyW/9antXMhjMPG0dEzc= +github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= +github.com/satori/go.uuid v1.2.0 h1:0uYX9dsZ2yD7q2RtLRtPSdGDWzjeM3TbMJP9utgA0ww= +github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0= +github.com/shopspring/decimal v0.0.0-20180709203117-cd690d0c9e24 h1:pntxY8Ary0t43dCZ5dqY4YTJCObLY1kIXl0uzMv+7DE= +github.com/shopspring/decimal v0.0.0-20180709203117-cd690d0c9e24/go.mod h1:M+9NzErvs504Cn4c5DxATwIqPbtswREoFCre64PpcG4= +github.com/siddontang/go v0.0.0-20180604090527-bdc77568d726 h1:xT+JlYxNGqyT+XcU8iUrN18JYed2TvG9yN5ULG2jATM= +github.com/siddontang/go v0.0.0-20180604090527-bdc77568d726/go.mod h1:3yhqj7WBBfRhbBlzyOC3gUxftwsU0u8gqevxwIHQpMw= +github.com/siddontang/go-log v0.0.0-20180807004314-8d05993dda07 h1:oI+RNwuC9jF2g2lP0u0cVEEZrc/AYBCuFdvwrLWM/6Q= +github.com/siddontang/go-log v0.0.0-20180807004314-8d05993dda07/go.mod h1:yFdBgwXP24JziuRl2NMUahT7nGLNOKi1SIiFxMttVD4= +github.com/sirupsen/logrus v1.4.1/go.mod h1:ni0Sbl8bgC9z8RoU9G6nDWqqs/fq4eDPysMBDgk/93Q= +github.com/sirupsen/logrus v1.4.2 h1:SPIRibHv4MatM3XXNO2BJeFLZwZ2LvZgfQ5+UNI2im4= +github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d h1:zE9ykElWQ6/NYmHa3jpm/yHnI4xSofP+UP6SpjHcSeM= github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc= github.com/smartystreets/go-aws-auth v0.0.0-20160722044803-2043e6d0bb7e h1:motZm4CkHelU517PDp9FbyaKmwZYB3ltnZUkHZA978A= github.com/smartystreets/go-aws-auth v0.0.0-20160722044803-2043e6d0bb7e/go.mod h1:SnhjPscd9TpLiy1LpzGSKh3bXCfxxXuqd9xmQJy3slM= github.com/smartystreets/goconvey v1.6.6 h1:lH+Snxmzl92r1jww8/jYPqKkhs3C9AF4LunzU56ZZr4= github.com/smartystreets/goconvey v1.6.6/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA= +github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4= github.com/streadway/amqp v0.0.0-20150320153439-6a378341a305 h1:z3GclFCkhJNYMNjszntIS2ZrSQxFacOhd0GXdfHHoOY= github.com/streadway/amqp v0.0.0-20150320153439-6a378341a305/go.mod h1:1WNBiOZtZQLpVAyu0iTduoJL9hEsMloAK5XWrtW0xdY= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= +github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2 h1:VklqNMn3ovrHsnt90PveolxSbWFaJdECFbxSq0Mqo2M= +github.com/twpayne/go-geom v1.4.1 h1:LeivFqaGBRfyg0XJJ9pkudcptwhSSrYN9KZUW6HcgdA= +github.com/twpayne/go-geom v1.4.1/go.mod h1:k/zktXdL+qnA6OgKsdEGUTA17jbQ2ZPTUa3CCySuGpE= +github.com/twpayne/go-kml v1.5.2/go.mod h1:kz8jAiIz6FIdU2Zjce9qGlVtgFYES9vt7BTPBHf5jl4= +github.com/twpayne/go-polyline v1.0.0/go.mod h1:ICh24bcLYBX8CknfvNPKqoTbe+eg+MX1NPyJmSBo7pU= +github.com/twpayne/go-waypoint v0.0.0-20200706203930-b263a7f6e4e8/go.mod h1:qj5pHncxKhu9gxtZEYWypA/z097sxhFlbTyOyt9gcnU= +github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= +go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= +go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw= +go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= +go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= +go.uber.org/multierr v1.5.0/go.mod h1:FeouvMocqHpRaaGuG9EjoKcStLC43Zu/fmqdUMPcKYU= +go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU= +go.uber.org/tools v0.0.0-20190618225709-2cfd321de3ee/go.mod h1:vJERXedbb3MVM5f9Ejo0C68/HhF8uaILCdgjnY+goOA= +go.uber.org/zap v1.9.1/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q= +go.uber.org/zap v1.15.0/go.mod h1:Mb2vm2krFEG5DV0W9qcHBYFtp/Wku1cvYaqPsS/WYfc= +go.uber.org/zap v1.16.0/go.mod h1:MA8QOfq0BHJwdXa996Y4dYkAqRKB8/1K1QMMZVaNZjQ= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= -golang.org/x/net v0.0.0-20190311183353-d8887717615a h1:oWX7TPOiFAMXLq8o0ikBYfCJVlRHBcsciT5bXOrH628= +golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9 h1:psW17arqaxU48Z5kZ0CQnkZWQJsqcURM6tKiBApRjXI= +golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= +golang.org/x/mod v0.0.0-20190513183733-4bf6d317e70e/go.mod h1:mXi4GBBbnImb6dmsKGUJ2LatrhH/nqhxcFungHvyanc= +golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= -golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a h1:1BGLXjeY4akVXGgbC9HugT3Jv3hCI0z56oJR5vAMgBU= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20191003171128-d98b1b443823/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20201021035429-f5854403a974 h1:IX6qOQeG5uLjB/hjjwjedwfjND0hgjPMMyO1RoIXQNI= +golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= -golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190507160741-ecd444e8653b/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200121082415-34d275377bf9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20220412211240-33da011f77ad h1:ntjMns5wyP/fN65tdBD4g8J5w8n015+iIIs9rtjXkY0= +golang.org/x/sys v0.0.0-20220412211240-33da011f77ad/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.6 h1:aRYxNxv6iGQlyVaZmk6ZgYEDa+Jg18DxebPSrd6bg1M= +golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= golang.org/x/tools v0.0.0-20190328211700-ab21143f2384/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= -gopkg.in/airbrake/gobrake.v2 v2.0.9 h1:7z2uVWwn7oVeeugY1DtlPAy5H+KYgB1KeKTnqjNatLo= -gopkg.in/airbrake/gobrake.v2 v2.0.9/go.mod h1:/h5ZAUhDkGaJfjzjKLSjv6zCL6O0LLBxU4K+aSYdM/U= +golang.org/x/tools v0.0.0-20190621195816-6e04913cbbac/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc= +golang.org/x/tools v0.0.0-20190624222133-a101b041ded4/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc= +golang.org/x/tools v0.0.0-20191029041327-9cc4af7d6b2c/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.0.0-20191029190741-b9c20aec41a5/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.0.0-20201125231158-b5590deeca9b/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= +google.golang.org/protobuf v1.26.0 h1:bxAC2xTBsZGibn2RTntX0oH50xLsqy1OxA9tTL3p/lk= +google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= +gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= gopkg.in/fatih/pool.v2 v2.0.0-20160721145410-20a0a429c5f9 h1:CT5RJOxO6mi7LZWAnePMMWVtrk3lCTo1b6t4OuNQUPQ= gopkg.in/fatih/pool.v2 v2.0.0-20160721145410-20a0a429c5f9/go.mod h1:8xVGeu1/2jr2wm5V9SPuMht2H5AEmf5aFMGSQixtjTY= -gopkg.in/gemnasium/logrus-airbrake-hook.v2 v2.1.2 h1:OAj3g0cR6Dx/R07QgQe8wkA9RNjB2u4i700xBkIT4e0= -gopkg.in/gemnasium/logrus-airbrake-hook.v2 v2.1.2/go.mod h1:Xk6kEKp8OKb+X14hQBKWaSkCsqBpgog8nAV2xsGOxlo= gopkg.in/gorethink/gorethink.v3 v3.0.5 h1:e2Uc/Xe+hpcVQFsj6MuHlYog3r0JYpnTzwDj/y2O4MU= gopkg.in/gorethink/gorethink.v3 v3.0.5/go.mod h1:+3yIIHJUGMBK+wyPH+iN5TP+88ikFDfZdqTlK3Y9q8I= gopkg.in/mgo.v2 v2.0.0-20160818020120-3f83fa500528 h1:/saqWwm73dLmuzbNhe92F0QsZ/KiFND+esHco2v1hiY= gopkg.in/mgo.v2 v2.0.0-20160818020120-3f83fa500528/go.mod h1:yeKp02qBN3iKW1OzL3MGk2IdtZzaj7SFntXj72NppTA= +gopkg.in/natefinch/lumberjack.v2 v2.0.0/go.mod h1:l0ndWWf7gzL7RNwBG7wST/UCcT4T24xpD6X8LsfU/+k= gopkg.in/olivere/elastic.v2 v2.0.1-0.20180214101641-ad2886760fe8 h1:ucKSpZ/mWz53GdRs6YoM1m9wI4TzPzZsrnWl5FQKK8s= gopkg.in/olivere/elastic.v2 v2.0.1-0.20180214101641-ad2886760fe8/go.mod h1:CTVyl1gckiFw1aLZYxC00g3f9jnHmhoOKcWF7W3c6n4= gopkg.in/olivere/elastic.v3 v3.0.42-0.20180214101641-ad2886760fe8 h1:/94acWvfEezSev8IzQkUIQNsNfo7TX92kfr1KP7FnXo= @@ -92,7 +225,10 @@ gopkg.in/olivere/elastic.v5 v5.0.64/go.mod h1:FylZT6jQWtfHsicejzOm3jIMVPOAksa80i gopkg.in/sourcemap.v1 v1.0.3 h1:/cqLW94A7+xWBmK3hp6rv28C+2ervQ4nB52YIkGitvE= gopkg.in/sourcemap.v1 v1.0.3/go.mod h1:2RlvNNSMglmRrcvhfuzp4hQHwOtjxlbjX7UPY/GXb78= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.2.7/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gotest.tools/v3 v3.0.2/go.mod h1:3SzNCllyD9/Y+b5r9JIKQ474KzkZyqLqEfYqMsX94Bk= +honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg=