An attempt towards a followthemoney query dsl.
This library provides methods to query and filter entities formatted as followthemoney data, either from a json file/stream or using a SQL backend via followthemoney-store
It also provides a Query
class that can be used in other libs to work with
SQL queries or api queries.
Minimum Python version: 3.11
pip install ftmq
ftmq
accepts either a line-based input stream or an argument with a file uri.
(For integration with followthemoney-store
, see below)
Input stream:
cat entities.ftm.json | ftmq <filter expression> > output.ftm.json
URI argument:
Under the hood, ftmq
uses
smart_open to be able to
interpret arbitrary file uris as argument -i
:
ftmq <filter expression> -i ~/Data/entities.ftm.json
ftmq <filter expression> -i https://example.org/data.json.gz
ftmq <filter expression> -i s3://data-bucket/entities.ftm.json
ftmq <filter expression> -i webhdfs://host:port/path/file
Of course, the same is possible for output -o
:
cat data.json | ftmq <filter expression> -o s3://data-bucket/output.json
cat entities.ftm.json | ftmq -d ec_meetings
cat entities.ftm.json | ftmq -s Person
Filter for a schema and all it's descendants or ancestors:
cat entities.ftm.json | ftmq -s LegalEntity --schema-include-descendants
cat entities.ftm.json | ftmq -s LegalEntity --schema-include-ancestors
Properties are options via --<prop>=<value>
cat entities.ftm.json | ftmq -s Company --country=de
cat entities.ftm.json | ftmq -s Company --incorporationDate__gte=2020 --address__ilike=berlin
Possible lookups:
gt
- greater thanlt
- lower thangte
- greater or equallte
- lower or equallike
- SQLishLIKE
(use%
placeholders)ilike
- SQLishILIKE
, case-insensitive (use%
placeholders)[]
- usage:prop[]=foo
evaluates iffoo
is member of arrayprop
"Uplevel" an entity input stream to nomenklatura.entity.CompositeEntity
and
optionally apply a dataset.
ftmq apply -i ./entities.ftm.json -d <aditional_dataset>
Overwrite datasets:
ftmq apply -i ./entities.ftm.json -d <aditional_dataset> --replace-dataset
Often in ftm scripting, we are iterating through all the proxies (e.g. during aggregation). Why not use this to collect statistics on the way? There is a context manager for this, which turns into the Coverage
model:
Print coverage to stdout (and filtered entities to nowhere):
cat entities.ftm.json | ftmq -s Event -o /dev/null --coverage-uri -
Within code:
from ftmq.coverage import Collector
fragments = [...]
buffer = {}
c = Collector()
for proxy in fragments:
if proxy.id in buffer:
buffer[proxy.id].merge(proxy)
else:
buffer[proxy.id] = proxy
# here collect stats:
c.collect(proxy)
coverage = c.export()
NOT IMPLEMENTED YET
The same cli logic applies:
ftmq store iterate -d ec_meetings -s Event --date__gte=2019 --date__lte=2020
from ftmq import Query
q = Query() \
.where(dataset="ec_meetings", date__lte=2020) \
.where(schema="Event") \
.order_by("date", ascending=False)
assert q.apply(proxy)
This project is part of investigraph
Media Tech Lab Bayern batch #3