Skip to content

Commit

Permalink
Do not accidentally create missing segments when trying to remove dat…
Browse files Browse the repository at this point in the history
…a from them. Fixes: #296
  • Loading branch information
spanezz committed Jan 25, 2023
2 parents 98f82e6 + f21c01b commit 7c9ee57
Show file tree
Hide file tree
Showing 22 changed files with 186 additions and 95 deletions.
5 changes: 5 additions & 0 deletions arki/dataset.cc
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,11 @@ CheckerConfig::CheckerConfig(std::shared_ptr<dataset::Reporter> reporter, bool r
}


void Checker::remove(const metadata::Collection& mds)
{
throw std::runtime_error(dataset().name() + ": dataset does not support removing items");
}

void Checker::check_issue51(CheckerConfig& opts)
{
throw std::runtime_error(dataset().name() + ": check_issue51 not implemented for this dataset");
Expand Down
12 changes: 6 additions & 6 deletions arki/dataset.h
Original file line number Diff line number Diff line change
Expand Up @@ -300,12 +300,6 @@ class Writer : public dataset::Base
*/
virtual void acquire_batch(WriterBatch& batch, const AcquireConfig& cfg=AcquireConfig()) = 0;

/**
* Mark the data corresponding to the given metadata as removed from the
* database.
*/
virtual void remove(const metadata::Collection& mds) = 0;

/**
* Flush pending changes to disk
*/
Expand Down Expand Up @@ -368,6 +362,12 @@ class Checker : public dataset::Base
/// Check the dataset for errors
virtual void check(CheckerConfig& opts) = 0;

/**
* Mark the data corresponding to the given metadata as removed from the
* database.
*/
virtual void remove(const metadata::Collection& mds);

/// Remove data from the dataset that is older than `delete age`
virtual void remove_old(CheckerConfig& opts) = 0;

Expand Down
7 changes: 1 addition & 6 deletions arki/dataset/empty.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,6 @@ class Writer : public DatasetAccess<dataset::Dataset, dataset::Writer>
WriterAcquireResult acquire(Metadata& md, const AcquireConfig& cfg=AcquireConfig()) override;
void acquire_batch(WriterBatch& batch, const AcquireConfig& cfg=AcquireConfig()) override;

void remove(const metadata::Collection&) override
{
// Of course, after this method is called, the metadata cannot be found
// in the dataset
}

static void test_acquire(std::shared_ptr<Session> session, const core::cfg::Section& cfg, WriterBatch& batch);
};

Expand All @@ -76,6 +70,7 @@ struct Checker : public DatasetAccess<dataset::Dataset, dataset::Checker>

std::string type() const override { return "empty"; }

void remove(const metadata::Collection& mds) override {}
void remove_old(CheckerConfig& opts) override {}
void remove_all(CheckerConfig& opts) override {}
void tar(CheckerConfig&) override {}
Expand Down
30 changes: 30 additions & 0 deletions arki/dataset/iseg-test.cc
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
#include "tests.h"
#include "iseg/writer.h"
#include "iseg/checker.h"
#include "arki/scan.h"
#include "arki/types/source/blob.h"
#include "arki/dataset/query.h"
#include "arki/matcher/parser.h"
#include "arki/utils/sys.h"

namespace {
using namespace std;
Expand Down Expand Up @@ -126,6 +128,34 @@ add_method("acquire_replace_usn", [](Fixture& f) {
}
});

add_method("delete_missing", [](Fixture& f) {
metadata::TestCollection mdc("inbound/test.grib1");

// Import once
{
auto writer = f.makeIsegWriter();
wassert(actual(writer->acquire(mdc[0])) == dataset::ACQ_OK);
writer->flush();
}

// Remove imported segments
sys::rmtree("testds/2007");

// Try deleting
{
auto checker = f.makeIsegChecker();
metadata::Collection to_remove;
to_remove.push_back(mdc[0].clone());
checker->remove(to_remove);
}

// Ensure that this did not accidentally create a segment
wassert(actual_file("testds/2007/07-08.grib").not_exists());
wassert(actual_file("testds/2007/07-08.grib.lock").not_exists());
wassert(actual_file("testds/2007/07-08.grib.index").not_exists());
wassert(actual_file("testds/2007").not_exists());
});

}

}
Expand Down
60 changes: 60 additions & 0 deletions arki/dataset/iseg/checker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
#include "arki/dataset/reporter.h"
#include "arki/dataset/lock.h"
#include "arki/dataset/archive.h"
#include "arki/dataset/index/summarycache.h"
#include "arki/types/reftime.h"
#include "arki/types/source/blob.h"
#include "arki/metadata.h"
#include "arki/matcher.h"
Expand Down Expand Up @@ -284,6 +286,21 @@ class CheckerSegment : public segmented::CheckerSegment
return 0;
}

void remove_data(const std::vector<uint64_t>& offsets)
{
if (!segment->exists_on_disk())
return;

// Delete from DB, and get file name
Pending p_del = idx().begin_transaction();

for (const auto& offset: offsets)
idx().remove(offset);

// Commit delete from DB
p_del.commit();
}

size_t repack(unsigned test_flags=0) override
{
// Lock away writes and reads
Expand Down Expand Up @@ -613,6 +630,49 @@ void Checker::check_issue51(CheckerConfig& opts)
return segmented::Checker::check_issue51(opts);
}

void Checker::remove(const metadata::Collection& mds)
{
// Group mds by segment
std::unordered_map<std::string, std::vector<uint64_t>> by_segment;
// Take note of months to invalidate in summary cache
std::set<std::pair<int, int>> months;

// Build a todo-list of entries to delete for each segment
for (const auto& md: mds)
{
const types::source::Blob* source = md->has_source_blob();
if (!source)
throw std::runtime_error("cannot remove metadata from dataset, because it has no Blob source");

if (source->basedir != dataset().path)
throw std::runtime_error("cannot remove metadata from dataset: its basedir is " + source->basedir + " but this dataset is at " + dataset().path);

Time time = md->get<types::reftime::Position>()->get_Position();
std::string relpath = dataset().step()(time) + "." + dataset().format;

if (!Segment::is_segment(str::joinpath(dataset().path, relpath)))
continue;

by_segment[relpath].push_back(source->offset);
months.insert(std::make_pair(time.ye, time.mo));
}

for (const auto& i: by_segment)
{
segment::WriterConfig writer_config;
writer_config.drop_cached_data_on_commit = false;
writer_config.eatmydata = dataset().eatmydata;

auto seg = segment(i.first);
seg->remove_data(i.second);
arki::nag::verbose("%s: %s: %zd data marked as deleted", name().c_str(), i.first.c_str(), i.second.size());
}

index::SummaryCache scache(dataset().summary_cache_pathname);
for (const auto& i: months)
scache.invalidate(i.first, i.second);
}

size_t Checker::vacuum(dataset::Reporter& reporter)
{
return 0;
Expand Down
1 change: 1 addition & 0 deletions arki/dataset/iseg/checker.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ class Checker : public DatasetAccess<iseg::Dataset, segmented::Checker>

std::string type() const override;

void remove(const metadata::Collection& mds) override;
std::unique_ptr<segmented::CheckerSegment> segment(const std::string& relpath) override;
std::unique_ptr<segmented::CheckerSegment> segment_prelocked(const std::string& relpath, std::shared_ptr<dataset::CheckLock> lock) override;
void segments_tracked(std::function<void(segmented::CheckerSegment& segment)>) override;
Expand Down
48 changes: 0 additions & 48 deletions arki/dataset/iseg/writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -355,54 +355,6 @@ void Writer::acquire_batch(WriterBatch& batch, const AcquireConfig& cfg)
}
}

void Writer::remove(const metadata::Collection& mds)
{
// Group mds by segment
std::unordered_map<std::string, std::vector<uint64_t>> by_segment;
// Take note of months to invalidate in summary cache
std::set<std::pair<int, int>> months;

for (const auto& md: mds)
{
const types::source::Blob* source = md->has_source_blob();
if (!source)
throw std::runtime_error("cannot remove metadata from dataset, because it has no Blob source");

if (source->basedir != dataset().path)
throw std::runtime_error("cannot remove metadata from dataset: its basedir is " + source->basedir + " but this dataset is at " + dataset().path);

by_segment[get_relpath(*md)].push_back(source->offset);

core::Time time = md->get<types::reftime::Position>()->get_Position();
months.insert(std::make_pair(time.ye, time.mo));
}

for (const auto& i: by_segment)
{
segment::WriterConfig writer_config;
writer_config.drop_cached_data_on_commit = false;
writer_config.eatmydata = dataset().eatmydata;

auto segment = file(writer_config, i.first);

// TODO: refuse if md is in the archive

// Delete from DB, and get file name
Pending p_del = segment->idx.begin_transaction();

for (const auto& offset: i.second)
segment->idx.remove(offset);

// Commit delete from DB
p_del.commit();

arki::nag::verbose("%s: %s: %zd data marked as deleted", name().c_str(), i.first.c_str(), i.second.size());
}

for (const auto& i: months)
scache.invalidate(i.first, i.second);
}

void Writer::test_acquire(std::shared_ptr<Session> session, const core::cfg::Section& cfg, WriterBatch& batch)
{
std::shared_ptr<const iseg::Dataset> dataset(new iseg::Dataset(session, cfg));
Expand Down
1 change: 0 additions & 1 deletion arki/dataset/iseg/writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ class Writer : public DatasetAccess<iseg::Dataset, segmented::Writer>

WriterAcquireResult acquire(Metadata& md, const AcquireConfig& cfg=AcquireConfig()) override;
void acquire_batch(WriterBatch& batch, const AcquireConfig& cfg=AcquireConfig()) override;
void remove(const metadata::Collection& mds) override;

static void test_acquire(std::shared_ptr<Session> session, const core::cfg::Section& cfg, WriterBatch& batch);
};
Expand Down
8 changes: 4 additions & 4 deletions arki/dataset/maintenance-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -158,8 +158,8 @@ void Fixture::delete_one_in_segment()
metadata::Collection mds(*config().create_reader(), Matcher());
metadata::Collection to_delete;
to_delete.push_back(mds.get(0));
auto writer = config().create_writer();
writer->remove(to_delete);
auto checker = config().create_checker();
checker->remove(to_delete);
}

void Fixture::delete_all_in_segment()
Expand All @@ -168,8 +168,8 @@ void Fixture::delete_all_in_segment()
metadata::Collection to_delete;
to_delete.push_back(mds.get(0));
to_delete.push_back(mds.get(1));
auto writer = config().create_writer();
writer->remove(to_delete);
auto checker = config().create_checker();
checker->remove(to_delete);
}

void Fixture::reset_seqfile()
Expand Down
5 changes: 0 additions & 5 deletions arki/dataset/outbound.cc
Original file line number Diff line number Diff line change
Expand Up @@ -90,11 +90,6 @@ void Writer::acquire_batch(WriterBatch& batch, const AcquireConfig& cfg)
}
}

void Writer::remove(const metadata::Collection&)
{
throw std::runtime_error("cannot remove data from outbound dataset: dataset does not support removing items");
}

void Writer::test_acquire(std::shared_ptr<Session> session, const core::cfg::Section& cfg, WriterBatch& batch)
{
std::shared_ptr<const outbound::Dataset> config(new outbound::Dataset(session, cfg));
Expand Down
2 changes: 0 additions & 2 deletions arki/dataset/outbound.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,6 @@ class Writer : public DatasetAccess<Dataset, segmented::Writer>
WriterAcquireResult acquire(Metadata& md, const AcquireConfig& cfg=AcquireConfig()) override;
void acquire_batch(WriterBatch& batch, const AcquireConfig& cfg=AcquireConfig()) override;

void remove(const metadata::Collection& mds) override;

static void test_acquire(std::shared_ptr<Session> session, const core::cfg::Section& cfg, WriterBatch& batch);
};

Expand Down
42 changes: 35 additions & 7 deletions arki/dataset/pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,41 @@ std::shared_ptr<dataset::Writer> DispatchPool::get_duplicates()
return get_error();
}

void DispatchPool::remove(const metadata::Collection& todolist, bool simulate)
void DispatchPool::flush()
{
for (auto& i: cache)
i.second->flush();
}


/*
* CheckPool
*/

CheckPool::CheckPool(std::shared_ptr<Pool> pool)
: pool(pool)
{
}

CheckPool::~CheckPool()
{
}

std::shared_ptr<dataset::Checker> CheckPool::get(const std::string& name)
{
auto ci = cache.find(name);
if (ci == cache.end())
{
auto ds = pool->dataset(name);
auto checker(ds->create_checker());
cache.insert(make_pair(name, checker));
return checker;
} else {
return ci->second;
}
}

void CheckPool::remove(const metadata::Collection& todolist, bool simulate)
{
// Group metadata by dataset
std::unordered_map<std::string, metadata::Collection> by_dataset;
Expand Down Expand Up @@ -291,11 +325,5 @@ void DispatchPool::remove(const metadata::Collection& todolist, bool simulate)
}
}

void DispatchPool::flush()
{
for (auto& i: cache)
i.second->flush();
}

}
}
30 changes: 26 additions & 4 deletions arki/dataset/pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -126,14 +126,36 @@ class DispatchPool
std::shared_ptr<dataset::Writer> get_duplicates();

/**
* Mark the given data as deleted
* Flush all dataset data to disk
*/
void remove(const arki::metadata::Collection& todolist, bool simulate);
void flush();
};


class CheckPool
{
protected:
std::shared_ptr<Pool> pool;

// Dataset cache
std::map<std::string, std::shared_ptr<dataset::Checker>> cache;

public:
CheckPool(std::shared_ptr<Pool> pool);
~CheckPool();

/**
* Flush all dataset data to disk
* Get a dataset, either from the cache or instantiating it.
*
* If \a name does not correspond to any dataset in the configuration,
* returns 0
*/
void flush();
std::shared_ptr<dataset::Checker> get(const std::string& name);

/**
* Mark the given data as deleted
*/
void remove(const arki::metadata::Collection& todolist, bool simulate);
};

}
Expand Down
Loading

0 comments on commit 7c9ee57

Please sign in to comment.