Skip to content

Commit

Permalink
Merge pull request #320 from ARPA-SIMC/issue317
Browse files Browse the repository at this point in the history
Improved dealing with missing values in GRIB2 metadata. refs: #317
  • Loading branch information
brancomat authored Dec 4, 2023
2 parents 12edfa5 + 44b0de3 commit cbd6fda
Show file tree
Hide file tree
Showing 7 changed files with 289 additions and 4 deletions.
1 change: 1 addition & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
* JPEG scanner doesn't raise an exception if datetime tag is missing
* Do not install vm2/source.lua (#310)
* Documented `match-alias.conf` syntax (#318)
* Improved dealing with missing values in GRIB2 metadata (#317)

# New in version 1.47

Expand Down
84 changes: 83 additions & 1 deletion python/arkimet/cmdline/maint.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import os
import sys
from collections import defaultdict
from typing import Any, Dict, List
from typing import Any, Dict, List, Tuple

import arkimet
from arkimet.cmdline.base import ArkimetCommand, Exit, Fail
Expand Down Expand Up @@ -45,6 +45,83 @@ def run(self):
arki_check.unarchive(pathname=segment)


class ScanTest(ArkimetCommand):
"""
Run a query on a dataset, rescan the resulting data, and show metadata changes
"""
@classmethod
def make_subparser(cls, subparsers: "argparse._SubParsersAction[Any]") -> argparse.ArgumentParser:
parser = super().make_subparser(subparsers)
parser.add_argument("query", metavar="query", action="store",
help="query to run")
parser.add_argument("dataset", metavar="path", nargs="+", action="store",
help="dataset(s) to query")
return parser

TypesDict = Dict[str, List[Dict[str, Any]]]

def index(self, md: arkimet.Metadata) -> Tuple[Dict[str, Any], TypesDict]:
source = None
res = defaultdict(list)
for item in md.to_python()["items"]:
if item["type"] == "source":
source = item
else:
res[item["type"]].append(item)

return source, res

def compare(self, orig: TypesDict, new: arkimet.Metadata) -> Tuple[TypesDict, TypesDict]:
added = defaultdict(list)
for item in new.to_python()["items"]:
if item["type"] == "source":
continue

try:
orig[item["type"]].remove(item)
except ValueError:
added[item["type"]].append(item)

# Remove empty lists from orig
# FIXME: things can be implemented so that this step is unnecessary,
# but at the moment this code tries to focus on clarity rather
# than speed
for name, vals in list(orig.items()):
if not vals:
del orig[name]

return added, orig

def run(self):
super().run()
ok = 0
failed = 0
with arkimet.dataset.Session() as session:
query = session.matcher(self.args.query)
for path in self.args.dataset:
cfg = arkimet.dataset.read_config(path)
with session.dataset_reader(cfg=cfg) as ds:
for orig in ds.query_data(query, with_data=True):
source, orig_data = self.index(orig)
scanner = arkimet.scan.Scanner.get_scanner(source["format"])
new = scanner.scan_data(orig.data)

added, removed = self.compare(orig_data, new)

if added or removed:
print("Mismatch in", source)
for name, values in added.items():
for value in values:
print(" added:", name, value)
for name, values in removed.items():
for value in values:
print(" removed:", name, value)
failed += 1
else:
ok += 1
print(f"{ok} elements matched, {failed} elements had changes")


class Maint(ArkimetCommand):
"""
Perform maintenance operations on arkimet datasets
Expand All @@ -56,13 +133,18 @@ def make_parser(cls) -> argparse.ArgumentParser:
parser = super().make_parser()
subparsers = parser.add_subparsers(help="sub-command help", dest="command")
Unarchive.make_subparser(subparsers)
ScanTest.make_subparser(subparsers)
return parser

@classmethod
def main(cls, args=None):
parser = cls.make_parser()
args = parser.parse_args(args=args)

if args.command is None:
parser.print_help()
return

try:
with args.command(parser, args) as cmd:
return cmd.run()
Expand Down
5 changes: 4 additions & 1 deletion python/arkimet/scan/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import _arkimet
from . import grib
from . import bufr
from . import vm2
from . import odimh5

__all__ = ("grib", "bufr", "vm2", "odimh5")
Scanner = _arkimet.scan.Scanner

__all__ = ("Scanner", "grib", "bufr", "vm2", "odimh5")
132 changes: 131 additions & 1 deletion python/scan.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ extern "C" {
PyTypeObject* arkipy_scan_Grib_Type = nullptr;
PyTypeObject* arkipy_scan_BufrMessage_Type = nullptr;

PyTypeObject* arkipy_scan_Scanner_Type = nullptr;

}

namespace {
Expand Down Expand Up @@ -215,6 +217,8 @@ struct get_long : public MethKwargs<get_long, arkipy_scan_Grib>
int res = grib_get_long(self->gh, key, &val);
if (res == GRIB_NOT_FOUND)
Py_RETURN_NONE;
if (val == GRIB_MISSING_LONG)
Py_RETURN_NONE;

check_grib_error(res, "cannot read long value from grib");

Expand Down Expand Up @@ -291,11 +295,15 @@ Access grib message contents
case GRIB_TYPE_LONG: {
long val;
check_grib_lookup_error(grib_get_long(self->gh, key.c_str(), &val), key.c_str(), "cannot read reading long value");
if (val == GRIB_MISSING_LONG)
Py_RETURN_NONE;
return to_python(val);
}
case GRIB_TYPE_DOUBLE: {
double val;
check_grib_lookup_error(grib_get_double(self->gh, key.c_str(), &val), key.c_str(), "cannot read double value");
if (val == GRIB_MISSING_DOUBLE)
Py_RETURN_NONE;
return to_python(val);
}
case GRIB_TYPE_STRING: {
Expand Down Expand Up @@ -624,6 +632,8 @@ struct vm2_get_variable : public MethKwargs<vm2_get_variable, PyObject>
}
};



Methods<> scan_methods;
Methods<> scanners_methods;
Methods<> grib_methods;
Expand All @@ -633,6 +643,111 @@ Methods<> odimh5_methods;
Methods<> nc_methods;
Methods<> jpeg_methods;


struct get_scanner : public ClassMethKwargs<get_scanner>
{
constexpr static const char* name = "get_scanner";
constexpr static const char* signature = "format: str";
constexpr static const char* returns = "arkimet.scan.Scanner";
constexpr static const char* summary = "Return a Scanner for the given data format";

static PyObject* run(PyTypeObject* cls, PyObject* args, PyObject* kw)
{
static const char* kwlist[] = { "format", nullptr };
const char* py_format = nullptr;
Py_ssize_t py_format_len;
if (!PyArg_ParseTupleAndKeywords(args, kw, "z#",
const_cast<char**>(kwlist), &py_format, &py_format_len))
return nullptr;

try {
auto scanner = arki::scan::Scanner::get_scanner(std::string(py_format, py_format_len));
return (PyObject*)arki::python::scan::scanner_create(scanner);
} ARKI_CATCH_RETURN_PYO
}
};

struct scan_data : public MethKwargs<scan_data, arkipy_scan_Scanner>
{
constexpr static const char* name = "scan_data";
constexpr static const char* signature = "data: bytes";
constexpr static const char* returns = "arkimet.Metadata";
constexpr static const char* summary = "Scan a memory buffer";
constexpr static const char* doc = R"(
Returns a Metadata with inline source.
)";
static PyObject* run(Impl* self, PyObject* args, PyObject* kw)
{
static const char* kwlist[] = { "data", nullptr };
PyObject* arg_data = nullptr;

if (!PyArg_ParseTupleAndKeywords(args, kw, "O", (char**)kwlist, &arg_data))
return nullptr;

try {
char* buffer;
Py_ssize_t length;
if (PyBytes_Check(arg_data))
{
if (PyBytes_AsStringAndSize(arg_data, &buffer, &length) == -1)
throw PythonException();
} else {
PyErr_Format(PyExc_TypeError, "data has type %R instead of bytes", arg_data);
return nullptr;
}

// FIXME: memory copy, seems unavoidable at the moment
std::vector<uint8_t> data(buffer, buffer+length);
auto md = self->scanner->scan_data(data);
return (PyObject*)metadata_create(md);
} ARKI_CATCH_RETURN_PYO
}
};

struct ScannerDef : public Type<ScannerDef, arkipy_scan_Scanner>
{
constexpr static const char* name = "Scanner";
constexpr static const char* qual_name = "arkimet.scan.Scanner";
constexpr static const char* doc = R"(
Scanner for binary data.
)";
GetSetters<> getsetters;
Methods<get_scanner, scan_data> methods;

static void _dealloc(Impl* self)
{
self->scanner.~shared_ptr();
Py_TYPE(self)->tp_free(self);
}

static PyObject* _str(Impl* self)
{
std::string str = "scanner:" + self->scanner->name();
return PyUnicode_FromStringAndSize(str.data(), str.size());
}

static PyObject* _repr(Impl* self)
{
std::string str = "scanner:" + self->scanner->name();
return PyUnicode_FromStringAndSize(str.data(), str.size());
}

// static int _init(Impl* self, PyObject* args, PyObject* kw)
// {
// static const char* kwlist[] = { nullptr };
// if (!PyArg_ParseTupleAndKeywords(args, kw, "", const_cast<char**>(kwlist)))
// return -1;

// try {
// new(&(self->scanner)) std::shared_ptr<arki::scan::Scanner>(make_shared<arki:scan::Scanner>());
// } ARKI_CATCH_RETURN_INT

// return 0;
// }
};

ScannerDef* scanner_def = nullptr;

}

extern "C" {
Expand Down Expand Up @@ -738,13 +853,25 @@ static PyModuleDef vm2_module = {
namespace arki {
namespace python {

namespace scan {

arkipy_scan_Scanner* scanner_create(std::shared_ptr<arki::scan::Scanner> scanner)
{
arkipy_scan_Scanner* result = PyObject_New(arkipy_scan_Scanner, arkipy_scan_Scanner_Type);
if (!result) throw PythonException();
new (&(result->scanner)) std::shared_ptr<arki::scan::Scanner>(scanner);
return result;
}

}


void register_scan(PyObject* m)
{
#ifdef HAVE_DBALLE
wreport_api.import();
dballe_api.import();
#endif

pyo_unique_ptr grib = throw_ifnull(PyModule_Create(&grib_module));
grib_def = new GribDef;
grib_def->define(arkipy_scan_Grib_Type, grib);
Expand All @@ -760,6 +887,9 @@ void register_scan(PyObject* m)
module_arkimet = m;
module_scanners = scanners;

scanner_def = new ScannerDef;
scanner_def->define(arkipy_scan_Scanner_Type, scan);

if (PyModule_AddObject(scan, "grib", grib.release()) == -1)
throw PythonException();

Expand Down
14 changes: 14 additions & 0 deletions python/scan.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,19 @@ extern PyTypeObject* arkipy_scan_Grib_Type;
(Py_TYPE(ob) == arkipy_scan_Grib_Type || \
PyType_IsSubtype(Py_TYPE(ob), arkipy_scan_Grib_Type))


typedef struct {
PyObject_HEAD
std::shared_ptr<arki::scan::Scanner> scanner;
} arkipy_scan_Scanner;

extern PyTypeObject* arkipy_scan_Scanner_Type;

#define arkipy_scan_Scanner_Check(ob) \
(Py_TYPE(ob) == arkipy_scan_Scanner_Type || \
PyType_IsSubtype(Py_TYPE(ob), arkipy_scan_Scanner_Type))


}

namespace arki {
Expand All @@ -34,6 +47,7 @@ void register_scan(PyObject* m);

namespace scan {
void init();
arkipy_scan_Scanner* scanner_create(std::shared_ptr<arki::scan::Scanner> scanner);
}

}
Expand Down
Loading

0 comments on commit cbd6fda

Please sign in to comment.