Skip to content

Commit

Permalink
Implement composite type support.
Browse files Browse the repository at this point in the history
  • Loading branch information
pedrofranceschi committed Apr 28, 2016
1 parent 3c2e42a commit 2da240f
Show file tree
Hide file tree
Showing 15 changed files with 588 additions and 25 deletions.
63 changes: 63 additions & 0 deletions action/alterattribute.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package action

import (
"encoding/gob"
"fmt"
)

type AlterAttribute struct {
SchemaName string
TypeName string
Column Column
NewColumn Column
}

// Register type for gob
func init() {
gob.Register(&AlterAttribute{})
}

func (a *AlterAttribute) Execute(c *Context) error {
if a.Column.Name != a.NewColumn.Name {
_, err := c.Tx.Exec(
fmt.Sprintf(
"ALTER TYPE \"%s\".\"%s\" RENAME ATTRIBUTE \"%s\" TO \"%s\";",
a.SchemaName,
a.TypeName,
a.Column.Name,
a.NewColumn.Name,
),
)

if err != nil {
return err
}
}

if a.Column.Type != a.NewColumn.Type {
_, err := c.Tx.Exec(
fmt.Sprintf(
"ALTER TYPE \"%s\".\"%s\" ALTER ATTRIBUTE \"%s\" TYPE %s\"%s\";",
a.SchemaName,
a.TypeName,
a.NewColumn.Name,
a.NewColumn.GetTypeSchemaStr(a.SchemaName),
a.NewColumn.Type,
),
)

if err != nil {
return err
}
}

return nil
}

func (a *AlterAttribute) Filter(targetExpression string) bool {
return true
}

func (a *AlterAttribute) NeedsSeparatedBatch() bool {
return false
}
40 changes: 40 additions & 0 deletions action/createattribute.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package action

import (
"encoding/gob"
"fmt"
)

type CreateAttribute struct {
SchemaName string
TypeName string
Column Column
}

// Register type for gob
func init() {
gob.Register(&CreateAttribute{})
}

func (a *CreateAttribute) Execute(c *Context) error {
_, err := c.Tx.Exec(
fmt.Sprintf(
"ALTER TYPE \"%s\".\"%s\" ADD ATTRIBUTE \"%s\" %s\"%s\";",
a.SchemaName,
a.TypeName,
a.Column.Name,
a.Column.GetTypeSchemaStr(a.SchemaName),
a.Column.Type,
),
)

return err
}

func (a *CreateAttribute) Filter(targetExpression string) bool {
return true
}

func (a *CreateAttribute) NeedsSeparatedBatch() bool {
return false
}
35 changes: 26 additions & 9 deletions action/createtype.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
type CreateType struct {
SchemaName string
TypeName string
TypeType string
}

// Register type for gob
Expand All @@ -16,15 +17,31 @@ func init() {
}

func (a *CreateType) Execute(c *Context) error {
_, err := c.Tx.Exec(
fmt.Sprintf(
"CREATE TYPE \"%s\".\"%s\" AS ENUM ();",
a.SchemaName,
a.TypeName,
),
)

return err
if a.TypeType == "c" {
// Composite type
_, err := c.Tx.Exec(
fmt.Sprintf(
"CREATE TYPE \"%s\".\"%s\" AS ();",
a.SchemaName,
a.TypeName,
),
)

return err
} else if a.TypeType == "e" {
// Enum
_, err := c.Tx.Exec(
fmt.Sprintf(
"CREATE TYPE \"%s\".\"%s\" AS ENUM ();",
a.SchemaName,
a.TypeName,
),
)

return err
}

return nil
}

func (a *CreateType) Filter(targetExpression string) bool {
Expand Down
33 changes: 33 additions & 0 deletions action/dropattribute.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package action

import (
"encoding/gob"
"fmt"
)

type DropAttribute struct {
SchemaName string
TypeName string
Column Column
}

// Register type for gob
func init() {
gob.Register(&DropAttribute{})
}

func (a *DropAttribute) Execute(c *Context) error {
_, err := c.Tx.Exec(
fmt.Sprintf("ALTER TYPE \"%s\".\"%s\" DROP ATTRIBUTE \"%s\";", a.SchemaName, a.TypeName, a.Column.Name),
)

return err
}

func (a *DropAttribute) Filter(targetExpression string) bool {
return true
}

func (a *DropAttribute) NeedsSeparatedBatch() bool {
return false
}
41 changes: 37 additions & 4 deletions data/sql/setup.sql
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ CREATE TABLE IF NOT EXISTS teleport.batch_events (

-- Returns current schema of all tables in all schemas as a JSON
-- JSON array containing each column's definition.
CREATE OR REPLACE FUNCTION get_schema() RETURNS text AS $$
CREATE OR REPLACE FUNCTION teleport_get_schema() RETURNS text AS $$
BEGIN
RETURN (
SELECT json_agg(row_to_json(data)) FROM (
Expand Down Expand Up @@ -132,7 +132,7 @@ BEGIN
-- Ordinary columns are numbered from 1 up.
-- System columns have negative numbers.
attr.attr_num > 0
) AS attributes,
) AS columns,
(
-- The view pg_indexes provides access to useful information about each index
-- in the database.
Expand Down Expand Up @@ -181,6 +181,7 @@ BEGIN
SELECT
pgtype.oid AS oid,
pgtype.typnamespace AS namespace_oid,
pgtype.typtype AS type_type,
pgtype.typname AS type_name,
(
-- The catalog pg_attribute stores information about table columns. There will be
Expand All @@ -197,16 +198,48 @@ BEGIN
) enum
WHERE
enum.type_oid = pgtype.oid
) AS enums
) AS enums,
(
-- The catalog pg_attribute stores information about table columns. There will be
-- exactly one pg_attribute row for every column in every table in the database.
-- (There will also be attribute entries for indexes, and indeed all objects that
-- have pg_class entries.)
SELECT array_to_json(array_agg(row_to_json(attr)))
FROM (
SELECT
a.attrelid AS class_oid,
a.attname AS attr_name,
a.attnum AS attr_num,
t.typname AS type_name,
(
SELECT n.nspname
FROM pg_namespace n
WHERE n.oid = t.typnamespace
) AS type_schema,
t.oid AS type_oid
FROM pg_attribute a
INNER JOIN pg_type t
ON a.atttypid = t.oid
) attr
WHERE
attr.class_oid = pgtype.typrelid AND
-- Ordinary columns are numbered from 1 up.
-- System columns have negative numbers.
attr.attr_num > 0
) AS attributes
FROM pg_type pgtype
LEFT JOIN pg_class class
ON class.oid = pgtype.typrelid
-- typtype is:
-- b for a base type
-- c for a composite type (e.g., a table's row type)
-- d for a domain
-- e for an enum type
-- p for a pseudo-type
-- r for a range type
WHERE typtype = 'e'
WHERE
(pgtype.typtype = 'e') OR
(pgtype.typtype = 'c' AND class.relkind = 'c')
) pgtype
WHERE pgtype.namespace_oid = namespace.oid
) AS types,
Expand Down
4 changes: 2 additions & 2 deletions data/sql/source_trigger.sql
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ BEGIN
WITH all_json_key_value AS (
SELECT 'pre' AS key, data::json AS value FROM teleport.event WHERE id = event_row.id
UNION ALL
SELECT 'post' AS key, get_schema()::json AS value
SELECT 'post' AS key, teleport_get_schema()::json AS value
)
UPDATE teleport.event
SET status = 'waiting_batch',
Expand All @@ -40,7 +40,7 @@ BEGIN

INSERT INTO teleport.event (data, kind, trigger_tag, trigger_event, transaction_id, status) VALUES
(
get_schema()::text,
teleport_get_schema()::text,
'ddl',
'ddl_command_start',
'',
Expand Down
88 changes: 88 additions & 0 deletions database/attribute.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package database

import (
"github.com/pagarme/teleport/action"
"github.com/pagarme/teleport/batcher/ddldiff"
)

// Define a class column
type Attribute struct {
Name string `json:"attr_name"`
Num int `json:"attr_num"`
TypeName string `json:"type_name"`
TypeSchema string `json:"type_schema"`
TypeOid string `json:"type_oid"`
Type *Type
}

func (a *Attribute) IsNativeType() bool {
return a.TypeSchema == "pg_catalog"
}

// Implements Diffable
func (post *Attribute) Diff(other ddldiff.Diffable, context ddldiff.Context) []action.Action {
actions := make([]action.Action, 0)

if other == nil {
actions = append(actions, &action.CreateAttribute{
context.Schema,
post.Type.Name,
action.Column{
post.Name,
post.TypeName,
post.IsNativeType(),
},
})
} else {
pre := other.(*Attribute)

if pre.Name != post.Name || pre.TypeOid != post.TypeOid {
actions = append(actions, &action.AlterAttribute{
context.Schema,
post.Type.Name,
action.Column{
pre.Name,
pre.TypeName,
pre.IsNativeType(),
},
action.Column{
post.Name,
post.TypeName,
post.IsNativeType(),
},
})
}
}

return actions
}

func (a *Attribute) Children() []ddldiff.Diffable {
return []ddldiff.Diffable{}
}

func (a *Attribute) Drop(context ddldiff.Context) []action.Action {
return []action.Action{
&action.DropAttribute{
context.Schema,
a.Type.Name,
action.Column{
a.Name,
a.TypeName,
a.IsNativeType(),
},
},
}
}

func (a *Attribute) IsEqual(other ddldiff.Diffable) bool {
if other == nil {
return false
}

if otherAttr, ok := other.(*Attribute); ok {
return (a.Num == otherAttr.Num)
}

return false
}
Loading

0 comments on commit 2da240f

Please sign in to comment.