Skip to content
This repository has been archived by the owner on Jan 3, 2024. It is now read-only.

Make types.h Object Ownership Story More Explicit #247

Draft
wants to merge 6 commits into
base: s3gw
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 1 addition & 6 deletions src/rgw/driver/sfs/bucket.cc
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,6 @@ void SFSBucket::Meta::decode_json(JSONObj* obj) {
JSONDecoder::decode_json("info", info, obj);
}

std::unique_ptr<Object> SFSBucket::_get_object(sfs::ObjectRef obj) {
rgw_obj_key key(obj->name, obj->instance);
return make_unique<SFSObject>(this->store, key, this, bucket);
}

std::unique_ptr<Object> SFSBucket::get_object(const rgw_obj_key& key) {
ldout(store->ceph_context(), SFS_LOG_DEBUG)
<< "bucket::" << __func__ << ": key : " << key << dendl;
Expand All @@ -88,7 +83,7 @@ std::unique_ptr<Object> SFSBucket::get_object(const rgw_obj_key& key) {
// specific version" operation.
// Return the object with the same key as it was requested.
objref->instance = key.instance;
return _get_object(objref);
return make_unique<SFSObject>(this->store, key, this, bucket);
} catch (const sfs::UnknownObjectException& _) {
ldout(store->ceph_context(), SFS_LOG_VERBOSE)
<< "unable to find key " << key << " in bucket " << bucket->get_name()
Expand Down
2 changes: 0 additions & 2 deletions src/rgw/driver/sfs/bucket.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,6 @@ class SFSBucket : public StoreBucket {

void write_meta(const DoutPrefixProvider* dpp);

std::unique_ptr<Object> _get_object(sfs::ObjectRef obj);

/// Verify params passed to list()
int verify_list_params(
const DoutPrefixProvider* dpp, const ListParams& params, int max
Expand Down
12 changes: 3 additions & 9 deletions src/rgw/driver/sfs/multipart.cc
Original file line number Diff line number Diff line change
Expand Up @@ -57,18 +57,12 @@ SFSMultipartUploadV2::SFSMultipartUploadV2(

std::unique_ptr<rgw::sal::Object> SFSMultipartUploadV2::get_meta_obj() {
rgw_obj_key key(meta_str, string(), RGW_OBJ_NS_MULTIPART);
auto mmo =
std::make_unique<SFSMultipartMetaObject>(store, key, bucket, bucketref);

sfs::sqlite::SQLiteMultipart mpdb(store->db_conn);
auto mp = mpdb.get_multipart(upload_id);
ceph_assert(mp.has_value());
mmo->set_attrs(mp->attrs);
// TODO(jecluis): this needs to be fixed once we get rid of the objref
mmo->set_object_ref(
std::shared_ptr<sfs::Object>(sfs::Object::create_from_obj_key(key))
return std::make_unique<SFSMultipartMetaObject>(
store, key, bucket, bucketref, mp->attrs
);
return mmo;
}

int SFSMultipartUploadV2::init(
Expand Down Expand Up @@ -431,7 +425,7 @@ int SFSMultipartUploadV2::complete(
// new object, or a new version, and move the file to its location as if we
// were writing directly to it.

ObjectRef objref;
std::unique_ptr<Object> objref;
try {
objref = bucketref->create_version(target_obj->get_key());
} catch (const std::system_error& e) {
Expand Down
54 changes: 48 additions & 6 deletions src/rgw/driver/sfs/multipart.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,54 @@ class SFStore;
* bits that are relevant for the SAL layer's expected path.
*
* For reference, check 'rgw_op.cc', RGWCompleteMultipart::execute().
*
* Note that during MP uploads, before completing them, the object is
* actually a promise of a future object. To make this promise
* accessible to certain queries (e.g get_attr) initialize the
* underlying objref from MP data rather than object / versioned
* object data.
*/
struct SFSMultipartMetaObject : public rgw::sal::SFSObject {
SFSMultipartMetaObject(SFSMultipartMetaObject&) = default;
SFSMultipartMetaObject(SFSMultipartMetaObject&) = delete;
SFSMultipartMetaObject(
rgw::sal::SFStore* _st, const rgw_obj_key& _k, rgw::sal::Bucket* _b,
BucketRef _bucket
BucketRef _bucket, const rgw::sal::Attrs& attrs
)
: rgw::sal::SFSObject(_st, _k, _b, _bucket, false) {}
: rgw::sal::SFSObject(_st, _k, _b, _bucket, false) {
// Note: objref points to a object that does not actually exists.
objref.reset(sfs::Object::create_from_obj_key(_k));
objref->update_attrs(attrs);
}

struct SFSMetaObjReadOp : public ReadOp {
private:
const sfs::Object& obj;

public:
SFSMetaObjReadOp() = delete;
SFSMetaObjReadOp(const sfs::Object& _obj) : obj(_obj) {}
virtual int prepare(optional_yield, const DoutPrefixProvider*) override {
return 0;
}
virtual int
read(int64_t, int64_t, bufferlist&, optional_yield, const DoutPrefixProvider*)
override {
return -ENOTSUP;
}
virtual int iterate(
const DoutPrefixProvider*, int64_t, int64_t, RGWGetDataCB*,
optional_yield
) override {
return -ENOTSUP;
}
virtual int get_attr(
const DoutPrefixProvider*, const char* name, bufferlist& dest,
optional_yield
) override {
return obj.get_attr(name, dest);
}
const std::string get_cls_name() { return "mp_meta_obj_read"; }
};

struct SFSMetaObjDeleteOp : public DeleteOp {
SFSMetaObjDeleteOp() = default;
Expand All @@ -55,9 +95,7 @@ struct SFSMultipartMetaObject : public rgw::sal::SFSObject {
const std::string get_cls_name() { return "mp_meta_obj_delete"; }
};

virtual std::unique_ptr<Object> clone() override {
return std::unique_ptr<Object>(new SFSMultipartMetaObject{*this});
}
virtual std::unique_ptr<Object> clone() override { return nullptr; }
SFSMultipartMetaObject& operator=(const SFSMultipartMetaObject&) = delete;

virtual std::unique_ptr<DeleteOp> get_delete_op() override {
Expand All @@ -70,6 +108,10 @@ struct SFSMultipartMetaObject : public rgw::sal::SFSObject {
) override {
return 0;
}

virtual std::unique_ptr<ReadOp> get_read_op() override {
return std::make_unique<SFSMetaObjReadOp>(*objref);
}
};

class SFSMultipartPartV2 : public StoreMultipartPart {
Expand Down
72 changes: 32 additions & 40 deletions src/rgw/driver/sfs/object.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,19 +29,8 @@ using namespace std;

namespace rgw::sal {

SFSObject::SFSReadOp::SFSReadOp(SFSObject* _source) : source(_source) {
/*
This initialization code was originally into prepare() but that
was not sufficient to cover all cases.
There are pieces of SAL code that are calling get_*() methods
but they don't call prepare().
In those cases the SFSReadOp is not properly initialized and those
calls are going to fail.
*/
// read op needs to retrieve also the version_id from the db
source->refresh_meta(true);
objref = source->get_object_ref();
}
SFSObject::SFSReadOp::SFSReadOp(const SFSObject& _source)
: source(_source), objref(_source.get_object_ref()) {}

// Handle conditional GET params. If-Match, If-None-Match,
// If-Modified-Since, If-UnModified-Since. Return 0 if we are neutral.
Expand All @@ -52,8 +41,8 @@ int SFSObject::SFSReadOp::handle_conditionals(const DoutPrefixProvider* dpp
!params.unmod_ptr) {
return 0;
}
const std::string etag = objref->get_meta().etag;
const auto mtime = objref->get_meta().mtime;
const std::string etag = objref.get_meta().etag;
const auto mtime = objref.get_meta().mtime;
int result = 0;

if (params.if_match) {
Expand Down Expand Up @@ -114,13 +103,13 @@ int SFSObject::SFSReadOp::handle_conditionals(const DoutPrefixProvider* dpp
int SFSObject::SFSReadOp::prepare(
optional_yield /*y*/, const DoutPrefixProvider* dpp
) {
if (!objref || objref->deleted) {
if (objref.deleted) {
// at this point, we don't have an objectref because
// the object does not exist.
return -ENOENT;
}

objdata = source->store->get_data_path() / objref->get_storage_path();
objdata = source.store->get_data_path() / objref.get_storage_path();
if (!std::filesystem::exists(objdata)) {
lsfs_verb(dpp) << "object data not found at " << objdata << dendl;
return -ENOENT;
Expand All @@ -130,15 +119,15 @@ int SFSObject::SFSReadOp::prepare(
<< fmt::format(
"bucket:{} obj:{} size:{} versionid:{} "
"conditionals:(ifmatch:{} ifnomatch:{} ifmod:{} ifunmod:{})",
source->bucket->get_name(), source->get_name(),
source->get_obj_size(), source->get_instance(),
source.bucket->get_name(), source.get_name(),
source.get_obj_size(), source.get_instance(),
fmt::ptr(params.if_match), fmt::ptr(params.if_nomatch),
fmt::ptr(params.mod_ptr), fmt::ptr(params.unmod_ptr)
)
<< dendl;

if (params.lastmod) {
*params.lastmod = source->get_mtime();
*params.lastmod = source.get_mtime();
}
return handle_conditionals(dpp);
}
Expand All @@ -147,10 +136,10 @@ int SFSObject::SFSReadOp::get_attr(
const DoutPrefixProvider* /*dpp*/, const char* name, bufferlist& dest,
optional_yield /*y*/
) {
if (!objref || objref->deleted) {
if (objref.deleted) {
return -ENOENT;
}
if (!objref->get_attr(name, dest)) {
if (!objref.get_attr(name, dest)) {
return -ENODATA;
}
return 0;
Expand All @@ -163,9 +152,9 @@ int SFSObject::SFSReadOp::read(
) {
// TODO bounds check, etc.
const auto len = end + 1 - ofs;
lsfs_debug(dpp) << "bucket: " << source->bucket->get_name()
<< ", obj: " << source->get_name()
<< ", size: " << source->get_obj_size() << ", offset: " << ofs
lsfs_debug(dpp) << "bucket: " << source.bucket->get_name()
<< ", obj: " << source.get_name()
<< ", size: " << source.get_obj_size() << ", offset: " << ofs
<< ", end: " << end << ", len: " << len << dendl;

ceph_assert(std::filesystem::exists(objdata));
Expand All @@ -187,9 +176,9 @@ int SFSObject::SFSReadOp::iterate(
) {
// TODO bounds check, etc.
const auto len = end + 1 - ofs;
lsfs_debug(dpp) << "bucket: " << source->bucket->get_name()
<< ", obj: " << source->get_name()
<< ", size: " << source->get_obj_size() << ", offset: " << ofs
lsfs_debug(dpp) << "bucket: " << source.bucket->get_name()
<< ", obj: " << source.get_name()
<< ", size: " << source.get_obj_size() << ", offset: " << ofs
<< ", end: " << end << ", len: " << len << dendl;

ceph_assert(std::filesystem::exists(objdata));
Expand Down Expand Up @@ -243,7 +232,7 @@ int SFSObject::SFSDeleteOp::delete_obj(

auto version_id = source->get_instance();
std::string delete_marker_version_id;
if (source->objref) {
if (source->state.exists) {
bucketref->delete_object(
*source->objref, source->get_key(),
source->bucket->versioning_enabled(), delete_marker_version_id
Expand Down Expand Up @@ -335,7 +324,7 @@ int SFSObject::copy_object(
return -ERR_INTERNAL_ERROR;
}

const sfs::ObjectRef dstref =
const std::unique_ptr<sfs::Object> dstref =
dst_bucket_ref->create_version(dst_object->get_key());
if (!dstref) {
::close(src_fd);
Expand Down Expand Up @@ -643,24 +632,27 @@ void SFSObject::refresh_meta(bool update_version_id_from_metadata) {
try {
objref = bucketref->get(rgw_obj_key(get_name(), get_instance()));
} catch (sfs::UnknownObjectException& e) {
// object probably not created yet?
objref = std::unique_ptr<sfs::Object>(sfs::Object::create_from_obj_key(
rgw_obj_key(get_name(), get_instance())
));
objref->deleted = true;
state.exists = false;
// object probably not created yet - return a deleted placeholder
return;
}
_refresh_meta_from_object(objref, update_version_id_from_metadata);
_refresh_meta_from_object(*objref, update_version_id_from_metadata);
}

void SFSObject::_refresh_meta_from_object(
sfs::ObjectRef obj_to_refresh, bool update_version_id_from_metadata
const sfs::Object& obj_to_refresh, bool update_version_id_from_metadata
) {
ceph_assert(obj_to_refresh);
// fill values from objref
set_obj_size(obj_to_refresh->get_meta().size);
set_attrs(obj_to_refresh->get_attrs());
state.accounted_size = obj_to_refresh->get_meta().size;
state.mtime = obj_to_refresh->get_meta().mtime;
set_obj_size(obj_to_refresh.get_meta().size);
set_attrs(obj_to_refresh.get_attrs());
state.accounted_size = obj_to_refresh.get_meta().size;
state.mtime = obj_to_refresh.get_meta().mtime;
state.exists = true;
if (update_version_id_from_metadata) {
set_instance(obj_to_refresh->instance);
set_instance(obj_to_refresh.instance);
}
}

Expand Down
40 changes: 16 additions & 24 deletions src/rgw/driver/sfs/object.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,32 +30,35 @@ class SFStore;

class SFSObject : public StoreObject {
private:
SFStore* store;
RGWAccessControlPolicy acls;
sfs::BucketRef bucketref;
sfs::ObjectRef objref;

protected:
SFSObject(SFSObject&) = default;
SFStore* store;
sfs::BucketRef bucketref;
std::unique_ptr<sfs::Object> objref;

SFSObject(SFSObject&) = delete;

void _refresh_meta_from_object(
sfs::ObjectRef obj_to_refresh,
const sfs::Object& obj_to_refresh,
bool update_version_id_from_metadata = false
);

const sfs::Object& get_object_ref() const { return *objref; }

public:
/**
* reads an object's contents.
*/
struct SFSReadOp : public ReadOp {
private:
SFSObject* source;
sfs::ObjectRef objref;
const SFSObject& source;
const sfs::Object& objref;
std::filesystem::path objdata;
int handle_conditionals(const DoutPrefixProvider* dpp) const;

public:
SFSReadOp(SFSObject* _source);
SFSReadOp(const SFSObject& _source);

virtual int prepare(optional_yield y, const DoutPrefixProvider* dpp)
override;
Expand Down Expand Up @@ -107,19 +110,11 @@ class SFSObject : public StoreObject {
refresh_meta();
}
}
SFSObject(
SFStore* _st, const rgw_obj_key& _k, Bucket* _b,
sfs::BucketRef _bucketref, sfs::ObjectRef _objref
)
: StoreObject(_k, _b),
store(_st),
bucketref(_bucketref),
objref(_objref) {
_refresh_meta_from_object(objref);
}

virtual std::unique_ptr<Object> clone() override {
return std::unique_ptr<Object>(new SFSObject{*this});
return std::unique_ptr<Object>(
new SFSObject(store, get_key(), get_bucket(), bucketref, true)
);
}

virtual int delete_object(
Expand Down Expand Up @@ -194,7 +189,8 @@ class SFSObject : public StoreObject {
* Obtain a Read Operation.
*/
virtual std::unique_ptr<ReadOp> get_read_op() override {
return std::make_unique<SFSObject::SFSReadOp>(this);
this->refresh_meta(true);
return std::make_unique<SFSObject::SFSReadOp>(*this);
}
/**
* Obtain a Delete Operation.
Expand Down Expand Up @@ -239,10 +235,6 @@ class SFSObject : public StoreObject {

bool get_attr(const std::string& name, bufferlist& dest);

sfs::ObjectRef get_object_ref() { return objref; }

void set_object_ref(sfs::ObjectRef objref) { this->objref = objref; }

// Refresh metadata from db.
// Also retrieves version_id when specified.
// There are situations (like delete operations) in which we don't want to
Expand Down
Loading