diff --git a/arki/dataset.cc b/arki/dataset.cc index cfb89f72..a15c274d 100644 --- a/arki/dataset.cc +++ b/arki/dataset.cc @@ -137,6 +137,11 @@ CheckerConfig::CheckerConfig(std::shared_ptr reporter, bool r } +void Checker::remove(const metadata::Collection& mds) +{ + throw std::runtime_error(dataset().name() + ": dataset does not support removing items"); +} + void Checker::check_issue51(CheckerConfig& opts) { throw std::runtime_error(dataset().name() + ": check_issue51 not implemented for this dataset"); diff --git a/arki/dataset.h b/arki/dataset.h index 85108680..08d7d256 100644 --- a/arki/dataset.h +++ b/arki/dataset.h @@ -300,12 +300,6 @@ class Writer : public dataset::Base */ virtual void acquire_batch(WriterBatch& batch, const AcquireConfig& cfg=AcquireConfig()) = 0; - /** - * Mark the data corresponding to the given metadata as removed from the - * database. - */ - virtual void remove(const metadata::Collection& mds) = 0; - /** * Flush pending changes to disk */ @@ -368,6 +362,12 @@ class Checker : public dataset::Base /// Check the dataset for errors virtual void check(CheckerConfig& opts) = 0; + /** + * Mark the data corresponding to the given metadata as removed from the + * database. + */ + virtual void remove(const metadata::Collection& mds); + /// Remove data from the dataset that is older than `delete age` virtual void remove_old(CheckerConfig& opts) = 0; diff --git a/arki/dataset/empty.h b/arki/dataset/empty.h index af50f00c..9c59f4a3 100644 --- a/arki/dataset/empty.h +++ b/arki/dataset/empty.h @@ -56,12 +56,6 @@ class Writer : public DatasetAccess WriterAcquireResult acquire(Metadata& md, const AcquireConfig& cfg=AcquireConfig()) override; void acquire_batch(WriterBatch& batch, const AcquireConfig& cfg=AcquireConfig()) override; - void remove(const metadata::Collection&) override - { - // Of course, after this method is called, the metadata cannot be found - // in the dataset - } - static void test_acquire(std::shared_ptr session, const core::cfg::Section& cfg, WriterBatch& batch); }; @@ -76,6 +70,7 @@ struct Checker : public DatasetAccess std::string type() const override { return "empty"; } + void remove(const metadata::Collection& mds) override {} void remove_old(CheckerConfig& opts) override {} void remove_all(CheckerConfig& opts) override {} void tar(CheckerConfig&) override {} diff --git a/arki/dataset/iseg-test.cc b/arki/dataset/iseg-test.cc index 6c1fe5a4..67719a69 100644 --- a/arki/dataset/iseg-test.cc +++ b/arki/dataset/iseg-test.cc @@ -1,9 +1,11 @@ #include "tests.h" #include "iseg/writer.h" +#include "iseg/checker.h" #include "arki/scan.h" #include "arki/types/source/blob.h" #include "arki/dataset/query.h" #include "arki/matcher/parser.h" +#include "arki/utils/sys.h" namespace { using namespace std; @@ -126,6 +128,34 @@ add_method("acquire_replace_usn", [](Fixture& f) { } }); +add_method("delete_missing", [](Fixture& f) { + metadata::TestCollection mdc("inbound/test.grib1"); + + // Import once + { + auto writer = f.makeIsegWriter(); + wassert(actual(writer->acquire(mdc[0])) == dataset::ACQ_OK); + writer->flush(); + } + + // Remove imported segments + sys::rmtree("testds/2007"); + + // Try deleting + { + auto checker = f.makeIsegChecker(); + metadata::Collection to_remove; + to_remove.push_back(mdc[0].clone()); + checker->remove(to_remove); + } + + // Ensure that this did not accidentally create a segment + wassert(actual_file("testds/2007/07-08.grib").not_exists()); + wassert(actual_file("testds/2007/07-08.grib.lock").not_exists()); + wassert(actual_file("testds/2007/07-08.grib.index").not_exists()); + wassert(actual_file("testds/2007").not_exists()); +}); + } } diff --git a/arki/dataset/iseg/checker.cc b/arki/dataset/iseg/checker.cc index a9bdf9d8..a8dc0f6e 100644 --- a/arki/dataset/iseg/checker.cc +++ b/arki/dataset/iseg/checker.cc @@ -5,6 +5,8 @@ #include "arki/dataset/reporter.h" #include "arki/dataset/lock.h" #include "arki/dataset/archive.h" +#include "arki/dataset/index/summarycache.h" +#include "arki/types/reftime.h" #include "arki/types/source/blob.h" #include "arki/metadata.h" #include "arki/matcher.h" @@ -284,6 +286,21 @@ class CheckerSegment : public segmented::CheckerSegment return 0; } + void remove_data(const std::vector& offsets) + { + if (!segment->exists_on_disk()) + return; + + // Delete from DB, and get file name + Pending p_del = idx().begin_transaction(); + + for (const auto& offset: offsets) + idx().remove(offset); + + // Commit delete from DB + p_del.commit(); + } + size_t repack(unsigned test_flags=0) override { // Lock away writes and reads @@ -613,6 +630,49 @@ void Checker::check_issue51(CheckerConfig& opts) return segmented::Checker::check_issue51(opts); } +void Checker::remove(const metadata::Collection& mds) +{ + // Group mds by segment + std::unordered_map> by_segment; + // Take note of months to invalidate in summary cache + std::set> months; + + // Build a todo-list of entries to delete for each segment + for (const auto& md: mds) + { + const types::source::Blob* source = md->has_source_blob(); + if (!source) + throw std::runtime_error("cannot remove metadata from dataset, because it has no Blob source"); + + if (source->basedir != dataset().path) + throw std::runtime_error("cannot remove metadata from dataset: its basedir is " + source->basedir + " but this dataset is at " + dataset().path); + + Time time = md->get()->get_Position(); + std::string relpath = dataset().step()(time) + "." + dataset().format; + + if (!Segment::is_segment(str::joinpath(dataset().path, relpath))) + continue; + + by_segment[relpath].push_back(source->offset); + months.insert(std::make_pair(time.ye, time.mo)); + } + + for (const auto& i: by_segment) + { + segment::WriterConfig writer_config; + writer_config.drop_cached_data_on_commit = false; + writer_config.eatmydata = dataset().eatmydata; + + auto seg = segment(i.first); + seg->remove_data(i.second); + arki::nag::verbose("%s: %s: %zd data marked as deleted", name().c_str(), i.first.c_str(), i.second.size()); + } + + index::SummaryCache scache(dataset().summary_cache_pathname); + for (const auto& i: months) + scache.invalidate(i.first, i.second); +} + size_t Checker::vacuum(dataset::Reporter& reporter) { return 0; diff --git a/arki/dataset/iseg/checker.h b/arki/dataset/iseg/checker.h index 5aab2db8..daef5531 100644 --- a/arki/dataset/iseg/checker.h +++ b/arki/dataset/iseg/checker.h @@ -22,6 +22,7 @@ class Checker : public DatasetAccess std::string type() const override; + void remove(const metadata::Collection& mds) override; std::unique_ptr segment(const std::string& relpath) override; std::unique_ptr segment_prelocked(const std::string& relpath, std::shared_ptr lock) override; void segments_tracked(std::function) override; diff --git a/arki/dataset/iseg/writer.cc b/arki/dataset/iseg/writer.cc index 31fd4564..d11c3e4a 100644 --- a/arki/dataset/iseg/writer.cc +++ b/arki/dataset/iseg/writer.cc @@ -355,54 +355,6 @@ void Writer::acquire_batch(WriterBatch& batch, const AcquireConfig& cfg) } } -void Writer::remove(const metadata::Collection& mds) -{ - // Group mds by segment - std::unordered_map> by_segment; - // Take note of months to invalidate in summary cache - std::set> months; - - for (const auto& md: mds) - { - const types::source::Blob* source = md->has_source_blob(); - if (!source) - throw std::runtime_error("cannot remove metadata from dataset, because it has no Blob source"); - - if (source->basedir != dataset().path) - throw std::runtime_error("cannot remove metadata from dataset: its basedir is " + source->basedir + " but this dataset is at " + dataset().path); - - by_segment[get_relpath(*md)].push_back(source->offset); - - core::Time time = md->get()->get_Position(); - months.insert(std::make_pair(time.ye, time.mo)); - } - - for (const auto& i: by_segment) - { - segment::WriterConfig writer_config; - writer_config.drop_cached_data_on_commit = false; - writer_config.eatmydata = dataset().eatmydata; - - auto segment = file(writer_config, i.first); - - // TODO: refuse if md is in the archive - - // Delete from DB, and get file name - Pending p_del = segment->idx.begin_transaction(); - - for (const auto& offset: i.second) - segment->idx.remove(offset); - - // Commit delete from DB - p_del.commit(); - - arki::nag::verbose("%s: %s: %zd data marked as deleted", name().c_str(), i.first.c_str(), i.second.size()); - } - - for (const auto& i: months) - scache.invalidate(i.first, i.second); -} - void Writer::test_acquire(std::shared_ptr session, const core::cfg::Section& cfg, WriterBatch& batch) { std::shared_ptr dataset(new iseg::Dataset(session, cfg)); diff --git a/arki/dataset/iseg/writer.h b/arki/dataset/iseg/writer.h index f8679da2..d646f9c0 100644 --- a/arki/dataset/iseg/writer.h +++ b/arki/dataset/iseg/writer.h @@ -35,7 +35,6 @@ class Writer : public DatasetAccess WriterAcquireResult acquire(Metadata& md, const AcquireConfig& cfg=AcquireConfig()) override; void acquire_batch(WriterBatch& batch, const AcquireConfig& cfg=AcquireConfig()) override; - void remove(const metadata::Collection& mds) override; static void test_acquire(std::shared_ptr session, const core::cfg::Section& cfg, WriterBatch& batch); }; diff --git a/arki/dataset/maintenance-test.cc b/arki/dataset/maintenance-test.cc index 1a22780f..28b5379f 100644 --- a/arki/dataset/maintenance-test.cc +++ b/arki/dataset/maintenance-test.cc @@ -158,8 +158,8 @@ void Fixture::delete_one_in_segment() metadata::Collection mds(*config().create_reader(), Matcher()); metadata::Collection to_delete; to_delete.push_back(mds.get(0)); - auto writer = config().create_writer(); - writer->remove(to_delete); + auto checker = config().create_checker(); + checker->remove(to_delete); } void Fixture::delete_all_in_segment() @@ -168,8 +168,8 @@ void Fixture::delete_all_in_segment() metadata::Collection to_delete; to_delete.push_back(mds.get(0)); to_delete.push_back(mds.get(1)); - auto writer = config().create_writer(); - writer->remove(to_delete); + auto checker = config().create_checker(); + checker->remove(to_delete); } void Fixture::reset_seqfile() diff --git a/arki/dataset/outbound.cc b/arki/dataset/outbound.cc index df0d4223..52ba967e 100644 --- a/arki/dataset/outbound.cc +++ b/arki/dataset/outbound.cc @@ -90,11 +90,6 @@ void Writer::acquire_batch(WriterBatch& batch, const AcquireConfig& cfg) } } -void Writer::remove(const metadata::Collection&) -{ - throw std::runtime_error("cannot remove data from outbound dataset: dataset does not support removing items"); -} - void Writer::test_acquire(std::shared_ptr session, const core::cfg::Section& cfg, WriterBatch& batch) { std::shared_ptr config(new outbound::Dataset(session, cfg)); diff --git a/arki/dataset/outbound.h b/arki/dataset/outbound.h index 9778187b..b60df6ea 100644 --- a/arki/dataset/outbound.h +++ b/arki/dataset/outbound.h @@ -48,8 +48,6 @@ class Writer : public DatasetAccess WriterAcquireResult acquire(Metadata& md, const AcquireConfig& cfg=AcquireConfig()) override; void acquire_batch(WriterBatch& batch, const AcquireConfig& cfg=AcquireConfig()) override; - void remove(const metadata::Collection& mds) override; - static void test_acquire(std::shared_ptr session, const core::cfg::Section& cfg, WriterBatch& batch); }; diff --git a/arki/dataset/pool.cc b/arki/dataset/pool.cc index cb881706..136258e2 100644 --- a/arki/dataset/pool.cc +++ b/arki/dataset/pool.cc @@ -240,7 +240,41 @@ std::shared_ptr DispatchPool::get_duplicates() return get_error(); } -void DispatchPool::remove(const metadata::Collection& todolist, bool simulate) +void DispatchPool::flush() +{ + for (auto& i: cache) + i.second->flush(); +} + + +/* + * CheckPool + */ + +CheckPool::CheckPool(std::shared_ptr pool) + : pool(pool) +{ +} + +CheckPool::~CheckPool() +{ +} + +std::shared_ptr CheckPool::get(const std::string& name) +{ + auto ci = cache.find(name); + if (ci == cache.end()) + { + auto ds = pool->dataset(name); + auto checker(ds->create_checker()); + cache.insert(make_pair(name, checker)); + return checker; + } else { + return ci->second; + } +} + +void CheckPool::remove(const metadata::Collection& todolist, bool simulate) { // Group metadata by dataset std::unordered_map by_dataset; @@ -291,11 +325,5 @@ void DispatchPool::remove(const metadata::Collection& todolist, bool simulate) } } -void DispatchPool::flush() -{ - for (auto& i: cache) - i.second->flush(); -} - } } diff --git a/arki/dataset/pool.h b/arki/dataset/pool.h index f4253086..2c8060e5 100644 --- a/arki/dataset/pool.h +++ b/arki/dataset/pool.h @@ -126,14 +126,36 @@ class DispatchPool std::shared_ptr get_duplicates(); /** - * Mark the given data as deleted + * Flush all dataset data to disk */ - void remove(const arki::metadata::Collection& todolist, bool simulate); + void flush(); +}; + + +class CheckPool +{ +protected: + std::shared_ptr pool; + + // Dataset cache + std::map> cache; + +public: + CheckPool(std::shared_ptr pool); + ~CheckPool(); /** - * Flush all dataset data to disk + * Get a dataset, either from the cache or instantiating it. + * + * If \a name does not correspond to any dataset in the configuration, + * returns 0 */ - void flush(); + std::shared_ptr get(const std::string& name); + + /** + * Mark the given data as deleted + */ + void remove(const arki::metadata::Collection& todolist, bool simulate); }; } diff --git a/arki/dataset/segmented.cc b/arki/dataset/segmented.cc index b3c426c8..c79405f8 100644 --- a/arki/dataset/segmented.cc +++ b/arki/dataset/segmented.cc @@ -228,6 +228,11 @@ void CheckerSegment::unarchive() index(move(mdc)); } +void CheckerSegment::remove_data(const std::vector& offsets) +{ + throw std::runtime_error(dataset().name() + ": dataset segment does not support removing items"); +} + Checker::~Checker() { diff --git a/arki/dataset/segmented.h b/arki/dataset/segmented.h index a59c5406..0a3a81d3 100644 --- a/arki/dataset/segmented.h +++ b/arki/dataset/segmented.h @@ -151,6 +151,14 @@ class CheckerSegment virtual SegmentState scan(dataset::Reporter& reporter, bool quick=true) = 0; + /** + * Remove entries from this segment, indicated by their stating offsets. + * + * Return the total size of data deleted. The space may not be freed right + * away, and may need to be reclaimed by a repack operation + */ + virtual void remove_data(const std::vector& offsets); + /** * Optimise the contents of a data file * diff --git a/arki/dataset/simple/writer.cc b/arki/dataset/simple/writer.cc index 60033483..134bbf6a 100644 --- a/arki/dataset/simple/writer.cc +++ b/arki/dataset/simple/writer.cc @@ -192,12 +192,6 @@ void Writer::acquire_batch(WriterBatch& batch, const AcquireConfig& cfg) } } -void Writer::remove(const metadata::Collection&) -{ - // Nothing to do - throw std::runtime_error("cannot remove data from simple dataset: dataset does not support removing items"); -} - void Writer::test_acquire(std::shared_ptr session, const core::cfg::Section& cfg, WriterBatch& batch) { std::shared_ptr dataset(new simple::Dataset(session, cfg)); diff --git a/arki/dataset/simple/writer.h b/arki/dataset/simple/writer.h index 69da2775..72276c0f 100644 --- a/arki/dataset/simple/writer.h +++ b/arki/dataset/simple/writer.h @@ -30,7 +30,6 @@ class Writer : public DatasetAccess WriterAcquireResult acquire(Metadata& md, const AcquireConfig& cfg=AcquireConfig()) override; void acquire_batch(WriterBatch& batch, const AcquireConfig& cfg=AcquireConfig()) override; - void remove(const metadata::Collection&) override; static void test_acquire(std::shared_ptr session, const core::cfg::Section& cfg, WriterBatch& batch); }; diff --git a/arki/dispatcher-test.cc b/arki/dispatcher-test.cc index 15fe8991..b18c0094 100644 --- a/arki/dispatcher-test.cc +++ b/arki/dispatcher-test.cc @@ -75,7 +75,7 @@ add_method("simple", [] { auto session = std::make_shared(); auto pool = std::make_shared(session); auto cfg = setup1("grib"); - for (const auto i: *cfg) + for (const auto& i: *cfg) pool->add_dataset(*i.second); plain_data_read_count.reset(); diff --git a/arki/segment.cc b/arki/segment.cc index e0bdd2b8..b2ec3e48 100644 --- a/arki/segment.cc +++ b/arki/segment.cc @@ -39,7 +39,7 @@ std::string Segment::basename(const std::string& pathname) return pathname; } -bool Segment::is_segment(std::string& abspath) +bool Segment::is_segment(const std::string& abspath) { std::unique_ptr st = sys::stat(abspath); if (!st.get()) diff --git a/arki/segment.h b/arki/segment.h index 278e59cc..45ee0b11 100644 --- a/arki/segment.h +++ b/arki/segment.h @@ -146,7 +146,7 @@ class Segment static std::string basename(const std::string& pathname); /// Check if the given file or directory is a segment - static bool is_segment(std::string& abspath); + static bool is_segment(const std::string& abspath); /// Instantiate the right Reader implementation for a segment that already exists static std::shared_ptr detect_reader(const std::string& format, const std::string& root, const std::string& relpath, const std::string& abspath, std::shared_ptr lock); diff --git a/python/arki-check.cc b/python/arki-check.cc index 29e5025e..c8288a0d 100644 --- a/python/arki-check.cc +++ b/python/arki-check.cc @@ -67,7 +67,7 @@ struct remove : public MethKwargs { ReleaseGIL rg; - arki::dataset::DispatchPool pool(self->pool); + arki::dataset::CheckPool pool(self->pool); // Read all metadata from the file specified in --remove arki::metadata::Collection todolist; diff --git a/python/arkimet.cc b/python/arkimet.cc index 1231f05e..dfb2063f 100644 --- a/python/arkimet.cc +++ b/python/arkimet.cc @@ -115,7 +115,7 @@ struct make_qmacro_dataset : public MethKwargs auto session = std::make_shared(); auto pool = std::make_shared(session); auto cfg = sections_from_python(arg_datasets); - for (const auto si: *cfg) + for (const auto& si: *cfg) pool->add_dataset(*si.second); auto ds = pool->querymacro(name, query);