From 2416d1c6c3236ae736a407f3c5c0c763baf1c0b8 Mon Sep 17 00:00:00 2001 From: Alok Kumar Singh Date: Thu, 20 May 2021 17:09:31 +0530 Subject: [PATCH 01/10] New masking feature: boolean keys Also refactors the code for extraColumn support --- MASKING.md | 14 ++ pkg/redshift/redshift.go | 2 + pkg/redshiftbatcher/batch_processor.go | 27 +-- pkg/redshiftloader/job.go | 185 +++++++++++++++------ pkg/redshiftloader/job_test.go | 2 + pkg/redshiftloader/load_processor.go | 1 + pkg/serializer/message.go | 5 +- pkg/serializer/serializer.go | 42 +++-- pkg/transformer/debezium/schema.go | 109 ++++++------ pkg/transformer/debezium/schema_test.go | 54 +++--- pkg/transformer/masker/database.yaml | 6 + pkg/transformer/masker/mask_config.go | 65 +++++++- pkg/transformer/masker/mask_config_test.go | 2 +- pkg/transformer/masker/masker.go | 84 ++++++---- pkg/transformer/masker/masker_test.go | 70 ++++++++ pkg/transformer/transformer.go | 4 +- 16 files changed, 476 insertions(+), 196 deletions(-) diff --git a/MASKING.md b/MASKING.md index a4fc78049..9ac42f6d7 100644 --- a/MASKING.md +++ b/MASKING.md @@ -102,3 +102,17 @@ include_tables: - customers - orders ``` + +### Regex Pattern Boolean Keys +Free text columns can contain PII so we do not unmask it, but we want the user to make aggregate analysis on the non pii data in it. So using this a user gets boolean column stating that the text/regex in the complete free text is present. + +For example: We add a boolean column `favourite_quote_has_philosphy`. +If value in column `favourite_quote` matches the regex `'life|time'` (case insensitive), then the value in extra column `favourite_quote_has_philosphy` is `true` else `false`. + +```yaml +regex_pattern_boolean_keys: + customers: + favourite_quote: + has_philosphy: 'life|time' + has_text_funny: 'funny' +``` diff --git a/pkg/redshift/redshift.go b/pkg/redshift/redshift.go index 9511dc9d7..b2e8cf945 100644 --- a/pkg/redshift/redshift.go +++ b/pkg/redshift/redshift.go @@ -19,6 +19,8 @@ import ( ) const ( + RedshiftBoolean = "boolean" + RedshiftString = "character varying" RedshiftStringMax = "character varying(65535)" RedshiftStringMaxLength = 65535 diff --git a/pkg/redshiftbatcher/batch_processor.go b/pkg/redshiftbatcher/batch_processor.go index ef9a13af2..63133c0df 100644 --- a/pkg/redshiftbatcher/batch_processor.go +++ b/pkg/redshiftbatcher/batch_processor.go @@ -182,6 +182,7 @@ type response struct { endOffset int64 messagesProcessed int maskSchema map[string]serializer.MaskInfo + extraMaskSchema map[string]serializer.ExtraMaskInfo bytesProcessed int64 } @@ -270,6 +271,7 @@ func (b *batchProcessor) signalLoad(resp *response) error { resp.batchSchemaID, // schema of upstream topic's value b.schemaIDKey, // schema of upstream topic's key resp.maskSchema, + resp.extraMaskSchema, resp.skipMerge, resp.bytesProcessed, resp.createEvents, @@ -333,6 +335,7 @@ func (b *batchProcessor) processMessage( resp.batchSchemaID, b.schemaIDKey, resp.maskSchema, + resp.extraMaskSchema, ) if err != nil { return bytesProcessed, fmt.Errorf( @@ -390,6 +393,9 @@ func (b *batchProcessor) processMessage( if b.maskMessages && len(resp.maskSchema) == 0 { resp.maskSchema = message.MaskSchema } + if b.maskMessages && len(resp.extraMaskSchema) == 0 { + resp.extraMaskSchema = message.ExtraMaskSchema + } resp.skipMerge = false // deprecated klog.V(5).Infof( @@ -542,16 +548,17 @@ func (b *batchProcessor) Process( responses := []*response{} for i, msgBuf := range msgBufs { resp := &response{ - err: nil, - batchID: i + 1, - batchSchemaID: -1, - skipMerge: false, - createEvents: 0, - updateEvents: 0, - deleteEvents: 0, - s3Key: "", - bodyBuf: bytes.NewBuffer(make([]byte, 0, 4096)), - maskSchema: make(map[string]serializer.MaskInfo), + err: nil, + batchID: i + 1, + batchSchemaID: -1, + skipMerge: false, + createEvents: 0, + updateEvents: 0, + deleteEvents: 0, + s3Key: "", + bodyBuf: bytes.NewBuffer(make([]byte, 0, 4096)), + maskSchema: make(map[string]serializer.MaskInfo), + extraMaskSchema: make(map[string]serializer.ExtraMaskInfo), } wg.Add(1) go b.processBatch(wg, session, msgBuf, resp) diff --git a/pkg/redshiftloader/job.go b/pkg/redshiftloader/job.go index 2367112c1..6031d7828 100644 --- a/pkg/redshiftloader/job.go +++ b/pkg/redshiftloader/job.go @@ -2,6 +2,7 @@ package redshiftloader import ( "fmt" + "github.com/practo/klog/v2" "github.com/practo/tipoca-stream/redshiftsink/pkg/serializer" "strings" ) @@ -18,6 +19,7 @@ var JobAvroSchema string = `{ {"name": "schemaId", "type": "int"}, {"name": "schemaIdKey", "type": "int", "default": -1}, {"name": "maskSchema", "type": "string"}, + {"name": "extraMaskSchema", "type": "string", default: ""}, {"name": "skipMerge", "type": "string", "default": ""}, {"name": "batchBytes", "type": "long", "default": 0}, {"name": "createEvents", "type": "long", "default": 0}, @@ -27,41 +29,45 @@ var JobAvroSchema string = `{ }` type Job struct { - UpstreamTopic string `json:"upstreamTopic"` // batcher topic - StartOffset int64 `json:"startOffset"` - EndOffset int64 `json:"endOffset"` - CsvDialect string `json:"csvDialect"` - S3Path string `json:"s3Path"` - SchemaId int `json:"schemaId"` // schema id of debezium event for the value for upstream topic (batcher topic) - SchemaIdKey int `json:"schemaIdKey"` // schema id of debezium event for the key for upstream topic (batcher topic) - MaskSchema map[string]serializer.MaskInfo `json:"maskSchema"` - SkipMerge bool `json:"skipMerge"` // deprecated in favour of createEvents, updateEvents and deleteEvents - BatchBytes int64 `json:"batchBytes"` // batch bytes store sum of all message bytes in this batch - CreateEvents int64 `json:"createEvents"` // stores count of create events - UpdateEvents int64 `json:"updateEvents"` // stores count of update events - DeleteEvents int64 `json:"deleteEvents"` // stores count of delete events + UpstreamTopic string `json:"upstreamTopic"` // batcher topic + StartOffset int64 `json:"startOffset"` + EndOffset int64 `json:"endOffset"` + CsvDialect string `json:"csvDialect"` + S3Path string `json:"s3Path"` + SchemaId int `json:"schemaId"` // schema id of debezium event for the value for upstream topic (batcher topic) + SchemaIdKey int `json:"schemaIdKey"` // schema id of debezium event for the key for upstream topic (batcher topic) + MaskSchema map[string]serializer.MaskInfo `json:"maskSchema"` + ExtraMaskSchema map[string]serializer.ExtraMaskInfo `json:"extraMaskSchema"` + SkipMerge bool `json:"skipMerge"` // deprecated in favour of createEvents, updateEvents and deleteEvents + BatchBytes int64 `json:"batchBytes"` // batch bytes store sum of all message bytes in this batch + CreateEvents int64 `json:"createEvents"` // stores count of create events + UpdateEvents int64 `json:"updateEvents"` // stores count of update events + DeleteEvents int64 `json:"deleteEvents"` // stores count of delete events } func NewJob( upstreamTopic string, startOffset int64, endOffset int64, csvDialect string, s3Path string, schemaId int, schemaIdKey int, - maskSchema map[string]serializer.MaskInfo, skipMerge bool, + maskSchema map[string]serializer.MaskInfo, + extraMaskSchema map[string]serializer.ExtraMaskInfo, + skipMerge bool, batchBytes, createEvents, updateEvents, deleteEvents int64) Job { return Job{ - UpstreamTopic: upstreamTopic, - StartOffset: startOffset, - EndOffset: endOffset, - CsvDialect: csvDialect, - S3Path: s3Path, - SchemaId: schemaId, - SchemaIdKey: schemaIdKey, - MaskSchema: maskSchema, - SkipMerge: skipMerge, // deprecated - BatchBytes: batchBytes, - CreateEvents: createEvents, - UpdateEvents: updateEvents, - DeleteEvents: deleteEvents, + UpstreamTopic: upstreamTopic, + StartOffset: startOffset, + EndOffset: endOffset, + CsvDialect: csvDialect, + S3Path: s3Path, + SchemaId: schemaId, + SchemaIdKey: schemaIdKey, + MaskSchema: maskSchema, + ExtraMaskSchema: extraMaskSchema, + SkipMerge: skipMerge, // deprecated + BatchBytes: batchBytes, + CreateEvents: createEvents, + UpdateEvents: updateEvents, + DeleteEvents: deleteEvents, } } @@ -115,9 +121,15 @@ func StringMapToJob(data map[string]interface{}) Job { case "maskSchema": schema := make(map[string]serializer.MaskInfo) if value, ok := v.(string); ok { - schema = ToSchemaMap(value) + schema = ToMaskSchemaMap(value) } job.MaskSchema = schema + case "extraMaskSchema": + if value, ok := v.(string); ok { + job.ExtraMaskSchema = ToExtraMaskSchemaMap(value) + } else { // backward compatibility + job.ExtraMaskSchema = nil + } case "batchBytes": if value, ok := v.(int64); ok { job.BatchBytes = value @@ -155,7 +167,7 @@ func StringMapToJob(data map[string]interface{}) Job { // TODO: hack, to release fast, found unwanted complications in // using map[string]interface in goavro(will revisit) -func ToSchemaMap(r string) map[string]serializer.MaskInfo { +func ToMaskSchemaMap(r string) map[string]serializer.MaskInfo { m := make(map[string]serializer.MaskInfo) columns := strings.Split(r, "|") @@ -170,7 +182,7 @@ func ToSchemaMap(r string) map[string]serializer.MaskInfo { info := strings.Split(col, ",") name := info[0] - var masked, sortCol, distCol, lengthCol, mobileCol, mappingPIICol, conditionalNonPIICol, dependentNonPIICol bool + var masked, sortCol, distCol, lengthCol, mobileCol, mappingPIICol, conditionalNonPIICol, dependentNonPIICol, regexPatternBooleanCol bool if info[1] == "true" { masked = true } @@ -196,21 +208,27 @@ func ToSchemaMap(r string) map[string]serializer.MaskInfo { conditionalNonPIICol = true } } - if len(info) == 9 { + if len(info) >= 9 { if info[8] == "true" { dependentNonPIICol = true } } + if len(info) >= 10 { + if info[9] == "true" { + regexPatternBooleanCol = true + } + } m[name] = serializer.MaskInfo{ - Masked: masked, - SortCol: sortCol, - DistCol: distCol, - LengthCol: lengthCol, - MobileCol: mobileCol, - MappingPIICol: mappingPIICol, - ConditionalNonPIICol: conditionalNonPIICol, - DependentNonPIICol: dependentNonPIICol, + Masked: masked, + SortCol: sortCol, + DistCol: distCol, + LengthCol: lengthCol, + MobileCol: mobileCol, + MappingPIICol: mappingPIICol, + ConditionalNonPIICol: conditionalNonPIICol, + DependentNonPIICol: dependentNonPIICol, + RegexPatternBooleanCol: regexPatternBooleanCol, } } @@ -218,13 +236,13 @@ func ToSchemaMap(r string) map[string]serializer.MaskInfo { } // TODO: hack, to release fast, found unwanted complications in -// using map[string]interface in goavro (will revisit) -func ToSchemaString(m map[string]serializer.MaskInfo) string { +// using map[string]interface in goavro (may revisit if required) +func ToMaskSchemaString(m map[string]serializer.MaskInfo) string { var r string for name, info := range m { col := fmt.Sprintf( - "%s,%t,%t,%t,%t,%t,%t,%t,%t", + "%s,%t,%t,%t,%t,%t,%t,%t,%t,%t", name, info.Masked, info.SortCol, @@ -234,6 +252,7 @@ func ToSchemaString(m map[string]serializer.MaskInfo) string { info.MappingPIICol, info.ConditionalNonPIICol, info.DependentNonPIICol, + info.RegexPatternBooleanCol, ) r = r + col + "|" } @@ -241,6 +260,61 @@ func ToSchemaString(m map[string]serializer.MaskInfo) string { return r } +// TODO: hack, to release fast, found unwanted complications in +// using map[string]interface in goavro (may revisit if required) +func ToExtraMaskSchemaString(m map[string]serializer.ExtraMaskInfo) string { + var r string + + for name, info := range m { + col := fmt.Sprintf( + "%s,%t,%s,%s", + name, + info.Masked, + info.ColumnType, + info.DefaultVal, + ) + r = r + col + "|" + } + + return r +} + +// TODO: hack, to release fast, found unwanted complications in +// using map[string]interface in goavro(will revisit) +func ToExtraMaskSchemaMap(r string) map[string]serializer.ExtraMaskInfo { + m := make(map[string]serializer.ExtraMaskInfo) + + columns := strings.Split(r, "|") + if len(columns) == 0 { + return m + } + + for _, col := range columns { + if col == "" { + continue + } + + info := strings.Split(col, ",") + + if len(info) != 3 { + klog.Fatalf("expecting extra mask schema to be length 3, got:%+v, schema:%v", len(info), r) + } + + var masked bool + if info[1] == "true" { + masked = true + } + + m[info[0]] = serializer.ExtraMaskInfo{ + Masked: masked, + ColumnType: info[1], + DefaultVal: info[2], + } + } + + return m +} + // ToStringMap returns a map representation of the Job func (c Job) ToStringMap() map[string]interface{} { skipMerge := "false" // deprecated not used anymore, backward compatibility @@ -248,18 +322,19 @@ func (c Job) ToStringMap() map[string]interface{} { skipMerge = "true" } return map[string]interface{}{ - "upstreamTopic": c.UpstreamTopic, - "startOffset": c.StartOffset, - "endOffset": c.EndOffset, - "csvDialect": c.CsvDialect, - "s3Path": c.S3Path, - "schemaId": c.SchemaId, - "schemaIdKey": c.SchemaIdKey, - "skipMerge": skipMerge, - "maskSchema": ToSchemaString(c.MaskSchema), - "batchBytes": c.BatchBytes, - "createEvents": c.CreateEvents, - "updateEvents": c.UpdateEvents, - "deleteEvents": c.DeleteEvents, + "upstreamTopic": c.UpstreamTopic, + "startOffset": c.StartOffset, + "endOffset": c.EndOffset, + "csvDialect": c.CsvDialect, + "s3Path": c.S3Path, + "schemaId": c.SchemaId, + "schemaIdKey": c.SchemaIdKey, + "skipMerge": skipMerge, + "maskSchema": ToMaskSchemaString(c.MaskSchema), + "extraMaskSchema": ToExtraMaskSchemaString(c.ExtraMaskSchema), + "batchBytes": c.BatchBytes, + "createEvents": c.CreateEvents, + "updateEvents": c.UpdateEvents, + "deleteEvents": c.DeleteEvents, } } diff --git a/pkg/redshiftloader/job_test.go b/pkg/redshiftloader/job_test.go index 84c3026a2..5130c8f29 100644 --- a/pkg/redshiftloader/job_test.go +++ b/pkg/redshiftloader/job_test.go @@ -12,6 +12,7 @@ func TestToStringMap(t *testing.T) { "kafkaoffset": serializer.MaskInfo{}, "id": serializer.MaskInfo{Masked: true}, } + extraMaskSchema := map[string]serializer.ExtraMaskInfo{} job := NewJob( "upstreamTopic", @@ -22,6 +23,7 @@ func TestToStringMap(t *testing.T) { 1, 2, maskSchema, + extraMaskSchema, false, 10, -1, diff --git a/pkg/redshiftloader/load_processor.go b/pkg/redshiftloader/load_processor.go index f6bb30881..9d29e2e75 100644 --- a/pkg/redshiftloader/load_processor.go +++ b/pkg/redshiftloader/load_processor.go @@ -724,6 +724,7 @@ func (b *loadProcessor) processBatch( schemaId, schemaIdKey, job.MaskSchema, + job.ExtraMaskSchema, ) if err != nil { return bytesProcessed, fmt.Errorf( diff --git a/pkg/serializer/message.go b/pkg/serializer/message.go index 06f3e9a4b..f20a03259 100644 --- a/pkg/serializer/message.go +++ b/pkg/serializer/message.go @@ -31,8 +31,9 @@ type Message struct { Value interface{} Bytes int64 - Operation string - MaskSchema map[string]MaskInfo + Operation string + MaskSchema map[string]MaskInfo + ExtraMaskSchema map[string]ExtraMaskInfo } type MessageAsyncBatch struct { diff --git a/pkg/serializer/serializer.go b/pkg/serializer/serializer.go index 0dc5f170a..de6e1332c 100644 --- a/pkg/serializer/serializer.go +++ b/pkg/serializer/serializer.go @@ -14,14 +14,23 @@ const ( ) type MaskInfo struct { - Masked bool - SortCol bool - DistCol bool - LengthCol bool - MobileCol bool - MappingPIICol bool - ConditionalNonPIICol bool - DependentNonPIICol bool + Masked bool + + SortCol bool + DistCol bool + + LengthCol bool + MobileCol bool + MappingPIICol bool + ConditionalNonPIICol bool + DependentNonPIICol bool + RegexPatternBooleanCol bool +} + +type ExtraMaskInfo struct { + Masked bool + ColumnType string + DefaultVal string } type Serializer interface { @@ -61,13 +70,14 @@ func (c *avroSerializer) Deserialize( } return &Message{ - SchemaId: int(schemaId), - Topic: message.Topic, - Partition: message.Partition, - Offset: message.Offset, - Key: string(message.Key), - Value: native, - Bytes: int64(len(message.Value)), - MaskSchema: make(map[string]MaskInfo), + SchemaId: int(schemaId), + Topic: message.Topic, + Partition: message.Partition, + Offset: message.Offset, + Key: string(message.Key), + Value: native, + Bytes: int64(len(message.Value)), + MaskSchema: make(map[string]MaskInfo), + ExtraMaskSchema: make(map[string]ExtraMaskInfo), }, nil } diff --git a/pkg/transformer/debezium/schema.go b/pkg/transformer/debezium/schema.go index 3632d6df8..507b179d7 100644 --- a/pkg/transformer/debezium/schema.go +++ b/pkg/transformer/debezium/schema.go @@ -309,8 +309,12 @@ func (c *schemaTransformer) TransformValue( topic string, schemaId int, schemaIdKey int, - maskSchema map[string]serializer.MaskInfo) (interface{}, error) { - + maskSchema map[string]serializer.MaskInfo, + extraMaskSchema map[string]serializer.ExtraMaskInfo, +) ( + interface{}, + error, +) { s, err := schemaregistry.GetSchemaWithRetry(c.registry, schemaId, 10) if err != nil { return nil, err @@ -330,33 +334,47 @@ func (c *schemaTransformer) TransformValue( s.Schema(), primaryKeys, maskSchema, + extraMaskSchema, ) } func (c *schemaTransformer) transformSchemaValue(jobSchema string, primaryKeys []string, - maskSchema map[string]serializer.MaskInfo) (interface{}, error) { + maskSchema map[string]serializer.MaskInfo, + extraMaskSchema map[string]serializer.ExtraMaskInfo, +) ( + interface{}, + error, +) { + var extraColumns []redshift.ColInfo + extraColumnsMap := make(map[string]bool) + if len(extraMaskSchema) != 0 { + for extraColumnName, emSchema := range extraMaskSchema { + extraColumnsMap[extraColumnName] = true + extraColumns = append(extraColumns, redshift.ColInfo{ + Name: extraColumnName, + Type: emSchema.ColumnType, + DefaultVal: emSchema.DefaultVal, + }) + } + } // remove nulls // TODO: this might be required, better if not // schema := strings.ReplaceAll(jobSchema, `"null",`, "") schema := jobSchema - var debeziumSchema Schema err := json.Unmarshal([]byte(schema), &debeziumSchema) if err != nil { return nil, err } - d := &schemaParser{ tableDelim: ".", schema: debeziumSchema, } - columns := d.columnsBefore() var redshiftColumns []redshift.ColInfo - var extraColumns []redshift.ColInfo for _, column := range columns { sortKey := false distKey := false @@ -368,55 +386,42 @@ func (c *schemaTransformer) transformSchemaValue(jobSchema string, sortKey = mschema.SortCol distKey = mschema.DistCol columnMasked = mschema.Masked - if mschema.LengthCol { - newColName := strings.ToLower( - column.Name) + transformer.LengthColumnSuffix - extraColumns = append(extraColumns, redshift.ColInfo{ - Name: newColName, - Type: redshift.RedshiftInteger, - DebeziumType: "", // not required - DefaultVal: "0", - NotNull: false, - PrimaryKey: false, - SortOrdinal: 0, - DistKey: false, - SourceType: redshift.SourceType{}, // not required - }) + if mschema.ConditionalNonPIICol || mschema.DependentNonPIICol { + useStringMax = true } - if mschema.MobileCol { - newColName := strings.ToLower( - column.Name) + transformer.MobileCoulmnSuffix - extraColumns = append(extraColumns, redshift.ColInfo{ - Name: newColName, - Type: redshift.RedshiftMobileColType, - DebeziumType: "", // not required - DefaultVal: "", - NotNull: false, - PrimaryKey: false, - SortOrdinal: 0, - DistKey: false, - SourceType: redshift.SourceType{}, // not required - }) + //deprecated below started -------------------------------------------------------------------- + if mschema.LengthCol && len(extraMaskSchema) == 0 { // deprecated in favour of extraMaskSchema + newColName := strings.ToLower(column.Name) + transformer.LengthColumnSuffix + _, ok := extraColumnsMap[newColName] + if !ok { + extraColumns = append(extraColumns, redshift.ColInfo{ + Name: newColName, + Type: redshift.RedshiftInteger, + DefaultVal: "0", + }) + } } - if mschema.MappingPIICol { - newColName := strings.ToLower( - transformer.MappingPIIColumnPrefix + column.Name, - ) - extraColumns = append(extraColumns, redshift.ColInfo{ - Name: newColName, - Type: redshift.RedshiftMaskedDataType, - DebeziumType: "", // not required - DefaultVal: "", - NotNull: false, - PrimaryKey: false, - SortOrdinal: 0, - DistKey: false, - SourceType: redshift.SourceType{}, // not required - }) + if mschema.MobileCol && len(extraMaskSchema) == 0 { // deprecated in favour of extraMaskSchema + newColName := strings.ToLower(column.Name) + transformer.MobileCoulmnSuffix + _, ok := extraColumnsMap[newColName] + if !ok { + extraColumns = append(extraColumns, redshift.ColInfo{ + Name: newColName, + Type: redshift.RedshiftMobileColType, + }) + } } - if mschema.ConditionalNonPIICol || mschema.DependentNonPIICol { - useStringMax = true + if mschema.MappingPIICol && len(extraMaskSchema) == 0 { // deprecated in favour of extraMaskSchema + newColName := strings.ToLower(transformer.MappingPIIColumnPrefix + column.Name) + _, ok := extraColumnsMap[newColName] + if !ok { + extraColumns = append(extraColumns, redshift.ColInfo{ + Name: newColName, + Type: redshift.RedshiftMaskedDataType, + }) + } } + //deprecated below ended -------------------------------------------------------------------------------------- } } diff --git a/pkg/transformer/debezium/schema_test.go b/pkg/transformer/debezium/schema_test.go index 8a4569a15..86ebbd2e6 100644 --- a/pkg/transformer/debezium/schema_test.go +++ b/pkg/transformer/debezium/schema_test.go @@ -10,21 +10,24 @@ func TestSchemaMysqlDataType(t *testing.T) { t.Parallel() tests := []struct { - name string - jobSchema string - maskSchema map[string]serializer.MaskInfo - cName string - cType string + name string + jobSchema string + maskSchema map[string]serializer.MaskInfo + extraMaskSchema map[string]serializer.ExtraMaskInfo + cName string + cType string }{ { - name: "test1: int type test", - jobSchema: `{"type":"record","name":"Envelope","namespace":"inventory.inventory.customers","fields":[{"name":"before","type":["null",{"type":"record","name":"Value","fields":[{"name":"id","type":"int"},{"name":"first_name","type":"string"},{"name":"last_name","type":["null","string"],"default":null},{"name":"email","type":"string"}],"connect.name":"inventory.inventory.customers.Value"}],"default":null},{"name":"after","type":["null","Value"],"default":null},{"name":"source","type":{"type":"record","name":"Source","namespace":"io.debezium.connector.mysql","fields":[{"name":"version","type":"string"},{"name":"connector","type":"string"},{"name":"name","type":"string"},{"name":"ts_ms","type":"long"},{"name":"snapshot","type":[{"type":"string","connect.version":1,"connect.parameters":{"allowed":"true,last,false"},"connect.default":"false","connect.name":"io.debezium.data.Enum"},"null"],"default":"false"},{"name":"db","type":"string"},{"name":"table","type":["null","string"],"default":null},{"name":"server_id","type":"long"},{"name":"gtid","type":["null","string"],"default":null},{"name":"file","type":"string"},{"name":"pos","type":"long"},{"name":"row","type":"int"},{"name":"thread","type":["null","long"],"default":null},{"name":"query","type":["null","string"],"default":null}],"connect.name":"io.debezium.connector.mysql.Source"}},{"name":"op","type":"string"},{"name":"ts_ms","type":["null","long"],"default":null},{"name":"transaction","type":["null",{"type":"record","name":"ConnectDefault","namespace":"io.confluent.connect.avro","fields":[{"name":"id","type":"string"},{"name":"total_order","type":"long"},{"name":"data_collection_order","type":"long"}]}],"default":null}],"connect.name":"inventory.inventory.customers.Envelope"}`, - maskSchema: map[string]serializer.MaskInfo{}, + name: "test1: int type test", + jobSchema: `{"type":"record","name":"Envelope","namespace":"inventory.inventory.customers","fields":[{"name":"before","type":["null",{"type":"record","name":"Value","fields":[{"name":"id","type":"int"},{"name":"first_name","type":"string"},{"name":"last_name","type":["null","string"],"default":null},{"name":"email","type":"string"}],"connect.name":"inventory.inventory.customers.Value"}],"default":null},{"name":"after","type":["null","Value"],"default":null},{"name":"source","type":{"type":"record","name":"Source","namespace":"io.debezium.connector.mysql","fields":[{"name":"version","type":"string"},{"name":"connector","type":"string"},{"name":"name","type":"string"},{"name":"ts_ms","type":"long"},{"name":"snapshot","type":[{"type":"string","connect.version":1,"connect.parameters":{"allowed":"true,last,false"},"connect.default":"false","connect.name":"io.debezium.data.Enum"},"null"],"default":"false"},{"name":"db","type":"string"},{"name":"table","type":["null","string"],"default":null},{"name":"server_id","type":"long"},{"name":"gtid","type":["null","string"],"default":null},{"name":"file","type":"string"},{"name":"pos","type":"long"},{"name":"row","type":"int"},{"name":"thread","type":["null","long"],"default":null},{"name":"query","type":["null","string"],"default":null}],"connect.name":"io.debezium.connector.mysql.Source"}},{"name":"op","type":"string"},{"name":"ts_ms","type":["null","long"],"default":null},{"name":"transaction","type":["null",{"type":"record","name":"ConnectDefault","namespace":"io.confluent.connect.avro","fields":[{"name":"id","type":"string"},{"name":"total_order","type":"long"},{"name":"data_collection_order","type":"long"}]}],"default":null}],"connect.name":"inventory.inventory.customers.Envelope"}`, + maskSchema: map[string]serializer.MaskInfo{}, + extraMaskSchema: map[string]serializer.ExtraMaskInfo{}, }, { - name: "test2: conversion test", - jobSchema: `{"type":"record","name":"Envelope","namespace":"inventory.inventory.customers","fields":[{"name":"before","type":["null",{"type":"record","name":"Value","fields":[{"name":"id","type":"int"},{"name":"first_name","type":"string"},{"name":"last_name","type":["null","string"],"default":null},{"name":"email","type":"string"}],"connect.name":"inventory.inventory.customers.Value"}],"default":null},{"name":"after","type":["null","Value"],"default":null},{"name":"source","type":{"type":"record","name":"Source","namespace":"io.debezium.connector.mysql","fields":[{"name":"version","type":"string"},{"name":"connector","type":"string"},{"name":"name","type":"string"},{"name":"ts_ms","type":"long"},{"name":"snapshot","type":[{"type":"string","connect.version":1,"connect.parameters":{"allowed":"true,last,false"},"connect.default":"false","connect.name":"io.debezium.data.Enum"},"null"],"default":"false"},{"name":"db","type":"string"},{"name":"table","type":["null","string"],"default":null},{"name":"server_id","type":"long"},{"name":"gtid","type":["null","string"],"default":null},{"name":"file","type":"string"},{"name":"pos","type":"long"},{"name":"row","type":"int"},{"name":"thread","type":["null","long"],"default":null},{"name":"query","type":["null","string"],"default":null}],"connect.name":"io.debezium.connector.mysql.Source"}},{"name":"op","type":"string"},{"name":"ts_ms","type":["null","long"],"default":null},{"name":"transaction","type":["null",{"type":"record","name":"ConnectDefault","namespace":"io.confluent.connect.avro","fields":[{"name":"id","type":"string"},{"name":"total_order","type":"long"},{"name":"data_collection_order","type":"long"}]}],"default":null}],"connect.name":"inventory.inventory.customers.Envelope"}`, - maskSchema: map[string]serializer.MaskInfo{}, + name: "test2: conversion test", + jobSchema: `{"type":"record","name":"Envelope","namespace":"inventory.inventory.customers","fields":[{"name":"before","type":["null",{"type":"record","name":"Value","fields":[{"name":"id","type":"int"},{"name":"first_name","type":"string"},{"name":"last_name","type":["null","string"],"default":null},{"name":"email","type":"string"}],"connect.name":"inventory.inventory.customers.Value"}],"default":null},{"name":"after","type":["null","Value"],"default":null},{"name":"source","type":{"type":"record","name":"Source","namespace":"io.debezium.connector.mysql","fields":[{"name":"version","type":"string"},{"name":"connector","type":"string"},{"name":"name","type":"string"},{"name":"ts_ms","type":"long"},{"name":"snapshot","type":[{"type":"string","connect.version":1,"connect.parameters":{"allowed":"true,last,false"},"connect.default":"false","connect.name":"io.debezium.data.Enum"},"null"],"default":"false"},{"name":"db","type":"string"},{"name":"table","type":["null","string"],"default":null},{"name":"server_id","type":"long"},{"name":"gtid","type":["null","string"],"default":null},{"name":"file","type":"string"},{"name":"pos","type":"long"},{"name":"row","type":"int"},{"name":"thread","type":["null","long"],"default":null},{"name":"query","type":["null","string"],"default":null}],"connect.name":"io.debezium.connector.mysql.Source"}},{"name":"op","type":"string"},{"name":"ts_ms","type":["null","long"],"default":null},{"name":"transaction","type":["null",{"type":"record","name":"ConnectDefault","namespace":"io.confluent.connect.avro","fields":[{"name":"id","type":"string"},{"name":"total_order","type":"long"},{"name":"data_collection_order","type":"long"}]}],"default":null}],"connect.name":"inventory.inventory.customers.Envelope"}`, + maskSchema: map[string]serializer.MaskInfo{}, + extraMaskSchema: map[string]serializer.ExtraMaskInfo{}, }, { name: "test3: length key test", @@ -39,22 +42,25 @@ func TestSchemaMysqlDataType(t *testing.T) { LengthCol: false, }, }, - cName: "email_length", - cType: "integer", + extraMaskSchema: map[string]serializer.ExtraMaskInfo{}, + cName: "email_length", + cType: "integer", }, { - name: "test4: source col length test", - jobSchema: `{"type":"record","name":"Envelope","namespace":"ts.inventory.customers","fields":[{"name":"before","type":["null",{"type":"record","name":"Value","fields":[{"name":"id","type":{"type":"int","connect.parameters":{"__debezium.source.column.type":"INT","__debezium.source.column.length":"11"}}},{"name":"first_name","type":["null",{"type":"string","connect.parameters":{"__debezium.source.column.type":"VARCHAR","__debezium.source.column.length":"1100"}}],"default":null},{"name":"last_name","type":["null",{"type":"string","connect.parameters":{"__debezium.source.column.type":"VARCHAR","__debezium.source.column.length":"255"}}],"default":null},{"name":"email","type":{"type":"string","connect.parameters":{"__debezium.source.column.type":"VARCHAR","__debezium.source.column.length":"255"}}},{"name":"dob","type":["null",{"type":"int","connect.version":1,"connect.parameters":{"__debezium.source.column.type":"DATE"},"connect.name":"io.debezium.time.Date"}],"default":null},{"name":"score","type":["null",{"type":"string","connect.parameters":{"__debezium.source.column.type":"DECIMAL","__debezium.source.column.length":"10","__debezium.source.column.scale":"4"}}],"default":null}],"connect.name":"ts.inventory.customers.Value"}],"default":null},{"name":"after","type":["null","Value"],"default":null},{"name":"source","type":{"type":"record","name":"Source","namespace":"io.debezium.connector.mysql","fields":[{"name":"version","type":"string"},{"name":"connector","type":"string"},{"name":"name","type":"string"},{"name":"ts_ms","type":"long"},{"name":"snapshot","type":[{"type":"string","connect.version":1,"connect.parameters":{"allowed":"true,last,false"},"connect.default":"false","connect.name":"io.debezium.data.Enum"},"null"],"default":"false"},{"name":"db","type":"string"},{"name":"table","type":["null","string"],"default":null},{"name":"server_id","type":"long"},{"name":"gtid","type":["null","string"],"default":null},{"name":"file","type":"string"},{"name":"pos","type":"long"},{"name":"row","type":"int"},{"name":"thread","type":["null","long"],"default":null},{"name":"query","type":["null","string"],"default":null}],"connect.name":"io.debezium.connector.mysql.Source"}},{"name":"op","type":"string"},{"name":"ts_ms","type":["null","long"],"default":null},{"name":"transaction","type":["null",{"type":"record","name":"ConnectDefault","namespace":"io.confluent.connect.avro","fields":[{"name":"id","type":"string"},{"name":"total_order","type":"long"},{"name":"data_collection_order","type":"long"}]}],"default":null}],"connect.name":"ts.inventory.customers.Envelope"}`, - maskSchema: map[string]serializer.MaskInfo{}, - cName: "first_name", - cType: "character varying(4400)", + name: "test4: source col length test", + jobSchema: `{"type":"record","name":"Envelope","namespace":"ts.inventory.customers","fields":[{"name":"before","type":["null",{"type":"record","name":"Value","fields":[{"name":"id","type":{"type":"int","connect.parameters":{"__debezium.source.column.type":"INT","__debezium.source.column.length":"11"}}},{"name":"first_name","type":["null",{"type":"string","connect.parameters":{"__debezium.source.column.type":"VARCHAR","__debezium.source.column.length":"1100"}}],"default":null},{"name":"last_name","type":["null",{"type":"string","connect.parameters":{"__debezium.source.column.type":"VARCHAR","__debezium.source.column.length":"255"}}],"default":null},{"name":"email","type":{"type":"string","connect.parameters":{"__debezium.source.column.type":"VARCHAR","__debezium.source.column.length":"255"}}},{"name":"dob","type":["null",{"type":"int","connect.version":1,"connect.parameters":{"__debezium.source.column.type":"DATE"},"connect.name":"io.debezium.time.Date"}],"default":null},{"name":"score","type":["null",{"type":"string","connect.parameters":{"__debezium.source.column.type":"DECIMAL","__debezium.source.column.length":"10","__debezium.source.column.scale":"4"}}],"default":null}],"connect.name":"ts.inventory.customers.Value"}],"default":null},{"name":"after","type":["null","Value"],"default":null},{"name":"source","type":{"type":"record","name":"Source","namespace":"io.debezium.connector.mysql","fields":[{"name":"version","type":"string"},{"name":"connector","type":"string"},{"name":"name","type":"string"},{"name":"ts_ms","type":"long"},{"name":"snapshot","type":[{"type":"string","connect.version":1,"connect.parameters":{"allowed":"true,last,false"},"connect.default":"false","connect.name":"io.debezium.data.Enum"},"null"],"default":"false"},{"name":"db","type":"string"},{"name":"table","type":["null","string"],"default":null},{"name":"server_id","type":"long"},{"name":"gtid","type":["null","string"],"default":null},{"name":"file","type":"string"},{"name":"pos","type":"long"},{"name":"row","type":"int"},{"name":"thread","type":["null","long"],"default":null},{"name":"query","type":["null","string"],"default":null}],"connect.name":"io.debezium.connector.mysql.Source"}},{"name":"op","type":"string"},{"name":"ts_ms","type":["null","long"],"default":null},{"name":"transaction","type":["null",{"type":"record","name":"ConnectDefault","namespace":"io.confluent.connect.avro","fields":[{"name":"id","type":"string"},{"name":"total_order","type":"long"},{"name":"data_collection_order","type":"long"}]}],"default":null}],"connect.name":"ts.inventory.customers.Envelope"}`, + maskSchema: map[string]serializer.MaskInfo{}, + extraMaskSchema: map[string]serializer.ExtraMaskInfo{}, + cName: "first_name", + cType: "character varying(4400)", }, { - name: "test4: enum test", - jobSchema: `{"type":"record","name":"Envelope","namespace":"ts.inventory.subscription_heroes","fields":[{"name":"before","type":["null",{"type":"record","name":"Value","fields":[{"name":"id","type":{"type":"string","connect.parameters":{"__debezium.source.column.type":"VARCHAR","__debezium.source.column.length":"36"}}},{"name":"subscription_id","type":{"type":"string","connect.parameters":{"__debezium.source.column.type":"VARCHAR","__debezium.source.column.length":"36"}}},{"name":"uhid","type":{"type":"string","connect.parameters":{"__debezium.source.column.type":"VARCHAR","__debezium.source.column.length":"36"}}},{"name":"account_id","type":["null",{"type":"int","connect.parameters":{"__debezium.source.column.type":"INT","__debezium.source.column.length":"11"}}],"default":null},{"name":"name","type":["null",{"type":"string","connect.parameters":{"__debezium.source.column.type":"VARCHAR","__debezium.source.column.length":"120"}}],"default":null},{"name":"relation","type":{"type":"string","connect.version":1,"connect.parameters":{"allowed":"SELF,FATHER,MOTHER,SPOUSE,SON,DAUGHTER,OTHER","__debezium.source.column.type":"ENUM","__debezium.source.column.length":"1"},"connect.name":"io.debezium.data.Enum"}},{"name":"mobile","type":["null",{"type":"string","connect.parameters":{"__debezium.source.column.type":"VARCHAR","__debezium.source.column.length":"15"}}],"default":null},{"name":"email","type":["null",{"type":"string","connect.parameters":{"__debezium.source.column.type":"VARCHAR","__debezium.source.column.length":"60"}}],"default":null},{"name":"adult","type":{"type":"int","connect.parameters":{"__debezium.source.column.type":"TINYINT","__debezium.source.column.length":"1"},"connect.type":"int16"}},{"name":"age","type":["null",{"type":"int","connect.parameters":{"__debezium.source.column.type":"INT","__debezium.source.column.length":"11"}}],"default":null},{"name":"gender","type":["null",{"type":"string","connect.version":1,"connect.parameters":{"allowed":"MALE,FEMALE,OTHER","__debezium.source.column.type":"ENUM","__debezium.source.column.length":"1"},"connect.name":"io.debezium.data.Enum"}],"default":null},{"name":"primary","type":["null",{"type":"int","connect.parameters":{"__debezium.source.column.type":"TINYINT","__debezium.source.column.length":"1"},"connect.type":"int16"}],"default":null},{"name":"soft_deleted","type":["null",{"type":"int","connect.parameters":{"__debezium.source.column.type":"TINYINT","__debezium.source.column.length":"1"},"connect.type":"int16"}],"default":null},{"name":"created_at","type":{"type":"long","connect.version":1,"connect.parameters":{"__debezium.source.column.type":"DATETIME"},"connect.name":"io.debezium.time.Timestamp"}},{"name":"modified_at","type":{"type":"long","connect.version":1,"connect.parameters":{"__debezium.source.column.type":"DATETIME"},"connect.name":"io.debezium.time.Timestamp"}},{"name":"created_by","type":["null",{"type":"string","connect.parameters":{"__debezium.source.column.type":"VARCHAR","__debezium.source.column.length":"36"}}],"default":null},{"name":"modified_by","type":["null",{"type":"string","connect.parameters":{"__debezium.source.column.type":"VARCHAR","__debezium.source.column.length":"36"}}],"default":null}],"connect.name":"ts.inventory.subscription_heroes.Value"}],"default":null},{"name":"after","type":["null","Value"],"default":null},{"name":"source","type":{"type":"record","name":"Source","namespace":"io.debezium.connector.mysql","fields":[{"name":"version","type":"string"},{"name":"connector","type":"string"},{"name":"name","type":"string"},{"name":"ts_ms","type":"long"},{"name":"snapshot","type":[{"type":"string","connect.version":1,"connect.parameters":{"allowed":"true,last,false"},"connect.default":"false","connect.name":"io.debezium.data.Enum"},"null"],"default":"false"},{"name":"db","type":"string"},{"name":"table","type":["null","string"],"default":null},{"name":"server_id","type":"long"},{"name":"gtid","type":["null","string"],"default":null},{"name":"file","type":"string"},{"name":"pos","type":"long"},{"name":"row","type":"int"},{"name":"thread","type":["null","long"],"default":null},{"name":"query","type":["null","string"],"default":null}],"connect.name":"io.debezium.connector.mysql.Source"}},{"name":"op","type":"string"},{"name":"ts_ms","type":["null","long"],"default":null},{"name":"transaction","type":["null",{"type":"record","name":"ConnectDefault","namespace":"io.confluent.connect.avro","fields":[{"name":"id","type":"string"},{"name":"total_order","type":"long"},{"name":"data_collection_order","type":"long"}]}],"default":null}],"connect.name":"ts.inventory.subscription_heroes.Envelope"}`, - maskSchema: map[string]serializer.MaskInfo{}, - cName: "gender", - cType: "character varying(65535)", + name: "test4: enum test", + jobSchema: `{"type":"record","name":"Envelope","namespace":"ts.inventory.subscription_heroes","fields":[{"name":"before","type":["null",{"type":"record","name":"Value","fields":[{"name":"id","type":{"type":"string","connect.parameters":{"__debezium.source.column.type":"VARCHAR","__debezium.source.column.length":"36"}}},{"name":"subscription_id","type":{"type":"string","connect.parameters":{"__debezium.source.column.type":"VARCHAR","__debezium.source.column.length":"36"}}},{"name":"uhid","type":{"type":"string","connect.parameters":{"__debezium.source.column.type":"VARCHAR","__debezium.source.column.length":"36"}}},{"name":"account_id","type":["null",{"type":"int","connect.parameters":{"__debezium.source.column.type":"INT","__debezium.source.column.length":"11"}}],"default":null},{"name":"name","type":["null",{"type":"string","connect.parameters":{"__debezium.source.column.type":"VARCHAR","__debezium.source.column.length":"120"}}],"default":null},{"name":"relation","type":{"type":"string","connect.version":1,"connect.parameters":{"allowed":"SELF,FATHER,MOTHER,SPOUSE,SON,DAUGHTER,OTHER","__debezium.source.column.type":"ENUM","__debezium.source.column.length":"1"},"connect.name":"io.debezium.data.Enum"}},{"name":"mobile","type":["null",{"type":"string","connect.parameters":{"__debezium.source.column.type":"VARCHAR","__debezium.source.column.length":"15"}}],"default":null},{"name":"email","type":["null",{"type":"string","connect.parameters":{"__debezium.source.column.type":"VARCHAR","__debezium.source.column.length":"60"}}],"default":null},{"name":"adult","type":{"type":"int","connect.parameters":{"__debezium.source.column.type":"TINYINT","__debezium.source.column.length":"1"},"connect.type":"int16"}},{"name":"age","type":["null",{"type":"int","connect.parameters":{"__debezium.source.column.type":"INT","__debezium.source.column.length":"11"}}],"default":null},{"name":"gender","type":["null",{"type":"string","connect.version":1,"connect.parameters":{"allowed":"MALE,FEMALE,OTHER","__debezium.source.column.type":"ENUM","__debezium.source.column.length":"1"},"connect.name":"io.debezium.data.Enum"}],"default":null},{"name":"primary","type":["null",{"type":"int","connect.parameters":{"__debezium.source.column.type":"TINYINT","__debezium.source.column.length":"1"},"connect.type":"int16"}],"default":null},{"name":"soft_deleted","type":["null",{"type":"int","connect.parameters":{"__debezium.source.column.type":"TINYINT","__debezium.source.column.length":"1"},"connect.type":"int16"}],"default":null},{"name":"created_at","type":{"type":"long","connect.version":1,"connect.parameters":{"__debezium.source.column.type":"DATETIME"},"connect.name":"io.debezium.time.Timestamp"}},{"name":"modified_at","type":{"type":"long","connect.version":1,"connect.parameters":{"__debezium.source.column.type":"DATETIME"},"connect.name":"io.debezium.time.Timestamp"}},{"name":"created_by","type":["null",{"type":"string","connect.parameters":{"__debezium.source.column.type":"VARCHAR","__debezium.source.column.length":"36"}}],"default":null},{"name":"modified_by","type":["null",{"type":"string","connect.parameters":{"__debezium.source.column.type":"VARCHAR","__debezium.source.column.length":"36"}}],"default":null}],"connect.name":"ts.inventory.subscription_heroes.Value"}],"default":null},{"name":"after","type":["null","Value"],"default":null},{"name":"source","type":{"type":"record","name":"Source","namespace":"io.debezium.connector.mysql","fields":[{"name":"version","type":"string"},{"name":"connector","type":"string"},{"name":"name","type":"string"},{"name":"ts_ms","type":"long"},{"name":"snapshot","type":[{"type":"string","connect.version":1,"connect.parameters":{"allowed":"true,last,false"},"connect.default":"false","connect.name":"io.debezium.data.Enum"},"null"],"default":"false"},{"name":"db","type":"string"},{"name":"table","type":["null","string"],"default":null},{"name":"server_id","type":"long"},{"name":"gtid","type":["null","string"],"default":null},{"name":"file","type":"string"},{"name":"pos","type":"long"},{"name":"row","type":"int"},{"name":"thread","type":["null","long"],"default":null},{"name":"query","type":["null","string"],"default":null}],"connect.name":"io.debezium.connector.mysql.Source"}},{"name":"op","type":"string"},{"name":"ts_ms","type":["null","long"],"default":null},{"name":"transaction","type":["null",{"type":"record","name":"ConnectDefault","namespace":"io.confluent.connect.avro","fields":[{"name":"id","type":"string"},{"name":"total_order","type":"long"},{"name":"data_collection_order","type":"long"}]}],"default":null}],"connect.name":"ts.inventory.subscription_heroes.Envelope"}`, + maskSchema: map[string]serializer.MaskInfo{}, + extraMaskSchema: map[string]serializer.ExtraMaskInfo{}, + cName: "gender", + cType: "character varying(65535)", }, } @@ -64,7 +70,7 @@ func TestSchemaMysqlDataType(t *testing.T) { c := &schemaTransformer{registry: nil} resp, err := c.transformSchemaValue( - tc.jobSchema, []string{"id"}, tc.maskSchema) + tc.jobSchema, []string{"id"}, tc.maskSchema, tc.extraMaskSchema) table := resp.(redshift.Table) if err != nil { t.Error(err) diff --git a/pkg/transformer/masker/database.yaml b/pkg/transformer/masker/database.yaml index 0aecf1882..026b67255 100644 --- a/pkg/transformer/masker/database.yaml +++ b/pkg/transformer/masker/database.yaml @@ -57,3 +57,9 @@ dist_keys: - email justifications: - source +regex_pattern_boolean_keys: + customers: + favourite_quote: + has_philosphy: 'life|time' + favourite_food: + has_pizza: 'pizza' diff --git a/pkg/transformer/masker/mask_config.go b/pkg/transformer/masker/mask_config.go index 17741d130..5230d285e 100644 --- a/pkg/transformer/masker/mask_config.go +++ b/pkg/transformer/masker/mask_config.go @@ -50,7 +50,11 @@ type MaskConfig struct { // IncludeTables restrict tables that are allowed to be sinked. IncludeTables *[]string `yaml:"include_tables,omitempty"` - // regexes cache is used to prevent regex Compile on everytime computations. + // RegexPatternBooleanKeys helps on free-text columns which need to be hashed to record + // whether a static regex pattern matches the unhashed text. + RegexPatternBooleanKeys map[string]interface{} `yaml:"regex_pattern_boolean_keys,omitempty"` + + // regexes cache is used to prevent regex Compile on every message mask run. regexes map[string]*regexp.Regexp } @@ -311,6 +315,65 @@ func (m MaskConfig) DependentNonPiiKey(table, cName string) bool { return false } +// BoolColumns returns the map of boolean column name and its value +func (m MaskConfig) BoolColumns(table, cName string, cValue *string) map[string]string { + if cValue == nil { + return nil + } + + columnsToCheckRaw, ok := m.RegexPatternBooleanKeys[table] + if !ok { + return nil + } + + columnsToCheck, ok := columnsToCheckRaw.(map[interface{}]interface{}) + if !ok { + klog.Fatalf( + "Type assertion error! table: %s, cName: %s\n", table, cName) + } + + boolColumns := make(map[string]string) + for freeTextColumnNameRaw, regexesRaw := range columnsToCheck { + freeTextColumnName := freeTextColumnNameRaw.(string) + freeTextColumnName = strings.ToLower(freeTextColumnName) // favourite_quote + if freeTextColumnName != cName { + continue + } + + regexes, ok := regexesRaw.(map[interface{}]interface{}) + if !ok { + klog.Fatalf( + "Type assertion error! table: %s, cName: %s\n", + table, cName) + } + + for regexNameRaw, patternRaw := range regexes { + regexName := regexNameRaw.(string) + regexName = strings.ToLower(regexName) + pattern := strings.ToLower(patternRaw.(string)) + + var err error + regex, ok := m.regexes[pattern] + if !ok { + regex, err = regexp.Compile(pattern) + if err != nil { + klog.Fatalf( + "Regex: %s compile failed, err:%v\n", pattern, err) + } + m.regexes[pattern] = regex + } + + if regex.MatchString(strings.ToLower(*cValue)) { + boolColumns[fmt.Sprintf("%s_%s", freeTextColumnName, regexName)] = "true" + } else { + boolColumns[fmt.Sprintf("%s_%s", freeTextColumnName, regexName)] = "false" + } + } + } + + return boolColumns +} + // PerformUnMasking checks if unmasking should be done or not func (m MaskConfig) PerformUnMasking(table, cName string, cValue *string, allColumns map[string]*string) bool { diff --git a/pkg/transformer/masker/mask_config_test.go b/pkg/transformer/masker/mask_config_test.go index 26a628ade..cb0570edf 100644 --- a/pkg/transformer/masker/mask_config_test.go +++ b/pkg/transformer/masker/mask_config_test.go @@ -30,7 +30,7 @@ func testMasked(t *testing.T, topic, table, cName, cValue string, } func TestMaskConfig(t *testing.T) { - t.Parallel() + // t.Parallel() tests := []struct { name string diff --git a/pkg/transformer/masker/masker.go b/pkg/transformer/masker/masker.go index 2a9f7d081..5941851f8 100644 --- a/pkg/transformer/masker/masker.go +++ b/pkg/transformer/masker/masker.go @@ -77,9 +77,9 @@ func (m *masker) Transform( addMissingColumn(rawColumns, table.Columns) columns := make(map[string]*string) - extraColumns := make(map[string]*string) maskSchema := make(map[string]serializer.MaskInfo) - mappingPIIColumns := make(map[string]bool) + extraMaskSchema := make(map[string]serializer.ExtraMaskInfo) + extraColumnValue := make(map[string]*string) mappingPIIKeyTable := m.config.hasMappingPIIKey(m.table) for cName, cVal := range rawColumns { @@ -91,17 +91,24 @@ func (m *masker) Transform( mappingPIIKey := m.config.MappingPIIKey(m.table, cName) dependentNonPiiKey := m.config.DependentNonPiiKey(m.table, cName) conditionalNonPiiKey := m.config.ConditionalNonPiiKey(m.table, cName) + boolColumns := m.config.BoolColumns(m.table, cName, cVal) + // extraColumns store the mask info for extra columns + // extra columns are added for the following keys: + // LengthKey, MobileKey, MappingPIIKey and Boolean keys if lengthKey { var length int if cVal != nil { length = len(*cVal) } - extraColumns[cName+transformer.LengthColumnSuffix] = stringPtr( - strconv.Itoa(length), - ) + extraColumnName := strings.ToLower(cName + transformer.LengthColumnSuffix) + extraMaskSchema[extraColumnName] = serializer.ExtraMaskInfo{ + Masked: false, + ColumnType: redshift.RedshiftInteger, + DefaultVal: "0", + } + extraColumnValue[extraColumnName] = stringPtr(strconv.Itoa(length)) } - if mobileKey { var tMobile *string if cVal == nil { @@ -116,9 +123,13 @@ func (m *masker) Transform( mobile[:exposedLength], ) } - extraColumns[cName+transformer.MobileCoulmnSuffix] = tMobile + extraColumnName := strings.ToLower(cName + transformer.MobileCoulmnSuffix) + extraMaskSchema[extraColumnName] = serializer.ExtraMaskInfo{ + Masked: false, + ColumnType: redshift.RedshiftMobileColType, + } + extraColumnValue[extraColumnName] = tMobile } - if mappingPIIKey { var hashedValue *string if cVal == nil || strings.TrimSpace(*cVal) == "" { @@ -126,10 +137,24 @@ func (m *masker) Transform( } else { hashedValue = Mask(*cVal, m.salt) } - - extraColumns[transformer.MappingPIIColumnPrefix+cName] = hashedValue - mappingPIIColumns[transformer.MappingPIIColumnPrefix+cName] = true + extraColumnName := strings.ToLower(transformer.MappingPIIColumnPrefix + cName) + extraMaskSchema[extraColumnName] = serializer.ExtraMaskInfo{ + Masked: true, + ColumnType: redshift.RedshiftMaskedDataType, + } + extraColumnValue[extraColumnName] = hashedValue } + var boolColumnKey bool + if len(boolColumns) > 0 { + boolColumnKey = true + for boolCol, boolVal := range boolColumns { + extraMaskSchema[boolCol] = serializer.ExtraMaskInfo{ + Masked: false, + ColumnType: redshift.RedshiftBoolean, + } + extraColumnValue[boolCol] = &boolVal + } + } // all extra columns handled // special case for mapping PII keys if mappingPIIKeyTable { @@ -152,40 +177,31 @@ func (m *masker) Transform( } maskSchema[cName] = serializer.MaskInfo{ - Masked: !unmasked, - SortCol: sortKey, - DistCol: distKey, - LengthCol: lengthKey, - MobileCol: mobileKey, - MappingPIICol: mappingPIIKey, - ConditionalNonPIICol: conditionalNonPiiKey, - DependentNonPIICol: dependentNonPiiKey, + Masked: !unmasked, + + SortCol: sortKey, + DistCol: distKey, + + LengthCol: lengthKey, + MobileCol: mobileKey, + MappingPIICol: mappingPIIKey, + ConditionalNonPIICol: conditionalNonPiiKey, + DependentNonPIICol: dependentNonPiiKey, + RegexPatternBooleanCol: boolColumnKey, } } - for cName, cVal := range extraColumns { + // append the extra columns so that the data reaches s3 + for cName, cVal := range extraColumnValue { // send value in json only when it is not nil, so that NULL takes effect if cVal != nil { columns[cName] = cVal } - - var maskedExtraColumn bool - _, ok := mappingPIIColumns[cName] - if ok { - maskedExtraColumn = true - } - - maskSchema[cName] = serializer.MaskInfo{ - Masked: maskedExtraColumn, - // extra length column, don't need more extras so below 3 - LengthCol: false, - MobileCol: false, - MappingPIICol: false, - } } message.Value = columns message.MaskSchema = maskSchema + message.ExtraMaskSchema = extraMaskSchema return nil } diff --git a/pkg/transformer/masker/masker_test.go b/pkg/transformer/masker/masker_test.go index 8c5918551..079b18064 100644 --- a/pkg/transformer/masker/masker_test.go +++ b/pkg/transformer/masker/masker_test.go @@ -436,6 +436,76 @@ func TestMasker(t *testing.T) { }, }, }, + { + name: "test18: boolean_keys regex matched, check value of main free text col", + topic: "dbserver.database.customers", + cName: "favourite_quote", + columns: map[string]*string{ + "favourite_quote": stringPtr("Life would be tragic if it weren't funny"), + }, + resultVal: stringPtr("3212da4cc0dc4c0023b912dcacab20f55feabb2e"), + resultMaskSchema: map[string]serializer.MaskInfo{ + "favourite_quote": serializer.MaskInfo{Masked: true}, + "favourite_quote_has_philosphy": serializer.MaskInfo{Masked: false}, + }, + redshiftTable: redshift.Table{}, + }, + { + name: "test19: boolean_keys regex match failed, check value of main free text col", + topic: "dbserver.database.customers", + cName: "favourite_quote", + columns: map[string]*string{ + "favourite_quote": stringPtr("Wife would be tragic if she wasn't funny"), + }, + resultVal: stringPtr("840ed39a7e650148cbdf0d516194d5c67c035e55"), + resultMaskSchema: map[string]serializer.MaskInfo{ + "favourite_quote": serializer.MaskInfo{Masked: true}, + "favourite_quote_has_philosphy": serializer.MaskInfo{Masked: false}, + }, + redshiftTable: redshift.Table{}, + }, + { + name: "test20: boolean_keys regex matched, check value of main free text col", + topic: "dbserver.database.customers", + cName: "favourite_quote_has_philosphy", + columns: map[string]*string{ + "favourite_quote": stringPtr("Life would be tragic if it weren't funny"), + }, + resultVal: stringPtr("true"), + resultMaskSchema: map[string]serializer.MaskInfo{ + "favourite_quote": serializer.MaskInfo{Masked: true}, + "favourite_quote_has_philosphy": serializer.MaskInfo{Masked: false}, + }, + redshiftTable: redshift.Table{}, + }, + { + name: "test21: boolean_keys regex match failed - check value of extra col", + topic: "dbserver.database.customers", + cName: "favourite_quote_has_philosphy", + columns: map[string]*string{ + "favourite_quote": stringPtr("Wife would be tragic if she wasn't funny"), + }, + resultVal: stringPtr("false"), + resultMaskSchema: map[string]serializer.MaskInfo{ + "favourite_quote": serializer.MaskInfo{Masked: true}, + "favourite_quote_has_philosphy": serializer.MaskInfo{Masked: false}, + }, + redshiftTable: redshift.Table{}, + }, + { + name: "test22 boolean_keys regex matched", + topic: "dbserver.database.customers", + cName: "favourite_food_has_pizza", + columns: map[string]*string{ + "favourite_food": stringPtr("pizza,pasta,burgers,kebabs"), + }, + resultVal: stringPtr("true"), + resultMaskSchema: map[string]serializer.MaskInfo{ + "favourite_food": serializer.MaskInfo{Masked: true}, + "favourite_food_has_pizza": serializer.MaskInfo{Masked: false}, + }, + redshiftTable: redshift.Table{}, + }, } for _, tc := range tests { diff --git a/pkg/transformer/transformer.go b/pkg/transformer/transformer.go index a42d6ee34..24538947e 100644 --- a/pkg/transformer/transformer.go +++ b/pkg/transformer/transformer.go @@ -29,7 +29,9 @@ type SchemaTransformer interface { topic string, schemaId int, schemaIdKey int, - maskSchema map[string]serializer.MaskInfo) (interface{}, error) + maskSchema map[string]serializer.MaskInfo, + extraMaskSchema map[string]serializer.ExtraMaskInfo, + ) (interface{}, error) // Deprecated: // TransformKey transforms the topic schema into name of the primary From a83239c689279e23dcdf918f3e1582c2fe0c5392 Mon Sep 17 00:00:00 2001 From: Alok Kumar Singh Date: Thu, 20 May 2021 17:32:07 +0530 Subject: [PATCH 02/10] Fix and update tests for extraColumns --- pkg/transformer/masker/masker_test.go | 123 +++++++++++++++++++++----- 1 file changed, 102 insertions(+), 21 deletions(-) diff --git a/pkg/transformer/masker/masker_test.go b/pkg/transformer/masker/masker_test.go index 079b18064..fd4d3d76b 100644 --- a/pkg/transformer/masker/masker_test.go +++ b/pkg/transformer/masker/masker_test.go @@ -45,6 +45,7 @@ func TestSaltMask(t *testing.T) { func testMasker(t *testing.T, salt, topic, cName string, columns map[string]*string, result *string, resultMaskSchema map[string]serializer.MaskInfo, + extraMaskSchema map[string]serializer.ExtraMaskInfo, redshiftTable redshift.Table) { dir, err := os.Getwd() @@ -84,6 +85,7 @@ func testMasker(t *testing.T, salt, topic, cName string, t.Errorf("column=%+v, maskColumn=%+v missing\n", column, maskInfo) continue } + if maskColumn.Masked != maskInfo.Masked || maskColumn.SortCol != maskInfo.SortCol || maskColumn.DistCol != maskInfo.DistCol || @@ -96,6 +98,24 @@ func testMasker(t *testing.T, salt, topic, cName string, } } + if len(extraMaskSchema) > 0 { + for column, extraMaskInfo := range extraMaskSchema { + extraMaskColumn, ok := message.ExtraMaskSchema[column] + if !ok { + t.Errorf("column=%+v, maskColumn=%+v missing\n", column, extraMaskInfo) + continue + } + + if extraMaskColumn.Masked != extraMaskInfo.Masked || + extraMaskColumn.ColumnType != extraMaskInfo.ColumnType || + extraMaskColumn.DefaultVal != extraMaskInfo.DefaultVal { + t.Errorf( + "extraColumn=%v, extraMaskColumn=%+v does not match %+v\n", + column, extraMaskColumn, extraMaskInfo) + } + } + } + if maskedColumns[cName] == nil { if maskedColumns[cName] != result { t.Errorf( @@ -125,6 +145,7 @@ func TestMasker(t *testing.T) { columns map[string]*string resultVal *string resultMaskSchema map[string]serializer.MaskInfo + extraMaskSchema map[string]serializer.ExtraMaskInfo redshiftTable redshift.Table }{ { @@ -141,6 +162,7 @@ func TestMasker(t *testing.T) { }, resultVal: stringPtr("1001"), resultMaskSchema: make(map[string]serializer.MaskInfo), + extraMaskSchema: make(map[string]serializer.ExtraMaskInfo), redshiftTable: redshift.Table{}, }, { @@ -158,6 +180,7 @@ func TestMasker(t *testing.T) { resultVal: stringPtr( "9ba53e85b996f6278aa647d8da8f355aafd16149"), resultMaskSchema: make(map[string]serializer.MaskInfo), + extraMaskSchema: make(map[string]serializer.ExtraMaskInfo), redshiftTable: redshift.Table{}, }, { @@ -174,6 +197,7 @@ func TestMasker(t *testing.T) { }, resultVal: nil, resultMaskSchema: make(map[string]serializer.MaskInfo), + extraMaskSchema: make(map[string]serializer.ExtraMaskInfo), redshiftTable: redshift.Table{}, }, { @@ -190,6 +214,7 @@ func TestMasker(t *testing.T) { }, resultVal: stringPtr("2020-09-20 20:56:45"), resultMaskSchema: make(map[string]serializer.MaskInfo), + extraMaskSchema: make(map[string]serializer.ExtraMaskInfo), redshiftTable: redshift.Table{}, }, { @@ -206,6 +231,7 @@ func TestMasker(t *testing.T) { }, resultVal: stringPtr("20"), resultMaskSchema: make(map[string]serializer.MaskInfo), + extraMaskSchema: make(map[string]serializer.ExtraMaskInfo), redshiftTable: redshift.Table{}, }, { @@ -222,6 +248,7 @@ func TestMasker(t *testing.T) { }, resultVal: stringPtr("d129eef03b45b9679db4d35922786281ee805877"), resultMaskSchema: make(map[string]serializer.MaskInfo), + extraMaskSchema: make(map[string]serializer.ExtraMaskInfo), redshiftTable: redshift.Table{}, }, { @@ -238,6 +265,7 @@ func TestMasker(t *testing.T) { }, resultVal: stringPtr("Batman"), resultMaskSchema: make(map[string]serializer.MaskInfo), + extraMaskSchema: make(map[string]serializer.ExtraMaskInfo), redshiftTable: redshift.Table{}, }, { @@ -263,7 +291,8 @@ func TestMasker(t *testing.T) { "email": serializer.MaskInfo{ Masked: true, DistCol: true, LengthCol: true}, }, - redshiftTable: redshift.Table{}, + extraMaskSchema: make(map[string]serializer.ExtraMaskInfo), + redshiftTable: redshift.Table{}, }, { name: "test9: mask schema test when field is not in config)", @@ -290,7 +319,8 @@ func TestMasker(t *testing.T) { Masked: true, DistCol: true, LengthCol: true}, "dob": serializer.MaskInfo{Masked: true}, }, - redshiftTable: redshift.Table{}, + extraMaskSchema: make(map[string]serializer.ExtraMaskInfo), + redshiftTable: redshift.Table{}, }, { name: "test10: case insensitivity (sort keys, dist keys)", @@ -309,7 +339,8 @@ func TestMasker(t *testing.T) { "createdat": serializer.MaskInfo{SortCol: true}, "email": serializer.MaskInfo{Masked: true}, }, - redshiftTable: redshift.Table{}, + extraMaskSchema: make(map[string]serializer.ExtraMaskInfo), + redshiftTable: redshift.Table{}, }, { name: "test11: case insensitivity1 (conditionalNonPii)", @@ -324,7 +355,8 @@ func TestMasker(t *testing.T) { "justice": serializer.MaskInfo{Masked: true}, "reason": serializer.MaskInfo{Masked: true}, }, - redshiftTable: redshift.Table{}, + extraMaskSchema: make(map[string]serializer.ExtraMaskInfo), + redshiftTable: redshift.Table{}, }, { name: "test12: case insensitivity2 (conditionalNonPii)", @@ -339,7 +371,8 @@ func TestMasker(t *testing.T) { "justice": serializer.MaskInfo{Masked: true}, "reason": serializer.MaskInfo{Masked: true}, }, - redshiftTable: redshift.Table{}, + extraMaskSchema: make(map[string]serializer.ExtraMaskInfo), + redshiftTable: redshift.Table{}, }, { name: "test13: case insensitivity (dependentNonPii)", @@ -354,7 +387,8 @@ func TestMasker(t *testing.T) { "justice": serializer.MaskInfo{Masked: true}, "reason": serializer.MaskInfo{Masked: true}, }, - redshiftTable: redshift.Table{}, + extraMaskSchema: make(map[string]serializer.ExtraMaskInfo), + redshiftTable: redshift.Table{}, }, { name: "test14: mobile keys", @@ -365,6 +399,7 @@ func TestMasker(t *testing.T) { }, resultVal: stringPtr("+9198"), resultMaskSchema: make(map[string]serializer.MaskInfo), + extraMaskSchema: make(map[string]serializer.ExtraMaskInfo), redshiftTable: redshift.Table{}, }, { @@ -376,8 +411,14 @@ func TestMasker(t *testing.T) { }, resultVal: stringPtr("9b8297b23539abcda0344522bca05a99feecba10"), resultMaskSchema: map[string]serializer.MaskInfo{ - "id": serializer.MaskInfo{Masked: false}, - "hashed_id": serializer.MaskInfo{Masked: true}, + "id": serializer.MaskInfo{Masked: false}, + }, + extraMaskSchema: map[string]serializer.ExtraMaskInfo{ + "hashed_id": serializer.ExtraMaskInfo{ + Masked: true, + ColumnType: "character varying(50)", + DefaultVal: "", + }, }, redshiftTable: redshift.Table{}, }, @@ -390,8 +431,14 @@ func TestMasker(t *testing.T) { }, resultVal: stringPtr("2011"), resultMaskSchema: map[string]serializer.MaskInfo{ - "id": serializer.MaskInfo{Masked: false}, - "hashed_id": serializer.MaskInfo{Masked: true}, + "id": serializer.MaskInfo{Masked: false}, + }, + extraMaskSchema: map[string]serializer.ExtraMaskInfo{ + "hashed_id": serializer.ExtraMaskInfo{ + Masked: true, + ColumnType: "character varying(50)", + DefaultVal: "", + }, }, redshiftTable: redshift.Table{}, }, @@ -406,6 +453,7 @@ func TestMasker(t *testing.T) { resultMaskSchema: map[string]serializer.MaskInfo{ "plan_enabled": serializer.MaskInfo{Masked: true}, }, + extraMaskSchema: make(map[string]serializer.ExtraMaskInfo), redshiftTable: redshift.Table{ Columns: []redshift.ColInfo{ redshift.ColInfo{ @@ -428,6 +476,7 @@ func TestMasker(t *testing.T) { resultMaskSchema: map[string]serializer.MaskInfo{ "notes": serializer.MaskInfo{Masked: true}, }, + extraMaskSchema: make(map[string]serializer.ExtraMaskInfo), redshiftTable: redshift.Table{ Columns: []redshift.ColInfo{ redshift.ColInfo{ @@ -445,8 +494,14 @@ func TestMasker(t *testing.T) { }, resultVal: stringPtr("3212da4cc0dc4c0023b912dcacab20f55feabb2e"), resultMaskSchema: map[string]serializer.MaskInfo{ - "favourite_quote": serializer.MaskInfo{Masked: true}, - "favourite_quote_has_philosphy": serializer.MaskInfo{Masked: false}, + "favourite_quote": serializer.MaskInfo{Masked: true}, + }, + extraMaskSchema: map[string]serializer.ExtraMaskInfo{ + "favourite_quote_has_philosphy": serializer.ExtraMaskInfo{ + Masked: false, + ColumnType: "boolean", + DefaultVal: "", + }, }, redshiftTable: redshift.Table{}, }, @@ -459,8 +514,14 @@ func TestMasker(t *testing.T) { }, resultVal: stringPtr("840ed39a7e650148cbdf0d516194d5c67c035e55"), resultMaskSchema: map[string]serializer.MaskInfo{ - "favourite_quote": serializer.MaskInfo{Masked: true}, - "favourite_quote_has_philosphy": serializer.MaskInfo{Masked: false}, + "favourite_quote": serializer.MaskInfo{Masked: true}, + }, + extraMaskSchema: map[string]serializer.ExtraMaskInfo{ + "favourite_quote_has_philosphy": serializer.ExtraMaskInfo{ + Masked: false, + ColumnType: "boolean", + DefaultVal: "", + }, }, redshiftTable: redshift.Table{}, }, @@ -473,8 +534,14 @@ func TestMasker(t *testing.T) { }, resultVal: stringPtr("true"), resultMaskSchema: map[string]serializer.MaskInfo{ - "favourite_quote": serializer.MaskInfo{Masked: true}, - "favourite_quote_has_philosphy": serializer.MaskInfo{Masked: false}, + "favourite_quote": serializer.MaskInfo{Masked: true}, + }, + extraMaskSchema: map[string]serializer.ExtraMaskInfo{ + "favourite_quote_has_philosphy": serializer.ExtraMaskInfo{ + Masked: false, + ColumnType: "boolean", + DefaultVal: "", + }, }, redshiftTable: redshift.Table{}, }, @@ -487,8 +554,14 @@ func TestMasker(t *testing.T) { }, resultVal: stringPtr("false"), resultMaskSchema: map[string]serializer.MaskInfo{ - "favourite_quote": serializer.MaskInfo{Masked: true}, - "favourite_quote_has_philosphy": serializer.MaskInfo{Masked: false}, + "favourite_quote": serializer.MaskInfo{Masked: true}, + }, + extraMaskSchema: map[string]serializer.ExtraMaskInfo{ + "favourite_quote_has_philosphy": serializer.ExtraMaskInfo{ + Masked: false, + ColumnType: "boolean", + DefaultVal: "", + }, }, redshiftTable: redshift.Table{}, }, @@ -501,8 +574,14 @@ func TestMasker(t *testing.T) { }, resultVal: stringPtr("true"), resultMaskSchema: map[string]serializer.MaskInfo{ - "favourite_food": serializer.MaskInfo{Masked: true}, - "favourite_food_has_pizza": serializer.MaskInfo{Masked: false}, + "favourite_food": serializer.MaskInfo{Masked: true}, + }, + extraMaskSchema: map[string]serializer.ExtraMaskInfo{ + "favourite_food_has_pizza": serializer.ExtraMaskInfo{ + Masked: false, + ColumnType: "boolean", + DefaultVal: "", + }, }, redshiftTable: redshift.Table{}, }, @@ -514,7 +593,9 @@ func TestMasker(t *testing.T) { testMasker( t, salt, tc.topic, tc.cName, tc.columns, - tc.resultVal, tc.resultMaskSchema, + tc.resultVal, + tc.resultMaskSchema, + tc.extraMaskSchema, tc.redshiftTable, ) }) From 40c52b106796f738bf848c62c921f5c816ce95e2 Mon Sep 17 00:00:00 2001 From: Alok Kumar Singh Date: Thu, 20 May 2021 17:49:20 +0530 Subject: [PATCH 03/10] Rephrased it to sound simple --- MASKING.md | 2 +- pkg/transformer/masker/mask_config.go | 5 +++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/MASKING.md b/MASKING.md index 9ac42f6d7..1b54345ef 100644 --- a/MASKING.md +++ b/MASKING.md @@ -104,7 +104,7 @@ include_tables: ``` ### Regex Pattern Boolean Keys -Free text columns can contain PII so we do not unmask it, but we want the user to make aggregate analysis on the non pii data in it. So using this a user gets boolean column stating that the text/regex in the complete free text is present. +Helps in keeping free text columns masked and adds a boolean column giving boolean info about the kind of value in the free text column. For example: We add a boolean column `favourite_quote_has_philosphy`. If value in column `favourite_quote` matches the regex `'life|time'` (case insensitive), then the value in extra column `favourite_quote_has_philosphy` is `true` else `false`. diff --git a/pkg/transformer/masker/mask_config.go b/pkg/transformer/masker/mask_config.go index 5230d285e..734f87b6b 100644 --- a/pkg/transformer/masker/mask_config.go +++ b/pkg/transformer/masker/mask_config.go @@ -50,8 +50,9 @@ type MaskConfig struct { // IncludeTables restrict tables that are allowed to be sinked. IncludeTables *[]string `yaml:"include_tables,omitempty"` - // RegexPatternBooleanKeys helps on free-text columns which need to be hashed to record - // whether a static regex pattern matches the unhashed text. + // RegexPatternBooleanKeys helps in keeping free text columns masked + // and adds boolean columns giving boolean info about the kind of + // value in the free text column. RegexPatternBooleanKeys map[string]interface{} `yaml:"regex_pattern_boolean_keys,omitempty"` // regexes cache is used to prevent regex Compile on every message mask run. From c1e6ace9fcab8e1e853b23e017cda4e57ec9f0db Mon Sep 17 00:00:00 2001 From: Alok Kumar Singh Date: Fri, 21 May 2021 13:46:08 +0530 Subject: [PATCH 04/10] Review fixes --- pkg/redshiftloader/job.go | 12 ++++++------ pkg/transformer/debezium/schema.go | 6 ++++++ 2 files changed, 12 insertions(+), 6 deletions(-) diff --git a/pkg/redshiftloader/job.go b/pkg/redshiftloader/job.go index 6031d7828..28ab70fc1 100644 --- a/pkg/redshiftloader/job.go +++ b/pkg/redshiftloader/job.go @@ -125,11 +125,11 @@ func StringMapToJob(data map[string]interface{}) Job { } job.MaskSchema = schema case "extraMaskSchema": + extraSchema := make(map[string]serializer.ExtraMaskInfo) if value, ok := v.(string); ok { - job.ExtraMaskSchema = ToExtraMaskSchemaMap(value) - } else { // backward compatibility - job.ExtraMaskSchema = nil + extraSchema = ToExtraMaskSchemaMap(value) } + job.ExtraMaskSchema = extraSchema case "batchBytes": if value, ok := v.(int64); ok { job.BatchBytes = value @@ -296,7 +296,7 @@ func ToExtraMaskSchemaMap(r string) map[string]serializer.ExtraMaskInfo { info := strings.Split(col, ",") - if len(info) != 3 { + if len(info) != 4 { klog.Fatalf("expecting extra mask schema to be length 3, got:%+v, schema:%v", len(info), r) } @@ -307,8 +307,8 @@ func ToExtraMaskSchemaMap(r string) map[string]serializer.ExtraMaskInfo { m[info[0]] = serializer.ExtraMaskInfo{ Masked: masked, - ColumnType: info[1], - DefaultVal: info[2], + ColumnType: info[2], + DefaultVal: info[3], } } diff --git a/pkg/transformer/debezium/schema.go b/pkg/transformer/debezium/schema.go index 507b179d7..1fccd4dcf 100644 --- a/pkg/transformer/debezium/schema.go +++ b/pkg/transformer/debezium/schema.go @@ -391,9 +391,11 @@ func (c *schemaTransformer) transformSchemaValue(jobSchema string, } //deprecated below started -------------------------------------------------------------------- if mschema.LengthCol && len(extraMaskSchema) == 0 { // deprecated in favour of extraMaskSchema + klog.Warningf("Running deprecated code, extraSchema: %+v unused for LengthCol", extraMaskSchema) newColName := strings.ToLower(column.Name) + transformer.LengthColumnSuffix _, ok := extraColumnsMap[newColName] if !ok { + klog.Warningf("(deprecated code) Adding extra column: %v", newColName) extraColumns = append(extraColumns, redshift.ColInfo{ Name: newColName, Type: redshift.RedshiftInteger, @@ -402,9 +404,11 @@ func (c *schemaTransformer) transformSchemaValue(jobSchema string, } } if mschema.MobileCol && len(extraMaskSchema) == 0 { // deprecated in favour of extraMaskSchema + klog.Warningf("Running deprecated code, extraSchema: %+v unused for MobileCol", extraMaskSchema) newColName := strings.ToLower(column.Name) + transformer.MobileCoulmnSuffix _, ok := extraColumnsMap[newColName] if !ok { + klog.Warningf("(deprecated code) Adding extra column: %v", newColName) extraColumns = append(extraColumns, redshift.ColInfo{ Name: newColName, Type: redshift.RedshiftMobileColType, @@ -412,9 +416,11 @@ func (c *schemaTransformer) transformSchemaValue(jobSchema string, } } if mschema.MappingPIICol && len(extraMaskSchema) == 0 { // deprecated in favour of extraMaskSchema + klog.Warningf("Running deprecated code, extraSchema: %+v unused for MappingPIICol", extraMaskSchema) newColName := strings.ToLower(transformer.MappingPIIColumnPrefix + column.Name) _, ok := extraColumnsMap[newColName] if !ok { + klog.Warningf("(deprecated code) Adding extra column: %v", newColName) extraColumns = append(extraColumns, redshift.ColInfo{ Name: newColName, Type: redshift.RedshiftMaskedDataType, From eae1f742d84340251db1a944b92d67652daaa090 Mon Sep 17 00:00:00 2001 From: Alok Kumar Singh Date: Fri, 21 May 2021 14:33:16 +0530 Subject: [PATCH 05/10] Review fixes; use case insensitivity pattern avoid string lower https://github.com/practo/tipoca-stream/pull/232#discussion_r636742962 https://stackoverflow.com/questions/15326421/how-do-i-do-a-case-insensitive-regular-expression-in-go/15326471#15326471 --- MASKING.md | 4 +++- pkg/transformer/masker/mask_config.go | 12 ++++++------ pkg/transformer/masker/masker_test.go | 20 ++++++++++++++++++++ 3 files changed, 29 insertions(+), 7 deletions(-) diff --git a/MASKING.md b/MASKING.md index 1b54345ef..5a9e30597 100644 --- a/MASKING.md +++ b/MASKING.md @@ -107,7 +107,9 @@ include_tables: Helps in keeping free text columns masked and adds a boolean column giving boolean info about the kind of value in the free text column. For example: We add a boolean column `favourite_quote_has_philosphy`. -If value in column `favourite_quote` matches the regex `'life|time'` (case insensitive), then the value in extra column `favourite_quote_has_philosphy` is `true` else `false`. +If value in column `favourite_quote` matches the regex `'life|time'`, then the value in extra column `favourite_quote_has_philosphy` is `true` else `false`. + +Regex match is case insensitive. ```yaml regex_pattern_boolean_keys: diff --git a/pkg/transformer/masker/mask_config.go b/pkg/transformer/masker/mask_config.go index 734f87b6b..c04344735 100644 --- a/pkg/transformer/masker/mask_config.go +++ b/pkg/transformer/masker/mask_config.go @@ -351,20 +351,20 @@ func (m MaskConfig) BoolColumns(table, cName string, cValue *string) map[string] for regexNameRaw, patternRaw := range regexes { regexName := regexNameRaw.(string) regexName = strings.ToLower(regexName) - pattern := strings.ToLower(patternRaw.(string)) + caseInsensitivePattern := fmt.Sprintf("(?i)%s", patternRaw.(string)) var err error - regex, ok := m.regexes[pattern] + regex, ok := m.regexes[caseInsensitivePattern] if !ok { - regex, err = regexp.Compile(pattern) + regex, err = regexp.Compile(caseInsensitivePattern) if err != nil { klog.Fatalf( - "Regex: %s compile failed, err:%v\n", pattern, err) + "Regex: %s compile failed, err:%v\n", caseInsensitivePattern, err) } - m.regexes[pattern] = regex + m.regexes[caseInsensitivePattern] = regex } - if regex.MatchString(strings.ToLower(*cValue)) { + if regex.MatchString(*cValue) { boolColumns[fmt.Sprintf("%s_%s", freeTextColumnName, regexName)] = "true" } else { boolColumns[fmt.Sprintf("%s_%s", freeTextColumnName, regexName)] = "false" diff --git a/pkg/transformer/masker/masker_test.go b/pkg/transformer/masker/masker_test.go index fd4d3d76b..a5dd6a0bd 100644 --- a/pkg/transformer/masker/masker_test.go +++ b/pkg/transformer/masker/masker_test.go @@ -585,6 +585,26 @@ func TestMasker(t *testing.T) { }, redshiftTable: redshift.Table{}, }, + { + name: "test23 boolean_keys case insensitivity test", + topic: "dbserver.database.customers", + cName: "favourite_food_has_pizza", + columns: map[string]*string{ + "favourite_food": stringPtr("PIZza, i love it, i love it"), + }, + resultVal: stringPtr("true"), + resultMaskSchema: map[string]serializer.MaskInfo{ + "favourite_food": serializer.MaskInfo{Masked: true}, + }, + extraMaskSchema: map[string]serializer.ExtraMaskInfo{ + "favourite_food_has_pizza": serializer.ExtraMaskInfo{ + Masked: false, + ColumnType: "boolean", + DefaultVal: "", + }, + }, + redshiftTable: redshift.Table{}, + }, } for _, tc := range tests { From 9368d1abc6c8cdab9cab86733fab3adf7a509c60 Mon Sep 17 00:00:00 2001 From: Alok Kumar Singh Date: Fri, 21 May 2021 14:46:32 +0530 Subject: [PATCH 06/10] Fix schema syntax Either the input schema or one its references is invalid --- pkg/redshiftloader/job.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/redshiftloader/job.go b/pkg/redshiftloader/job.go index 28ab70fc1..1c0343656 100644 --- a/pkg/redshiftloader/job.go +++ b/pkg/redshiftloader/job.go @@ -19,7 +19,7 @@ var JobAvroSchema string = `{ {"name": "schemaId", "type": "int"}, {"name": "schemaIdKey", "type": "int", "default": -1}, {"name": "maskSchema", "type": "string"}, - {"name": "extraMaskSchema", "type": "string", default: ""}, + {"name": "extraMaskSchema", "type": "string", "default": ""}, {"name": "skipMerge", "type": "string", "default": ""}, {"name": "batchBytes", "type": "long", "default": 0}, {"name": "createEvents", "type": "long", "default": 0}, From bd663adf21e9ea6b88bababf49e769d2e8b87843 Mon Sep 17 00:00:00 2001 From: Alok Kumar Singh Date: Fri, 21 May 2021 15:02:19 +0530 Subject: [PATCH 07/10] Add the new key: RegexPatternBooleanKeys in the differ --- pkg/transformer/masker/mask_diff.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pkg/transformer/masker/mask_diff.go b/pkg/transformer/masker/mask_diff.go index f2014ce82..9a30934ef 100644 --- a/pkg/transformer/masker/mask_diff.go +++ b/pkg/transformer/masker/mask_diff.go @@ -107,4 +107,6 @@ func (m *MaskDiffer) Diff() { m.current.ConditionalNonPiiKeys, m.desired.ConditionalNonPiiKeys) m.diffMapInterface( m.current.DependentNonPiiKeys, m.desired.DependentNonPiiKeys) + m.diffMapInterface( + m.current.RegexPatternBooleanKeys, m.desired.RegexPatternBooleanKeys) } From 5abca89370edcbf687ae7718ed4e63dc414b2f3a Mon Sep 17 00:00:00 2001 From: Alok Kumar Singh Date: Fri, 21 May 2021 15:35:56 +0530 Subject: [PATCH 08/10] Fix https://github.com/practo/tipoca-stream/pull/232#issuecomment-845834142 Add test also for nil valued columns --- pkg/transformer/masker/mask_config.go | 10 ++++------ pkg/transformer/masker/masker_test.go | 20 ++++++++++++++++++++ 2 files changed, 24 insertions(+), 6 deletions(-) diff --git a/pkg/transformer/masker/mask_config.go b/pkg/transformer/masker/mask_config.go index c04344735..16123a7f9 100644 --- a/pkg/transformer/masker/mask_config.go +++ b/pkg/transformer/masker/mask_config.go @@ -316,12 +316,10 @@ func (m MaskConfig) DependentNonPiiKey(table, cName string) bool { return false } -// BoolColumns returns the map of boolean column name and its value +// BoolColumns returns extra boolean columns for the parent column(free text col) +// to make analysis on the data contained in parent column possible using the +// boolean columns func (m MaskConfig) BoolColumns(table, cName string, cValue *string) map[string]string { - if cValue == nil { - return nil - } - columnsToCheckRaw, ok := m.RegexPatternBooleanKeys[table] if !ok { return nil @@ -364,7 +362,7 @@ func (m MaskConfig) BoolColumns(table, cName string, cValue *string) map[string] m.regexes[caseInsensitivePattern] = regex } - if regex.MatchString(*cValue) { + if cValue != nil && regex.MatchString(*cValue) { boolColumns[fmt.Sprintf("%s_%s", freeTextColumnName, regexName)] = "true" } else { boolColumns[fmt.Sprintf("%s_%s", freeTextColumnName, regexName)] = "false" diff --git a/pkg/transformer/masker/masker_test.go b/pkg/transformer/masker/masker_test.go index a5dd6a0bd..326f38b8b 100644 --- a/pkg/transformer/masker/masker_test.go +++ b/pkg/transformer/masker/masker_test.go @@ -605,6 +605,26 @@ func TestMasker(t *testing.T) { }, redshiftTable: redshift.Table{}, }, + { + name: "test24 boolean_keys test for nil col", + topic: "dbserver.database.customers", + cName: "favourite_food_has_pizza", + columns: map[string]*string{ + "favourite_food": stringPtr(""), + }, + resultVal: stringPtr("false"), + resultMaskSchema: map[string]serializer.MaskInfo{ + "favourite_food": serializer.MaskInfo{Masked: true}, + }, + extraMaskSchema: map[string]serializer.ExtraMaskInfo{ + "favourite_food_has_pizza": serializer.ExtraMaskInfo{ + Masked: false, + ColumnType: "boolean", + DefaultVal: "", + }, + }, + redshiftTable: redshift.Table{}, + }, } for _, tc := range tests { From a7381967ba5aba641006d79b5cbc1e65715f2961 Mon Sep 17 00:00:00 2001 From: Alok Kumar Singh Date: Fri, 21 May 2021 17:27:46 +0530 Subject: [PATCH 09/10] Maintain extra cols order https://github.com/practo/tipoca-stream/pull/232#discussion_r636790381 --- pkg/transformer/debezium/schema.go | 13 ++++++ pkg/transformer/debezium/schema_test.go | 59 +++++++++++++++++++++++++ 2 files changed, 72 insertions(+) diff --git a/pkg/transformer/debezium/schema.go b/pkg/transformer/debezium/schema.go index 1fccd4dcf..09e241e24 100644 --- a/pkg/transformer/debezium/schema.go +++ b/pkg/transformer/debezium/schema.go @@ -9,6 +9,7 @@ import ( "github.com/practo/tipoca-stream/redshiftsink/pkg/serializer" "github.com/practo/tipoca-stream/redshiftsink/pkg/transformer" "github.com/practo/tipoca-stream/redshiftsink/pkg/transformer/masker" + "sort" "strings" ) @@ -338,6 +339,15 @@ func (c *schemaTransformer) TransformValue( ) } +func sortExtraColumns(extraColumns []redshift.ColInfo) { + sort.Slice( + extraColumns, + func(i, j int) bool { + return extraColumns[i].Name < extraColumns[j].Name + }, + ) +} + func (c *schemaTransformer) transformSchemaValue(jobSchema string, primaryKeys []string, maskSchema map[string]serializer.MaskInfo, @@ -431,6 +441,9 @@ func (c *schemaTransformer) transformSchemaValue(jobSchema string, } } + // sort to keep the order consistent for the redshift table schema + sortExtraColumns(extraColumns) + var redshiftDataType string if useStringMax { redshiftDataType = redshift.RedshiftStringMax diff --git a/pkg/transformer/debezium/schema_test.go b/pkg/transformer/debezium/schema_test.go index 86ebbd2e6..f0d96b60f 100644 --- a/pkg/transformer/debezium/schema_test.go +++ b/pkg/transformer/debezium/schema_test.go @@ -3,9 +3,68 @@ package debezium import ( "github.com/practo/tipoca-stream/redshiftsink/pkg/redshift" "github.com/practo/tipoca-stream/redshiftsink/pkg/serializer" + "reflect" "testing" ) +func TestExtraColumnSort(t *testing.T) { + var ec, rec []redshift.ColInfo + ec = append( + ec, + redshift.ColInfo{ + Name: "email_length", + Type: "email", + DefaultVal: "", + }, + redshift.ColInfo{ + Name: "has_covid", + Type: "string", + DefaultVal: "", + }, + redshift.ColInfo{ + Name: "a1988born", + Type: "string", + DefaultVal: "", + }, + redshift.ColInfo{ + Name: "b1986born", + Type: "string", + DefaultVal: "", + }, + ) + + sortExtraColumns(ec) + + rec = append( + rec, + redshift.ColInfo{ + Name: "a1988born", + Type: "string", + DefaultVal: "", + }, + redshift.ColInfo{ + Name: "b1986born", + Type: "string", + DefaultVal: "", + }, + redshift.ColInfo{ + Name: "email_length", + Type: "email", + DefaultVal: "", + }, + redshift.ColInfo{ + Name: "has_covid", + Type: "string", + DefaultVal: "", + }, + ) + + if !reflect.DeepEqual(rec, ec) { + t.Errorf("rec!=ec, got=%+v\n, expected=%+v", rec, ec) + } + +} + func TestSchemaMysqlDataType(t *testing.T) { t.Parallel() From d18f20a64df221ba8e5ff2cdd690ea5d2997c9fc Mon Sep 17 00:00:00 2001 From: Alok Kumar Singh Date: Fri, 21 May 2021 18:32:38 +0530 Subject: [PATCH 10/10] Multi regex bug fixed --- pkg/transformer/masker/database.yaml | 3 +++ pkg/transformer/masker/mask_config.go | 9 +++++---- pkg/transformer/masker/masker.go | 2 +- pkg/transformer/masker/masker_test.go | 20 ++++++++++++++++++++ 4 files changed, 29 insertions(+), 5 deletions(-) diff --git a/pkg/transformer/masker/database.yaml b/pkg/transformer/masker/database.yaml index 026b67255..ce1c8f730 100644 --- a/pkg/transformer/masker/database.yaml +++ b/pkg/transformer/masker/database.yaml @@ -63,3 +63,6 @@ regex_pattern_boolean_keys: has_philosphy: 'life|time' favourite_food: has_pizza: 'pizza' + dob: + 1986born: '1986-.*' + 1988born: '1988-.*' diff --git a/pkg/transformer/masker/mask_config.go b/pkg/transformer/masker/mask_config.go index 16123a7f9..bd7ed929e 100644 --- a/pkg/transformer/masker/mask_config.go +++ b/pkg/transformer/masker/mask_config.go @@ -319,7 +319,7 @@ func (m MaskConfig) DependentNonPiiKey(table, cName string) bool { // BoolColumns returns extra boolean columns for the parent column(free text col) // to make analysis on the data contained in parent column possible using the // boolean columns -func (m MaskConfig) BoolColumns(table, cName string, cValue *string) map[string]string { +func (m MaskConfig) BoolColumns(table, cName string, cValue *string) map[string]*string { columnsToCheckRaw, ok := m.RegexPatternBooleanKeys[table] if !ok { return nil @@ -331,7 +331,7 @@ func (m MaskConfig) BoolColumns(table, cName string, cValue *string) map[string] "Type assertion error! table: %s, cName: %s\n", table, cName) } - boolColumns := make(map[string]string) + boolColumns := make(map[string]*string) for freeTextColumnNameRaw, regexesRaw := range columnsToCheck { freeTextColumnName := freeTextColumnNameRaw.(string) freeTextColumnName = strings.ToLower(freeTextColumnName) // favourite_quote @@ -363,10 +363,11 @@ func (m MaskConfig) BoolColumns(table, cName string, cValue *string) map[string] } if cValue != nil && regex.MatchString(*cValue) { - boolColumns[fmt.Sprintf("%s_%s", freeTextColumnName, regexName)] = "true" + boolColumns[fmt.Sprintf("%s_%s", freeTextColumnName, regexName)] = stringPtr("true") } else { - boolColumns[fmt.Sprintf("%s_%s", freeTextColumnName, regexName)] = "false" + boolColumns[fmt.Sprintf("%s_%s", freeTextColumnName, regexName)] = stringPtr("false") } + } } diff --git a/pkg/transformer/masker/masker.go b/pkg/transformer/masker/masker.go index 5941851f8..656e0d13e 100644 --- a/pkg/transformer/masker/masker.go +++ b/pkg/transformer/masker/masker.go @@ -152,7 +152,7 @@ func (m *masker) Transform( Masked: false, ColumnType: redshift.RedshiftBoolean, } - extraColumnValue[boolCol] = &boolVal + extraColumnValue[boolCol] = boolVal } } // all extra columns handled diff --git a/pkg/transformer/masker/masker_test.go b/pkg/transformer/masker/masker_test.go index 326f38b8b..9f3ec9e73 100644 --- a/pkg/transformer/masker/masker_test.go +++ b/pkg/transformer/masker/masker_test.go @@ -625,6 +625,26 @@ func TestMasker(t *testing.T) { }, redshiftTable: redshift.Table{}, }, + { + name: "test25 boolean_keys_two_regex", + topic: "dbserver.database.customers", + cName: "dob_1986born", + columns: map[string]*string{ + "dob": stringPtr("1988-09-21"), + }, + resultVal: stringPtr("false"), + resultMaskSchema: map[string]serializer.MaskInfo{ + "dob": serializer.MaskInfo{Masked: true}, + }, + extraMaskSchema: map[string]serializer.ExtraMaskInfo{ + "dob_1986born": serializer.ExtraMaskInfo{ + Masked: false, + ColumnType: "boolean", + DefaultVal: "", + }, + }, + redshiftTable: redshift.Table{}, + }, } for _, tc := range tests {