Skip to content

Commit

Permalink
Merge pull request #232 from practo/masking-feature-regex_pattern_boo…
Browse files Browse the repository at this point in the history
…lean_keys

Masking feature: regex pattern boolean keys
  • Loading branch information
alok87 authored May 24, 2021
2 parents 691b77f + d18f20a commit adaea07
Show file tree
Hide file tree
Showing 17 changed files with 713 additions and 207 deletions.
16 changes: 16 additions & 0 deletions MASKING.md
Original file line number Diff line number Diff line change
Expand Up @@ -102,3 +102,19 @@ include_tables:
- customers
- orders
```

### Regex Pattern Boolean Keys
Helps in keeping free text columns masked and adds a boolean column giving boolean info about the kind of value in the free text column.

For example: We add a boolean column `favourite_quote_has_philosphy`.
If value in column `favourite_quote` matches the regex `'life|time'`, then the value in extra column `favourite_quote_has_philosphy` is `true` else `false`.

Regex match is case insensitive.

```yaml
regex_pattern_boolean_keys:
customers:
favourite_quote:
has_philosphy: 'life|time'
has_text_funny: 'funny'
```
2 changes: 2 additions & 0 deletions pkg/redshift/redshift.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import (
)

const (
RedshiftBoolean = "boolean"

RedshiftString = "character varying"
RedshiftStringMax = "character varying(65535)"
RedshiftStringMaxLength = 65535
Expand Down
27 changes: 17 additions & 10 deletions pkg/redshiftbatcher/batch_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ type response struct {
endOffset int64
messagesProcessed int
maskSchema map[string]serializer.MaskInfo
extraMaskSchema map[string]serializer.ExtraMaskInfo
bytesProcessed int64
}

Expand Down Expand Up @@ -270,6 +271,7 @@ func (b *batchProcessor) signalLoad(resp *response) error {
resp.batchSchemaID, // schema of upstream topic's value
b.schemaIDKey, // schema of upstream topic's key
resp.maskSchema,
resp.extraMaskSchema,
resp.skipMerge,
resp.bytesProcessed,
resp.createEvents,
Expand Down Expand Up @@ -333,6 +335,7 @@ func (b *batchProcessor) processMessage(
resp.batchSchemaID,
b.schemaIDKey,
resp.maskSchema,
resp.extraMaskSchema,
)
if err != nil {
return bytesProcessed, fmt.Errorf(
Expand Down Expand Up @@ -390,6 +393,9 @@ func (b *batchProcessor) processMessage(
if b.maskMessages && len(resp.maskSchema) == 0 {
resp.maskSchema = message.MaskSchema
}
if b.maskMessages && len(resp.extraMaskSchema) == 0 {
resp.extraMaskSchema = message.ExtraMaskSchema
}

resp.skipMerge = false // deprecated
klog.V(5).Infof(
Expand Down Expand Up @@ -542,16 +548,17 @@ func (b *batchProcessor) Process(
responses := []*response{}
for i, msgBuf := range msgBufs {
resp := &response{
err: nil,
batchID: i + 1,
batchSchemaID: -1,
skipMerge: false,
createEvents: 0,
updateEvents: 0,
deleteEvents: 0,
s3Key: "",
bodyBuf: bytes.NewBuffer(make([]byte, 0, 4096)),
maskSchema: make(map[string]serializer.MaskInfo),
err: nil,
batchID: i + 1,
batchSchemaID: -1,
skipMerge: false,
createEvents: 0,
updateEvents: 0,
deleteEvents: 0,
s3Key: "",
bodyBuf: bytes.NewBuffer(make([]byte, 0, 4096)),
maskSchema: make(map[string]serializer.MaskInfo),
extraMaskSchema: make(map[string]serializer.ExtraMaskInfo),
}
wg.Add(1)
go b.processBatch(wg, session, msgBuf, resp)
Expand Down
185 changes: 130 additions & 55 deletions pkg/redshiftloader/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package redshiftloader

import (
"fmt"
"github.com/practo/klog/v2"
"github.com/practo/tipoca-stream/redshiftsink/pkg/serializer"
"strings"
)
Expand All @@ -18,6 +19,7 @@ var JobAvroSchema string = `{
{"name": "schemaId", "type": "int"},
{"name": "schemaIdKey", "type": "int", "default": -1},
{"name": "maskSchema", "type": "string"},
{"name": "extraMaskSchema", "type": "string", "default": ""},
{"name": "skipMerge", "type": "string", "default": ""},
{"name": "batchBytes", "type": "long", "default": 0},
{"name": "createEvents", "type": "long", "default": 0},
Expand All @@ -27,41 +29,45 @@ var JobAvroSchema string = `{
}`

type Job struct {
UpstreamTopic string `json:"upstreamTopic"` // batcher topic
StartOffset int64 `json:"startOffset"`
EndOffset int64 `json:"endOffset"`
CsvDialect string `json:"csvDialect"`
S3Path string `json:"s3Path"`
SchemaId int `json:"schemaId"` // schema id of debezium event for the value for upstream topic (batcher topic)
SchemaIdKey int `json:"schemaIdKey"` // schema id of debezium event for the key for upstream topic (batcher topic)
MaskSchema map[string]serializer.MaskInfo `json:"maskSchema"`
SkipMerge bool `json:"skipMerge"` // deprecated in favour of createEvents, updateEvents and deleteEvents
BatchBytes int64 `json:"batchBytes"` // batch bytes store sum of all message bytes in this batch
CreateEvents int64 `json:"createEvents"` // stores count of create events
UpdateEvents int64 `json:"updateEvents"` // stores count of update events
DeleteEvents int64 `json:"deleteEvents"` // stores count of delete events
UpstreamTopic string `json:"upstreamTopic"` // batcher topic
StartOffset int64 `json:"startOffset"`
EndOffset int64 `json:"endOffset"`
CsvDialect string `json:"csvDialect"`
S3Path string `json:"s3Path"`
SchemaId int `json:"schemaId"` // schema id of debezium event for the value for upstream topic (batcher topic)
SchemaIdKey int `json:"schemaIdKey"` // schema id of debezium event for the key for upstream topic (batcher topic)
MaskSchema map[string]serializer.MaskInfo `json:"maskSchema"`
ExtraMaskSchema map[string]serializer.ExtraMaskInfo `json:"extraMaskSchema"`
SkipMerge bool `json:"skipMerge"` // deprecated in favour of createEvents, updateEvents and deleteEvents
BatchBytes int64 `json:"batchBytes"` // batch bytes store sum of all message bytes in this batch
CreateEvents int64 `json:"createEvents"` // stores count of create events
UpdateEvents int64 `json:"updateEvents"` // stores count of update events
DeleteEvents int64 `json:"deleteEvents"` // stores count of delete events
}

func NewJob(
upstreamTopic string, startOffset int64, endOffset int64,
csvDialect string, s3Path string, schemaId int, schemaIdKey int,
maskSchema map[string]serializer.MaskInfo, skipMerge bool,
maskSchema map[string]serializer.MaskInfo,
extraMaskSchema map[string]serializer.ExtraMaskInfo,
skipMerge bool,
batchBytes, createEvents, updateEvents, deleteEvents int64) Job {

return Job{
UpstreamTopic: upstreamTopic,
StartOffset: startOffset,
EndOffset: endOffset,
CsvDialect: csvDialect,
S3Path: s3Path,
SchemaId: schemaId,
SchemaIdKey: schemaIdKey,
MaskSchema: maskSchema,
SkipMerge: skipMerge, // deprecated
BatchBytes: batchBytes,
CreateEvents: createEvents,
UpdateEvents: updateEvents,
DeleteEvents: deleteEvents,
UpstreamTopic: upstreamTopic,
StartOffset: startOffset,
EndOffset: endOffset,
CsvDialect: csvDialect,
S3Path: s3Path,
SchemaId: schemaId,
SchemaIdKey: schemaIdKey,
MaskSchema: maskSchema,
ExtraMaskSchema: extraMaskSchema,
SkipMerge: skipMerge, // deprecated
BatchBytes: batchBytes,
CreateEvents: createEvents,
UpdateEvents: updateEvents,
DeleteEvents: deleteEvents,
}
}

Expand Down Expand Up @@ -115,9 +121,15 @@ func StringMapToJob(data map[string]interface{}) Job {
case "maskSchema":
schema := make(map[string]serializer.MaskInfo)
if value, ok := v.(string); ok {
schema = ToSchemaMap(value)
schema = ToMaskSchemaMap(value)
}
job.MaskSchema = schema
case "extraMaskSchema":
extraSchema := make(map[string]serializer.ExtraMaskInfo)
if value, ok := v.(string); ok {
extraSchema = ToExtraMaskSchemaMap(value)
}
job.ExtraMaskSchema = extraSchema
case "batchBytes":
if value, ok := v.(int64); ok {
job.BatchBytes = value
Expand Down Expand Up @@ -155,7 +167,7 @@ func StringMapToJob(data map[string]interface{}) Job {

// TODO: hack, to release fast, found unwanted complications in
// using map[string]interface in goavro(will revisit)
func ToSchemaMap(r string) map[string]serializer.MaskInfo {
func ToMaskSchemaMap(r string) map[string]serializer.MaskInfo {
m := make(map[string]serializer.MaskInfo)

columns := strings.Split(r, "|")
Expand All @@ -170,7 +182,7 @@ func ToSchemaMap(r string) map[string]serializer.MaskInfo {

info := strings.Split(col, ",")
name := info[0]
var masked, sortCol, distCol, lengthCol, mobileCol, mappingPIICol, conditionalNonPIICol, dependentNonPIICol bool
var masked, sortCol, distCol, lengthCol, mobileCol, mappingPIICol, conditionalNonPIICol, dependentNonPIICol, regexPatternBooleanCol bool
if info[1] == "true" {
masked = true
}
Expand All @@ -196,35 +208,41 @@ func ToSchemaMap(r string) map[string]serializer.MaskInfo {
conditionalNonPIICol = true
}
}
if len(info) == 9 {
if len(info) >= 9 {
if info[8] == "true" {
dependentNonPIICol = true
}
}
if len(info) >= 10 {
if info[9] == "true" {
regexPatternBooleanCol = true
}
}

m[name] = serializer.MaskInfo{
Masked: masked,
SortCol: sortCol,
DistCol: distCol,
LengthCol: lengthCol,
MobileCol: mobileCol,
MappingPIICol: mappingPIICol,
ConditionalNonPIICol: conditionalNonPIICol,
DependentNonPIICol: dependentNonPIICol,
Masked: masked,
SortCol: sortCol,
DistCol: distCol,
LengthCol: lengthCol,
MobileCol: mobileCol,
MappingPIICol: mappingPIICol,
ConditionalNonPIICol: conditionalNonPIICol,
DependentNonPIICol: dependentNonPIICol,
RegexPatternBooleanCol: regexPatternBooleanCol,
}
}

return m
}

// TODO: hack, to release fast, found unwanted complications in
// using map[string]interface in goavro (will revisit)
func ToSchemaString(m map[string]serializer.MaskInfo) string {
// using map[string]interface in goavro (may revisit if required)
func ToMaskSchemaString(m map[string]serializer.MaskInfo) string {
var r string

for name, info := range m {
col := fmt.Sprintf(
"%s,%t,%t,%t,%t,%t,%t,%t,%t",
"%s,%t,%t,%t,%t,%t,%t,%t,%t,%t",
name,
info.Masked,
info.SortCol,
Expand All @@ -234,32 +252,89 @@ func ToSchemaString(m map[string]serializer.MaskInfo) string {
info.MappingPIICol,
info.ConditionalNonPIICol,
info.DependentNonPIICol,
info.RegexPatternBooleanCol,
)
r = r + col + "|"
}

return r
}

// TODO: hack, to release fast, found unwanted complications in
// using map[string]interface in goavro (may revisit if required)
func ToExtraMaskSchemaString(m map[string]serializer.ExtraMaskInfo) string {
var r string

for name, info := range m {
col := fmt.Sprintf(
"%s,%t,%s,%s",
name,
info.Masked,
info.ColumnType,
info.DefaultVal,
)
r = r + col + "|"
}

return r
}

// TODO: hack, to release fast, found unwanted complications in
// using map[string]interface in goavro(will revisit)
func ToExtraMaskSchemaMap(r string) map[string]serializer.ExtraMaskInfo {
m := make(map[string]serializer.ExtraMaskInfo)

columns := strings.Split(r, "|")
if len(columns) == 0 {
return m
}

for _, col := range columns {
if col == "" {
continue
}

info := strings.Split(col, ",")

if len(info) != 4 {
klog.Fatalf("expecting extra mask schema to be length 3, got:%+v, schema:%v", len(info), r)
}

var masked bool
if info[1] == "true" {
masked = true
}

m[info[0]] = serializer.ExtraMaskInfo{
Masked: masked,
ColumnType: info[2],
DefaultVal: info[3],
}
}

return m
}

// ToStringMap returns a map representation of the Job
func (c Job) ToStringMap() map[string]interface{} {
skipMerge := "false" // deprecated not used anymore, backward compatibility
if c.SkipMerge {
skipMerge = "true"
}
return map[string]interface{}{
"upstreamTopic": c.UpstreamTopic,
"startOffset": c.StartOffset,
"endOffset": c.EndOffset,
"csvDialect": c.CsvDialect,
"s3Path": c.S3Path,
"schemaId": c.SchemaId,
"schemaIdKey": c.SchemaIdKey,
"skipMerge": skipMerge,
"maskSchema": ToSchemaString(c.MaskSchema),
"batchBytes": c.BatchBytes,
"createEvents": c.CreateEvents,
"updateEvents": c.UpdateEvents,
"deleteEvents": c.DeleteEvents,
"upstreamTopic": c.UpstreamTopic,
"startOffset": c.StartOffset,
"endOffset": c.EndOffset,
"csvDialect": c.CsvDialect,
"s3Path": c.S3Path,
"schemaId": c.SchemaId,
"schemaIdKey": c.SchemaIdKey,
"skipMerge": skipMerge,
"maskSchema": ToMaskSchemaString(c.MaskSchema),
"extraMaskSchema": ToExtraMaskSchemaString(c.ExtraMaskSchema),
"batchBytes": c.BatchBytes,
"createEvents": c.CreateEvents,
"updateEvents": c.UpdateEvents,
"deleteEvents": c.DeleteEvents,
}
}
2 changes: 2 additions & 0 deletions pkg/redshiftloader/job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ func TestToStringMap(t *testing.T) {
"kafkaoffset": serializer.MaskInfo{},
"id": serializer.MaskInfo{Masked: true},
}
extraMaskSchema := map[string]serializer.ExtraMaskInfo{}

job := NewJob(
"upstreamTopic",
Expand All @@ -22,6 +23,7 @@ func TestToStringMap(t *testing.T) {
1,
2,
maskSchema,
extraMaskSchema,
false,
10,
-1,
Expand Down
1 change: 1 addition & 0 deletions pkg/redshiftloader/load_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -724,6 +724,7 @@ func (b *loadProcessor) processBatch(
schemaId,
schemaIdKey,
job.MaskSchema,
job.ExtraMaskSchema,
)
if err != nil {
return bytesProcessed, fmt.Errorf(
Expand Down
Loading

0 comments on commit adaea07

Please sign in to comment.