Skip to content

Commit

Permalink
fix: IPDatabase Concurrency (#49)
Browse files Browse the repository at this point in the history
* fix: add rwmutex

* fix: global ipdb

* fix: pass wg to transforms

* docs: comment on closure

* feat: add ignore_close

* refactor: globals, options

* docs: ipdb

* fix: undo breakage

* fix: undo breakage

* Update examples/process/ip_lookup/main.go

Co-authored-by: Daniel Stinson-Diess <[email protected]>

Co-authored-by: Daniel Stinson-Diess <[email protected]>
  • Loading branch information
jshlbrd and shellcromancer authored Dec 19, 2022
1 parent 2c9e524 commit f799a6f
Show file tree
Hide file tree
Showing 13 changed files with 240 additions and 74 deletions.
2 changes: 2 additions & 0 deletions build/config/process.libsonnet
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,7 @@
ip_database(input,
output,
database_options,
ignore_close=false,
condition_operator='',
condition_inspectors=[]): {
type: 'ip_database',
Expand All @@ -242,6 +243,7 @@
condition: { operator: condition_operator, inspectors: condition_inspectors },
input_key: input,
output_key: output,
ignore_close: ignore_close,
},
},
lambda(input,
Expand Down
2 changes: 1 addition & 1 deletion cmd/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func (sub *substation) Transform(ctx context.Context, wg *sync.WaitGroup) error
}

log.WithField("transform", sub.config.Transform.Type).Debug("starting transformer")
if err := t.Transform(ctx, sub.channels.transform, sub.channels.sink); err != nil {
if err := t.Transform(ctx, wg, sub.channels.transform, sub.channels.sink); err != nil {
return err
}

Expand Down
4 changes: 4 additions & 0 deletions examples/process/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,7 @@ Reads all lines from JSON file and applies a processor.
## encapsulation

Reads all lines from JSON file as encapsulated data and conditionally applies a processor.

## ip_lookup

Reads all lines from JSON file as encapsulated data and applies the IPDatabase processor using a MaxMind City database.
9 changes: 9 additions & 0 deletions examples/process/ip_lookup/config.jsonnet
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
local ipdb = import '../../../build/config/ip_database.libsonnet';
local process = import '../../../build/config/process.libsonnet';

// the MaxMind City database can be read from local disk, HTTP(S) URL, or AWS S3 URL
// other databases / providers can be used by changing the imported function (e.g., ipdb.maxmind_asn)
local mm_city = ipdb.maxmind_city('location://path/to/maxmind.mmdb');

// applies the IPDatabase processor using a MaxMind City database
process.ip_database(input='addr', output='geo', database_options=mm_city)
3 changes: 3 additions & 0 deletions examples/process/ip_lookup/data.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{"addr":"8.8.8.8"}
{"addr":"9.9.9.9"}
{"addr":"208.67.222.222"}
57 changes: 57 additions & 0 deletions examples/process/ip_lookup/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
// example of reading JSON from a file and applying the IPDatabase processor
package main

import (
"bufio"
"context"
"encoding/json"
"fmt"
"os"

"github.com/brexhq/substation/config"
"github.com/brexhq/substation/process"
)

func main() {
// read lines from data file into encapsulated data
open, err := os.Open("./data.json")
if err != nil {
panic(err)
}

var capsules []config.Capsule
capsule := config.NewCapsule()

scanner := bufio.NewScanner(open)
for scanner.Scan() {
capsule.SetData(scanner.Bytes())
capsules = append(capsules, capsule)
}

// read config file and create a new batch processor
cfg, err := os.ReadFile("./config.json")
if err != nil {
panic(err)
}

var sub config.Config
if err := json.Unmarshal(cfg, &sub); err != nil {
panic(err)
}

proc, err := process.BatchApplicatorFactory(sub)
if err != nil {
panic(err)
}
defer proc.Close(context.TODO())

// apply batch processor to encapsulated data
capsules, err = process.ApplyBatch(context.TODO(), capsules, proc)
if err != nil {
panic(err)
}

for _, capsule := range capsules {
fmt.Println(string(capsule.Data()))
}
}
25 changes: 14 additions & 11 deletions internal/ip/database/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,13 @@ import (
// errInvalidFactoryInput is returned when an unsupported OpenCloser is referenced in Factory.
const errInvalidFactoryInput = errors.Error("invalid factory input")

// databases are global variables that can be accessed across the application by using the Factory function.
var (
ip2loc IP2Location
maxMindASN MaxMindASN
maxMindCity MaxMindCity
)

// OpenCloser provides tools for opening, closing, and getting values from IP address enrichment databases.
type OpenCloser interface {
ip.Getter
Expand All @@ -21,22 +28,18 @@ type OpenCloser interface {
IsEnabled() bool
}

// Factory returns an OpenCloser. The returned OpenCloser must be opened before it can be used.
// func Factory(db string) (OpenCloser, error) {
// Factory returns a pointer to an OpenCloser that is stored as a package level global variable. The OpenCloser must be opened before it can be used.
func Factory(cfg config.Config) (OpenCloser, error) {
switch t := cfg.Type; t {
case "ip2location":
var db IP2Location
_ = config.Decode(cfg.Settings, &db)
return &db, nil
_ = config.Decode(cfg.Settings, &ip2loc)
return &ip2loc, nil
case "maxmind_asn":
var db MaxMindASN
_ = config.Decode(cfg.Settings, &db)
return &db, nil
_ = config.Decode(cfg.Settings, &maxMindASN)
return &maxMindASN, nil
case "maxmind_city":
var db MaxMindCity
_ = config.Decode(cfg.Settings, &db)
return &db, nil
_ = config.Decode(cfg.Settings, &maxMindCity)
return &maxMindCity, nil
default:
return nil, fmt.Errorf("database %s: %v", t, errInvalidFactoryInput)
}
Expand Down
31 changes: 27 additions & 4 deletions internal/ip/database/ip2location.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,18 @@ package database
import (
"context"
"os"
"sync"

"github.com/brexhq/substation/internal/file"
"github.com/brexhq/substation/internal/ip"
"github.com/ip2location/ip2location-go/v9"
)

// IP2Location provides read access to an IP2Location binary database.
// IP2Location provides read access to an IP2Location binary database. The database is safe for concurrent access.
type IP2Location struct {
// Database contains the location of the IP2Location database. This can be either a path on local disk, an HTTP(S) URL, or an AWS S3 URL.
Database string `json:"database"`
mu sync.RWMutex
db *ip2location.DB
}

Expand All @@ -20,8 +23,16 @@ func (d *IP2Location) IsEnabled() bool {
return d.db != nil
}

// Open retrieves the database and opens it for querying. The location of the database can be either a path on local disk, an HTTP(S) URL, or an AWS S3 URL.
// Open retrieves the database and opens it for querying.
func (d *IP2Location) Open(ctx context.Context) error {
d.mu.Lock()
defer d.mu.Unlock()

// avoids unnecessary opening
if d.db != nil {
return nil
}

path, err := file.Get(ctx, d.Database)
defer os.Remove(path)

Expand All @@ -38,15 +49,27 @@ func (d *IP2Location) Open(ctx context.Context) error {

// Close closes the open database.
func (d *IP2Location) Close() error {
if d.IsEnabled() {
d.db.Close()
d.mu.Lock()
defer d.mu.Unlock()

// avoids unnecessary closing
if d.db == nil {
return nil
}

d.db.Close()

// db is made nil so that IsEnabled correctly
// returns the non-enabled state
d.db = nil
return nil
}

// Get queries the database and returns an aggregated database record containing enrichment information.
func (d *IP2Location) Get(addr string) (*ip.EnrichmentRecord, error) {
d.mu.RLock()
defer d.mu.RUnlock()

resp, err := d.db.Get_all(addr)
if err != nil {
return nil, err
Expand Down
Loading

0 comments on commit f799a6f

Please sign in to comment.