diff --git a/src/main/java/no/priv/garshol/duke/ConfigLoader.java b/src/main/java/no/priv/garshol/duke/ConfigLoader.java index ee57f579..ee260e14 100644 --- a/src/main/java/no/priv/garshol/duke/ConfigLoader.java +++ b/src/main/java/no/priv/garshol/duke/ConfigLoader.java @@ -1,283 +1,308 @@ - -package no.priv.garshol.duke; - -import java.util.Set; -import java.util.Map; -import java.util.List; -import java.util.HashSet; -import java.util.HashMap; -import java.util.ArrayList; -import java.util.Collection; -import java.io.File; -import java.io.InputStream; -import java.io.IOException; -import org.xml.sax.XMLReader; -import org.xml.sax.Attributes; -import org.xml.sax.InputSource; -import org.xml.sax.SAXException; -import org.xml.sax.helpers.XMLReaderFactory; -import org.xml.sax.helpers.DefaultHandler; - -import no.priv.garshol.duke.DukeConfigException; -import no.priv.garshol.duke.utils.StringUtils; -import no.priv.garshol.duke.utils.ObjectUtils; -import no.priv.garshol.duke.cleaners.ChainedCleaner; -import no.priv.garshol.duke.comparators.ExactComparator; -import no.priv.garshol.duke.datasources.Column; -import no.priv.garshol.duke.datasources.CSVDataSource; -import no.priv.garshol.duke.datasources.ColumnarDataSource; -import no.priv.garshol.duke.datasources.JDBCDataSource; -import no.priv.garshol.duke.datasources.JNDIDataSource; -import no.priv.garshol.duke.datasources.NTriplesDataSource; -import no.priv.garshol.duke.datasources.SparqlDataSource; - -/** - * Can read XML configuration files and return a fully set up configuration. - */ -public class ConfigLoader { - - /** - * Note that if file starts with 'classpath:' the resource is looked - * up on the classpath instead. - */ - public static Configuration load(String file) - throws IOException, SAXException { - ConfigurationImpl cfg = new ConfigurationImpl(); - - XMLReader parser = XMLReaderFactory.createXMLReader(); - parser.setContentHandler(new ConfigHandler(cfg, file)); - if (file.startsWith("classpath:")) { - String resource = file.substring("classpath:".length()); - ClassLoader cloader = Thread.currentThread().getContextClassLoader(); - InputStream istream = cloader.getResourceAsStream(resource); - parser.parse(new InputSource(istream)); - } else - parser.parse(file); - - return cfg; - } - - private static class ConfigHandler extends DefaultHandler { - private ConfigurationImpl config; - private List properties; - private File path; // location of config file - - private double low; - private double high; - private String name; - private boolean idprop; - private boolean ignore_prop; - private Comparator comparator; - private Property.Lookup lookup; - - private Set keepers; - private int groupno; // counts datasource groups - private Map objects; // configured Java beans for reuse - private DataSource datasource; - private Object currentobj; // Java bean currently being configured by - private Database database; - - private boolean keep; - private StringBuffer content; - - private ConfigHandler(ConfigurationImpl config, String path) { - this.config = config; - this.properties = new ArrayList(); - if (!path.startsWith("classpath:")) - this.path = new File(path).getParentFile(); - - this.objects = new HashMap(); - this.keepers = new HashSet(); - this.content = new StringBuffer(); - - keepers.add("threshold"); - keepers.add("maybe-threshold"); - keepers.add("name"); - keepers.add("low"); - keepers.add("high"); - keepers.add("comparator"); - } - - public void startElement(String uri, String localName, String qName, - Attributes attributes) { - if (keepers.contains(localName)) { - keep = true; - content.setLength(0); // clear - } else if (localName.equals("property")) { - String type = attributes.getValue("type"); - idprop = type != null && type.equals("id"); - ignore_prop = type != null && type.equals("ignore"); - low = 0.5; - high = 0.5; - comparator = null; - lookup = Property.Lookup.DEFAULT; - if (attributes.getValue("lookup") != null) - lookup = (Property.Lookup) ObjectUtils.getEnumConstantByName( - Property.Lookup.class, - attributes.getValue("lookup").toUpperCase()); - } else if (localName.equals("csv")) { - datasource = new CSVDataSource(); - currentobj = datasource; - } else if (localName.equals("jdbc")) { - datasource = new JDBCDataSource(); - currentobj = datasource; - } else if (localName.equals("jndi")) { - datasource = new JNDIDataSource(); - currentobj = datasource; - } else if (localName.equals("sparql")) { - datasource = new SparqlDataSource(); - currentobj = datasource; - } else if (localName.equals("ntriples")) { - datasource = new NTriplesDataSource(); - currentobj = datasource; - } else if (localName.equals("data-source")) { - datasource = (DataSource) instantiate(attributes.getValue("class")); - currentobj = datasource; - } else if (localName.equals("column")) { - if (!(datasource instanceof ColumnarDataSource)) - throw new DukeConfigException("Column inside data source which " + - "does not support it: " + datasource); - - String name = attributes.getValue("name"); - if (name == null) - throw new DukeConfigException("Column with no name"); - String property = attributes.getValue("property"); - String prefix = attributes.getValue("prefix"); - String cleanername = attributes.getValue("cleaner"); - Cleaner cleaner = makeCleaner(cleanername); - - Column c = new Column(name, property, prefix, cleaner); - String spliton = attributes.getValue("split-on"); - if (spliton != null) - c.setSplitOn(spliton); - - ((ColumnarDataSource) datasource).addColumn(c); - } else if (localName.equals("param")) { - String param = attributes.getValue("name"); - String value = attributes.getValue("value"); - - if (currentobj == null) - throw new DukeConfigException("Trying to set parameter " + - param + " but no current object"); - - // we resolve file references relative to the config file location - if (param.equals("input-file") && path != null) - value = new File(path, value).getAbsolutePath(); - - ObjectUtils.setBeanProperty(currentobj, param, value, objects); - } else if (localName.equals("group")) { - groupno++; - // FIXME: now possible to have data sources between the two - // groups. need to check for that, too. ideally XML - // validation should take care of all this for us. - if (groupno == 1 && !config.getDataSources().isEmpty()) - throw new DukeConfigException("Cannot have groups in deduplication mode"); - else if (groupno == 3) - throw new DukeConfigException("Record linkage mode only supports " + - "two groups"); - - } else if (localName.equals("object")) { - String klass = attributes.getValue("class"); - String name = attributes.getValue("name"); - currentobj = instantiate(klass); - objects.put(name, currentobj); - } else if (localName.equals("database")) { - String klass = attributes.getValue("class"); - if (klass == null) - klass = "no.priv.garshol.duke.LuceneDatabase"; // default - database = (Database) instantiate(klass); - currentobj = database; - } - } - - public void characters(char[] ch, int start, int length) { - if (keep) - content.append(ch, start, length); - } - - public void endElement(String uri, String localName, String qName) { - if (localName.equals("threshold")) - config.setThreshold(Double.parseDouble(content.toString())); - else if (localName.equals("maybe-threshold")) - config.setMaybeThreshold(Double.parseDouble(content.toString())); - else if (localName.equals("name")) - name = content.toString(); - else if (localName.equals("property")) { - if (idprop) - properties.add(new PropertyImpl(name)); - else { - Property p = new PropertyImpl(name, comparator, low, high); - if (ignore_prop) - p.setIgnoreProperty(true); - p.setLookupBehaviour(lookup); - properties.add(p); - } - } else if (localName.equals("low")) - low = Double.parseDouble(content.toString()); - else if (localName.equals("high")) - high = Double.parseDouble(content.toString()); - else if (localName.equals("comparator")) { - comparator = (Comparator) objects.get(content.toString()); - if (comparator == null) // wasn't a configured bean - comparator = (Comparator) instantiate(content.toString()); - } else if (localName.equals("csv") || - localName.equals("jdbc") || - localName.equals("jndi") || - localName.equals("ntriples") || - localName.equals("sparql") || - localName.equals("data-source")) { - config.addDataSource(groupno, datasource); - datasource = null; - currentobj = null; - } else if (localName.equals("object")) - currentobj = null; - else if (localName.equals("database")) - config.setDatabase(database); - - if (keepers.contains(localName)) - keep = false; - - else if (localName.equals("duke")) { - if (groupno > 0 && groupno != 2) - throw new DukeConfigException("Record linkage mode requires exactly 2 groups; should you be using deduplication mode?"); - } - } - - public void endDocument() { - config.setProperties(properties); - } - - private Cleaner makeCleaner(String value) { - if (value == null) - return null; - - String[] names = StringUtils.split(value); - Cleaner[] cleaners = new Cleaner[names.length]; - for (int ix = 0; ix < cleaners.length; ix++) - cleaners[ix] = _makeCleaner(names[ix]); - - if (cleaners.length == 1) - return cleaners[0]; - else - return new ChainedCleaner(cleaners); - } - - private Cleaner _makeCleaner(String name) { - Cleaner cleaner = (Cleaner) objects.get(name); - if (cleaner == null) // wasn't a configured bean - cleaner = (Cleaner) instantiate(name); - return cleaner; - } - } - - private static Object instantiate(String classname) { - try { - Class klass = Class.forName(classname); - return klass.newInstance(); - } - catch (Exception e) { - throw new DukeConfigException("Couldn't instantiate class " + classname + - ": " + e); - } - } -} + +package no.priv.garshol.duke; + +import java.util.Set; +import java.util.Map; +import java.util.List; +import java.util.HashSet; +import java.util.HashMap; +import java.util.ArrayList; +import java.io.File; +import java.io.InputStream; +import java.io.IOException; +import org.xml.sax.XMLReader; +import org.xml.sax.Attributes; +import org.xml.sax.InputSource; +import org.xml.sax.SAXException; +import org.xml.sax.helpers.XMLReaderFactory; +import org.xml.sax.helpers.DefaultHandler; + +import no.priv.garshol.duke.DukeConfigException; +import no.priv.garshol.duke.utils.StringUtils; +import no.priv.garshol.duke.utils.ObjectUtils; +import no.priv.garshol.duke.cleaners.ChainedCleaner; +import no.priv.garshol.duke.datasources.Column; +import no.priv.garshol.duke.datasources.CSVDataSource; +import no.priv.garshol.duke.datasources.ColumnarDataSource; +import no.priv.garshol.duke.datasources.JDBCDataSource; +import no.priv.garshol.duke.datasources.JNDIDataSource; +import no.priv.garshol.duke.datasources.NTriplesDataSource; +import no.priv.garshol.duke.datasources.SparqlDataSource; + +import no.priv.garshol.duke.transforms.TransformDataSource; +import no.priv.garshol.duke.transforms.TransformOperation; + +/** + * Can read XML configuration files and return a fully set up configuration. + */ +public class ConfigLoader { + + /** + * Note that if file starts with 'classpath:' the resource is looked + * up on the classpath instead. + */ + public static Configuration load(String file) + throws IOException, SAXException { + ConfigurationImpl cfg = new ConfigurationImpl(); + + XMLReader parser = XMLReaderFactory.createXMLReader(); + parser.setContentHandler(new ConfigHandler(cfg, file)); + if (file.startsWith("classpath:")) { + String resource = file.substring("classpath:".length()); + ClassLoader cloader = Thread.currentThread().getContextClassLoader(); + InputStream istream = cloader.getResourceAsStream(resource); + parser.parse(new InputSource(istream)); + } else + parser.parse(file); + + return cfg; + } + + private static class ConfigHandler extends DefaultHandler { + private ConfigurationImpl config; + private List properties; + private File path; // location of config file + + private double low; + private double high; + private String name; + private boolean idprop; + private boolean ignore_prop; + private Comparator comparator; + private Property.Lookup lookup; + + private Set keepers; + private int groupno; // counts datasource groups + private Map objects; // configured Java beans for reuse + private DataSource datasource; + private Object currentobj; // Java bean currently being configured by + private Database database; + + private boolean keep; + private StringBuffer content; + + private ConfigHandler(ConfigurationImpl config, String path) { + this.config = config; + this.properties = new ArrayList(); + if (!path.startsWith("classpath:")) + this.path = new File(path).getParentFile(); + + this.objects = new HashMap(); + this.keepers = new HashSet(); + this.content = new StringBuffer(); + + keepers.add("threshold"); + keepers.add("maybe-threshold"); + keepers.add("name"); + keepers.add("low"); + keepers.add("high"); + keepers.add("comparator"); + } + + public void startElement(String uri, String localName, String qName, + Attributes attributes) { + if (keepers.contains(localName)) { + keep = true; + content.setLength(0); // clear + } else if (localName.equals("property")) { + String type = attributes.getValue("type"); + idprop = type != null && type.equals("id"); + ignore_prop = type != null && type.equals("ignore"); + low = 0.5; + high = 0.5; + comparator = null; + lookup = Property.Lookup.DEFAULT; + if (attributes.getValue("lookup") != null) + lookup = (Property.Lookup) ObjectUtils.getEnumConstantByName( + Property.Lookup.class, + attributes.getValue("lookup").toUpperCase()); + } else if (localName.equals("csv")) { + datasource = new CSVDataSource(); + currentobj = datasource; + } else if (localName.equals("jdbc")) { + datasource = new JDBCDataSource(); + currentobj = datasource; + } else if (localName.equals("jndi")) { + datasource = new JNDIDataSource(); + currentobj = datasource; + } else if (localName.equals("sparql")) { + datasource = new SparqlDataSource(); + currentobj = datasource; + } else if (localName.equals("ntriples")) { + datasource = new NTriplesDataSource(); + currentobj = datasource; + } else if (localName.equals("data-source")) { + datasource = (DataSource) instantiate(attributes.getValue("class")); + currentobj = datasource; + + //add a way to transform a datasource and records + } else if (localName.equals("transform")) { + if (!(currentobj instanceof DataSource)) + throw new DukeConfigException("Transform must be inside a datasource"); + datasource = new TransformDataSource(datasource); + currentobj = datasource; + // add operations + } else if (localName.equals("operation")) { + if (!(currentobj instanceof TransformDataSource)) + throw new DukeConfigException("Operation must be inside a transform element"); + String opclass = attributes.getValue("class"); + TransformOperation operation = (TransformOperation) instantiate(opclass); + ((TransformDataSource) currentobj).addOperation(operation); + currentobj = operation; + + } else if (localName.equals("column")) { + if (!(datasource instanceof ColumnarDataSource)) + throw new DukeConfigException("Column inside data source which " + + "does not support it: " + datasource); + + String name = attributes.getValue("name"); + if (name == null) + throw new DukeConfigException("Column with no name"); + String property = attributes.getValue("property"); + String prefix = attributes.getValue("prefix"); + String cleanername = attributes.getValue("cleaner"); + Cleaner cleaner = makeCleaner(cleanername); + + Column c = new Column(name, property, prefix, cleaner); + String spliton = attributes.getValue("split-on"); + if (spliton != null) + c.setSplitOn(spliton); + + ((ColumnarDataSource) datasource).addColumn(c); + } else if (localName.equals("param")) { + String param = attributes.getValue("name"); + String value = attributes.getValue("value"); + + if (currentobj == null) + throw new DukeConfigException("Trying to set parameter " + + param + " but no current object"); + + // we resolve file references relative to the config file location + if (param.equals("input-file") && path != null) + value = new File(path, value).getAbsolutePath(); + + ObjectUtils.setBeanProperty(currentobj, param, value, objects); + } else if (localName.equals("group")) { + groupno++; + // FIXME: now possible to have data sources between the two + // groups. need to check for that, too. ideally XML + // validation should take care of all this for us. + if (groupno == 1 && !config.getDataSources().isEmpty()) + throw new DukeConfigException("Cannot have groups in deduplication mode"); + else if (groupno == 3) + throw new DukeConfigException("Record linkage mode only supports " + + "two groups"); + + } else if (localName.equals("object")) { + String klass = attributes.getValue("class"); + String name = attributes.getValue("name"); + currentobj = instantiate(klass); + objects.put(name, currentobj); + } else if (localName.equals("database")) { + String klass = attributes.getValue("class"); + if (klass == null) + klass = "no.priv.garshol.duke.LuceneDatabase"; // default + database = (Database) instantiate(klass); + currentobj = database; + } + } + + public void characters(char[] ch, int start, int length) { + if (keep) + content.append(ch, start, length); + } + + public void endElement(String uri, String localName, String qName) { + if (localName.equals("threshold")) + config.setThreshold(Double.parseDouble(content.toString())); + else if (localName.equals("maybe-threshold")) + config.setMaybeThreshold(Double.parseDouble(content.toString())); + else if (localName.equals("name")) + name = content.toString(); + else if (localName.equals("property")) { + if (idprop) + properties.add(new PropertyImpl(name)); + else { + Property p = new PropertyImpl(name, comparator, low, high); + if (ignore_prop) + p.setIgnoreProperty(true); + p.setLookupBehaviour(lookup); + properties.add(p); + } + } else if (localName.equals("low")) + low = Double.parseDouble(content.toString()); + else if (localName.equals("high")) + high = Double.parseDouble(content.toString()); + else if (localName.equals("comparator")) { + comparator = (Comparator) objects.get(content.toString()); + if (comparator == null) // wasn't a configured bean + comparator = (Comparator) instantiate(content.toString()); + } else if (localName.equals("csv") || + localName.equals("jdbc") || + localName.equals("jndi") || + localName.equals("ntriples") || + localName.equals("sparql") || + localName.equals("data-source")) { + config.addDataSource(groupno, datasource); + datasource = null; + currentobj = null; + + } else if (localName.equals("transform")) { + // unstack datasource + currentobj = ((TransformDataSource) datasource).getTransformedDataSource(); + } else if (localName.equals("operation")) { + // unstack datasource + currentobj = datasource; + + } else if (localName.equals("object")) + currentobj = null; + else if (localName.equals("database")) + config.setDatabase(database); + + if (keepers.contains(localName)) + keep = false; + + else if (localName.equals("duke")) { + if (groupno > 0 && groupno != 2) + throw new DukeConfigException("Record linkage mode requires exactly 2 groups; should you be using deduplication mode?"); + } + } + + public void endDocument() { + config.setProperties(properties); + } + + private Cleaner makeCleaner(String value) { + if (value == null) + return null; + + String[] names = StringUtils.split(value); + Cleaner[] cleaners = new Cleaner[names.length]; + for (int ix = 0; ix < cleaners.length; ix++) + cleaners[ix] = _makeCleaner(names[ix]); + + if (cleaners.length == 1) + return cleaners[0]; + else + return new ChainedCleaner(cleaners); + } + + private Cleaner _makeCleaner(String name) { + Cleaner cleaner = (Cleaner) objects.get(name); + if (cleaner == null) // wasn't a configured bean + cleaner = (Cleaner) instantiate(name); + return cleaner; + } + } + + private static Object instantiate(String classname) { + try { + Class klass = Class.forName(classname); + return klass.newInstance(); + } + catch (Exception e) { + throw new DukeConfigException("Couldn't instantiate class " + classname + + ": " + e); + } + } +} diff --git a/src/main/java/no/priv/garshol/duke/Processor.java b/src/main/java/no/priv/garshol/duke/Processor.java index bf3d6a52..163560ad 100644 --- a/src/main/java/no/priv/garshol/duke/Processor.java +++ b/src/main/java/no/priv/garshol/duke/Processor.java @@ -1,747 +1,689 @@ - -package no.priv.garshol.duke; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.Comparator; -import java.util.Iterator; -import java.util.List; -import java.util.concurrent.CopyOnWriteArrayList; -import java.io.Writer; -import java.io.PrintWriter; - -import no.priv.garshol.duke.matchers.MatchListener; -import no.priv.garshol.duke.matchers.PrintMatchListener; -import no.priv.garshol.duke.matchers.AbstractMatchListener; -import no.priv.garshol.duke.utils.Utils; - -/** - * The class that implements the actual deduplication and record - * linkage logic. - */ -public class Processor { - private Configuration config; - protected Database database; - private Collection listeners; - private Logger logger; - private List proporder; - private double[] accprob; - private int threads; - private final static int DEFAULT_BATCH_SIZE = 40000; - - // performance statistics - private long comparisons; // number of records compared - private long srcread; // ms spent reading from data sources - private long indexing; // ms spent indexing records - private long searching; // ms spent searching for records - private long comparing; // ms spent comparing records - private long callbacks; // ms spent in callbacks - private Profiler profiler; - - /** - * Creates a new processor, overwriting the existing Lucene index. - */ - public Processor(Configuration config) { - this(config, true); - } - - /** - * Creates a new processor. - * @param overwrite If true, make new Lucene index. If false, leave - * existing data. - */ - public Processor(Configuration config, boolean overwrite) { - this(config, config.getDatabase(overwrite)); - } - - /** - * Creates a new processor, bound to the given database. - */ - public Processor(Configuration config, Database database) { - this.config = config; - this.database = database; - // using this List implementation so that listeners can be removed - // while Duke is running (see issue 117) - this.listeners = new CopyOnWriteArrayList(); - this.logger = new DummyLogger(); - this.threads = 1; - - // precomputing for later optimizations - this.proporder = new ArrayList(); - for (Property p : config.getProperties()) - if (!p.isIdProperty()) - proporder.add(p); - Collections.sort(proporder, new PropertyComparator()); - - // still precomputing - double prob = 0.5; - accprob = new double[proporder.size()]; - for (int ix = proporder.size() - 1; ix >= 0; ix--) { - prob = Utils.computeBayes(prob, proporder.get(ix).getHighProbability()); - accprob[ix] = prob; - } - } - - /** - * Sets the logger to report to. - */ - public void setLogger(Logger logger) { - this.logger = logger; - } - - /** - * Sets the number of threads to use for processing. The default is - * 1. - */ - public void setThreads(int threads) { - this.threads = threads; - } - - /** - * Returns the number of threads. - */ - public int getThreads() { - return threads; - } - - /** - * Adds a listener to be notified of processing events. - */ - public void addMatchListener(MatchListener listener) { - listeners.add(listener); - } - - /** - * Removes a listener from being notified of the processing events. - * @since 1.1 - */ - public boolean removeMatchListener(MatchListener listener) { - if (listener != null) - return listeners.remove(listener); - return true; - } - - /** - * Returns all registered listeners. - */ - public Collection getListeners() { - return listeners; - } - - /** - * Returns the actual Lucene index being used. - */ - public Database getDatabase() { - return database; - } - - /** - * Used to turn performance profiling on and off. - * @since 1.1 - */ - public void setPerformanceProfiling(boolean profile) { - if (profile) { - if (profiler != null) - return; // we're already profiling - - this.profiler = new Profiler(); - addMatchListener(profiler); - - } else { - // turn off profiling - if (profiler == null) - return; // we're not profiling, so nothing to do - - removeMatchListener(profiler); - profiler = null; - } - } - - /** - * Returns the performance profiler, if any. - * @since 1.1 - */ - public Profiler getProfiler() { - return profiler; - } - - /** - * Reads all available records from the data sources and processes - * them in batches, notifying the listeners throughout. - */ - public void deduplicate() { - deduplicate(config.getDataSources(), DEFAULT_BATCH_SIZE); - } - - /** - * Reads all available records from the data sources and processes - * them in batches, notifying the listeners throughout. - */ - public void deduplicate(int batch_size) { - deduplicate(config.getDataSources(), batch_size); - } - - /** - * Reads all available records from the data sources and processes - * them in batches, notifying the listeners throughout. - */ - public void deduplicate(Collection sources, int batch_size) { - int count = 0; - startProcessing(); - - Iterator it = sources.iterator(); - while (it.hasNext()) { - DataSource source = it.next(); - source.setLogger(logger); - - RecordIterator it2 = source.getRecords(); - try { - Collection batch = new ArrayList(); - long start = System.currentTimeMillis(); - while (it2.hasNext()) { - Record record = it2.next(); - batch.add(record); - count++; - if (count % batch_size == 0) { - srcread += (System.currentTimeMillis() - start); - deduplicate(batch); - it2.batchProcessed(); - batch = new ArrayList(); - start = System.currentTimeMillis(); - } - } - - if (!batch.isEmpty()) { - deduplicate(batch); - it2.batchProcessed(); - } - } finally { - it2.close(); - } - } - - endProcessing(); - } - - /** - * Deduplicates a newly arrived batch of records. The records may - * have been seen before. - */ - public void deduplicate(Collection records) { - logger.info("Deduplicating batch of " + records.size() + " records"); - batchReady(records.size()); - - // prepare - long start = System.currentTimeMillis(); - for (Record record : records) - database.index(record); - - database.commit(); - indexing += System.currentTimeMillis() - start; - - // then match - match(records, true); - - batchDone(); - } - - private void match(Collection records, boolean matchall) { - if (threads == 1) - for (Record record : records) - match(record, matchall); - else - threadedmatch(records, matchall); - } - - private void threadedmatch(Collection records, boolean matchall) { - // split batch into n smaller batches - MatchThread[] threads = new MatchThread[this.threads]; - for (int ix = 0; ix < threads.length; ix++) - threads[ix] = new MatchThread(ix, records.size() / threads.length, - matchall); - int ix = 0; - for (Record record : records) - threads[ix++ % threads.length].addRecord(record); - - // kick off threads - for (ix = 0; ix < threads.length; ix++) - threads[ix].start(); - - // wait for threads to finish - try { - for (ix = 0; ix < threads.length; ix++) - threads[ix].join(); - } catch (InterruptedException e) { - // argh - } - } - - /** - * Does record linkage across the two groups, but does not link - * records within each group. - */ - public void link() { - link(config.getDataSources(1), config.getDataSources(2), - DEFAULT_BATCH_SIZE); - } - - // FIXME: what about the general case, where there are more than 2 groups? - /** - * Does record linkage across the two groups, but does not link - * records within each group. With this method, all matches - * above threshold are passed on. - */ - public void link(Collection sources1, - Collection sources2, - int batch_size) { - link(sources1, sources2, true, batch_size); - } - - /** - * Does record linkage across the two groups, but does not link - * records within each group. - * @param matchall If true, all matching records are accepted. If false, - * only the single best match for each record is accepted. - * @param batch_size The batch size to use. - * @since 1.1 - */ - public void link(Collection sources1, - Collection sources2, - boolean matchall, - int batch_size) { - startProcessing(); - - // first, index up group 1 - index(sources1, batch_size); - - // second, traverse group 2 to look for matches with group 1 - linkRecords(sources2, matchall, batch_size); - } - - /** - * Retrieve new records from data sources, and match them to - * previously indexed records. This method does not index - * the new records. With this method, all matches above - * threshold are passed on. - * @since 0.4 - */ - public void linkRecords(Collection sources) { - linkRecords(sources, true); - } - - /** - * Retrieve new records from data sources, and match them to - * previously indexed records. This method does not index - * the new records. - * @param matchall If true, all matching records are accepted. If false, - * only the single best match for each record is accepted. - * @since 0.5 - */ - public void linkRecords(Collection sources, boolean matchall) { - linkRecords(sources, matchall, DEFAULT_BATCH_SIZE); - } - - /** - * Retrieve new records from data sources, and match them to - * previously indexed records. This method does not index - * the new records. - * @param matchall If true, all matching records are accepted. If false, - * only the single best match for each record is accepted. - * @param batch_size The batch size to use. - * @since 1.0 - */ - public void linkRecords(Collection sources, boolean matchall, - int batch_size) { - for (DataSource source : sources) { - source.setLogger(logger); - - Collection batch = new ArrayList(batch_size); - RecordIterator it = source.getRecords(); - while (it.hasNext()) { - batch.add(it.next()); - if (batch.size() == batch_size) { - linkBatch(batch, matchall); - batch.clear(); - } - } - it.close(); - - if (!batch.isEmpty()) - linkBatch(batch, matchall); - } - - endProcessing(); - } - - private void linkBatch(Collection batch, boolean matchall) { - batchReady(batch.size()); - match(batch, matchall); - batchDone(); - } - - /** - * Index all new records from the given data sources. This method - * does not do any matching. - * @since 0.4 - */ - public void index(Collection sources, int batch_size) { - int count = 0; - for (DataSource source : sources) { - source.setLogger(logger); - - RecordIterator it2 = source.getRecords(); - while (it2.hasNext()) { - Record record = it2.next(); - database.index(record); - count++; - if (count % batch_size == 0) - batchReady(batch_size); - } - it2.close(); - } - if (count % batch_size == 0) - batchReady(count % batch_size); - database.commit(); - } - - /** - * Returns the number of records that have been compared. - */ - public long getComparisonCount() { - return comparisons; - } - - private void match(Record record, boolean matchall) { - long start = System.currentTimeMillis(); - Collection candidates = database.findCandidateMatches(record); - searching += System.currentTimeMillis() - start; - if (logger.isDebugEnabled()) - logger.debug("Matching record " + - PrintMatchListener.toString(record, config.getProperties()) + - " found " + candidates.size() + " candidates"); - - start = System.currentTimeMillis(); - if (matchall) - compareCandidatesSimple(record, candidates); - else - compareCandidatesBest(record, candidates); - comparing += System.currentTimeMillis() - start; - } - - // ===== RECORD LINKAGE STRATEGIES - // the following two methods implement different record matching - // strategies. the first is used for deduplication, where we simply - // want all matches above the thresholds. the second is used for - // record linkage, to implement a simple greedy matching algorithm - // where we choose the best alternative above the threshold for each - // record. - - // other, more advanced possibilities exist for record linkage, but - // they are not implemented yet. see the links below for more - // information. - - // http://code.google.com/p/duke/issues/detail?id=55 - // http://research.microsoft.com/pubs/153478/msr-report-1to1.pdf - - /** - * Passes on all matches found. - */ - protected void compareCandidatesSimple(Record record, - Collection candidates) { - boolean found = false; - for (Record candidate : candidates) { - if (isSameAs(record, candidate)) - continue; - - double prob = compare(record, candidate); - if (prob > config.getThreshold()) { - found = true; - registerMatch(record, candidate, prob); - } else if (config.getMaybeThreshold() != 0.0 && - prob > config.getMaybeThreshold()) { - found = true; // I guess? - registerMatchPerhaps(record, candidate, prob); - } - } - if (!found) - registerNoMatchFor(record); - } - - /** - * Passes on only the best match for each record. - */ - protected void compareCandidatesBest(Record record, - Collection candidates) { - double max = 0.0; - Record best = null; - - // go through all candidates, and find the best - for (Record candidate : candidates) { - if (isSameAs(record, candidate)) - continue; - - double prob = compare(record, candidate); - if (prob > max) { - max = prob; - best = candidate; - } - } - - // pass on the best match, if any - if (max > config.getThreshold()) - registerMatch(record, best, max); - else if (config.getMaybeThreshold() != 0.0 && - max > config.getMaybeThreshold()) - registerMatchPerhaps(record, best, max); - else - registerNoMatchFor(record); - } - - /** - * Compares two records and returns the probability that they - * represent the same real-world entity. - */ - public double compare(Record r1, Record r2) { - comparisons++; - double prob = 0.5; - for (String propname : r1.getProperties()) { - Property prop = config.getPropertyByName(propname); - if (prop == null) - continue; // means the property is unknown - if (prop.isIdProperty() || prop.isIgnoreProperty()) - continue; - - Collection vs1 = r1.getValues(propname); - Collection vs2 = r2.getValues(propname); - if (vs1 == null || vs1.isEmpty() || vs2 == null || vs2.isEmpty()) - continue; // no values to compare, so skip - - double high = 0.0; - for (String v1 : vs1) { - if (v1.equals("")) // FIXME: these values shouldn't be here at all - continue; - - for (String v2 : vs2) { - if (v2.equals("")) // FIXME: these values shouldn't be here at all - continue; - - try { - double p = prop.compare(v1, v2); - high = Math.max(high, p); - } catch (Exception e) { - throw new DukeException("Comparison of values '" + v1 + "' and "+ - "'" + v2 + "' with " + - prop.getComparator() + " failed", e); - } - } - } - - prob = Utils.computeBayes(prob, high); - } - return prob; - } - - /** - * Commits all state to disk and frees up resources. - */ - public void close() { - database.close(); - } - - // ===== INTERNALS - - private boolean isSameAs(Record r1, Record r2) { - for (Property idp : config.getIdentityProperties()) { - Collection vs2 = r2.getValues(idp.getName()); - Collection vs1 = r1.getValues(idp.getName()); - if (vs1 == null) - continue; - for (String v1 : vs1) - if (vs2.contains(v1)) - return true; - } - return false; - } - - private void startProcessing() { - long start = System.currentTimeMillis(); - for (MatchListener listener : listeners) - listener.startProcessing(); - callbacks += (System.currentTimeMillis() - start); - } - - private void endProcessing() { - long start = System.currentTimeMillis(); - for (MatchListener listener : listeners) - listener.endProcessing(); - callbacks += (System.currentTimeMillis() - start); - } - - private void batchReady(int size) { - long start = System.currentTimeMillis(); - for (MatchListener listener : listeners) - listener.batchReady(size); - callbacks += (System.currentTimeMillis() - start); - } - - private void batchDone() { - long start = System.currentTimeMillis(); - for (MatchListener listener : listeners) - listener.batchDone(); - callbacks += (System.currentTimeMillis() - start); - } - - /** - * Records the statement that the two records match. - */ - private void registerMatch(Record r1, Record r2, double confidence) { - long start = System.currentTimeMillis(); - for (MatchListener listener : listeners) - listener.matches(r1, r2, confidence); - callbacks += (System.currentTimeMillis() - start); - } - - /** - * Records the statement that the two records may match. - */ - private void registerMatchPerhaps(Record r1, Record r2, double confidence) { - long start = System.currentTimeMillis(); - for (MatchListener listener : listeners) - listener.matchesPerhaps(r1, r2, confidence); - callbacks += (System.currentTimeMillis() - start); - } - - /** - * Notifies listeners that we found no matches for this record. - */ - private void registerNoMatchFor(Record current) { - long start = System.currentTimeMillis(); - for (MatchListener listener : listeners) - listener.noMatchFor(current); - callbacks += (System.currentTimeMillis() - start); - } - - /** - * Sorts properties so that the properties with the lowest low - * probabilities come first. - */ - static class PropertyComparator implements Comparator { - public int compare(Property p1, Property p2) { - double diff = p1.getLowProbability() - p2.getLowProbability(); - if (diff < 0) - return -1; - else if (diff > 0) - return 1; - else - return 0; - } - } - - // ===== THREADS - - /** - * The thread that actually runs parallell matching. It holds the - * thread's share of the current batch. - */ - class MatchThread extends Thread { - private Collection records; - private boolean matchall; - - public MatchThread(int threadno, int recordcount, boolean matchall) { - super("MatchThread " + threadno); - this.records = new ArrayList(recordcount); - this.matchall = matchall; - } - - public void run() { - for (Record record : records) - match(record, matchall); - } - - public void addRecord(Record record) { - records.add(record); - } - } - - // ===== PERFORMANCE PROFILING - - public class Profiler extends AbstractMatchListener { - private long processing_start; - private long batch_start; - private int batch_size; - private int records; - private PrintWriter out; - - public Profiler() { - this.out = new PrintWriter(System.out); - } - - /** - * Sets Writer to receive performance statistics. Defaults to - * System.out. - */ - public void setOutput(Writer outw) { - this.out = new PrintWriter(outw); - } - - public void startProcessing() { - processing_start = System.currentTimeMillis(); - System.out.println("Duke version " + Duke.getVersionString()); - System.out.println(getDatabase()); - System.out.println("Threads: " + getThreads()); - } - - public void batchReady(int size) { - batch_start = System.currentTimeMillis(); - batch_size = size; - } - - public void batchDone() { - records += batch_size; - int rs = (int) ((1000.0 * batch_size) / - (System.currentTimeMillis() - batch_start)); - System.out.println("" + records + " processed, " + rs + - " records/second; comparisons: " + - getComparisonCount()); - } - - public void endProcessing() { - long end = System.currentTimeMillis(); - double rs = (1000.0 * records) / (end - processing_start); - System.out.println("Run completed, " + (int) rs + " records/second"); - System.out.println("" + records + " records total in " + - ((end - processing_start) / 1000) + " seconds"); - - long total = srcread + indexing + searching + comparing + callbacks; - System.out.println("Reading from source: " + - seconds(srcread) + " (" + - percent(srcread, total) + "%)"); - System.out.println("Indexing: " + - seconds(indexing) + " (" + - percent(indexing, total) + "%)"); - System.out.println("Searching: " + - seconds(searching) + " (" + - percent(searching, total) + "%)"); - System.out.println("Comparing: " + - seconds(comparing) + " (" + - percent(comparing, total) + "%)"); - System.out.println("Callbacks: " + - seconds(callbacks) + " (" + - percent(callbacks, total) + "%)"); - System.out.println(); - Runtime r = Runtime.getRuntime(); - System.out.println("Total memory: " + r.totalMemory() + ", " + - "free memory: " + r.freeMemory() + ", " + - "used memory: " + (r.totalMemory() - r.freeMemory())); - } - - private String seconds(long ms) { - return "" + (int) (ms / 1000); - } - - private String percent(long ms, long total) { - return "" + (int) ((double) (ms * 100) / (double) total); - } - } + +package no.priv.garshol.duke; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; +import java.io.Writer; +import java.io.PrintWriter; + +import no.priv.garshol.duke.matchers.AbstractMatchListener; +import no.priv.garshol.duke.matchers.MatchListener; +import no.priv.garshol.duke.matchers.PrintMatchListener; +import no.priv.garshol.duke.recordlinkage.RecordLinkageBestStrategy; +import no.priv.garshol.duke.recordlinkage.RecordLinkageSimpleStrategy; +import no.priv.garshol.duke.utils.Utils; + +/** + * The class that implements the actual deduplication and record + * linkage logic. + */ +public class Processor { + private Configuration config; + protected Database database; + private Collection listeners; + private Logger logger; + private List proporder; + private double[] accprob; + private int threads; + private final static int DEFAULT_BATCH_SIZE = 40000; + + // performance statistics + private long comparisons; // number of records compared + private long srcread; // ms spent reading from data sources + private long indexing; // ms spent indexing records + private long searching; // ms spent searching for records + private long comparing; // ms spent comparing records + private long callbacks; // ms spent in callbacks + private Profiler profiler; + + private RecordLinkageStrategy strategy; + + /** + * Creates a new processor, overwriting the existing Lucene index. + */ + public Processor(Configuration config) { + this(config, true); + } + + /** + * Creates a new processor. + * @param overwrite If true, make new Lucene index. If false, leave + * existing data. + */ + public Processor(Configuration config, boolean overwrite) { + this(config, config.getDatabase(overwrite)); + } + + /** + * Creates a new processor, bound to the given database. + */ + public Processor(Configuration config, Database database) { + this.config = config; + this.database = database; + // using this List implementation so that listeners can be removed + // while Duke is running (see issue 117) + this.listeners = new CopyOnWriteArrayList(); + this.logger = new DummyLogger(); + this.threads = 1; + + // precomputing for later optimizations + this.proporder = new ArrayList(); + for (Property p : config.getProperties()) + if (!p.isIdProperty()) + proporder.add(p); + Collections.sort(proporder, new PropertyComparator()); + + // still precomputing + double prob = 0.5; + accprob = new double[proporder.size()]; + for (int ix = proporder.size() - 1; ix >= 0; ix--) { + prob = Utils.computeBayes(prob, proporder.get(ix).getHighProbability()); + accprob[ix] = prob; + } + } + + /** + * Sets the logger to report to. + */ + public void setLogger(Logger logger) { + this.logger = logger; + } + + /** + * @param strategy the strategy to set + */ + public void setRecordLinkageStrategy(RecordLinkageStrategy strategy) { + this.strategy = strategy; + } + + /** + * Sets the number of threads to use for processing. The default is + * 1. + */ + public void setThreads(int threads) { + this.threads = threads; + } + + /** + * Returns the number of threads. + */ + public int getThreads() { + return threads; + } + + /** + * Adds a listener to be notified of processing events. + */ + public void addMatchListener(MatchListener listener) { + listeners.add(listener); + } + + /** + * Removes a listener from being notified of the processing events. + * @since 1.1 + */ + public boolean removeMatchListener(MatchListener listener) { + if (listener != null) + return listeners.remove(listener); + return true; + } + + /** + * Returns all registered listeners. + */ + public Collection getListeners() { + return listeners; + } + + /** + * Returns the actual Lucene index being used. + */ + public Database getDatabase() { + return database; + } + + /** + * Used to turn performance profiling on and off. + * @since 1.1 + */ + public void setPerformanceProfiling(boolean profile) { + if (profile) { + if (profiler != null) + return; // we're already profiling + + this.profiler = new Profiler(); + addMatchListener(profiler); + + } else { + // turn off profiling + if (profiler == null) + return; // we're not profiling, so nothing to do + + removeMatchListener(profiler); + profiler = null; + } + } + + /** + * Returns the performance profiler, if any. + * @since 1.1 + */ + public Profiler getProfiler() { + return profiler; + } + + /** + * Reads all available records from the data sources and processes + * them in batches, notifying the listeners throughout. + */ + public void deduplicate() { + deduplicate(config.getDataSources(), DEFAULT_BATCH_SIZE); + } + + /** + * Reads all available records from the data sources and processes + * them in batches, notifying the listeners throughout. + */ + public void deduplicate(int batch_size) { + deduplicate(config.getDataSources(), batch_size); + } + + /** + * Reads all available records from the data sources and processes + * them in batches, notifying the listeners throughout. + */ + public void deduplicate(Collection sources, int batch_size) { + int count = 0; + startProcessing(); + + Iterator it = sources.iterator(); + while (it.hasNext()) { + DataSource source = it.next(); + source.setLogger(logger); + + RecordIterator it2 = source.getRecords(); + try { + Collection batch = new ArrayList(); + long start = System.currentTimeMillis(); + while (it2.hasNext()) { + Record record = it2.next(); + batch.add(record); + count++; + if (count % batch_size == 0) { + srcread += (System.currentTimeMillis() - start); + deduplicate(batch); + it2.batchProcessed(); + batch = new ArrayList(); + start = System.currentTimeMillis(); + } + } + + if (!batch.isEmpty()) { + deduplicate(batch); + it2.batchProcessed(); + } + } finally { + it2.close(); + } + } + + endProcessing(); + } + + /** + * Deduplicates a newly arrived batch of records. The records may + * have been seen before. + */ + public void deduplicate(Collection records) { + logger.info("Deduplicating batch of " + records.size() + " records"); + batchReady(records.size()); + + // prepare + long start = System.currentTimeMillis(); + for (Record record : records) + database.index(record); + + database.commit(); + indexing += System.currentTimeMillis() - start; + + // then match + match(records, this.strategy==null? new RecordLinkageSimpleStrategy(): this.strategy); + + batchDone(); + } + + private void match(Collection records, RecordLinkageStrategy strategy) { + if (threads == 1) + for (Record record : records) + match(record, strategy); + else + threadedmatch(records, strategy); + } + + private void threadedmatch(Collection records, RecordLinkageStrategy strategy) { + // split batch into n smaller batches + MatchThread[] threads = new MatchThread[this.threads]; + for (int ix = 0; ix < threads.length; ix++) + threads[ix] = new MatchThread(ix, records.size() / threads.length, strategy); + int ix = 0; + for (Record record : records) + threads[ix++ % threads.length].addRecord(record); + + // kick off threads + for (ix = 0; ix < threads.length; ix++) + threads[ix].start(); + + // wait for threads to finish + try { + for (ix = 0; ix < threads.length; ix++) + threads[ix].join(); + } catch (InterruptedException e) { + // argh + } + } + + /** + * Does record linkage across the two groups, but does not link + * records within each group. + */ + public void link() { + link(config.getDataSources(1), config.getDataSources(2), + DEFAULT_BATCH_SIZE); + } + + // FIXME: what about the general case, where there are more than 2 groups? + /** + * Does record linkage across the two groups, but does not link + * records within each group. With this method, all matches + * above threshold are passed on. + */ + public void link(Collection sources1, + Collection sources2, + int batch_size) { + link(sources1, sources2, true, batch_size); + } + + /** + * Does record linkage across the two groups, but does not link + * records within each group. + * @param matchall If true, all matching records are accepted. If false, + * only the single best match for each record is accepted. + * @param batch_size The batch size to use. + * @since 1.1 + */ + public void link(Collection sources1, + Collection sources2, + boolean matchall, + int batch_size) { + startProcessing(); + + // first, index up group 1 + index(sources1, batch_size); + + // second, traverse group 2 to look for matches with group 1 + linkRecords(sources2, matchall, batch_size); + } + + /** + * Retrieve new records from data sources, and match them to + * previously indexed records. This method does not index + * the new records. With this method, all matches above + * threshold are passed on. + * @since 0.4 + */ + public void linkRecords(Collection sources) { + linkRecords(sources, true); + } + + /** + * Retrieve new records from data sources, and match them to + * previously indexed records. This method does not index + * the new records. + * @param matchall If true, all matching records are accepted. If false, + * only the single best match for each record is accepted. + * @since 0.5 + */ + public void linkRecords(Collection sources, boolean matchall) { + linkRecords(sources, matchall, DEFAULT_BATCH_SIZE); + } + + /** + * Retrieve new records from data sources, and match them to + * previously indexed records. This method does not index + * the new records. + * @param matchall If true, all matching records are accepted. If false, + * only the single best match for each record is accepted. + * Used if strategy is not set, else default RecordLinkageStrategy is used + * @param batch_size The batch size to use. + * @since 1.0 + */ + public void linkRecords(Collection sources, boolean matchall, int batch_size) { + + RecordLinkageStrategy strategy = (this.strategy!=null? this.strategy: + matchall? new RecordLinkageSimpleStrategy(): new RecordLinkageBestStrategy()); + + for (DataSource source : sources) { + source.setLogger(logger); + + Collection batch = new ArrayList(batch_size); + RecordIterator it = source.getRecords(); + while (it.hasNext()) { + batch.add(it.next()); + if (batch.size() == batch_size) { + linkBatch(batch, strategy); + batch.clear(); + } + } + it.close(); + + if (!batch.isEmpty()) + linkBatch(batch, strategy); + } + + endProcessing(); + } + + private void linkBatch(Collection batch, RecordLinkageStrategy strategy) { + batchReady(batch.size()); + match(batch, strategy); + batchDone(); + } + + /** + * Index all new records from the given data sources. This method + * does not do any matching. + * @since 0.4 + */ + public void index(Collection sources, int batch_size) { + int count = 0; + for (DataSource source : sources) { + source.setLogger(logger); + + RecordIterator it2 = source.getRecords(); + while (it2.hasNext()) { + Record record = it2.next(); + database.index(record); + count++; + if (count % batch_size == 0) + batchReady(batch_size); + } + it2.close(); + } + if (count % batch_size == 0) + batchReady(count % batch_size); + database.commit(); + } + + /** + * Returns the number of records that have been compared. + */ + public long getComparisonCount() { + return comparisons; + } + + private void match(Record record, RecordLinkageStrategy strategy) { + long start = System.currentTimeMillis(); + Collection candidates = database.findCandidateMatches(record); + searching += System.currentTimeMillis() - start; + if (logger.isDebugEnabled()) + logger.debug("Matching record " + + PrintMatchListener.toString(record, config.getProperties()) + + " found " + candidates.size() + " candidates"); + + start = System.currentTimeMillis(); + strategy.compare(this, config, record, candidates); + comparing += System.currentTimeMillis() - start; + } + + /** + * Compares two records and returns the probability that they + * represent the same real-world entity. + */ + public double compare(Record r1, Record r2) { + comparisons++; + double prob = 0.5; + for (String propname : r1.getProperties()) { + Property prop = config.getPropertyByName(propname); + if (prop == null) + continue; // means the property is unknown + if (prop.isIdProperty() || prop.isIgnoreProperty()) + continue; + + Collection vs1 = r1.getValues(propname); + Collection vs2 = r2.getValues(propname); + if (vs1 == null || vs1.isEmpty() || vs2 == null || vs2.isEmpty()) + continue; // no values to compare, so skip + + double high = 0.0; + for (String v1 : vs1) { + if (v1.equals("")) // FIXME: these values shouldn't be here at all + continue; + + for (String v2 : vs2) { + if (v2.equals("")) // FIXME: these values shouldn't be here at all + continue; + + try { + double p = prop.compare(v1, v2); + high = Math.max(high, p); + } catch (Exception e) { + throw new DukeException("Comparison of values '" + v1 + "' and "+ + "'" + v2 + "' with " + + prop.getComparator() + " failed", e); + } + } + } + + prob = Utils.computeBayes(prob, high); + } + return prob; + } + + /** + * Commits all state to disk and frees up resources. + */ + public void close() { + database.close(); + } + + // ===== INTERNALS + + public boolean isSameAs(Record r1, Record r2) { + for (Property idp : config.getIdentityProperties()) { + Collection vs2 = r2.getValues(idp.getName()); + Collection vs1 = r1.getValues(idp.getName()); + if (vs1 == null) + continue; + for (String v1 : vs1) + if (vs2.contains(v1)) + return true; + } + return false; + } + + private void startProcessing() { + long start = System.currentTimeMillis(); + for (MatchListener listener : listeners) + listener.startProcessing(); + callbacks += (System.currentTimeMillis() - start); + } + + private void endProcessing() { + long start = System.currentTimeMillis(); + for (MatchListener listener : listeners) + listener.endProcessing(); + callbacks += (System.currentTimeMillis() - start); + } + + private void batchReady(int size) { + long start = System.currentTimeMillis(); + for (MatchListener listener : listeners) + listener.batchReady(size); + callbacks += (System.currentTimeMillis() - start); + } + + private void batchDone() { + long start = System.currentTimeMillis(); + for (MatchListener listener : listeners) + listener.batchDone(); + callbacks += (System.currentTimeMillis() - start); + } + + /** + * Records the statement that the two records match. + */ + public void registerMatch(Record r1, Record r2, double confidence) { + long start = System.currentTimeMillis(); + for (MatchListener listener : listeners) + listener.matches(r1, r2, confidence); + callbacks += (System.currentTimeMillis() - start); + } + + /** + * Records the statement that the two records may match. + */ + public void registerMatchPerhaps(Record r1, Record r2, double confidence) { + long start = System.currentTimeMillis(); + for (MatchListener listener : listeners) + listener.matchesPerhaps(r1, r2, confidence); + callbacks += (System.currentTimeMillis() - start); + } + + /** + * Notifies listeners that we found no matches for this record. + */ + public void registerNoMatchFor(Record current) { + long start = System.currentTimeMillis(); + for (MatchListener listener : listeners) + listener.noMatchFor(current); + callbacks += (System.currentTimeMillis() - start); + } + + /** + * Sorts properties so that the properties with the lowest low + * probabilities come first. + */ + static class PropertyComparator implements Comparator { + public int compare(Property p1, Property p2) { + double diff = p1.getLowProbability() - p2.getLowProbability(); + if (diff < 0) + return -1; + else if (diff > 0) + return 1; + else + return 0; + } + } + + // ===== THREADS + + /** + * The thread that actually runs parallell matching. It holds the + * thread's share of the current batch. + */ + class MatchThread extends Thread { + private Collection records; + private RecordLinkageStrategy strategy; + + public MatchThread(int threadno, int recordcount, RecordLinkageStrategy strategy) { + super("MatchThread " + threadno); + this.records = new ArrayList(recordcount); + this.strategy = strategy; + } + + public void run() { + for (Record record : records) + match(record, strategy); + } + + public void addRecord(Record record) { + records.add(record); + } + } + + // ===== PERFORMANCE PROFILING + + public class Profiler extends AbstractMatchListener { + private long processing_start; + private long batch_start; + private int batch_size; + private int records; + private PrintWriter out; + + public Profiler() { + this.out = new PrintWriter(System.out); + } + + /** + * Sets Writer to receive performance statistics. Defaults to + * System.out. + */ + public void setOutput(Writer outw) { + this.out = new PrintWriter(outw); + } + + public void startProcessing() { + processing_start = System.currentTimeMillis(); + out.println("Duke version " + Duke.getVersionString()); + out.println(getDatabase()); + out.println("Threads: " + getThreads()); + } + + public void batchReady(int size) { + batch_start = System.currentTimeMillis(); + batch_size = size; + } + + public void batchDone() { + records += batch_size; + int rs = (int) ((1000.0 * batch_size) / + (System.currentTimeMillis() - batch_start)); + out.println("" + records + " processed, " + rs + + " records/second; comparisons: " + + getComparisonCount()); + } + + public void endProcessing() { + long end = System.currentTimeMillis(); + double rs = (1000.0 * records) / (end - processing_start); + out.println("Run completed, " + (int) rs + " records/second"); + out.println("" + records + " records total in " + + ((end - processing_start) / 1000) + " seconds"); + + long total = srcread + indexing + searching + comparing + callbacks; + out.println("Reading from source: " + + seconds(srcread) + " (" + + percent(srcread, total) + "%)"); + out.println("Indexing: " + + seconds(indexing) + " (" + + percent(indexing, total) + "%)"); + out.println("Searching: " + + seconds(searching) + " (" + + percent(searching, total) + "%)"); + out.println("Comparing: " + + seconds(comparing) + " (" + + percent(comparing, total) + "%)"); + out.println("Callbacks: " + + seconds(callbacks) + " (" + + percent(callbacks, total) + "%)"); + out.println(); + Runtime r = Runtime.getRuntime(); + out.println("Total memory: " + r.totalMemory() + ", " + + "free memory: " + r.freeMemory() + ", " + + "used memory: " + (r.totalMemory() - r.freeMemory())); + } + + private String seconds(long ms) { + return "" + (int) (ms / 1000); + } + + private String percent(long ms, long total) { + return "" + (int) ((double) (ms * 100) / (double) total); + } + } } \ No newline at end of file diff --git a/src/main/java/no/priv/garshol/duke/RecordLinkageStrategy.java b/src/main/java/no/priv/garshol/duke/RecordLinkageStrategy.java new file mode 100644 index 00000000..c11a9526 --- /dev/null +++ b/src/main/java/no/priv/garshol/duke/RecordLinkageStrategy.java @@ -0,0 +1,27 @@ +package no.priv.garshol.duke; + +import java.util.Collection; + +import no.priv.garshol.duke.matchers.MatchListener; + +/** + * This interface allows the customization of the strategy for record linkage + * Advanced possibilities exist for record linkage, but + * they are not implemented yet. see the links below for more + * information. + * + * http://code.google.com/p/duke/issues/detail?id=55 + * http://research.microsoft.com/pubs/153478/msr-report-1to1.pdf + */ +public interface RecordLinkageStrategy { + + /** + * Compare record + * @param processor The processor to notify {@link MatchListener} + * @param config The {@link Configuration} + * @param record The record to match + * @param candidates The possible candidates + */ + void compare(Processor processor, Configuration config, Record record, Collection candidates); + +} diff --git a/src/main/java/no/priv/garshol/duke/recordlinkage/RecordLinkageBestStrategy.java b/src/main/java/no/priv/garshol/duke/recordlinkage/RecordLinkageBestStrategy.java new file mode 100644 index 00000000..310cb661 --- /dev/null +++ b/src/main/java/no/priv/garshol/duke/recordlinkage/RecordLinkageBestStrategy.java @@ -0,0 +1,46 @@ +package no.priv.garshol.duke.recordlinkage; + +import java.util.Collection; + +import no.priv.garshol.duke.Configuration; +import no.priv.garshol.duke.Processor; +import no.priv.garshol.duke.Record; +import no.priv.garshol.duke.RecordLinkageStrategy; + +/** + * Strategy used for record linkage, to implement a simple greedy matching + * algorithm where we choose the best alternative above the threshold for each + * record. + */ +public class RecordLinkageBestStrategy implements RecordLinkageStrategy { + + @Override + public void compare(Processor processor, Configuration config, Record record, + Collection candidates) { + + double max = 0.0; + Record best = null; + + // go through all candidates, and find the best + for (Record candidate : candidates) { + if (processor.isSameAs(record, candidate)) + continue; + + double prob = processor.compare(record, candidate); + if (prob > max) { + max = prob; + best = candidate; + } + } + + // pass on the best match, if any + if (max > config.getThreshold()) + processor.registerMatch(record, best, max); + else if (config.getMaybeThreshold() != 0.0 && max > config.getMaybeThreshold()) + processor.registerMatchPerhaps(record, best, max); + else + processor.registerNoMatchFor(record); + + } + +} diff --git a/src/main/java/no/priv/garshol/duke/recordlinkage/RecordLinkageMixStrategy.java b/src/main/java/no/priv/garshol/duke/recordlinkage/RecordLinkageMixStrategy.java new file mode 100644 index 00000000..092dc5ec --- /dev/null +++ b/src/main/java/no/priv/garshol/duke/recordlinkage/RecordLinkageMixStrategy.java @@ -0,0 +1,62 @@ +package no.priv.garshol.duke.recordlinkage; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +import no.priv.garshol.duke.Configuration; +import no.priv.garshol.duke.Processor; +import no.priv.garshol.duke.Record; +import no.priv.garshol.duke.RecordLinkageStrategy; + +/** + * Strategy used for record linkage, to implement a simple greedy matching + * algorithm where we choose the best alternative above the threshold for each + * record. + */ +public class RecordLinkageMixStrategy implements RecordLinkageStrategy { + + @Override + public void compare(Processor processor, Configuration config, Record record, Collection candidates) { + + double max = 0.0; + Record best = null; + + List maybe = new ArrayList(); + List maybeScores = new ArrayList(); + + // go through all candidates, and find the best + for (Record candidate : candidates) { + if (processor.isSameAs(record, candidate)) + continue; + + double prob = processor.compare(record, candidate); + + if (prob > config.getThreshold()) { + if (prob > max) { + max = prob; + best = candidate; + } + } else if (config.getMaybeThreshold() != 0.0 && prob > config.getMaybeThreshold()) { + maybe.add(candidate); + maybeScores.add(prob); + } + } + + // notify MatchListeners + + if (best!=null) { + processor.registerMatch(record, best, max); + } + else if (maybe.size()>0) { + for (int i = 0; i < maybe.size(); i++) { + processor.registerMatchPerhaps(record, maybe.get(i), maybeScores.get(i)); + } + } + else { + processor.registerNoMatchFor(record); + } + + } + +} diff --git a/src/main/java/no/priv/garshol/duke/recordlinkage/RecordLinkageSimpleStrategy.java b/src/main/java/no/priv/garshol/duke/recordlinkage/RecordLinkageSimpleStrategy.java new file mode 100644 index 00000000..bb9bfff2 --- /dev/null +++ b/src/main/java/no/priv/garshol/duke/recordlinkage/RecordLinkageSimpleStrategy.java @@ -0,0 +1,40 @@ +package no.priv.garshol.duke.recordlinkage; + +import java.util.Collection; + +import no.priv.garshol.duke.Configuration; +import no.priv.garshol.duke.RecordLinkageStrategy; +import no.priv.garshol.duke.Processor; +import no.priv.garshol.duke.Record; + +/** + * Simple startegy used for deduplication, where we simply + * want all matches above the thresholds. + */ +public class RecordLinkageSimpleStrategy implements RecordLinkageStrategy { + + + @Override + public void compare(Processor processor, Configuration config, Record record, Collection candidates) { + + boolean found = false; + for (Record candidate : candidates) { + if (processor.isSameAs(record, candidate)) + continue; + + double prob = processor.compare(record, candidate); + if (prob > config.getThreshold()) { + found = true; + processor.registerMatch(record, candidate, prob); + } else if (config.getMaybeThreshold() != 0.0 && prob > config.getMaybeThreshold()) { + found = true; // I guess? + processor.registerMatchPerhaps(record, candidate, prob); + } + } + if (!found) { + processor.registerNoMatchFor(record); + } + + } + +} diff --git a/src/main/java/no/priv/garshol/duke/transforms/TransformDataSource.java b/src/main/java/no/priv/garshol/duke/transforms/TransformDataSource.java new file mode 100644 index 00000000..43b12019 --- /dev/null +++ b/src/main/java/no/priv/garshol/duke/transforms/TransformDataSource.java @@ -0,0 +1,85 @@ +/* + * Duke@TransformDataSource.java + */ +package no.priv.garshol.duke.transforms; + +import java.util.ArrayList; +import java.util.List; + +import no.priv.garshol.duke.DataSource; +import no.priv.garshol.duke.Logger; +import no.priv.garshol.duke.Record; +import no.priv.garshol.duke.RecordIterator; + +/** + * A wrapper arround a DataSource that transforms it via list of operations + */ +public class TransformDataSource implements DataSource { + + /** The DataSource to transform */ + protected DataSource transformedDataSource; + + /** operations to apply on Records */ + protected List operations = new ArrayList(); + + /** + * Default constructor + * @param source The DataSource to be transformed + */ + public TransformDataSource(DataSource source) { + this.transformedDataSource = source; + } + + @Override + public RecordIterator getRecords() { + final RecordIterator srciter = transformedDataSource.getRecords(); + return new RecordIterator() { + @Override + public Record next() { + Record r = srciter.next(); + if (r!=null) { + for (TransformOperation op: operations) { + r = op.transform(r); + } + } + return r; + } + + @Override + public boolean hasNext() { + return srciter.hasNext(); + } + }; + } + + /** + * Just cascade the logger + */ + @Override + public void setLogger(Logger logger) { + transformedDataSource.setLogger(logger); + } + + /** + * Add an operation + * @param oper The TransformOperation + */ + public void addOperation(TransformOperation oper) { + operations.add(oper); + } + + /** + * @return the transformedDataSource + */ + public DataSource getTransformedDataSource() { + return transformedDataSource; + } + + /** + * @return the operations + */ + public List getOperations() { + return operations; + } + +} diff --git a/src/main/java/no/priv/garshol/duke/transforms/TransformOperation.java b/src/main/java/no/priv/garshol/duke/transforms/TransformOperation.java new file mode 100644 index 00000000..47bdad43 --- /dev/null +++ b/src/main/java/no/priv/garshol/duke/transforms/TransformOperation.java @@ -0,0 +1,20 @@ +/* + * Duke@TransformOperation.java + */ +package no.priv.garshol.duke.transforms; + +import no.priv.garshol.duke.Record; + +/** + * Operations to apply on Record to transform it + */ +public interface TransformOperation { + + /** + * Transform the record + * @param r The record to modify + * @return The transformed record + */ + Record transform(Record r); + +} diff --git a/src/main/java/no/priv/garshol/duke/transforms/TransformOperationJoin.java b/src/main/java/no/priv/garshol/duke/transforms/TransformOperationJoin.java new file mode 100644 index 00000000..628de80a --- /dev/null +++ b/src/main/java/no/priv/garshol/duke/transforms/TransformOperationJoin.java @@ -0,0 +1,64 @@ +/* + * Duke@TransformOperationJoin.java + */ +package no.priv.garshol.duke.transforms; + +import no.priv.garshol.duke.Record; +import no.priv.garshol.duke.utils.StringUtils; + +/** + * A specific TransformOperation that add to the record an additional property that is the result of a join operation between properties. + */ +public class TransformOperationJoin implements TransformOperation { + + protected String resultingProperty; + + protected String[] properties; + + protected String joiner = " "; + + /** + * @see no.priv.garshol.duke.transforms.TransformOperation#transform(no.priv.garshol.duke.Record) + */ + @Override + public Record transform(Record record) { + + StringBuilder tmp = new StringBuilder(); + boolean first = true; + for (int i = 0; i < properties.length; i++) { + String v = record.getValue(properties[i]); + if (v!=null && !v.equals("")) { + if (!first) { + tmp.append(joiner); + } + first = false; + tmp.append(v); + } + } + return new TransformedRecord(record, resultingProperty, tmp.toString()); + } + + //--------------------------------- configuration -- + + /** + * @param resultingProperty the resultingProperty to set + */ + public void setResultingProperty(String resultingProperty) { + this.resultingProperty = resultingProperty; + } + + /** + * @param properties the properties to set + */ + public void setProperties(String props) { + this.properties = StringUtils.split(props); + } + + /** + * @param joiner the joiner to set + */ + public void setJoiner(String joiner) { + this.joiner = joiner; + } + +} diff --git a/src/main/java/no/priv/garshol/duke/transforms/TransformedRecord.java b/src/main/java/no/priv/garshol/duke/transforms/TransformedRecord.java new file mode 100644 index 00000000..da180b2d --- /dev/null +++ b/src/main/java/no/priv/garshol/duke/transforms/TransformedRecord.java @@ -0,0 +1,65 @@ +/* + * Duke@TransformedRecord.java + */ +package no.priv.garshol.duke.transforms; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; + +import no.priv.garshol.duke.Record; + +/** + * A transformed record that add a virtual property on the record + */ +public class TransformedRecord implements Record { + + /** The name of the virtual property */ + protected String resultingProperty; + /** The value of the virtual property */ + protected String resultingValue; + + /** The record that is extended */ + protected Record record; + + /** The backed up list of properties */ + protected Collection props; + + public TransformedRecord(Record r, String resultingColumn, String resultingValue) { + this.record = r; + this.resultingProperty = resultingColumn; + this.resultingValue = resultingValue; + + this.props = new ArrayList(r.getProperties()); + this.props.add(resultingColumn); + } + + @Override + public Collection getProperties() { + return this.props; + } + + @Override + public Collection getValues(String prop) { + if (prop.equals(resultingProperty)) { + return Collections.singleton(resultingValue); + } else { + return record.getValues(prop); + } + } + + @Override + public String getValue(String prop) { + if (prop.equals(resultingProperty)) { + return resultingValue; + } else { + return record.getValue(prop); + } + } + + @Override + public void merge(Record other) { + record.merge(other); + } + +} diff --git a/src/test/java/no/priv/garshol/duke/test/TransformDataSourceTest.java b/src/test/java/no/priv/garshol/duke/test/TransformDataSourceTest.java new file mode 100644 index 00000000..eca5e4b2 --- /dev/null +++ b/src/test/java/no/priv/garshol/duke/test/TransformDataSourceTest.java @@ -0,0 +1,71 @@ +package no.priv.garshol.duke.test; + +import static org.junit.Assert.assertTrue; + +import java.io.StringReader; + +import no.priv.garshol.duke.ConfigLoader; +import no.priv.garshol.duke.Configuration; +import no.priv.garshol.duke.DataSource; +import no.priv.garshol.duke.Record; +import no.priv.garshol.duke.RecordIterator; +import no.priv.garshol.duke.datasources.CSVDataSource; +import no.priv.garshol.duke.datasources.Column; +import no.priv.garshol.duke.transforms.TransformDataSource; +import no.priv.garshol.duke.transforms.TransformOperation; +import no.priv.garshol.duke.transforms.TransformOperationJoin; + +import org.junit.Assert; +import org.junit.Test; + +public class TransformDataSourceTest { + + @Test + public void testTransformation() { + + CSVDataSource source = new CSVDataSource(); + source.addColumn(new Column("NAME", null, null, null)); + source.addColumn(new Column("ADDR1", null, null, null)); + source.addColumn(new Column("ADDR2", null, null, null)); + source.addColumn(new Column("CITY", null, null, null)); + + String csvdata = "NAME,ADDR1,ADDR2,CITY\nJohn Smith,10 rue des Iris,Bat.4,Paris\nMartin Dupont,7 av Jean Moulin,,Lyon"; + source.setReader(new StringReader(csvdata)); + + TransformOperationJoin join = new TransformOperationJoin(); + join.setProperties("ADDR1 ADDR2"); + join.setResultingProperty("ADDRESS"); + join.setJoiner(" "); + + TransformDataSource tds = new TransformDataSource(source); + tds.addOperation(join); + + RecordIterator iter = tds.getRecords(); + + Record john = iter.next(); + assertTrue(john.getProperties().contains("ADDRESS")); + Assert.assertEquals("10 rue des Iris Bat.4", john.getValue("ADDRESS")); + + Record martin = iter.next(); + Assert.assertEquals("7 av Jean Moulin", martin.getValue("ADDRESS")); + + Record end = iter.next(); + Assert.assertNull(end); + } + + @Test + public void testConfiguration() throws Exception { + Configuration config = ConfigLoader.load("classpath:config-transform.xml"); + + DataSource ds = (DataSource) config.getDataSources().toArray()[0]; + Assert.assertNotNull(ds); + Assert.assertTrue(ds instanceof TransformDataSource); + + TransformDataSource tds = (TransformDataSource) ds; + Assert.assertEquals(1, tds.getOperations().size()); + + TransformOperation operation = tds.getOperations().get(0); + Assert.assertTrue(operation instanceof TransformOperationJoin); + } + +} diff --git a/src/test/resources/config-transform.xml b/src/test/resources/config-transform.xml new file mode 100644 index 00000000..a78b3397 --- /dev/null +++ b/src/test/resources/config-transform.xml @@ -0,0 +1,40 @@ + + + + 0.7 + + + NAME + no.priv.garshol.duke.comparators.Levenshtein + + + ADRRESS + no.priv.garshol.duke.comparators.Levenshtein + + + CITY + no.priv.garshol.duke.comparators.Levenshtein + + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file