Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ingest tool #30

Merged
merged 26 commits into from
Mar 16, 2023
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
63270e7
WIP ingest tool.
juagargi Jan 19, 2023
672dd3e
Use a processor structure instead of plain functions.
juagargi Jan 19, 2023
5fc9152
rename MapReduce to processor
juagargi Jan 19, 2023
b1d25b6
Processor handles data parsing and map-reduce ops.
juagargi Jan 19, 2023
cad5ecc
Ignore file containing the root hash data
juagargi Jan 19, 2023
df67299
Package cert data into batches.
juagargi Jan 20, 2023
5954e37
Add a BatchProcessor.
juagargi Jan 20, 2023
0a80fab
WIP updating DB with certs
juagargi Jan 20, 2023
4d0b924
Store changes in DB.
juagargi Jan 20, 2023
b9a75c7
Preparing to do SMT updates.
juagargi Jan 23, 2023
d6efeb0
Add modified domains to updates table.
juagargi Jan 24, 2023
4915199
Update SMT.
juagargi Jan 24, 2023
6e9f9f6
Check clashes between batches also using SANs.
juagargi Jan 25, 2023
ef62c4e
If a deadlock is found when inserting the pairs, retry.
juagargi Feb 23, 2023
626c4e1
Find all .gz and .csv files under incoming dir.
juagargi Feb 23, 2023
e376410
Fix 2nd deadlock (#34)
cyrill-k Feb 24, 2023
2103802
Reign memory consumption by limitting DB workers.
juagargi Feb 24, 2023
5cb7954
Bugfix: some batches are too big.
juagargi Feb 24, 2023
fd8b146
Add a DB function to db.
juagargi Feb 27, 2023
9c34e2a
XXX Don't use fpki as database. This commit must be rolled back after…
juagargi Feb 27, 2023
a52fa04
Hack: when leaf is > 1Gb, insert it via CSV.
juagargi Feb 28, 2023
8f951dc
Better modifiable create DB script.
juagargi Feb 28, 2023
5712d13
Print out domains that yield huge leaves.
juagargi Feb 28, 2023
2b02705
Skip expired certs on ingest.
juagargi Mar 1, 2023
68570d7
Revert "XXX Don't use fpki as database.
juagargi Mar 1, 2023
b81f149
print root value after ingestion
cyrill-k Mar 2, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,4 @@ bin/*
.DS_Store

# root
./root
root
206 changes: 206 additions & 0 deletions cmd/ingest/batch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
package main

import (
"context"
"fmt"
"sync"
"sync/atomic"

ctx509 "github.com/google/certificate-transparency-go/x509"
"github.com/netsec-ethz/fpki/pkg/common"
"github.com/netsec-ethz/fpki/pkg/db"
mcommon "github.com/netsec-ethz/fpki/pkg/mapserver/common"
"github.com/netsec-ethz/fpki/pkg/mapserver/updater"
)

const BatchSize = 10000

type Batch struct {
Certs []*ctx509.Certificate
Chains [][]*ctx509.Certificate
names []*string // CN and SANs of each certificate
}

func NewBatch() *Batch {
return &Batch{
Certs: make([]*ctx509.Certificate, 0, BatchSize),
Chains: make([][]*ctx509.Certificate, 0, BatchSize),
names: make([]*string, 0, BatchSize),
}
}

// AddData pushed the cert data into the batch.
func (b *Batch) AddData(d *CertData) {
b.Certs = append(b.Certs, d.Cert)
b.Chains = append(b.Chains, d.CertChain)
// Add common name and SANs:
seenNames := make(map[string]struct{})
b.names = append(b.names, &d.Cert.Subject.CommonName)
seenNames[d.Cert.Subject.CommonName] = struct{}{}
for i, name := range d.Cert.DNSNames {
if _, ok := seenNames[name]; ok {
continue
}
b.names = append(b.names, &d.Cert.DNSNames[i])
seenNames[name] = struct{}{}
}
}

func (b *Batch) Full() bool {
return len(b.Certs) == BatchSize
}

type BatchProcessor struct {
conn db.Conn

incomingCh chan *Batch
incomingWg sync.WaitGroup
doneCh chan struct{}

runningBatches map[string]*Batch
runningBatchesMu sync.Mutex
reschedules atomic.Int64
}

func NewBatchProcessor(conn db.Conn) *BatchProcessor {
p := &BatchProcessor{
conn: conn,
incomingCh: make(chan *Batch),
doneCh: make(chan struct{}),

runningBatches: make(map[string]*Batch),
}
p.start()
return p
}

func (p *BatchProcessor) start() {
go func() {
for batch := range p.incomingCh {
go p.wrapBatch(batch)
}
p.doneCh <- struct{}{}
}()

}

func (p *BatchProcessor) Wait() {
fmt.Println("deleteme waiting 1")
p.incomingWg.Wait()
close(p.incomingCh)
fmt.Println("deleteme waiting 2")
<-p.doneCh
fmt.Println("deleteme waiting 3")
fmt.Printf("# reschedules: %d\n", p.reschedules.Load())
}

// Process processes a Batch into the DB.
func (p *BatchProcessor) Process(b *Batch) {
p.incomingWg.Add(1) // one more batch to process
go func() {
p.incomingCh <- b
}()
}

// wrapBatch protects the processing of a batch.
func (p *BatchProcessor) wrapBatch(batch *Batch) {
if err := p.checkIfBatchClashes(batch); err != nil {
// At least one name in this batch is already being processed at a different batch,
// and we can't use different batches that contain a non nil intersection.
// Just reschedule the batch in the hopes that it will eventually picked up when
// the active batches don't clash with it:
p.reschedules.Add(1)
p.incomingCh <- batch
return
}

p.addBatchAsActive(batch)
defer p.removeBatchFromActive(batch)
defer p.incomingWg.Done() // one less batch to process

p.processBatch(batch)
fmt.Println("batch processed")
}

func (p *BatchProcessor) processBatch(batch *Batch) {
// Compute which domains could be affected:
affectedDomainsMap, domainCertMap, domainCertChainMap := updater.GetAffectedDomainAndCertMap(
batch.Certs, batch.Chains)
if len(affectedDomainsMap) == 0 {
return
}

// Get all affected entries already present in the DB:
affectedDomainHashes := make([]common.SHA256Output, 0, len(affectedDomainsMap))
for k := range affectedDomainsMap {
affectedDomainHashes = append(affectedDomainHashes, k)
}
domainEntries, err := p.conn.RetrieveDomainEntries(context.Background(), affectedDomainHashes)
if err != nil {
panic(err)
}

// Obtain a map from SHAs to certificates:
shaToCerts := make(map[common.SHA256Output]*mcommon.DomainEntry)
for _, kv := range domainEntries {
entry, err := mcommon.DeserializeDomainEntry(kv.Value)
if err != nil {
panic(err)
}
shaToCerts[kv.Key] = entry
}

// Update Domain Entries in DB:
updatedDomains, err := updater.UpdateDomainEntries(shaToCerts, domainCertMap, domainCertChainMap)
if err != nil {
panic(err)
}
shaToCerts, err = updater.GetDomainEntriesToWrite(updatedDomains, shaToCerts)
if err != nil {
panic(err)
}
domainEntries, err = updater.SerializeUpdatedDomainEntries(shaToCerts)
if err != nil {
panic(err)
}
_, err = p.conn.UpdateDomainEntries(context.Background(), domainEntries)
if err != nil {
panic(err)
}

// Add entries to the `updates` table containing all the modified domains:
if _, err = p.conn.AddUpdatedDomains(context.Background(), affectedDomainHashes); err != nil {
panic(err)
}
}

func (p *BatchProcessor) checkIfBatchClashes(b *Batch) error {
p.runningBatchesMu.Lock()
defer p.runningBatchesMu.Unlock()

for _, n := range b.names {
if other, ok := p.runningBatches[*n]; ok && other != b {
return fmt.Errorf("same CN in different batches, pointers: %p, %p. CN: %s",
other, b.names, *n)
}
}
return nil
}

func (p *BatchProcessor) addBatchAsActive(b *Batch) {
p.runningBatchesMu.Lock()
defer p.runningBatchesMu.Unlock()

for _, n := range b.names {
p.runningBatches[*n] = b
}
}

func (p *BatchProcessor) removeBatchFromActive(b *Batch) {
p.runningBatchesMu.Lock()
defer p.runningBatchesMu.Unlock()

for _, n := range b.names {
delete(p.runningBatches, *n)
}
}
76 changes: 76 additions & 0 deletions cmd/ingest/csv_files.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package main

import (
"compress/gzip"
"io"
"os"
)

type File interface {
WithFile(string) File
Filename() string
Open() (io.Reader, error)
Close() error
}

type baseFile struct {
FileName string
reader *os.File
}

func (f *baseFile) Filename() string {
return f.FileName
}

func (f *baseFile) Close() error {
return f.reader.Close()
}

type GzFile struct {
baseFile

gzReader *gzip.Reader
}

func (f *GzFile) WithFile(fn string) File {
f.FileName = fn
return f
}

func (f *GzFile) Open() (io.Reader, error) {
var err error
f.reader, err = os.Open(f.FileName)
if err != nil {
return nil, err
}
f.gzReader, err = gzip.NewReader(f.reader)
if err != nil {
return nil, err
}
return f.gzReader, nil
}

func (f *GzFile) Close() error {
if err := f.gzReader.Close(); err != nil {
return err
}
return f.reader.Close()
}

type CsvFile struct {
baseFile
}

func (f *CsvFile) WithFile(fn string) File {
f.FileName = fn
return f
}

func (f *CsvFile) Open() (io.Reader, error) {
var err error
f.reader, err = os.Open(f.FileName)
if err != nil {
return nil, err
}
return f.reader, nil
}
81 changes: 81 additions & 0 deletions cmd/ingest/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package main

import (
"flag"
"fmt"
"io/ioutil"
"os"
"path/filepath"

"github.com/netsec-ethz/fpki/pkg/db"
)

const (
CertificateColumn = 3
CertChainColumn = 4
)

func main() {
flag.Usage = func() {
fmt.Fprintf(os.Stderr, "Usage:\n%s directory\n", os.Args[0])
}
flag.Parse()
if flag.NArg() != 1 {
flag.Usage()
}

conn, err := db.Connect(nil)
exitIfError(err)

gzFiles, csvFiles := listOurFiles(flag.Arg(0))
fmt.Printf("# gzFiles: %d, # csvFiles: %d\n", len(gzFiles), len(csvFiles))

// Truncate DB.
exitIfError(conn.TruncateAllTables())
// Disable indices in DB.
exitIfError(conn.DisableIndexing("domainEntries"))
exitIfError(conn.DisableIndexing("updates"))

// Update certificates and chains.
proc := NewProcessor(conn)
proc.AddGzFiles(gzFiles)
proc.AddCsvFiles(csvFiles)
exitIfError(proc.Wait())

// Re-enable indices in DB.
exitIfError(conn.EnableIndexing("updates"))
exitIfError(conn.EnableIndexing("domainEntries"))
// Close DB and check errors.
err = conn.Close()
exitIfError(err)
}

func listOurFiles(dir string) (gzFiles, csvFiles []string) {
entries, err := ioutil.ReadDir(dir)
exitIfError(err)
for _, e := range entries {
if !e.IsDir() {
continue
}
if e.Name() == "bundled" {
// Use all *.gz in this directory.
d := filepath.Join(dir, e.Name())
gzFiles, err = filepath.Glob(fmt.Sprintf("%s/*.gz", d))
cyrill-k marked this conversation as resolved.
Show resolved Hide resolved
exitIfError(err)
csvFiles, err = filepath.Glob(fmt.Sprintf("%s/*.csv", dir))
cyrill-k marked this conversation as resolved.
Show resolved Hide resolved
exitIfError(err)
} else {
gzs, csvs := listOurFiles(filepath.Join(dir, e.Name()))
gzFiles = append(gzFiles, gzs...)
csvFiles = append(csvFiles, csvs...)
}
}
return
}

func exitIfError(err error) {
if err != nil {
fmt.Fprintf(os.Stderr, "%s\n", err)
os.Exit(1)
}
}
Loading