From 5ddc95a4891c6515f9ebaecfac9ae86502066001 Mon Sep 17 00:00:00 2001 From: Yuval Lifshitz Date: Wed, 19 Nov 2025 10:19:08 +0000 Subject: [PATCH] rgw/s3vector: implement the VectorBucket API Signed-off-by: Yuval Lifshitz --- src/rgw/driver/posix/rgw_sal_posix.h | 7 + src/rgw/driver/rados/account.cc | 8 + src/rgw/driver/rados/account.h | 5 + src/rgw/driver/rados/rgw_bucket.cc | 69 ++- src/rgw/driver/rados/rgw_bucket.h | 25 +- src/rgw/driver/rados/rgw_cr_rados.cc | 3 +- src/rgw/driver/rados/rgw_rados.cc | 196 ++++++++- src/rgw/driver/rados/rgw_rados.h | 24 +- src/rgw/driver/rados/rgw_reshard.cc | 9 +- src/rgw/driver/rados/rgw_sal_rados.cc | 215 ++++++++- src/rgw/driver/rados/rgw_sal_rados.h | 40 ++ src/rgw/driver/rados/rgw_service.cc | 46 +- src/rgw/driver/rados/rgw_service.h | 10 + src/rgw/driver/rados/rgw_sync_module.cc | 16 + src/rgw/driver/rados/rgw_sync_module.h | 8 + src/rgw/rgw_arn.cc | 2 + src/rgw/rgw_arn.h | 2 +- src/rgw/rgw_op.h | 4 +- src/rgw/rgw_rest_client.cc | 5 + src/rgw/rgw_rest_s3.cc | 16 +- src/rgw/rgw_rest_s3vector.cc | 415 +++++++++++++++++- src/rgw/rgw_s3vector.cc | 29 +- src/rgw/rgw_s3vector.h | 8 +- src/rgw/rgw_sal.h | 135 ++++++ src/rgw/rgw_sal_dbstore.h | 7 + src/rgw/rgw_sal_filter.h | 12 + src/rgw/rgw_sal_store.h | 52 +++ src/rgw/rgw_zone.cc | 4 + src/rgw/rgw_zone.h | 11 +- src/rgw/services/svc_bucket_sobj.cc | 97 +++- src/rgw/services/svc_bucket_sobj.h | 24 +- src/rgw/services/svc_user.h | 2 + src/rgw/services/svc_user_rados.cc | 8 + src/rgw/services/svc_user_rados.h | 2 + src/test/rgw/s3vectors/README.rst | 3 +- src/test/rgw/s3vectors/__init__.py | 20 + src/test/rgw/s3vectors/s3vector_test.py | 268 ++++++++++- .../rgw/s3vectors/s3vtests.conf.multisite | 14 + 38 files changed, 1671 insertions(+), 150 deletions(-) create mode 100644 src/test/rgw/s3vectors/s3vtests.conf.multisite diff --git a/src/rgw/driver/posix/rgw_sal_posix.h b/src/rgw/driver/posix/rgw_sal_posix.h index 488fbf6e016..a14cc67eeaf 100644 --- a/src/rgw/driver/posix/rgw_sal_posix.h +++ b/src/rgw/driver/posix/rgw_sal_posix.h @@ -761,6 +761,13 @@ public: virtual const std::string& get_compression_type(const rgw_placement_rule& rule) override; virtual bool valid_placement(const rgw_placement_rule& rule) override { return true; } + int load_vector_bucket(const DoutPrefixProvider* dpp, const rgw_bucket& b, + std::unique_ptr* bucket, optional_yield y) override { return -ENOTSUP; } + int list_vector_buckets(const DoutPrefixProvider* dpp, + const rgw_owner& owner, const std::string& tenant, + const std::string& marker, const std::string& end_marker, + uint64_t max, BucketList& buckets, + optional_yield y) override { return -ENOTSUP; } virtual void finalize(void) override {} diff --git a/src/rgw/driver/rados/account.cc b/src/rgw/driver/rados/account.cc index cd65f0fdd0f..c22805701cc 100644 --- a/src/rgw/driver/rados/account.cc +++ b/src/rgw/driver/rados/account.cc @@ -33,6 +33,7 @@ namespace rgwrados::account { static constexpr std::string_view buckets_oid_prefix = "buckets."; +static constexpr std::string_view vector_buckets_oid_prefix = "vectorbuckets."; static constexpr std::string_view users_oid_prefix = "users."; static constexpr std::string_view groups_oid_prefix = "groups."; static constexpr std::string_view roles_oid_prefix = "roles."; @@ -44,11 +45,18 @@ static constexpr std::string_view name_oid_prefix = "name."; static std::string get_buckets_key(std::string_view account_id) { return string_cat_reserve(buckets_oid_prefix, account_id); } +static std::string get_vector_buckets_key(std::string_view account_id) { + return string_cat_reserve(vector_buckets_oid_prefix, account_id); +} rgw_raw_obj get_buckets_obj(const RGWZoneParams& zone, std::string_view account_id) { return {zone.account_pool, get_buckets_key(account_id)}; } +rgw_raw_obj get_vector_buckets_obj(const RGWZoneParams& zone, + std::string_view account_id) { + return {zone.account_pool, get_vector_buckets_key(account_id)}; +} static std::string get_users_key(std::string_view account_id) { return string_cat_reserve(users_oid_prefix, account_id); } diff --git a/src/rgw/driver/rados/account.h b/src/rgw/driver/rados/account.h index 2a54e4b5c16..1517127602d 100644 --- a/src/rgw/driver/rados/account.h +++ b/src/rgw/driver/rados/account.h @@ -46,6 +46,11 @@ auto create_metadata_handler(RGWSI_SysObj& sysobj, const RGWZoneParams& zone) rgw_raw_obj get_buckets_obj(const RGWZoneParams& zone, std::string_view account_id); +/// Return the rados object that tracks the given account's vector buckets. This +/// can be used with the cls_user interface in namespace rgwrados::buckets. +rgw_raw_obj get_vector_buckets_obj(const RGWZoneParams& zone, + std::string_view account_id); + /// Return the rados object that tracks the given account's users. This /// can be used with the cls_user interface in namespace rgwrados::users. rgw_raw_obj get_users_obj(const RGWZoneParams& zone, diff --git a/src/rgw/driver/rados/rgw_bucket.cc b/src/rgw/driver/rados/rgw_bucket.cc index be0b3d7007b..5de8e56b9e6 100644 --- a/src/rgw/driver/rados/rgw_bucket.cc +++ b/src/rgw/driver/rados/rgw_bucket.cc @@ -2839,6 +2839,13 @@ class RGWArchiveBucketMetadataHandler : public RGWBucketMetadataHandler { } }; +class RGWVectorBucketMetadataHandler : public RGWBucketMetadataHandler { + public: + using RGWBucketMetadataHandler::RGWBucketMetadataHandler; + + string get_type() override { return "vectorbucket"; } +}; + class RGWBucketInstanceMetadataHandler : public RGWMetadataHandler { rgw::sal::Driver* driver; RGWSI_Zone* svc_zone{nullptr}; @@ -2846,12 +2853,13 @@ class RGWBucketInstanceMetadataHandler : public RGWMetadataHandler { RGWSI_BucketIndex* svc_bi{nullptr}; RGWDataChangesLog *svc_datalog{nullptr}; - int put_prepare(const DoutPrefixProvider* dpp, optional_yield y, +protected: + virtual int put_prepare(const DoutPrefixProvider* dpp, optional_yield y, const std::string& entry, RGWBucketCompleteInfo& bci, const std::optional& old_bci, const RGWObjVersionTracker& objv_tracker, bool from_remote_zone); - int put_post(const DoutPrefixProvider* dpp, optional_yield y, + virtual int put_post(const DoutPrefixProvider* dpp, optional_yield y, const RGWBucketCompleteInfo& bci, const std::optional& old_bci, RGWObjVersionTracker& objv_tracker); @@ -3195,13 +3203,38 @@ class RGWArchiveBucketInstanceMetadataHandler : public RGWBucketInstanceMetadata } }; +class RGWVectorBucketInstanceMetadataHandler : public RGWBucketInstanceMetadataHandler { +protected: + int put_prepare(const DoutPrefixProvider* dpp, optional_yield y, + const std::string& entry, RGWBucketCompleteInfo& bci, + const std::optional& old_bci, + const RGWObjVersionTracker& objv_tracker, + bool from_remote_zone) override { + bci.info.layout.current_index.layout.type = rgw::BucketIndexType::Indexless; + return 0; + } + int put_post(const DoutPrefixProvider* dpp, optional_yield y, + const RGWBucketCompleteInfo& bci, + const std::optional& old_bci, + RGWObjVersionTracker& objv_tracker) override {return 0;} + public: + RGWVectorBucketInstanceMetadataHandler(rgw::sal::Driver* driver, + RGWSI_Zone* svc_zone, + RGWSI_Bucket* svc_bucket) : + RGWBucketInstanceMetadataHandler(driver, svc_zone, svc_bucket, nullptr, nullptr) {} + + string get_type() override { return "vectorbucket.instance"; } +}; + RGWBucketCtl::RGWBucketCtl(RGWSI_Zone *zone_svc, RGWSI_Bucket *bucket_svc, RGWSI_Bucket_Sync *bucket_sync_svc, RGWSI_BucketIndex *bi_svc, RGWSI_User* user_svc, - RGWDataChangesLog *datalog_svc) - : cct(zone_svc->ctx()) + RGWDataChangesLog *datalog_svc, + BucketsObjGetter obj_getter_func, + UserBucketsObjGetter user_obj_getter_func) + : cct(zone_svc->ctx()), get_buckets_obj(obj_getter_func), get_user_buckets_obj(user_obj_getter_func) { svc.zone = zone_svc; svc.bucket = bucket_svc; @@ -3480,17 +3513,17 @@ int RGWBucketCtl::set_bucket_instance_attrs(RGWBucketInfo& bucket_info, .set_orig_info(&bucket_info)); } -static rgw_raw_obj get_owner_buckets_obj(RGWSI_User* svc_user, +rgw_raw_obj RGWBucketCtl::get_owner_buckets_obj(RGWSI_User* svc_user, RGWSI_Zone* svc_zone, const rgw_owner& owner) { return std::visit(fu2::overload( [&] (const rgw_user& uid) { - return svc_user->get_buckets_obj(uid); + return (svc_user->*get_user_buckets_obj)(uid); }, [&] (const rgw_account_id& account_id) { const RGWZoneParams& zone = svc_zone->get_zone_params(); - return rgwrados::account::get_buckets_obj(zone, account_id); + return get_buckets_obj(zone, account_id); }), owner); } @@ -3633,11 +3666,11 @@ int RGWBucketCtl::sync_owner_stats(const DoutPrefixProvider *dpp, // flush stats to the user/account owner object const rgw_raw_obj& obj = std::visit(fu2::overload( [&] (const rgw_user& user) { - return svc.user->get_buckets_obj(user); + return (svc.user->*get_user_buckets_obj)(user); }, [&] (const rgw_account_id& id) { const RGWZoneParams& zone = svc.zone->get_zone_params(); - return rgwrados::account::get_buckets_obj(zone, id); + return get_buckets_obj(zone, id); }), owner); return rgwrados::buckets::write_stats(dpp, y, rados, obj, *pent); } @@ -3727,6 +3760,24 @@ auto create_archive_bucket_instance_metadata_handler(rgw::sal::Driver* driver, svc_datalog); } +auto create_vector_bucket_metadata_handler(librados::Rados& rados, + RGWSI_Bucket* svc_bucket, + RGWBucketCtl* ctl_bucket) + -> std::unique_ptr +{ + return std::make_unique( + rados, svc_bucket, ctl_bucket); +} + +auto create_vector_bucket_instance_metadata_handler(rgw::sal::Driver* driver, + RGWSI_Zone* svc_zone, + RGWSI_Bucket* svc_bucket) + -> std::unique_ptr +{ + return std::make_unique(driver, svc_zone, + svc_bucket); +} + list RGWBucketEntryPoint::generate_test_instances() { list o; diff --git a/src/rgw/driver/rados/rgw_bucket.h b/src/rgw/driver/rados/rgw_bucket.h index 2bf40f6f45b..34f2e691b68 100644 --- a/src/rgw/driver/rados/rgw_bucket.h +++ b/src/rgw/driver/rados/rgw_bucket.h @@ -199,6 +199,18 @@ auto create_archive_bucket_instance_metadata_handler(rgw::sal::Driver* driver, -> std::unique_ptr; +// vector bucket entrypoint metadata handler factory +auto create_vector_bucket_metadata_handler(librados::Rados& rados, + RGWSI_Bucket* svc_bucket, + RGWBucketCtl* ctl_bucket) + -> std::unique_ptr; + +// vector bucket instance metadata handler factory +auto create_vector_bucket_instance_metadata_handler(rgw::sal::Driver* driver, + RGWSI_Zone* svc_zone, + RGWSI_Bucket* svc_bucket) + -> std::unique_ptr; + extern int rgw_remove_object(const DoutPrefixProvider* dpp, rgw::sal::Driver* driver, rgw::sal::Bucket* bucket, @@ -428,8 +440,16 @@ struct rgw_ep_info { : ep(ep), attrs(attrs) {} }; +using BucketsObjGetter = rgw_raw_obj (*)(const RGWZoneParams&, std::string_view); +using UserBucketsObjGetter = rgw_raw_obj (RGWSI_User::*)(const rgw_user&) const; + class RGWBucketCtl { CephContext *cct; + BucketsObjGetter get_buckets_obj; + UserBucketsObjGetter get_user_buckets_obj; + rgw_raw_obj get_owner_buckets_obj(RGWSI_User* svc_user, + RGWSI_Zone* svc_zone, + const rgw_owner& owner); struct Svc { RGWSI_Zone *zone{nullptr}; @@ -450,7 +470,9 @@ public: RGWSI_Bucket_Sync *bucket_sync_svc, RGWSI_BucketIndex *bi_svc, RGWSI_User* user_svc, - RGWDataChangesLog *datalog_svc); + RGWDataChangesLog *datalog_svc, + BucketsObjGetter obj_getter_func, + UserBucketsObjGetter user_buckets_getter_func); void init(RGWUserCtl *user_ctl, RGWDataChangesLog *datalog, @@ -734,3 +756,4 @@ private: bool rgw_find_bucket_by_id(const DoutPrefixProvider *dpp, CephContext *cct, rgw::sal::Driver* driver, const std::string& marker, const std::string& bucket_id, rgw_bucket* bucket_out); + diff --git a/src/rgw/driver/rados/rgw_cr_rados.cc b/src/rgw/driver/rados/rgw_cr_rados.cc index 32f36f58db2..23b7dc4acde 100644 --- a/src/rgw/driver/rados/rgw_cr_rados.cc +++ b/src/rgw/driver/rados/rgw_cr_rados.cc @@ -662,10 +662,11 @@ int RGWAsyncGetBucketInstanceInfo::_send_request(const DoutPrefixProvider *dpp) return 0; } +// TODO: add vector bucket support int RGWAsyncPutBucketInstanceInfo::_send_request(const DoutPrefixProvider *dpp) { auto r = store->getRados()->put_bucket_instance_info(bucket_info, exclusive, - mtime, attrs, dpp, null_yield); + mtime, attrs, dpp, null_yield, store->ctl()->bucket); if (r < 0) { ldpp_dout(dpp, 0) << "ERROR: failed to put bucket instance info for " << bucket_info.bucket << dendl; diff --git a/src/rgw/driver/rados/rgw_rados.cc b/src/rgw/driver/rados/rgw_rados.cc index 4e861f93776..095bc448038 100644 --- a/src/rgw/driver/rados/rgw_rados.cc +++ b/src/rgw/driver/rados/rgw_rados.cc @@ -1285,6 +1285,10 @@ int RGWRados::init_complete(const DoutPrefixProvider *dpp, optional_yield y, rgw if (ret < 0) return ret; + ret = open_vector_pool_ctx(dpp); + if (ret < 0) + return ret; + pools_initialized = true; index_completion_manager = new RGWIndexCompletionManager(this); @@ -1578,6 +1582,11 @@ int RGWRados::open_pool_ctx(const DoutPrefixProvider *dpp, const rgw_pool& pool, return rgw_init_ioctx(dpp, get_rados_handle(), pool, io_ctx, create, mostly_omap, bulk); } +int RGWRados::open_vector_pool_ctx(const DoutPrefixProvider *dpp) +{ + return rgw_init_ioctx(dpp, get_rados_handle(), svc.zone->get_zone_params().vector_pool, vector_pool_ctx, true, true); +} + /**** logs ****/ struct log_list_state { @@ -2459,6 +2468,8 @@ void RGWRados::create_bucket_id(string *bucket_id) *bucket_id = buf; } +constexpr int MAX_CREATE_RETRIES= 20; /* need to bound retries */ + int RGWRados::create_bucket(const DoutPrefixProvider* dpp, optional_yield y, const rgw_bucket& bucket, @@ -2478,7 +2489,6 @@ int RGWRados::create_bucket(const DoutPrefixProvider* dpp, { int ret = 0; -#define MAX_CREATE_RETRIES 20 /* need to bound retries */ for (int i = 0; i < MAX_CREATE_RETRIES; i++) { RGWObjVersionTracker& objv_tracker = info.objv_tracker; objv_tracker.read_version.clear(); @@ -2528,14 +2538,14 @@ int RGWRados::create_bucket(const DoutPrefixProvider* dpp, } constexpr bool exclusive = true; - ret = put_linked_bucket_info(info, exclusive, ceph::real_time(), pep_objv, &attrs, true, dpp, y); + ret = put_linked_bucket_info(info, exclusive, ceph::real_time(), pep_objv, &attrs, true, dpp, y, ctl.bucket); if (ret == -ECANCELED) { ret = -EEXIST; } if (ret == -EEXIST) { /* we need to reread the info and return it, caller will have a use for it */ RGWBucketInfo orig_info; - int r = get_bucket_info(&svc, bucket.tenant, bucket.name, orig_info, NULL, y, dpp); + int r = get_bucket_info(&svc, bucket.tenant, bucket.name, orig_info, NULL, y, dpp, ctl.bucket); if (r < 0) { if (r == -ENOENT) { continue; @@ -2570,6 +2580,83 @@ int RGWRados::create_bucket(const DoutPrefixProvider* dpp, return -ENOENT; } +int RGWRados::create_vector_bucket(const DoutPrefixProvider* dpp, + optional_yield y, + const rgw_bucket& bucket, + const rgw_owner& owner, + const std::string& zonegroup_id, + const rgw_placement_rule& placement_rule, + const std::map& attrs, + const std::optional& quota, + std::optional creation_time, + obj_version* pep_objv, + RGWBucketInfo& info) +{ + ldpp_dout(dpp, 20) << "s3vector --- RGWRados::create_vector_bucke called" << dendl; + int ret = 0; + + for (int i = 0; i < MAX_CREATE_RETRIES; i++) { + RGWObjVersionTracker& objv_tracker = info.objv_tracker; + objv_tracker.read_version.clear(); + objv_tracker.generate_new_write_ver(cct); + + if (bucket.marker.empty()) { + create_bucket_id(&info.bucket.marker); + info.bucket.bucket_id = info.bucket.marker; + } else { + info.bucket = bucket; + } + + info.owner = owner; + info.zonegroup = zonegroup_id; + info.placement_rule = placement_rule; + + if (creation_time) { + info.creation_time = *creation_time; + } else { + info.creation_time = ceph::real_clock::now(); + } + if (quota) { + info.quota = *quota; + } + + constexpr bool exclusive = true; + ret = put_linked_bucket_info(info, exclusive, ceph::real_time(), pep_objv, &attrs, true, dpp, y, ctl.vector_bucket); + if (ret == -ECANCELED) { + ret = -EEXIST; + } + if (ret == -EEXIST) { + /* we need to reread the info and return it, caller will have a use for it */ + RGWBucketInfo orig_info; + int r = get_bucket_info(&svc, bucket.tenant, bucket.name, orig_info, NULL, y, dpp, ctl.vector_bucket); + if (r < 0) { + if (r == -ENOENT) { + continue; + } + ldpp_dout(dpp, 0) << "get s3vector bucket info returned " << r << dendl; + return r; + } + + /* only remove it if it's a different bucket instance */ + if (orig_info.bucket.bucket_id != bucket.bucket_id) { + r = ctl.vector_bucket->remove_bucket_instance_info(info.bucket, info, y, dpp); + if (r < 0) { + ldpp_dout(dpp, 0) << "WARNING: " << __func__ << "(): failed to remove s3vector bucket instance info: bucket instance=" << info.bucket.get_key() << ": r=" << r << dendl; + /* continue anyway */ + } + } + + info = std::move(orig_info); + /* ret == -EEXIST here */ + } + return ret; + } + + /* this is highly unlikely */ + ldpp_dout(dpp, 0) << "ERROR: could not create s3vector bucket, continuously raced with bucket creation and removal" << dendl; + return -ENOENT; +} + bool RGWRados::obj_to_raw(const rgw_placement_rule& placement_rule, const rgw_obj& obj, rgw_raw_obj *raw_obj) { get_obj_bucket_and_oid_loc(obj, raw_obj->oid, raw_obj->loc); @@ -3052,7 +3139,7 @@ int RGWRados::swift_versioning_copy(RGWObjectCtx& obj_ctx, RGWBucketInfo dest_bucket_info; - r = get_bucket_info(&svc, bucket_info.bucket.tenant, bucket_info.swift_ver_location, dest_bucket_info, NULL, y, NULL); + r = get_bucket_info(&svc, bucket_info.bucket.tenant, bucket_info.swift_ver_location, dest_bucket_info, NULL, y, dpp, ctl.bucket, NULL); if (r < 0) { ldpp_dout(dpp, 10) << "failed to read dest bucket info: r=" << r << dendl; if (r == -ENOENT) { @@ -3136,7 +3223,7 @@ int RGWRados::swift_versioning_restore(RGWObjectCtx& obj_ctx, int ret = get_bucket_info(&svc, bucket_info.bucket.tenant, bucket_info.swift_ver_location, - archive_binfo, nullptr, y, nullptr); + archive_binfo, nullptr, y, dpp, ctl.bucket, nullptr); if (ret < 0) { return ret; } @@ -6074,6 +6161,68 @@ int RGWRados::delete_bucket(RGWBucketInfo& bucket_info, std::map& attrs, RGWObjVersionTracker& objv_tracker, optional_yield y, const DoutPrefixProvider *dpp) +{ + ldpp_dout(dpp, 20) << "s3vector --- RGWRados::delete_vector_bucket called" << dendl; + const rgw_bucket& bucket = bucket_info.bucket; + + bool remove_ep = true; + + int r; + if (objv_tracker.read_version.empty()) { + RGWBucketEntryPoint ep; + r = ctl.vector_bucket->read_bucket_entrypoint_info(bucket_info.bucket, + &ep, + y, + dpp, + RGWBucketCtl::Bucket::GetParams() + .set_objv_tracker(&objv_tracker)); + if (r < 0 || + (!bucket_info.bucket.bucket_id.empty() && + ep.bucket.bucket_id != bucket_info.bucket.bucket_id)) { + if (r != -ENOENT) { + ldpp_dout(dpp, 0) << "ERROR: read_bucket_entrypoint_info() s3vector bucket=" << bucket_info.bucket << " returned error: r=" << r << dendl; + /* we have no idea what caused the error, will not try to remove it */ + } + /* + * either failed to read bucket entrypoint, or it points to a different bucket instance than + * requested + */ + remove_ep = false; + } + } + + if (remove_ep) { + r = ctl.vector_bucket->remove_bucket_entrypoint_info(bucket_info.bucket, y, dpp, + RGWBucketCtl::Bucket::RemoveParams() + .set_objv_tracker(&objv_tracker)); + if (r < 0) + return r; + } + + /* if the bucket is not synced we can remove the meta file */ + if (!svc.zone->is_syncing_bucket_meta()) { + r = ctl.vector_bucket->remove_bucket_instance_info(bucket, bucket_info, y, dpp); + if (r < 0) { + return r; + } + + } else { + // set 'deleted' flag for multisite replication to handle bucket instance removal + // TODO: implement store_delete_bucket_info_flag for vector buckets + /*r = store_delete_bucket_info_flag(bucket_info, attrs, y, dpp); + if (r < 0) { + // no need to treat this as an error + ldpp_dout(dpp, 0) << "WARNING: failed to store bucket info flag 'deleted' on bucket: " << bucket.name << " r=" << r << dendl; + } else { + ldpp_dout(dpp, 20) << "INFO: setting bucket info flag to deleted for bucket: " << bucket.name << dendl; + }*/ + } + + return 0; +} + +// TODO add vector bucket support int RGWRados::set_bucket_owner(rgw_bucket& bucket, ACLOwner& owner, const DoutPrefixProvider *dpp, optional_yield y) { RGWBucketInfo info; @@ -6081,7 +6230,7 @@ int RGWRados::set_bucket_owner(rgw_bucket& bucket, ACLOwner& owner, const DoutPr int r; if (bucket.bucket_id.empty()) { - r = get_bucket_info(&svc, bucket.tenant, bucket.name, info, NULL, y, dpp, &attrs); + r = get_bucket_info(&svc, bucket.tenant, bucket.name, info, NULL, y, dpp, ctl.bucket, &attrs); } else { r = get_bucket_instance_info(bucket, info, nullptr, &attrs, y, dpp); } @@ -6092,7 +6241,7 @@ int RGWRados::set_bucket_owner(rgw_bucket& bucket, ACLOwner& owner, const DoutPr info.owner = owner.id; - r = put_bucket_instance_info(info, false, real_time(), &attrs, dpp, y); + r = put_bucket_instance_info(info, false, real_time(), &attrs, dpp, y, ctl.bucket); if (r < 0) { ldpp_dout(dpp, 0) << "NOTICE: put_bucket_info on bucket=" << bucket.name << " returned err=" << r << dendl; return r; @@ -6102,6 +6251,7 @@ int RGWRados::set_bucket_owner(rgw_bucket& bucket, ACLOwner& owner, const DoutPr } +// TODO add vector bucket support int RGWRados::set_buckets_enabled(vector& buckets, bool enabled, const DoutPrefixProvider *dpp, optional_yield y) { int ret = 0; @@ -6118,7 +6268,7 @@ int RGWRados::set_buckets_enabled(vector& buckets, bool enabled, con RGWBucketInfo info; map attrs; - int r = get_bucket_info(&svc, bucket.tenant, bucket.name, info, NULL, y, dpp, &attrs); + int r = get_bucket_info(&svc, bucket.tenant, bucket.name, info, NULL, y, dpp, ctl.bucket, &attrs); if (r < 0) { ldpp_dout(dpp, 0) << "NOTICE: get_bucket_info on bucket=" << bucket.name << " returned err=" << r << ", skipping bucket" << dendl; ret = r; @@ -6130,7 +6280,7 @@ int RGWRados::set_buckets_enabled(vector& buckets, bool enabled, con info.flags |= BUCKET_SUSPENDED; } - r = put_bucket_instance_info(info, false, real_time(), &attrs, dpp, y); + r = put_bucket_instance_info(info, false, real_time(), &attrs, dpp, y, ctl.bucket); if (r < 0) { ldpp_dout(dpp, 0) << "NOTICE: put_bucket_info on bucket=" << bucket.name << " returned err=" << r << ", skipping bucket" << dendl; ret = r; @@ -6140,10 +6290,11 @@ int RGWRados::set_buckets_enabled(vector& buckets, bool enabled, con return ret; } +// TODO add vector bucket support int RGWRados::bucket_suspended(const DoutPrefixProvider *dpp, rgw_bucket& bucket, bool *suspended, optional_yield y) { RGWBucketInfo bucket_info; - int ret = get_bucket_info(&svc, bucket.tenant, bucket.name, bucket_info, NULL, y, dpp); + int ret = get_bucket_info(&svc, bucket.tenant, bucket.name, bucket_info, NULL, y, dpp, ctl.bucket); if (ret < 0) { return ret; } @@ -8729,7 +8880,7 @@ int RGWRados::check_reshard_logrecord_status(RGWBucketInfo& bucket_info, optiona map bucket_attrs; int ret = get_bucket_info(&svc, bucket_info.bucket.tenant, bucket_info.bucket.name, - bucket_info, nullptr, y, dpp, &bucket_attrs); + bucket_info, nullptr, y, dpp, ctl.bucket, &bucket_attrs); if (ret < 0) { ldpp_dout(dpp, 0) << __func__ << " ERROR: failed to refresh bucket info : " << cpp_strerror(-ret) << dendl; @@ -8755,7 +8906,7 @@ int RGWRados::recover_reshard_logrecord(RGWBucketInfo& bucket_info, bucket_info.bucket.bucket_id << "; expected if resharding underway" << dendl; // update the judge time bucket_info.layout.judge_reshard_lock_time = real_clock::now(); - ret = put_bucket_instance_info(bucket_info, false, real_time(), &bucket_attrs, dpp, y); + ret = put_bucket_instance_info(bucket_info, false, real_time(), &bucket_attrs, dpp, y, ctl.bucket); if (ret < 0) { ldpp_dout(dpp, 0) << "RGWReshard::" << __func__ << " ERROR: error putting bucket instance info: " << cpp_strerror(-ret) << dendl; @@ -8821,7 +8972,7 @@ int RGWRados::block_while_resharding(RGWRados::BucketShard *bs, auto fetch_new_bucket_info = [this, bs, &obj_instance, &bucket_info, &bucket_attrs, &y, dpp](const std::string& log_tag) -> int { int ret = get_bucket_info(&svc, bs->bucket.tenant, bs->bucket.name, - bucket_info, nullptr, y, dpp, &bucket_attrs); + bucket_info, nullptr, y, dpp, ctl.bucket, &bucket_attrs); if (ret < 0) { ldpp_dout(dpp, 0) << __func__ << " ERROR: failed to refresh bucket info after reshard at " << @@ -10035,12 +10186,12 @@ int RGWRados::get_bucket_info(RGWServices *svc, RGWBucketInfo& info, real_time *pmtime, optional_yield y, - const DoutPrefixProvider *dpp, map *pattrs) + const DoutPrefixProvider *dpp, RGWBucketCtl* bucket_ctl, map *pattrs) { rgw_bucket bucket; bucket.tenant = tenant; bucket.name = bucket_name; - return ctl.bucket->read_bucket_info(bucket, &info, y, dpp, + return bucket_ctl->read_bucket_info(bucket, &info, y, dpp, RGWBucketCtl::BucketInstance::GetParams() .set_mtime(pmtime) .set_attrs(pattrs)); @@ -10049,14 +10200,15 @@ int RGWRados::get_bucket_info(RGWServices *svc, int RGWRados::try_refresh_bucket_info(RGWBucketInfo& info, ceph::real_time *pmtime, const DoutPrefixProvider *dpp, optional_yield y, - map *pattrs) + map *pattrs, + RGWBucketCtl* bucket_ctl) { rgw_bucket bucket = info.bucket; bucket.bucket_id.clear(); auto rv = info.objv_tracker.read_version; - return ctl.bucket->read_bucket_info(bucket, &info, y, dpp, + return bucket_ctl->read_bucket_info(bucket, &info, y, dpp, RGWBucketCtl::BucketInstance::GetParams() .set_mtime(pmtime) .set_attrs(pattrs) @@ -10065,9 +10217,9 @@ int RGWRados::try_refresh_bucket_info(RGWBucketInfo& info, int RGWRados::put_bucket_instance_info(RGWBucketInfo& info, bool exclusive, real_time mtime, const map *pattrs, - const DoutPrefixProvider *dpp, optional_yield y) + const DoutPrefixProvider *dpp, optional_yield y, RGWBucketCtl* bucket_ctl) { - return ctl.bucket->store_bucket_instance_info(info.bucket, info, y, dpp, + return bucket_ctl->store_bucket_instance_info(info.bucket, info, y, dpp, RGWBucketCtl::BucketInstance::PutParams() .set_exclusive(exclusive) .set_mtime(mtime) @@ -10076,11 +10228,11 @@ int RGWRados::put_bucket_instance_info(RGWBucketInfo& info, bool exclusive, int RGWRados::put_linked_bucket_info(RGWBucketInfo& info, bool exclusive, real_time mtime, obj_version *pep_objv, const map *pattrs, bool create_entry_point, - const DoutPrefixProvider *dpp, optional_yield y) + const DoutPrefixProvider *dpp, optional_yield y, RGWBucketCtl* bucket_ctl) { bool create_head = !info.has_instance_obj || create_entry_point; - int ret = put_bucket_instance_info(info, exclusive, mtime, pattrs, dpp, y); + int ret = put_bucket_instance_info(info, exclusive, mtime, pattrs, dpp, y, bucket_ctl); if (ret < 0) { return ret; } @@ -10102,7 +10254,7 @@ int RGWRados::put_linked_bucket_info(RGWBucketInfo& info, bool exclusive, real_t *pep_objv = ot.write_version; } } - ret = ctl.bucket->store_bucket_entrypoint_info(info.bucket, entry_point, y, dpp, RGWBucketCtl::Bucket::PutParams() + ret = bucket_ctl->store_bucket_entrypoint_info(info.bucket, entry_point, y, dpp, RGWBucketCtl::Bucket::PutParams() .set_exclusive(exclusive) .set_objv_tracker(&ot) .set_mtime(mtime)); diff --git a/src/rgw/driver/rados/rgw_rados.h b/src/rgw/driver/rados/rgw_rados.h index 8990faf457f..00b13022381 100644 --- a/src/rgw/driver/rados/rgw_rados.h +++ b/src/rgw/driver/rados/rgw_rados.h @@ -342,6 +342,7 @@ class RGWRados int open_reshard_pool_ctx(const DoutPrefixProvider *dpp); int open_notif_pool_ctx(const DoutPrefixProvider *dpp); int open_logging_pool_ctx(const DoutPrefixProvider *dpp); + int open_vector_pool_ctx(const DoutPrefixProvider *dpp); int open_pool_ctx(const DoutPrefixProvider *dpp, const rgw_pool& pool, librados::IoCtx& io_ctx, bool mostly_omap, bool bulk); @@ -439,6 +440,7 @@ protected: librados::IoCtx reshard_pool_ctx; librados::IoCtx notif_pool_ctx; // .rgw.notif librados::IoCtx logging_pool_ctx; // .rgw.logging + librados::IoCtx vector_pool_ctx; // .rgw.meta.vector bool pools_initialized{false}; @@ -661,6 +663,18 @@ public: obj_version* pep_objv, RGWBucketInfo& info); + int create_vector_bucket(const DoutPrefixProvider* dpp, + optional_yield y, + const rgw_bucket& bucket, + const rgw_owner& owner, + const std::string& zonegroup_id, + const rgw_placement_rule& placement_rule, + const std::map& attrs, + const std::optional& quota, + std::optional creation_time, + obj_version* pep_objv, + RGWBucketInfo& info); + RGWCoroutinesManagerRegistry *get_cr_registry() { return cr_registry; } struct BucketShard { @@ -1300,6 +1314,8 @@ int restore_obj_from_cloud(RGWLCCloudTierCtx& tier_ctx, */ int delete_bucket(RGWBucketInfo& bucket_info, std::map& attrs, RGWObjVersionTracker& objv_tracker, optional_yield y, const DoutPrefixProvider *dpp, bool check_empty = true); + int delete_vector_bucket(RGWBucketInfo& bucket_info, std::map& attrs, RGWObjVersionTracker& objv_tracker, optional_yield y, const DoutPrefixProvider *dpp); + void wakeup_meta_sync_shards(std::set& shard_ids); void wakeup_data_sync_shards(const DoutPrefixProvider *dpp, const rgw_zone_id& source_zone, bc::flat_map >& entries); @@ -1501,7 +1517,7 @@ public: std::map& stats, std::string *max_marker, bool* syncstopped = NULL); int get_bucket_stats_async(const DoutPrefixProvider *dpp, RGWBucketInfo& bucket_info, const rgw::bucket_index_layout_generation& idx_layout, int shard_id, boost::intrusive_ptr cb); - int put_bucket_instance_info(RGWBucketInfo& info, bool exclusive, ceph::real_time mtime, const std::map *pattrs, const DoutPrefixProvider *dpp, optional_yield y); + int put_bucket_instance_info(RGWBucketInfo& info, bool exclusive, ceph::real_time mtime, const std::map *pattrs, const DoutPrefixProvider *dpp, optional_yield y, RGWBucketCtl* bucket_ctl); /* xxx dang obj_ctx -> svc */ int get_bucket_instance_info(const std::string& meta_key, RGWBucketInfo& info, ceph::real_time *pmtime, std::map *pattrs, optional_yield y, const DoutPrefixProvider *dpp); int get_bucket_instance_info(const rgw_bucket& bucket, RGWBucketInfo& info, ceph::real_time *pmtime, std::map *pattrs, optional_yield y, const DoutPrefixProvider *dpp); @@ -1512,7 +1528,7 @@ public: const std::string& tenant_name, const std::string& bucket_name, RGWBucketInfo& info, ceph::real_time *pmtime, optional_yield y, - const DoutPrefixProvider *dpp, std::map *pattrs = NULL); + const DoutPrefixProvider *dpp, RGWBucketCtl* bucket_ctl, std::map *pattrs = NULL); RGWChainedCacheImpl_bucket_topics_entry *get_topic_cache() { return topic_cache; } @@ -1523,11 +1539,11 @@ public: int try_refresh_bucket_info(RGWBucketInfo& info, ceph::real_time *pmtime, const DoutPrefixProvider *dpp, optional_yield y, - std::map *pattrs = nullptr); + std::map *pattrs, RGWBucketCtl* bucket_ctl); int put_linked_bucket_info(RGWBucketInfo& info, bool exclusive, ceph::real_time mtime, obj_version *pep_objv, const std::map *pattrs, bool create_entry_point, - const DoutPrefixProvider *dpp, optional_yield y); + const DoutPrefixProvider *dpp, optional_yield y, RGWBucketCtl* bucket_ctl); int cls_obj_prepare_op(const DoutPrefixProvider *dpp, BucketShard& bs, RGWModifyOp op, std::string& tag, rgw_obj& obj, optional_yield y); diff --git a/src/rgw/driver/rados/rgw_reshard.cc b/src/rgw/driver/rados/rgw_reshard.cc index 10816ff7445..b3d58587592 100644 --- a/src/rgw/driver/rados/rgw_reshard.cc +++ b/src/rgw/driver/rados/rgw_reshard.cc @@ -490,7 +490,7 @@ static int init_target_layout(rgw::sal::RadosStore* store, if (ret = fault.check("set_target_layout"); ret == 0) { // no fault injected, write the bucket instance metadata ret = store->getRados()->put_bucket_instance_info(bucket_info, false, - real_time(), &bucket_attrs, dpp, y); + real_time(), &bucket_attrs, dpp, y, store->ctl()->bucket); } else if (ret == -ECANCELED) { fault.clear(); // clear the fault so a retry can succeed } @@ -571,7 +571,7 @@ static int revert_target_layout(rgw::sal::RadosStore* store, ret == 0) { // no fault injected, revert the bucket instance metadata ret = store->getRados()->put_bucket_instance_info(bucket_info, false, real_time(), - &bucket_attrs, dpp, y); + &bucket_attrs, dpp, y, store->ctl()->bucket); } else if (ret == -ECANCELED) { fault.clear(); // clear the fault so a retry can succeed } @@ -695,7 +695,7 @@ static int change_reshard_state(rgw::sal::RadosStore* store, if (ret = fault.check("change_reshard_state"); ret == 0) { // no fault injected, write the bucket instance metadata ret = store->getRados()->put_bucket_instance_info(bucket_info, false, - real_time(), &bucket_attrs, dpp, y); + real_time(), &bucket_attrs, dpp, y, store->ctl()->bucket); } else if (ret == -ECANCELED) { fault.clear(); // clear the fault so a retry can succeed } @@ -805,7 +805,7 @@ static int commit_target_layout(rgw::sal::RadosStore* store, int ret = fault.check("commit_target_layout"); if (ret == 0) { // no fault injected, write the bucket instance metadata ret = store->getRados()->put_bucket_instance_info( - bucket_info, false, real_time(), &bucket_attrs, dpp, y); + bucket_info, false, real_time(), &bucket_attrs, dpp, y, store->ctl()->bucket); } else if (ret == -ECANCELED) { fault.clear(); // clear the fault so a retry can succeed } @@ -1568,6 +1568,7 @@ int RGWReshard::process_entry(const cls_rgw_reshard_entry& entry, entry.bucket_name, bucket_info, nullptr, y, dpp, + store->ctl()->bucket, &bucket_attrs); if (ret < 0 || bucket_info.bucket.bucket_id != entry.bucket_id) { if (ret < 0) { diff --git a/src/rgw/driver/rados/rgw_sal_rados.cc b/src/rgw/driver/rados/rgw_sal_rados.cc index 481822bb1cc..3b0e7653591 100644 --- a/src/rgw/driver/rados/rgw_sal_rados.cc +++ b/src/rgw/driver/rados/rgw_sal_rados.cc @@ -140,6 +140,25 @@ static rgw_raw_obj get_owner_buckets_obj(RGWSI_User* svc_user, return std::visit(visitor{svc_user, svc_zone}, owner); } +static rgw_raw_obj get_owner_vector_buckets_obj(RGWSI_User* svc_user, + RGWSI_Zone* svc_zone, + const rgw_owner& owner) +{ + struct visitor { + RGWSI_User* svc_user; + RGWSI_Zone* svc_zone; + + rgw_raw_obj operator()(const rgw_user& user) { + return svc_user->get_vector_buckets_obj(user); + } + rgw_raw_obj operator()(const rgw_account_id& id) { + const RGWZoneParams& zone = svc_zone->get_zone_params(); + return rgwrados::account::get_vector_buckets_obj(zone, id); + } + }; + return std::visit(visitor{svc_user, svc_zone}, owner); +} + int RadosStore::list_buckets(const DoutPrefixProvider* dpp, const rgw_owner& owner, const std::string& tenant, const std::string& marker, const std::string& end_marker, @@ -628,6 +647,38 @@ int RadosBucket::remove_bypass_gc(int concurrent_max, bool return ret; } +int RadosVectorBucket::unlink(const DoutPrefixProvider* dpp, const rgw_owner& owner, optional_yield y, bool update_entrypoint) +{ + ldpp_dout(dpp, 20) << "s3vector --- RadosVectorBucket::unlink called" << dendl; + librados::Rados& rados = *store->getRados()->get_rados_handle(); + return store->ctl()->vector_bucket->unlink_bucket(rados, owner, info.bucket, + y, dpp, update_entrypoint); +} + +int RadosVectorBucket::remove(const DoutPrefixProvider* dpp, + bool delete_children, + optional_yield y) +{ + ldpp_dout(dpp, 20) << "s3vector --- RadosVectorBucket::remove called" << dendl; + RGWObjVersionTracker ot; + + int ret = store->getRados()->delete_vector_bucket(info, get_attrs(), ot, y, dpp); + if (ret < 0) { + ldpp_dout(dpp, -1) << "ERROR: could not remove s3vector bucket " << + info.bucket.name << dendl; + return ret; + } + + librados::Rados& rados = *store->getRados()->get_rados_handle(); + ret = store->ctl()->vector_bucket->unlink_bucket(rados, info.owner, + info.bucket, y, dpp, false); + if (ret < 0) { + ldpp_dout(dpp, -1) << "ERROR: unable to remove user s3vector bucket information" << dendl; + } + + return ret; +} + int RadosBucket::load_bucket(const DoutPrefixProvider* dpp, optional_yield y) { int ret; @@ -654,6 +705,137 @@ int RadosBucket::load_bucket(const DoutPrefixProvider* dpp, optional_yield y) return ret; } +int RadosVectorBucket::load_bucket(const DoutPrefixProvider* dpp, optional_yield y) +{ + ldpp_dout(dpp, 20) << "s3vector --- RadosVectorBucket::load_bucket called" << dendl; + int ret; + + RGWObjVersionTracker ep_ot; + if (info.bucket.bucket_id.empty()) { + ret = store->ctl()->vector_bucket->read_bucket_info(info.bucket, &info, y, dpp, + RGWBucketCtl::BucketInstance::GetParams() + .set_mtime(&mtime) + .set_attrs(&attrs), + &ep_ot); + } else { + ret = store->ctl()->vector_bucket->read_bucket_instance_info(info.bucket, &info, y, dpp, + RGWBucketCtl::BucketInstance::GetParams() + .set_mtime(&mtime) + .set_attrs(&attrs)); + } + if (ret != 0) { + return ret; + } + + bucket_version = ep_ot.read_version; + + return ret; +} + +int RadosVectorBucket::create(const DoutPrefixProvider* dpp, + const CreateParams& params, + optional_yield y) +{ + ldpp_dout(dpp, 20) << "s3vector --- RadosVectorBucket::create called" << dendl; + rgw_bucket key = get_key(); + key.marker = params.marker; + key.bucket_id = params.bucket_id; + + int ret = store->getRados()->create_vector_bucket( + dpp, y, key, params.owner, params.zonegroup_id, + params.placement_rule, params.attrs, + params.quota, params.creation_time, &bucket_version, info); + + bool existed = false; + if (ret == -EEXIST) { + existed = true; + /* bucket already existed, might have raced with another bucket creation, + * or might be partial bucket creation that never completed. Read existing + * bucket info, verify that the reported bucket owner is the current user. + * If all is ok then update the user's list of buckets. Otherwise inform + * client about a name conflict. + */ + if (info.owner != params.owner) { + return -ERR_BUCKET_EXISTS; + } + // prevent re-creation with different index type or shard count + if ((params.index_type && *params.index_type != + info.layout.current_index.layout.type) || + (params.index_shards && *params.index_shards != + info.layout.current_index.layout.normal.num_shards)) { + return -ERR_BUCKET_EXISTS; + } + ret = 0; + } else if (ret != 0) { + return ret; + } + + ret = link(dpp, params.owner, y, true); + if (ret && !existed && ret != -EEXIST) { + /* if it exists (or previously existed), don't remove it! */ + ret = unlink(dpp, params.owner, y); + if (ret < 0) { + ldpp_dout(dpp, 0) << "WARNING: failed to unlink bucket: ret=" << ret + << dendl; + } + } else if (ret == -EEXIST) { + ret = -ERR_BUCKET_EXISTS; + } else if (ret == 0) { + /* this is to handle the following race condition: + * a concurrent DELETE bucket request deletes the bucket entry point and + * unlinks it (if the bucket pre-exists) before it's linked in this + * bucket creation request. */ + + if (existed) { + ret = -ERR_BUCKET_EXISTS; + } + + RGWBucketEntryPoint ep; + RGWObjVersionTracker objv_tracker; + int r = store->ctl()->vector_bucket->read_bucket_entrypoint_info(info.bucket, + &ep, + y, + dpp, + RGWBucketCtl::Bucket::GetParams() + .set_objv_tracker(&objv_tracker)); + if (r == -ENOENT) { + ret = 0; + + ldpp_dout(dpp, 5) << "WARNING: the bucket entry point has been deleted by a concurrent DELETE bucket request." + << " Unlinking the bucket." << dendl; + r = unlink(dpp, params.owner, y); + if (r < 0) { + ldpp_dout(dpp, 0) << "WARNING: failed to unlink bucket: ret=" << r << dendl; + } + } + } + + return ret; +} + +int RadosVectorBucket::link(const DoutPrefixProvider* dpp, const rgw_owner& new_owner, optional_yield y, bool update_entrypoint, RGWObjVersionTracker* objv) { + ldpp_dout(dpp, 20) << "s3vector --- RadosVectorBucket::link called" << dendl; + RGWBucketEntryPoint ep; + ep.bucket = info.bucket; + ep.owner = new_owner; + ep.creation_time = get_creation_time(); + ep.linked = true; + Attrs ep_attrs; + rgw_ep_info ep_data{ep, ep_attrs}; + + librados::Rados& rados = *store->getRados()->get_rados_handle(); + int r = store->ctl()->vector_bucket->link_bucket(rados, new_owner, info.bucket, + get_creation_time(), y, dpp, update_entrypoint, + &ep_data); + if (r < 0) + return r; + + if (objv) + *objv = ep_data.ep_objv; + + return r; +} + int RadosBucket::read_stats(const DoutPrefixProvider *dpp, optional_yield y, const bucket_index_layout_generation& idx_layout, int shard_id, std::string* bucket_ver, std::string* master_ver, @@ -771,7 +953,13 @@ int RadosBucket::chown(const DoutPrefixProvider* dpp, int RadosBucket::put_info(const DoutPrefixProvider* dpp, bool exclusive, ceph::real_time _mtime, optional_yield y) { mtime = _mtime; - return store->getRados()->put_bucket_instance_info(info, exclusive, mtime, &attrs, dpp, y); + return store->getRados()->put_bucket_instance_info(info, exclusive, mtime, &attrs, dpp, y, store->ctl()->bucket); +} + +int RadosVectorBucket::put_info(const DoutPrefixProvider* dpp, bool exclusive, ceph::real_time _mtime, optional_yield y) +{ + mtime = _mtime; + return store->getRados()->put_bucket_instance_info(info, exclusive, mtime, &attrs, dpp, y, store->ctl()->vector_bucket); } int RadosBucket::check_empty(const DoutPrefixProvider* dpp, optional_yield y) @@ -797,7 +985,12 @@ int RadosBucket::merge_and_store_attrs(const DoutPrefixProvider* dpp, Attrs& new int RadosBucket::try_refresh_info(const DoutPrefixProvider* dpp, ceph::real_time* pmtime, optional_yield y) { - return store->getRados()->try_refresh_bucket_info(info, pmtime, dpp, y, &attrs); + return store->getRados()->try_refresh_bucket_info(info, pmtime, dpp, y, &attrs, store->ctl()->bucket); +} + +int RadosVectorBucket::try_refresh_info(const DoutPrefixProvider* dpp, ceph::real_time* pmtime, optional_yield y) +{ + return store->getRados()->try_refresh_bucket_info(info, pmtime, dpp, y, &attrs, store->ctl()->vector_bucket); } int RadosBucket::read_usage(const DoutPrefixProvider *dpp, uint64_t start_epoch, uint64_t end_epoch, @@ -2790,6 +2983,24 @@ int RadosStore::get_obj_head_ioctx(const DoutPrefixProvider *dpp, const RGWBucke return rados->get_obj_head_ioctx(dpp, bucket_info, obj, ioctx); } +int RadosStore::load_vector_bucket(const DoutPrefixProvider* dpp, const rgw_bucket& b, + std::unique_ptr* bucket, optional_yield y) { + *bucket = std::make_unique(this, b); + return (*bucket)->load_bucket(dpp, y); +} + +int RadosStore::list_vector_buckets(const DoutPrefixProvider* dpp, + const rgw_owner& owner, const std::string& tenant, + const std::string& marker, const std::string& end_marker, + uint64_t max, BucketList& listing, + optional_yield y) { + librados::Rados& rados = *getRados()->get_rados_handle(); + const rgw_raw_obj& obj = get_owner_vector_buckets_obj(svc()->user, svc()->zone, owner); + + return rgwrados::buckets::list(dpp, y, rados, obj, tenant, + marker, end_marker, max, listing); +} + RadosObject::~RadosObject() { if (rados_ctx_owned) diff --git a/src/rgw/driver/rados/rgw_sal_rados.h b/src/rgw/driver/rados/rgw_sal_rados.h index 152bb87413c..afd59bb195f 100644 --- a/src/rgw/driver/rados/rgw_sal_rados.h +++ b/src/rgw/driver/rados/rgw_sal_rados.h @@ -427,6 +427,13 @@ class RadosStore : public StoreDriver { const std::string& unique_tag) override; virtual const std::string& get_compression_type(const rgw_placement_rule& rule) override; virtual bool valid_placement(const rgw_placement_rule& rule) override; + int load_vector_bucket(const DoutPrefixProvider* dpp, const rgw_bucket& b, + std::unique_ptr* bucket, optional_yield y) override; + int list_vector_buckets(const DoutPrefixProvider* dpp, + const rgw_owner& owner, const std::string& tenant, + const std::string& marker, const std::string& end_marker, + uint64_t max, BucketList& listing, + optional_yield y) override; virtual void shutdown(void) override; @@ -817,6 +824,39 @@ class RadosBucket : public StoreBucket { friend class RadosUser; }; +class RadosVectorBucket : public StoreVectorBucket { + private: + RadosStore* store; + + public: + RadosVectorBucket(RadosStore *_st) + : store(_st) {} + + RadosVectorBucket(RadosStore *_st, const rgw_bucket& _b) + : StoreVectorBucket(_b), + store(_st) {} + + RadosVectorBucket(RadosStore *_st, const RGWBucketInfo& _i) + : StoreVectorBucket(_i), + store(_st) {} + + ~RadosVectorBucket() override = default; + int remove(const DoutPrefixProvider* dpp, bool delete_children, optional_yield y) override; + int create(const DoutPrefixProvider* dpp, const CreateParams& params, + optional_yield y) override; + int load_bucket(const DoutPrefixProvider* dpp, optional_yield y) override; + int check_empty(const DoutPrefixProvider* dpp, optional_yield y) override { return 0; } + std::unique_ptr clone() override { + return std::make_unique(*this); + } + int put_info(const DoutPrefixProvider* dpp, bool exclusive, ceph::real_time mtime, optional_yield y) override; + int try_refresh_info(const DoutPrefixProvider* dpp, ceph::real_time* pmtime, optional_yield y) override; + + private: + int link(const DoutPrefixProvider* dpp, const rgw_owner& new_owner, optional_yield y, bool update_entrypoint = true, RGWObjVersionTracker* objv = nullptr); + int unlink(const DoutPrefixProvider* dpp, const rgw_owner& owner, optional_yield y, bool update_entrypoint = true); +}; + class RadosMultipartPart : public StoreMultipartPart { protected: RGWUploadPartInfo info; diff --git a/src/rgw/driver/rados/rgw_service.cc b/src/rgw/driver/rados/rgw_service.cc index b69ac11462c..bab88034d10 100644 --- a/src/rgw/driver/rados/rgw_service.cc +++ b/src/rgw/driver/rados/rgw_service.cc @@ -57,6 +57,7 @@ int RGWServices_Def::init(CephContext *cct, const rgw::SiteConfig* site) { bucket_sobj = std::make_unique(cct); + vector_bucket_sobj = std::make_unique(cct); bucket_sync_sobj = std::make_unique(cct); bi_rados = std::make_unique(cct); bilog_rados = std::make_unique(cct); @@ -88,6 +89,9 @@ int RGWServices_Def::init(CephContext *cct, bucket_sobj->init(zone.get(), sysobj.get(), sysobj_cache.get(), bi_rados.get(), mdlog.get(), sync_modules.get(), bucket_sync_sobj.get()); + vector_bucket_sobj->init(zone.get(), sysobj.get(), sysobj_cache.get(), + bi_rados.get(), mdlog.get(), + sync_modules.get(), bucket_sync_sobj.get()); bucket_sync_sobj->init(zone.get(), sysobj.get(), sysobj_cache.get(), @@ -205,6 +209,12 @@ int RGWServices_Def::init(CephContext *cct, return r; } + r = vector_bucket_sobj->start(y, dpp); + if (r < 0) { + ldpp_dout(dpp, 0) << "ERROR: failed to start s3vector bucket service (" << cpp_strerror(-r) << dendl; + return r; + } + r = bucket_sync_sobj->start(y, dpp); if (r < 0) { ldpp_dout(dpp, 0) << "ERROR: failed to start bucket_sync service (" << cpp_strerror(-r) << dendl; @@ -246,6 +256,7 @@ void RGWServices_Def::shutdown() bi_rados->shutdown(); bucket_sync_sobj->shutdown(); bucket_sobj->shutdown(); + vector_bucket_sobj->shutdown(); sysobj->shutdown(); sysobj_core->shutdown(); @@ -275,6 +286,8 @@ int RGWServices::do_init(CephContext *_cct, rgw::sal::RadosStore* driver, bool h bilog_rados = _svc.bilog_rados.get(); bucket_sobj = _svc.bucket_sobj.get(); bucket = bucket_sobj; + vector_bucket_sobj = _svc.vector_bucket_sobj.get(); + vector_bucket = vector_bucket_sobj; bucket_sync_sobj = _svc.bucket_sync_sobj.get(); bucket_sync = bucket_sync_sobj; cls = _svc.cls.get(); @@ -333,17 +346,32 @@ int RGWCtlDef::init(RGWServices& svc, rgw::sal::Driver* driver, svc.bucket, svc.bucket_sync, svc.bi, svc.user, - svc.datalog_rados)); + svc.datalog_rados, + rgwrados::account::get_buckets_obj, + &RGWSI_User::get_buckets_obj)); + vector_bucket.reset(new RGWBucketCtl(svc.zone, + svc.vector_bucket, + svc.bucket_sync, + svc.bi, svc.user, + svc.datalog_rados, + rgwrados::account::get_vector_buckets_obj, + &RGWSI_User::get_vector_buckets_obj)); auto sync_module = svc.sync_modules->get_sync_module(); if (sync_module) { meta.bucket = sync_module->alloc_bucket_meta_handler(rados, svc.bucket, bucket.get()); meta.bucket_instance = sync_module->alloc_bucket_instance_meta_handler( driver, svc.zone, svc.bucket, svc.bi, svc.datalog_rados); + meta.vector_bucket = sync_module->alloc_vector_bucket_meta_handler(rados, svc.vector_bucket, vector_bucket.get()); + meta.vector_bucket_instance = sync_module->alloc_vector_bucket_instance_meta_handler( + driver, svc.zone, svc.vector_bucket); } else { meta.bucket = create_bucket_metadata_handler(rados, svc.bucket, bucket.get()); meta.bucket_instance = create_bucket_instance_metadata_handler( driver, svc.zone, svc.bucket, svc.bi, svc.datalog_rados); + meta.vector_bucket = create_vector_bucket_metadata_handler(rados, svc.vector_bucket, vector_bucket.get()); + meta.vector_bucket_instance = create_vector_bucket_instance_metadata_handler( + driver, svc.zone, svc.vector_bucket); } meta.otp = rgwrados::otp::create_metadata_handler( @@ -366,6 +394,7 @@ int RGWCtlDef::init(RGWServices& svc, rgw::sal::Driver* driver, user->init(bucket.get()); bucket->init(user.get(), svc.datalog_rados, dpp); + vector_bucket->init(user.get(), svc.datalog_rados, dpp); return 0; } @@ -386,6 +415,8 @@ int RGWCtl::init(RGWServices *_svc, rgw::sal::Driver* driver, meta.user = _ctl.meta.user.get(); meta.bucket = _ctl.meta.bucket.get(); meta.bucket_instance = _ctl.meta.bucket_instance.get(); + meta.vector_bucket = _ctl.meta.vector_bucket.get(); + meta.vector_bucket_instance = _ctl.meta.vector_bucket_instance.get(); meta.otp = _ctl.meta.otp.get(); meta.role = _ctl.meta.role.get(); meta.topic = _ctl.meta.topic.get(); @@ -393,6 +424,7 @@ int RGWCtl::init(RGWServices *_svc, rgw::sal::Driver* driver, user = _ctl.user.get(); bucket = _ctl.bucket.get(); + vector_bucket = _ctl.vector_bucket.get(); r = meta.user->attach(meta.mgr); if (r < 0) { @@ -412,6 +444,18 @@ int RGWCtl::init(RGWServices *_svc, rgw::sal::Driver* driver, return r; } + r = meta.vector_bucket->attach(meta.mgr); + if (r < 0) { + ldout(cct, 0) << "ERROR: failed to start init meta.vector_bucket ctl (" << cpp_strerror(-r) << dendl; + return r; + } + + r = meta.vector_bucket_instance->attach(meta.mgr); + if (r < 0) { + ldout(cct, 0) << "ERROR: failed to start init meta.vector_bucket_instance ctl (" << cpp_strerror(-r) << dendl; + return r; + } + r = meta.otp->attach(meta.mgr); if (r < 0) { ldout(cct, 0) << "ERROR: failed to start init otp ctl (" << cpp_strerror(-r) << dendl; diff --git a/src/rgw/driver/rados/rgw_service.h b/src/rgw/driver/rados/rgw_service.h index cdc2392da18..86399ef01ee 100644 --- a/src/rgw/driver/rados/rgw_service.h +++ b/src/rgw/driver/rados/rgw_service.h @@ -55,6 +55,7 @@ public: class RGWSI_Bucket; class RGWSI_Bucket_SObj; +class RGWSI_VectorBucket_SObj; class RGWSI_Bucket_Sync; class RGWSI_Bucket_Sync_SObj; class RGWSI_BucketIndex; @@ -84,6 +85,7 @@ struct RGWServices_Def bool has_shutdown{false}; std::unique_ptr bucket_sobj; + std::unique_ptr vector_bucket_sobj; std::unique_ptr bucket_sync_sobj; std::unique_ptr bi_rados; std::unique_ptr bilog_rados; @@ -121,7 +123,9 @@ struct RGWServices const rgw::SiteConfig* site{nullptr}; RGWSI_Bucket *bucket{nullptr}; + RGWSI_Bucket *vector_bucket{nullptr}; RGWSI_Bucket_SObj *bucket_sobj{nullptr}; + RGWSI_VectorBucket_SObj *vector_bucket_sobj{nullptr}; RGWSI_Bucket_Sync *bucket_sync{nullptr}; RGWSI_Bucket_Sync_SObj *bucket_sync_sobj{nullptr}; RGWSI_BucketIndex *bi{nullptr}; @@ -176,6 +180,8 @@ struct RGWCtlDef { std::unique_ptr mgr; std::unique_ptr bucket; std::unique_ptr bucket_instance; + std::unique_ptr vector_bucket; + std::unique_ptr vector_bucket_instance; std::unique_ptr user; std::unique_ptr otp; std::unique_ptr role; @@ -191,6 +197,7 @@ struct RGWCtlDef { std::unique_ptr user; std::unique_ptr bucket; + std::unique_ptr vector_bucket; RGWCtlDef(); ~RGWCtlDef(); @@ -210,6 +217,8 @@ struct RGWCtl { RGWMetadataHandler *bucket{nullptr}; RGWMetadataHandler *bucket_instance{nullptr}; + RGWMetadataHandler *vector_bucket{nullptr}; + RGWMetadataHandler *vector_bucket_instance{nullptr}; RGWMetadataHandler *user{nullptr}; RGWMetadataHandler *otp{nullptr}; RGWMetadataHandler *role{nullptr}; @@ -220,6 +229,7 @@ struct RGWCtl { RGWUserCtl *user{nullptr}; RGWBucketCtl *bucket{nullptr}; + RGWBucketCtl *vector_bucket{nullptr}; int init(RGWServices *_svc, rgw::sal::Driver* driver, librados::Rados& rados, const DoutPrefixProvider *dpp); diff --git a/src/rgw/driver/rados/rgw_sync_module.cc b/src/rgw/driver/rados/rgw_sync_module.cc index 0db00e9e49a..59e508f0b99 100644 --- a/src/rgw/driver/rados/rgw_sync_module.cc +++ b/src/rgw/driver/rados/rgw_sync_module.cc @@ -35,6 +35,22 @@ auto RGWSyncModuleInstance::alloc_bucket_instance_meta_handler(rgw::sal::Driver* svc_bucket, svc_bi, svc_datalog); } +auto RGWSyncModuleInstance::alloc_vector_bucket_meta_handler(librados::Rados& rados, + RGWSI_Bucket* svc_bucket, + RGWBucketCtl* ctl_bucket) + -> std::unique_ptr +{ + return create_vector_bucket_metadata_handler(rados, svc_bucket, ctl_bucket); +} + +auto RGWSyncModuleInstance::alloc_vector_bucket_instance_meta_handler(rgw::sal::Driver* driver, + RGWSI_Zone* svc_zone, + RGWSI_Bucket* svc_bucket) + -> std::unique_ptr +{ + return create_vector_bucket_instance_metadata_handler(driver, svc_zone, + svc_bucket); +} RGWStatRemoteObjCBCR::RGWStatRemoteObjCBCR(RGWDataSyncCtx *_sc, rgw_bucket& _src_bucket, rgw_obj_key& _key) : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env), diff --git a/src/rgw/driver/rados/rgw_sync_module.h b/src/rgw/driver/rados/rgw_sync_module.h index e27c89bc03e..5c19f51f820 100644 --- a/src/rgw/driver/rados/rgw_sync_module.h +++ b/src/rgw/driver/rados/rgw_sync_module.h @@ -71,6 +71,14 @@ public: RGWSI_BucketIndex* svc_bi, RGWDataChangesLog *svc_datalog) -> std::unique_ptr; + virtual auto alloc_vector_bucket_meta_handler(librados::Rados& rados, + RGWSI_Bucket* svc_bucket, + RGWBucketCtl* ctl_bucket) + -> std::unique_ptr; + virtual auto alloc_vector_bucket_instance_meta_handler(rgw::sal::Driver* driver, + RGWSI_Zone* svc_zone, + RGWSI_Bucket* svc_bucket) + -> std::unique_ptr; // indication whether the sync module start with full sync (default behavior) // incremental sync would follow anyway diff --git a/src/rgw/rgw_arn.cc b/src/rgw/rgw_arn.cc index 00938b9ed6e..5b3219fff76 100644 --- a/src/rgw/rgw_arn.cc +++ b/src/rgw/rgw_arn.cc @@ -95,6 +95,7 @@ boost::optional to_service(const smatch::value_type& s, { "route53", Service::route53 }, { "route53domains", Service::route53domains }, { "s3", Service::s3 }, + { "s3vectors", Service::s3vectors }, { "sdb", Service::sdb }, { "servicecatalog", Service::servicecatalog }, { "ses", Service::ses }, @@ -264,6 +265,7 @@ std::string ARN::to_string() const { { Service::route53, "route53" }, { Service::route53domains, "route53domains" }, { Service::s3, "s3" }, + { Service::s3vectors, "s3vectors" }, { Service::sdb, "sdb" }, { Service::servicecatalog, "servicecatalog" }, { Service::ses, "ses" }, diff --git a/src/rgw/rgw_arn.h b/src/rgw/rgw_arn.h index 15ef452671d..22ade02471b 100644 --- a/src/rgw/rgw_arn.h +++ b/src/rgw/rgw_arn.h @@ -31,7 +31,7 @@ enum struct Service { machinelearning, aws_marketplace, aws_marketplace_management, mobileanalytics, mobilehub, opsworks, opsworks_cm, polly, redshift, rds, route53, route53domains, sts, servicecatalog, - ses, sns, sqs, s3, swf, sdb, states, storagegateway, support, + ses, sns, sqs, s3, s3vectors, swf, sdb, states, storagegateway, support, trustedadvisor, waf, workmail, workspaces, wildcard }; diff --git a/src/rgw/rgw_op.h b/src/rgw/rgw_op.h index 4408bbb2e86..9aab9941de0 100644 --- a/src/rgw/rgw_op.h +++ b/src/rgw/rgw_op.h @@ -191,9 +191,9 @@ int rgw_rest_get_json_input(CephContext *cct, req_state *s, T& out, // // The called function must return an integer, negative on error. In // general, they should just return op_ret. -template +template int retry_raced_bucket_write(const DoutPrefixProvider *dpp, - rgw::sal::Bucket *b, + B* b, const F &f, optional_yield y) { auto r = f(); diff --git a/src/rgw/rgw_rest_client.cc b/src/rgw/rgw_rest_client.cc index 41bbbebd7b2..09b952b1ec0 100644 --- a/src/rgw/rgw_rest_client.cc +++ b/src/rgw/rgw_rest_client.cc @@ -414,6 +414,11 @@ auto RGWRESTSimpleRequest::forward_request(const DoutPrefixProvider *dpp, const new_env.set("HTTP_X_AMZ_CONTENT_SHA256", maybe_payload_hash); } + const char* content_type = info.env->get("CONTENT_TYPE"); + if (content_type) { + new_env.set("HTTP_CONTENT_TYPE", content_type); + } + int ret = sign_request(dpp, key, region, s, new_env, new_info, nullptr); if (ret < 0) { ldpp_dout(dpp, 0) << "ERROR: failed to sign request" << dendl; diff --git a/src/rgw/rgw_rest_s3.cc b/src/rgw/rgw_rest_s3.cc index 000b1325cb1..03f857b8c0f 100644 --- a/src/rgw/rgw_rest_s3.cc +++ b/src/rgw/rgw_rest_s3.cc @@ -2765,12 +2765,10 @@ void RGWCreateBucket_ObjStore_S3::send_response() set_req_state_err(s, op_ret); } dump_errno(s); - end_header(s); - - if (op_ret < 0) - return; - if (s->system_request) { + if (op_ret == 0 && s->system_request) { + s->format = RGWFormat::JSON; + end_header(s, this, to_mime_type(s->format)); JSONFormatter f; /* use json formatter for system requests output */ const RGWBucketInfo& info = s->bucket->get_info(); @@ -2781,7 +2779,9 @@ void RGWCreateBucket_ObjStore_S3::send_response() encode_json("bucket_info", info, &f); f.close_section(); rgw_flush_formatter_and_reset(s, &f); + return; } + end_header(s); } void RGWDeleteBucket_ObjStore_S3::send_response() @@ -5944,12 +5944,16 @@ void parse_post_action(const std::string& post_body, req_state* s) // s3vector requests looks like bucket POST operations // where the "bucket name" is the operation name. // with JSON payload +// and no user provided params // POST / HTTP/1.1 // Content-type: application/json bool is_s3vector_op(const req_state* s) { const auto content_type = s->info.env->get_optional("CONTENT_TYPE"); + const auto& params = s->info.args.get_params(); return std::string_view(s->info.method) == "POST" && - s->info.args.get_num_params() == 0 && + std::count_if(params.begin(), params.end(), [](const auto& p) { + return !p.first.starts_with(RGW_SYS_PARAM_PREFIX); + }) == 0 && content_type && *content_type == "application/json"; } diff --git a/src/rgw/rgw_rest_s3vector.cc b/src/rgw/rgw_rest_s3vector.cc index 059e171993a..10aac50112b 100644 --- a/src/rgw/rgw_rest_s3vector.cc +++ b/src/rgw/rgw_rest_s3vector.cc @@ -4,6 +4,7 @@ #include "rgw_op.h" #include "rgw_rest_s3vector.h" #include "rgw_s3vector.h" +#include "rgw_process_env.h" #include "common/async/yield_context.h" #define dout_context g_ceph_context @@ -13,22 +14,22 @@ namespace { class RGWS3VectorBase : public RGWDefaultResponseOp { protected: + bufferlist in_data; template int do_init_processing(T& configuration, optional_yield y) { const auto max_size = s->cct->_conf->rgw_max_put_param_size; - bufferlist data; int ret = 0; - if (std::tie(ret, data) = read_all_input(s, max_size, false); ret < 0) { + if (std::tie(ret, in_data) = read_all_input(s, max_size, false); ret < 0) { ldpp_dout(this, 1) << "ERROR: failed to read JSON s3vector payload, ret = " << ret << dendl; return ret; } - if (data.length() == 0) { + if (in_data.length() == 0) { ldpp_dout(this, 1) << "ERROR: JSON s3vector payload missing" << dendl; return -EINVAL; } JSONParser parser; - if (!parser.parse(data.c_str(), data.length())) { + if (!parser.parse(in_data.c_str(), in_data.length())) { ldpp_dout(this, 1) << "ERROR: failed to parse JSON s3vector payload" << dendl; return -EINVAL; } @@ -65,19 +66,44 @@ class RGWS3VectorCreateIndex : public RGWS3VectorBase { } void execute(optional_yield y) override { + const rgw_bucket bucket_id(s->bucket_tenant, configuration.vector_bucket_name); + std::unique_ptr bucket; + op_ret = driver->load_vector_bucket(this, bucket_id, &bucket, y); + if (op_ret < 0) { + ldpp_dout(this, 1) << "ERROR: failed to load s3vector bucket " << bucket_id << ". error: " << op_ret << dendl; + return; + } op_ret = rgw::s3vector::create_index(configuration, this, y); } }; class RGWS3VectorCreateVectorBucket : public RGWS3VectorBase { rgw::s3vector::create_vector_bucket_t configuration; + std::unique_ptr bucket; int verify_permission(optional_yield y) override { ldpp_dout(this, 10) << "INFO: verifying permission for s3vector CreateVectorBucket" << dendl; - // TODO: implement permission check - /*if (!verify_bucket_permission(this, s, rgw::IAM::s3vectorsCreateVectorBucket)) { + if (s->auth.identity->is_anonymous()) { return -EACCES; - }*/ + } + + rgw::ARN arn(rgw::Partition::aws, rgw::Service::s3vectors, + "", s->bucket_tenant, configuration.vector_bucket_name); + if (!verify_user_permission(this, s, arn, rgw::IAM::s3vectorsCreateVectorBucket, false)) { + return -EACCES; + } + + if (s->auth.identity->get_tenant() != s->bucket_tenant) { + //AssumeRole is meant for cross account access + if (s->auth.identity->get_identity_type() != TYPE_ROLE) { + ldpp_dout(this, 5) << "WARNING: user cannot create an s3vector bucket in a different tenant" + << " (user_id.tenant=" << s->user->get_tenant() + << " requested=" << s->bucket_tenant << ")" + << dendl; + return -EACCES; + } + } + return 0; } @@ -91,8 +117,89 @@ class RGWS3VectorCreateVectorBucket : public RGWS3VectorBase { } void execute(optional_yield y) override { + const rgw_bucket bucket_id(s->bucket_tenant, configuration.vector_bucket_name); + int ret = driver->load_vector_bucket(this, bucket_id, &bucket, y); + if (ret != -ENOENT) { + // TODO: verify attributes from the existing bucket match the requested configuration + op_ret = ret; + return; + } + + const auto& zonegroup = s->penv.site->get_zonegroup(); + + rgw::sal::VectorBucket::CreateParams createparams; + createparams.owner = s->user->get_id(); + createparams.zonegroup_id = zonegroup.id; + // vector buckets are indexless + createparams.index_type = rgw::BucketIndexType::Indexless; + createparams.placement_rule.storage_class = s->info.storage_class; + if (!driver->is_meta_master()) { + // apply bucket creation on the master zone first + JSONParser jp; + op_ret = rgw_forward_request_to_master(this, *s->penv.site, s->owner.id, + &in_data, &jp, s->info, s->err, y); + if (op_ret < 0) { + return; + } + + RGWBucketInfo master_info; + JSONDecoder::decode_json("bucket_info", master_info, &jp); + + // update params with info from the master + createparams.marker = master_info.bucket.marker; + createparams.bucket_id = master_info.bucket.bucket_id; + createparams.zonegroup_id = master_info.zonegroup; + createparams.quota = master_info.quota; + createparams.creation_time = master_info.creation_time; + } + + op_ret = bucket->create(this, createparams, y); + if (op_ret < 0) { + ldpp_dout(this, 1) << "ERROR: failed to create s3vector bucket " << bucket_id << ". error: " << ret << dendl; + return; + } op_ret = rgw::s3vector::create_vector_bucket(configuration, this, y); + if (op_ret < 0) { + ldpp_dout(this, 1) << "ERROR: failed to initialize s3vector bucket " << bucket_id << ". error: " << ret << dendl; + return; + } } + + void send_response() override { + if (op_ret == -ERR_BUCKET_EXISTS) { + const auto eexist_override = s->cct->_conf.get_val("rgw_bucket_eexist_override"); + if (! eexist_override) [[likely]] { + op_ret = 0; + } else { + s->err.message = "The requested bucket name is not available. The bucket namespace is shared by all users of the system. Specify a different name and try again."; + } + } + + if (op_ret) { + set_req_state_err(s, op_ret); + } + dump_errno(s); + + if (op_ret == 0 && s->system_request) { + s->format = RGWFormat::JSON; + end_header(s, this, to_mime_type(s->format)); + JSONFormatter f; /* use json formatter for system requests output */ + + ceph_assert(bucket); + ceph_assert(!bucket->empty()); + const RGWBucketInfo& info = bucket->get_info(); + const obj_version& ep_objv = bucket->get_version(); + f.open_object_section("info"); + encode_json("entry_point_object_ver", ep_objv, &f); + encode_json("object_ver", info.objv_tracker.read_version, &f); + encode_json("bucket_info", info, &f); + f.close_section(); + rgw_flush_formatter_and_reset(s, &f); + return; + } + end_header(s); + } + }; class RGWS3VectorDeleteIndex : public RGWS3VectorBase { @@ -117,19 +224,30 @@ class RGWS3VectorDeleteIndex : public RGWS3VectorBase { } void execute(optional_yield y) override { + const rgw_bucket bucket_id(s->bucket_tenant, configuration.vector_bucket_name); + std::unique_ptr bucket; + op_ret = driver->load_vector_bucket(this, bucket_id, &bucket, y); + if (op_ret < 0) { + ldpp_dout(this, 1) << "ERROR: failed to load s3vector bucket " << bucket_id << ". error: " << op_ret << dendl; + return; + } op_ret = rgw::s3vector::delete_index(configuration, this, y); } }; class RGWS3VectorDeleteVectorBucket : public RGWS3VectorBase { + // TODO: collapse with get_vector_bucket_t an create a common function + // to build and verify the ARN in init_processing() rgw::s3vector::delete_vector_bucket_t configuration; + boost::optional arn; int verify_permission(optional_yield y) override { ldpp_dout(this, 10) << "INFO: verifying permission for s3vector DeleteVectorBucket" << dendl; - // TODO: implement permission check - /*if (!verify_bucket_permission(this, s, rgw::IAM::s3vectorsDeleteVectorBucket)) { - return -EACCES; - }*/ + if (!verify_bucket_permission(this, s, arn.get(), rgw::IAM::s3vectorsDeleteVectorBucket)) { + // TODO: ignore failure for now "evaluate_iam_policies: implicit deny from identity-based policy" + // return -EACCES; + return 0; + } return 0; } @@ -139,11 +257,65 @@ class RGWS3VectorDeleteVectorBucket : public RGWS3VectorBase { uint32_t op_mask() override { return RGW_OP_TYPE_WRITE; } int init_processing(optional_yield y) override { - return do_init_processing(configuration, y); + int ret = do_init_processing(configuration, y); + if (ret < 0) { + return ret; + } + if (configuration.vector_bucket_arn.empty()) { + arn.emplace(rgw::Partition::aws, + rgw::Service::s3vectors, + s->penv.site->get_zonegroup().api_name, + s->auth.identity->get_tenant(), + configuration.vector_bucket_name); + } else { + arn = rgw::ARN::parse(configuration.vector_bucket_arn); + if (!arn) { + ldpp_dout(this, 1) << "ERROR: invalid s3vector bucket ARN: " << configuration.vector_bucket_arn << dendl; + return -EINVAL; + } + if (arn->service != rgw::Service::s3vectors || + arn->partition != rgw::Partition::aws || + arn->region != s->penv.site->get_zonegroup().api_name || + arn->account != s->bucket_tenant || + arn->resource.empty()) { + ldpp_dout(this, 1) << "ERROR: invalid s3vector bucket ARN service: " << arn->to_string() << dendl; + return -EINVAL; + } + } + if (!configuration.vector_bucket_name.empty() && + arn->resource != configuration.vector_bucket_name) { + ldpp_dout(this, 1) << "ERROR: s3vector bucket ARN bucket mismatch: " << arn->to_string() + << " expected bucket: " << configuration.vector_bucket_name << dendl; + return -EINVAL; + } + + s->bucket_name = arn->resource; + ldpp_dout(this, 20) << "INFO: s3vector bucket ARN: " << arn.get() << dendl; + return 0; } void execute(optional_yield y) override { - op_ret = rgw::s3vector::delete_vector_bucket(configuration, this, y); + { + JSONFormatter f; + configuration.dump(&f); + std::stringstream ss; + f.flush(ss); + ldpp_dout(this, 20) << "INFO: executing s3vector DeleteVectorBucket with: " << ss.str() << dendl; + } + const rgw_bucket bucket_id(s->bucket_tenant, configuration.vector_bucket_name); + std::unique_ptr bucket; + op_ret = driver->load_vector_bucket(this, bucket_id, &bucket, y); + if (op_ret < 0) { + ldpp_dout(this, 1) << "ERROR: failed to load s3vector bucket " << bucket_id << ". error: " << op_ret << dendl; + return; + } + op_ret = bucket->remove(this, false, y); + if (op_ret < 0) { + ldpp_dout(this, 1) << "ERROR: failed to delete s3vector bucket " << bucket_id << ". error: " << op_ret << dendl; + return; + } + // TODO: verify bucket is empty before deletion (or support force delete) + // rgw::s3vector::delete_vector_bucket(configuration, this, y); } }; @@ -169,6 +341,13 @@ class RGWS3VectorDeleteVectorBucketPolicy : public RGWS3VectorBase { } void execute(optional_yield y) override { + const rgw_bucket bucket_id(s->bucket_tenant, configuration.vector_bucket_name); + std::unique_ptr bucket; + op_ret = driver->load_vector_bucket(this, bucket_id, &bucket, y); + if (op_ret < 0) { + ldpp_dout(this, 1) << "ERROR: failed to load s3vector bucket " << bucket_id << ". error: " << op_ret << dendl; + return; + } op_ret = rgw::s3vector::delete_vector_bucket_policy(configuration, this, y); } }; @@ -195,6 +374,13 @@ class RGWS3VectorPutVectors : public RGWS3VectorBase { } void execute(optional_yield y) override { + const rgw_bucket bucket_id(s->bucket_tenant, configuration.vector_bucket_name); + std::unique_ptr bucket; + op_ret = driver->load_vector_bucket(this, bucket_id, &bucket, y); + if (op_ret < 0) { + ldpp_dout(this, 1) << "ERROR: failed to load s3vector bucket " << bucket_id << ". error: " << op_ret << dendl; + return; + } op_ret = rgw::s3vector::put_vectors(configuration, this, y); } }; @@ -221,6 +407,13 @@ class RGWS3VectorGetVectors : public RGWS3VectorBase { } void execute(optional_yield y) override { + const rgw_bucket bucket_id(s->bucket_tenant, configuration.vector_bucket_name); + std::unique_ptr bucket; + op_ret = driver->load_vector_bucket(this, bucket_id, &bucket, y); + if (op_ret < 0) { + ldpp_dout(this, 1) << "ERROR: failed to load s3vector bucket " << bucket_id << ". error: " << op_ret << dendl; + return; + } op_ret = rgw::s3vector::get_vectors(configuration, this, y); } }; @@ -247,19 +440,37 @@ class RGWS3VectorListVectors : public RGWS3VectorBase { } void execute(optional_yield y) override { + const rgw_bucket bucket_id(s->bucket_tenant, configuration.vector_bucket_name); + std::unique_ptr bucket; + op_ret = driver->load_vector_bucket(this, bucket_id, &bucket, y); + if (op_ret < 0) { + ldpp_dout(this, 1) << "ERROR: failed to load s3vector bucket " << bucket_id << ". error: " << op_ret << dendl; + return; + } op_ret = rgw::s3vector::list_vectors(configuration, this, y); } }; class RGWS3VectorListVectorBuckets : public RGWS3VectorBase { rgw::s3vector::list_vector_buckets_t configuration; + rgw::sal::BucketList listing; + std::string zonegroup_name; int verify_permission(optional_yield y) override { ldpp_dout(this, 10) << "INFO: verifying permission for s3vector ListVectorBuckets" << dendl; - // TODO: implement permission check - /*if (!verify_bucket_permission(this, s, rgw::IAM::s3vectorsListVectorBuckets)) { + if (s->auth.identity->is_anonymous()) { return -EACCES; - }*/ + } + + zonegroup_name = s->penv.site->get_zonegroup().api_name; + rgw::ARN arn = rgw::ARN(rgw::Partition::aws, + rgw::Service::s3vectors, + zonegroup_name, + s->auth.identity->get_tenant(), + "*"); + if (!verify_user_permission(this, s, arn, rgw::IAM::s3vectorsListVectorBuckets, false)) { + return -EACCES; + } return 0; } @@ -273,19 +484,68 @@ class RGWS3VectorListVectorBuckets : public RGWS3VectorBase { } void execute(optional_yield y) override { - op_ret = rgw::s3vector::list_vector_buckets(configuration, this, y); + { + JSONFormatter f; + configuration.dump(&f); + std::stringstream ss; + f.flush(ss); + ldpp_dout(this, 20) << "INFO: executing s3vector ListVectorBuckets with: " << ss.str() << dendl; + } + + op_ret = driver->list_vector_buckets(this, s->owner.id, s->auth.identity->get_tenant(), + configuration.next_token, "", configuration.max_results, listing, y); + if (op_ret < 0) { + ldpp_dout(this, 20) << "ERROR: failed to execute ListVectorBuckets. error: " << op_ret << dendl; + return; + } + } + + void send_response() override { + if (op_ret < 0) { + set_req_state_err(s, op_ret); + } + dump_errno(s); + end_header(s, this, "application/json"); + // convert BucketList to AWS S3 Vector Buckets format + JSONFormatter f; + f.open_object_section(""); + if (!listing.next_marker.empty()) { + ::encode_json("nextToken", listing.next_marker, &f); + } + f.open_array_section("vectorBuckets"); + for (const auto& bucket : listing.buckets) { + f.open_object_section(""); + ::encode_json("creationTime", ceph::to_iso_8601(bucket.creation_time), &f); + rgw::ARN arn(rgw::Partition::aws, + rgw::Service::s3vectors, + zonegroup_name, + s->auth.identity->get_tenant(), + bucket.bucket.name); + ::encode_json("vectorBucketArn", arn.to_string(), &f); + ::encode_json("vectorBucketName", bucket.bucket.name, &f); + f.close_section(); + } + f.close_section(); // vectorBuckets + f.close_section(); // root object + std::stringstream ss; + f.flush(ss); + dump_body(s, ss.str()); } }; class RGWS3VectorGetVectorBucket : public RGWS3VectorBase { rgw::s3vector::get_vector_bucket_t configuration; + std::unique_ptr bucket; + boost::optional arn; int verify_permission(optional_yield y) override { ldpp_dout(this, 10) << "INFO: verifying permission for s3vector GetVectorBucket" << dendl; - // TODO: implement permission check - /*if (!verify_bucket_permission(this, s, rgw::IAM::s3vectorsGetVectorBucket)) { - return -EACCES; - }*/ + + if (!verify_bucket_permission(this, s, arn.get(), rgw::IAM::s3vectorsGetVectorBucket)) { + // TODO: ignore failure for now "evaluate_iam_policies: implicit deny from identity-based policy" + // return -EACCES; + return 0; + } return 0; } @@ -295,11 +555,78 @@ class RGWS3VectorGetVectorBucket : public RGWS3VectorBase { uint32_t op_mask() override { return RGW_OP_TYPE_READ; } int init_processing(optional_yield y) override { - return do_init_processing(configuration, y); + int ret = do_init_processing(configuration, y); + if (ret < 0) { + return ret; + } + if (configuration.vector_bucket_arn.empty()) { + arn.emplace(rgw::Partition::aws, + rgw::Service::s3vectors, + s->penv.site->get_zonegroup().api_name, + s->auth.identity->get_tenant(), + configuration.vector_bucket_name); + } else { + arn = rgw::ARN::parse(configuration.vector_bucket_arn); + if (!arn) { + ldpp_dout(this, 1) << "ERROR: invalid s3vector bucket ARN: " << configuration.vector_bucket_arn << dendl; + return -EINVAL; + } + if (arn->service != rgw::Service::s3vectors || + arn->partition != rgw::Partition::aws || + arn->region != s->penv.site->get_zonegroup().api_name || + arn->account != s->bucket_tenant || + arn->resource.empty()) { + ldpp_dout(this, 1) << "ERROR: invalid s3vector bucket ARN service: " << arn->to_string() << dendl; + return -EINVAL; + } + } + if (!configuration.vector_bucket_name.empty() && + arn->resource != configuration.vector_bucket_name) { + ldpp_dout(this, 1) << "ERROR: s3vector bucket ARN bucket mismatch: " << arn->to_string() + << " expected bucket: " << configuration.vector_bucket_name << dendl; + return -EINVAL; + } + + s->bucket_name = arn->resource; + ldpp_dout(this, 20) << "INFO: s3vector bucket ARN: " << arn.get() << dendl; + return 0; } void execute(optional_yield y) override { - op_ret = rgw::s3vector::get_vector_bucket(configuration, this, y); + { + JSONFormatter f; + configuration.dump(&f); + std::stringstream ss; + f.flush(ss); + ldpp_dout(this, 20) << "INFO: executing s3vector GetVectorBucket with: " << ss.str() << dendl; + } + const rgw_bucket bucket_id(s->bucket_tenant, configuration.vector_bucket_name); + op_ret = driver->load_vector_bucket(this, bucket_id, &bucket, y); + if (op_ret < 0) { + ldpp_dout(this, 1) << "ERROR: failed to load s3vector bucket " << bucket_id << ". error: " << op_ret << dendl; + } + } + + void send_response() override { + if (op_ret < 0) { + set_req_state_err(s, op_ret); + } + dump_errno(s); + end_header(s, this, "application/json"); + // convert BucketList to AWS S3 Vector Buckets format + JSONFormatter f; + f.open_object_section(""); + f.open_object_section("vectorBucket"); + ::encode_json("creationTime", ceph::to_iso_8601(bucket->get_creation_time()), &f); + rgw::ARN arn(rgw::Partition::aws, rgw::Service::s3vectors, + "", bucket->get_tenant(), bucket->get_name()); + ::encode_json("vectorBucketArn", arn.to_string(), &f); + ::encode_json("vectorBucketName", bucket->get_name(), &f); + f.close_section(); // vectorBucket + f.close_section(); // root object + std::stringstream ss; + f.flush(ss); + dump_body(s, ss.str()); } }; @@ -325,6 +652,13 @@ class RGWS3VectorGetIndex : public RGWS3VectorBase { } void execute(optional_yield y) override { + const rgw_bucket bucket_id(s->bucket_tenant, configuration.vector_bucket_name); + std::unique_ptr bucket; + op_ret = driver->load_vector_bucket(this, bucket_id, &bucket, y); + if (op_ret < 0) { + ldpp_dout(this, 1) << "ERROR: failed to load s3vector bucket " << bucket_id << ". error: " << op_ret << dendl; + return; + } op_ret = rgw::s3vector::get_index(configuration, this, y); } }; @@ -351,6 +685,13 @@ class RGWS3VectorListIndexes : public RGWS3VectorBase { } void execute(optional_yield y) override { + const rgw_bucket bucket_id(s->bucket_tenant, configuration.vector_bucket_name); + std::unique_ptr bucket; + op_ret = driver->load_vector_bucket(this, bucket_id, &bucket, y); + if (op_ret < 0) { + ldpp_dout(this, 1) << "ERROR: failed to load s3vector bucket " << bucket_id << ". error: " << op_ret << dendl; + return; + } op_ret = rgw::s3vector::list_indexes(configuration, this, y); } }; @@ -377,6 +718,13 @@ class RGWS3VectorPutVectorBucketPolicy : public RGWS3VectorBase { } void execute(optional_yield y) override { + const rgw_bucket bucket_id(s->bucket_tenant, configuration.vector_bucket_name); + std::unique_ptr bucket; + op_ret = driver->load_vector_bucket(this, bucket_id, &bucket, y); + if (op_ret < 0) { + ldpp_dout(this, 1) << "ERROR: failed to load s3vector bucket " << bucket_id << ". error: " << op_ret << dendl; + return; + } op_ret = rgw::s3vector::put_vector_bucket_policy(configuration, this, y); } }; @@ -403,6 +751,13 @@ class RGWS3VectorGetVectorBucketPolicy : public RGWS3VectorBase { } void execute(optional_yield y) override { + const rgw_bucket bucket_id(s->bucket_tenant, configuration.vector_bucket_name); + std::unique_ptr bucket; + op_ret = driver->load_vector_bucket(this, bucket_id, &bucket, y); + if (op_ret < 0) { + ldpp_dout(this, 1) << "ERROR: failed to load s3vector bucket " << bucket_id << ". error: " << op_ret << dendl; + return; + } op_ret = rgw::s3vector::get_vector_bucket_policy(configuration, this, y); } }; @@ -429,6 +784,13 @@ class RGWS3VectorDeleteVectors : public RGWS3VectorBase { } void execute(optional_yield y) override { + const rgw_bucket bucket_id(s->bucket_tenant, configuration.vector_bucket_name); + std::unique_ptr bucket; + op_ret = driver->load_vector_bucket(this, bucket_id, &bucket, y); + if (op_ret < 0) { + ldpp_dout(this, 1) << "ERROR: failed to load s3vector bucket " << bucket_id << ". error: " << op_ret << dendl; + return; + } op_ret = rgw::s3vector::delete_vectors(configuration, this, y); } }; @@ -455,6 +817,13 @@ class RGWS3VectorQueryVectors : public RGWS3VectorBase { } void execute(optional_yield y) override { + const rgw_bucket bucket_id(s->bucket_tenant, configuration.vector_bucket_name); + std::unique_ptr bucket; + op_ret = driver->load_vector_bucket(this, bucket_id, &bucket, y); + if (op_ret < 0) { + ldpp_dout(this, 1) << "ERROR: failed to load s3vector bucket " << bucket_id << ". error: " << op_ret << dendl; + return; + } op_ret = rgw::s3vector::query_vectors(configuration, this, y); } }; diff --git a/src/rgw/rgw_s3vector.cc b/src/rgw/rgw_s3vector.cc index 513d66865e4..e1fe0754b18 100644 --- a/src/rgw/rgw_s3vector.cc +++ b/src/rgw/rgw_s3vector.cc @@ -6,6 +6,7 @@ #include "common/Formatter.h" #include "common/dout.h" #include +#include "rgw_sal.h" #define dout_subsys ceph_subsys_rgw @@ -194,11 +195,6 @@ void delete_vector_bucket_policy_t::decode_json(JSONObj* obj) { } int create_vector_bucket(const create_vector_bucket_t& configuration, DoutPrefixProvider* dpp, optional_yield y) { - JSONFormatter f; - configuration.dump(&f); - std::stringstream ss; - f.flush(ss); - ldpp_dout(dpp, 20) << "INFO: executing s3vector CreateVectorBucket with: " << ss.str() << dendl; return 0; } @@ -212,11 +208,6 @@ int delete_index(const delete_index_t& configuration, DoutPrefixProvider* dpp, o } int delete_vector_bucket(const delete_vector_bucket_t& configuration, DoutPrefixProvider* dpp, optional_yield y) { - JSONFormatter f; - configuration.dump(&f); - std::stringstream ss; - f.flush(ss); - ldpp_dout(dpp, 20) << "INFO: executing s3vector DeleteVectorBucket with: " << ss.str() << dendl; return 0; } @@ -414,15 +405,6 @@ void get_vector_bucket_t::decode_json(JSONObj* obj) { decode_name_or_arn("vectorBucketName", "vectorBucketArn", vector_bucket_name, vector_bucket_arn, obj); } -int list_vector_buckets(const list_vector_buckets_t& configuration, DoutPrefixProvider* dpp, optional_yield y) { - JSONFormatter f; - configuration.dump(&f); - std::stringstream ss; - f.flush(ss); - ldpp_dout(dpp, 20) << "INFO: executing s3vector ListVectorBuckets with: " << ss.str() << dendl; - return 0; -} - void get_index_t::dump(ceph::Formatter* f) const { f->open_object_section(""); ::encode_json("indexArn", index_arn, f); @@ -436,15 +418,6 @@ void get_index_t::decode_json(JSONObj* obj) { decode_name("vectorBucketName", vector_bucket_name, obj); } -int get_vector_bucket(const get_vector_bucket_t& configuration, DoutPrefixProvider* dpp, optional_yield y) { - JSONFormatter f; - configuration.dump(&f); - std::stringstream ss; - f.flush(ss); - ldpp_dout(dpp, 20) << "INFO: executing s3vector GetVectorBucket with: " << ss.str() << dendl; - return 0; -} - void list_indexes_t::dump(ceph::Formatter* f) const { f->open_object_section(""); ::encode_json("maxResults", max_results, f); diff --git a/src/rgw/rgw_s3vector.h b/src/rgw/rgw_s3vector.h index 41dc92180fa..95ca7a0f78c 100644 --- a/src/rgw/rgw_s3vector.h +++ b/src/rgw/rgw_s3vector.h @@ -14,6 +14,12 @@ class Formatter; class JSONObj; class DoutPrefixProvider; +struct rgw_bucket; + +namespace rgw::sal { +class Driver; +} + namespace rgw::s3vector { enum class DistanceMetric { COSINE, @@ -560,8 +566,6 @@ int delete_vector_bucket_policy(const delete_vector_bucket_policy_t& configurati int put_vectors(const put_vectors_t& configuration, DoutPrefixProvider* dpp, optional_yield y); int get_vectors(const get_vectors_t& configuration, DoutPrefixProvider* dpp, optional_yield y); int list_vectors(const list_vectors_t& configuration, DoutPrefixProvider* dpp, optional_yield y); -int list_vector_buckets(const list_vector_buckets_t& configuration, DoutPrefixProvider* dpp, optional_yield y); -int get_vector_bucket(const get_vector_bucket_t& configuration, DoutPrefixProvider* dpp, optional_yield y); int get_index(const get_index_t& configuration, DoutPrefixProvider* dpp, optional_yield y); int list_indexes(const list_indexes_t& configuration, DoutPrefixProvider* dpp, optional_yield y); int put_vector_bucket_policy(const put_vector_bucket_policy_t& configuration, DoutPrefixProvider* dpp, optional_yield y); diff --git a/src/rgw/rgw_sal.h b/src/rgw/rgw_sal.h index 1f6342a2570..b4d82b76d5d 100644 --- a/src/rgw/rgw_sal.h +++ b/src/rgw/rgw_sal.h @@ -151,6 +151,7 @@ namespace rgw { namespace sal { struct MPSerializer; class GCChain; class RGWRole; +class VectorBucket; enum AttrsMod { ATTRSMOD_NONE = 0, @@ -713,6 +714,17 @@ class Driver { /** Check to see if this placement rule is valid */ virtual bool valid_placement(const rgw_placement_rule& rule) = 0; + /** Load a VectorBucket by key. Queries driver for vector bucket info. On -ENOENT, the + * bucket must still be allocated to support bucket->create(). */ + virtual int load_vector_bucket(const DoutPrefixProvider* dpp, const rgw_bucket& b, + std::unique_ptr* bucket, optional_yield y) = 0; + /** List the vector buckets of a given owner */ + virtual int list_vector_buckets(const DoutPrefixProvider* dpp, + const rgw_owner& owner, const std::string& tenant, + const std::string& marker, const std::string& end_marker, + uint64_t max, BucketList& buckets, + optional_yield y) = 0; + /** Shut down background tasks, to be called while Asio is running. */ virtual void shutdown(void) { }; @@ -1940,6 +1952,129 @@ public: virtual void set_luarocks_path(const std::string& path) = 0; }; +class VectorBucket { + public: + VectorBucket() = default; + virtual ~VectorBucket() = default; + + /** Remove this vector bucket from the backing store */ + virtual int remove(const DoutPrefixProvider* dpp, bool delete_children, optional_yield y) = 0; + + using CreateParams = Bucket::CreateParams; + /// Create this bucket in the backing store. + virtual int create(const DoutPrefixProvider* dpp, + const CreateParams& params, + optional_yield y) = 0; + + /** Get the cached attributes associated with this vector bucket */ + virtual Attrs& get_attrs(void) = 0; + /** Set the cached attributes on this vector bucket */ + virtual int set_attrs(Attrs a) = 0; + /** Load this vector bucket from the backing store. Requires the key to be set, fills other fields. */ + virtual int load_bucket(const DoutPrefixProvider* dpp, optional_yield y) = 0; + /** Get the owner of this vector bucket */ + virtual const rgw_owner& get_owner() const = 0; + /** Check in the backing store if this vector bucket is empty */ + virtual int check_empty(const DoutPrefixProvider* dpp, optional_yield y) = 0; + /** Check if this instantiation is empty */ + virtual bool empty() const = 0; + /** Get the cached name of this vector bucket */ + virtual const std::string& get_name() const = 0; + /** Get the cached tenant of this vector bucket */ + virtual const std::string& get_tenant() const = 0; + /** Get the cached marker of this vector bucket */ + virtual const std::string& get_marker() const = 0; + /** Get the cached ID of this vector bucket */ + virtual const std::string& get_bucket_id() const = 0; + /** Get the cached placement rule of this vector bucket */ + virtual const rgw_placement_rule& get_placement_rule() const = 0; + /** Get the cached creation time of this vector bucket */ + virtual ceph::real_time get_creation_time() const = 0; + /** Get the cached modification time of this vector bucket */ + virtual const ceph::real_time& get_modification_time() const = 0; + /** Get the cached version of this vector bucket */ + virtual const obj_version& get_version() const = 0; + /** Set the cached version of this vector bucket */ + virtual void set_version(obj_version &ver) = 0; + /** Get the key for this vector bucket */ + virtual const rgw_bucket& get_key() const = 0; + /** Get the info for this vector bucket */ + virtual const RGWBucketInfo& get_info() const = 0; + + /** Check if a VectorBucket pointer is empty */ + static bool empty(const VectorBucket* b) { return (!b || b->empty()); } + /** Check if a VectorBucket unique pointer is empty */ + static bool empty(const std::unique_ptr& b) { return (!b || b->empty()); } + /** Clone a copy of this vector bucket. Used when modification is necessary of the copy */ + virtual std::unique_ptr clone() = 0; + + /** Print the User to @a out */ + virtual void print(std::ostream& out) const = 0; + + friend inline std::ostream& operator<<(std::ostream& out, const VectorBucket& b) { + b.print(out); + return out; + } + + friend inline std::ostream& operator<<(std::ostream& out, const VectorBucket* b) { + if (!b) + out << ""; + else + b->print(out); + return out; + } + + friend inline std::ostream& operator<<(std::ostream& out, const std::unique_ptr& p) { + out << p.get(); + return out; + } + + virtual bool operator==(const VectorBucket& b) const = 0; + virtual bool operator!=(const VectorBucket& b) const = 0; + + /** Store the cached bucket info into the backing store */ + virtual int put_info(const DoutPrefixProvider* dpp, bool exclusive, ceph::real_time mtime, optional_yield y) = 0; + + /** Try to refresh the cached bucket info from the backing store. Used in + * read-modify-update loop. */ + virtual int try_refresh_info(const DoutPrefixProvider* dpp, ceph::real_time* pmtime, optional_yield y) = 0; + + // TODO check if we need these API from Bucket class: + /** Read the bucket stats from the backing Store, synchronous */ + //virtual int read_stats(const DoutPrefixProvider *dpp, optional_yield y, + // const bucket_index_layout_generation& idx_layout, + // int shard_id, std::string* bucket_ver, std::string* master_ver, + // std::map& stats, + // std::string* max_marker = nullptr, + // bool* syncstopped = nullptr) = 0; + /** Read the bucket stats from the backing Store, asynchronous */ + //virtual int read_stats_async(const DoutPrefixProvider *dpp, + // const bucket_index_layout_generation& idx_layout, + // int shard_id, boost::intrusive_ptr cb) = 0; + /** Sync this bucket's stats to the owning user's stats in the backing store */ + //virtual int sync_owner_stats(const DoutPrefixProvider *dpp, optional_yield y, + // RGWBucketEnt* optional_ent) = 0; + /** Change the owner of this bucket in the backing store. Current owner must be set. Does not + * change ownership of the objects in the bucket. */ + //virtual int chown(const DoutPrefixProvider* dpp, + // const rgw_owner& new_owner, + // const std::string& new_owner_name, + // optional_yield y) = 0; + /** Check if the given size fits within the quota */ + // virtual int check_quota(const DoutPrefixProvider *dpp, RGWQuota& quota, uint64_t obj_size, optional_yield y, bool check_size_only = false) = 0; + /** Set the attributes in attrs, leaving any other existing attrs set, and + * write them to the backing store; a merge operation */ + //virtual int merge_and_store_attrs(const DoutPrefixProvider* dpp, Attrs& new_attrs, optional_yield y) = 0; + /** Read usage information about this bucket from the backing store */ + //virtual int read_usage(const DoutPrefixProvider *dpp, uint64_t start_epoch, uint64_t end_epoch, uint32_t max_entries, + // bool* is_truncated, RGWUsageIter& usage_iter, + // std::map& usage) = 0; + /** Trim the usage information to the given epoch range */ + //virtual int trim_usage(const DoutPrefixProvider *dpp, uint64_t start_epoch, uint64_t end_epoch, optional_yield y) = 0; + /** Remove this specific vector bucket instance from the backing store. May be removed from API */ + //virtual int purge_instance(const DoutPrefixProvider* dpp, optional_yield y) = 0; +}; + /** @} namespace rgw::sal in group RGWSAL */ } } // namespace rgw::sal diff --git a/src/rgw/rgw_sal_dbstore.h b/src/rgw/rgw_sal_dbstore.h index 7db6a0424f4..dd444d3c1e6 100644 --- a/src/rgw/rgw_sal_dbstore.h +++ b/src/rgw/rgw_sal_dbstore.h @@ -1022,6 +1022,13 @@ public: virtual const std::string& get_compression_type(const rgw_placement_rule& rule) override; virtual bool valid_placement(const rgw_placement_rule& rule) override; + int load_vector_bucket(const DoutPrefixProvider* dpp, const rgw_bucket& b, + std::unique_ptr* bucket, optional_yield y) override { return -ENOTSUP; } + int list_vector_buckets(const DoutPrefixProvider* dpp, + const rgw_owner& owner, const std::string& tenant, + const std::string& marker, const std::string& end_marker, + uint64_t max, BucketList& buckets, + optional_yield y) override { return -ENOTSUP; } virtual void finalize(void) override; diff --git a/src/rgw/rgw_sal_filter.h b/src/rgw/rgw_sal_filter.h index 0524c687d96..aadcf9270d6 100644 --- a/src/rgw/rgw_sal_filter.h +++ b/src/rgw/rgw_sal_filter.h @@ -489,6 +489,18 @@ public: virtual const std::string& get_compression_type(const rgw_placement_rule& rule) override; virtual bool valid_placement(const rgw_placement_rule& rule) override; + int load_vector_bucket(const DoutPrefixProvider* dpp, const rgw_bucket& b, + std::unique_ptr* bucket, optional_yield y) override { + return next->load_vector_bucket(dpp, b, bucket, y); + } + int list_vector_buckets(const DoutPrefixProvider* dpp, + const rgw_owner& owner, const std::string& tenant, + const std::string& marker, const std::string& end_marker, + uint64_t max, BucketList& buckets, + optional_yield y) override { + return next->list_vector_buckets(dpp, owner, tenant, marker, + end_marker, max, buckets, y); + } virtual void shutdown(void) override { next->shutdown(); }; diff --git a/src/rgw/rgw_sal_store.h b/src/rgw/rgw_sal_store.h index 0f2f91f5fef..a2e6a230ddb 100644 --- a/src/rgw/rgw_sal_store.h +++ b/src/rgw/rgw_sal_store.h @@ -276,6 +276,58 @@ class StoreBucket : public Bucket { friend class BucketList; }; +class StoreVectorBucket : public VectorBucket { + protected: + RGWBucketInfo info; + Attrs attrs; + obj_version bucket_version; + ceph::real_time mtime; + + public: + + StoreVectorBucket() = default; + StoreVectorBucket(const rgw_bucket& b) { info.bucket = b; } + StoreVectorBucket(const RGWBucketInfo& i) : info(i) {} + virtual ~StoreVectorBucket() = default; + + virtual Attrs& get_attrs(void) override { return attrs; } + virtual int set_attrs(Attrs a) override { attrs = a; return 0; } + const rgw_owner& get_owner() const override { return info.owner; } + bool empty() const override { return info.bucket.name.empty(); } + const std::string& get_name() const override { return info.bucket.name; } + const std::string& get_tenant() const override { return info.bucket.tenant; } + const std::string& get_marker() const override { return info.bucket.marker; } + const std::string& get_bucket_id() const override { return info.bucket.bucket_id; } + const rgw_placement_rule& get_placement_rule() const override { return info.placement_rule; } + ceph::real_time get_creation_time() const override { return info.creation_time; } + const ceph::real_time& get_modification_time() const override { return mtime; } + const obj_version& get_version() const override { return bucket_version; } + void set_version(obj_version &ver) override { bucket_version = ver; } + const rgw_bucket& get_key() const override { return info.bucket; } + const RGWBucketInfo& get_info() const override { return info; } + void print(std::ostream& out) const override { out << info.bucket; } + bool operator==(const VectorBucket& b) const override { + if (typeid(*this) != typeid(b)) { + return false; + } + const StoreVectorBucket& sb = dynamic_cast(b); + + return (info.bucket.tenant == sb.info.bucket.tenant) && + (info.bucket.name == sb.info.bucket.name) && + (info.bucket.bucket_id == sb.info.bucket.bucket_id); + } + virtual bool operator!=(const VectorBucket& b) const override { + if (typeid(*this) != typeid(b)) { + return false; + } + const StoreVectorBucket& sb = dynamic_cast(b); + + return (info.bucket.tenant != sb.info.bucket.tenant) || + (info.bucket.name != sb.info.bucket.name) || + (info.bucket.bucket_id != sb.info.bucket.bucket_id); + } +}; + class StoreObject : public Object { protected: RGWObjState state; diff --git a/src/rgw/rgw_zone.cc b/src/rgw/rgw_zone.cc index ad08e4b69be..ea4534834fa 100644 --- a/src/rgw/rgw_zone.cc +++ b/src/rgw/rgw_zone.cc @@ -172,6 +172,7 @@ void RGWZoneParams::decode_json(JSONObj *obj) JSONDecoder::decode_json("tier_config", tier_config, obj); JSONDecoder::decode_json("realm_id", realm_id, obj); JSONDecoder::decode_json("restore_pool", restore_pool, obj); + JSONDecoder::decode_json("vector_pool", vector_pool, obj); } void RGWZoneParams::dump(Formatter *f) const @@ -203,6 +204,7 @@ void RGWZoneParams::dump(Formatter *f) const encode_json("tier_config", tier_config, f); encode_json("realm_id", realm_id, f); encode_json("restore_pool", restore_pool, f); + encode_json("vector_pool", vector_pool, f); } rgw_pool RGWZoneParams::get_pool(CephContext *cct) const @@ -260,6 +262,7 @@ void add_zone_pools(const RGWZoneParams& info, pools.insert(info.topics_pool); pools.insert(info.account_pool); pools.insert(info.group_pool); + pools.insert(info.vector_pool); for (const auto& [pname, placement] : info.placement_pools) { pools.insert(placement.index_pool); @@ -853,6 +856,7 @@ int init_zone_pool_names(const DoutPrefixProvider *dpp, optional_yield y, fix_zone_pool_dup(pools, info.name, ".rgw.meta:topics", info.topics_pool); info.account_pool = fix_zone_pool_dup(pools, info.name, ".rgw.meta:accounts", info.account_pool); info.group_pool = fix_zone_pool_dup(pools, info.name, ".rgw.meta:groups", info.group_pool); + info.vector_pool = fix_zone_pool_dup(pools, info.name, ".rgw.meta:vector", info.vector_pool); for (auto& [pname, placement] : info.placement_pools) { placement.index_pool = fix_zone_pool_dup(pools, info.name, "." + default_bucket_index_pool_suffix, placement.index_pool); diff --git a/src/rgw/rgw_zone.h b/src/rgw/rgw_zone.h index 27e0919c924..a2a208af58f 100644 --- a/src/rgw/rgw_zone.h +++ b/src/rgw/rgw_zone.h @@ -34,6 +34,7 @@ struct RGWZoneParams { rgw_pool group_pool; rgw_pool dedup_pool; rgw_pool bucket_logging_pool; + rgw_pool vector_pool; RGWAccessKey system_key; @@ -63,7 +64,7 @@ struct RGWZoneParams { const std::string& get_compression_type(const rgw_placement_rule& placement_rule) const; void encode(bufferlist& bl) const { - ENCODE_START(18, 1, bl); + ENCODE_START(19, 1, bl); encode(domain_root, bl); encode(control_pool, bl); encode(gc_pool, bl); @@ -102,11 +103,12 @@ struct RGWZoneParams { encode(restore_pool, bl); encode(dedup_pool, bl); encode(bucket_logging_pool, bl); + encode(vector_pool, bl); ENCODE_FINISH(bl); } void decode(bufferlist::const_iterator& bl) { - DECODE_START(18, bl); + DECODE_START(19, bl); decode(domain_root, bl); decode(control_pool, bl); decode(gc_pool, bl); @@ -206,6 +208,11 @@ struct RGWZoneParams { } else { bucket_logging_pool = log_pool.name + ":logging"; } + if (struct_v >= 19) { + decode(vector_pool, bl); + } else { + vector_pool = name + ".rgw..meta.vector"; + } DECODE_FINISH(bl); } void dump(Formatter *f) const; diff --git a/src/rgw/services/svc_bucket_sobj.cc b/src/rgw/services/svc_bucket_sobj.cc index 44e5e9f20d5..9189d1ed5a3 100644 --- a/src/rgw/services/svc_bucket_sobj.cc +++ b/src/rgw/services/svc_bucket_sobj.cc @@ -20,17 +20,16 @@ using namespace std; -static const std::string instance_oid_prefix = ".bucket.meta."; - // convert bucket instance oids back to the tenant/ format for metadata keys. // it's safe to parse 'tenant:' only for oids, because they won't contain the // optional :shard at the end -static std::string instance_meta_key_to_oid(const std::string& metadata_key) +std::string RGWSI_Bucket_SObj::instance_meta_key_to_oid(const std::string& metadata_key) const { - std::string oid = string_cat_reserve(instance_oid_prefix, metadata_key); + const auto& prefix = instance_oid_prefix(); + std::string oid = string_cat_reserve(prefix, metadata_key); // replace tenant/ with tenant: - auto c = oid.find('/', instance_oid_prefix.size()); + auto c = oid.find('/', prefix.size()); if (c != string::npos) { oid[c] = ':'; } @@ -41,13 +40,14 @@ static std::string instance_meta_key_to_oid(const std::string& metadata_key) // convert bucket instance oids back to the tenant/ format for metadata keys. // it's safe to parse 'tenant:' only for oids, because they won't contain the // optional :shard at the end -static std::string instance_oid_to_meta_key(const std::string& oid) +std::string RGWSI_Bucket_SObj::instance_oid_to_meta_key(const std::string& oid) const { - if (oid.size() < instance_oid_prefix.size()) { /* just sanity check */ + const auto prefix_size = instance_oid_prefix().size(); + if (oid.size() < prefix_size) { /* just sanity check */ return string(); } - std::string key = oid.substr(instance_oid_prefix.size()); + std::string key = oid.substr(prefix_size); // find first : (could be tenant:bucket or bucket:instance) auto c = key.find(':'); @@ -61,6 +61,11 @@ static std::string instance_oid_to_meta_key(const std::string& oid) return key; } +std::string RGWSI_Bucket_SObj::get_cache_key(const std::string& key) const { + std::string cache_key("bi/"); + cache_key.append(key); + return cache_key; +} RGWSI_Bucket_SObj::RGWSI_Bucket_SObj(CephContext *cct): RGWSI_Bucket(cct) { } @@ -111,12 +116,23 @@ class BucketEntrypointLister : public RGWMetadataLister { } }; +const std::string& RGWSI_Bucket_SObj::instance_oid_prefix() const +{ + static const std::string instance_oid_prefix = ".bucket.meta."; + return instance_oid_prefix; +} + +const rgw_pool& RGWSI_Bucket_SObj::get_entrypoint_pool() const +{ + return svc.zone->get_zone_params().domain_root; +} + int RGWSI_Bucket_SObj::create_entrypoint_lister( const DoutPrefixProvider* dpp, const std::string& marker, std::unique_ptr& lister) { - const rgw_pool& pool = svc.zone->get_zone_params().domain_root; + const rgw_pool& pool = get_entrypoint_pool(); auto p = std::make_unique(svc.sysobj->get_pool(pool)); int r = p->init(dpp, marker, ""); // empty prefix if (r < 0) { @@ -126,10 +142,18 @@ int RGWSI_Bucket_SObj::create_entrypoint_lister( return 0; } +const rgw_pool& RGWSI_VectorBucket_SObj::get_entrypoint_pool() const +{ + return svc.zone->get_zone_params().vector_pool; +} class BucketInstanceLister : public RGWMetadataLister { + const RGWSI_Bucket_SObj* const bucket_sobj; public: - using RGWMetadataLister::RGWMetadataLister; + BucketInstanceLister(RGWSI_SysObj::Pool pool, const RGWSI_Bucket_SObj* _bucket_sobj) : + RGWMetadataLister(pool), + bucket_sobj(_bucket_sobj) {} + ~BucketInstanceLister() override = default; void filter_transform(std::vector& oids, std::list& keys) override @@ -137,7 +161,9 @@ class BucketInstanceLister : public RGWMetadataLister { // transform instance oids to metadata keys std::transform(oids.begin(), oids.end(), std::back_inserter(keys), - instance_oid_to_meta_key); + [this](const std::string& oid) { + return bucket_sobj->instance_oid_to_meta_key(oid); + }); } }; @@ -146,9 +172,9 @@ int RGWSI_Bucket_SObj::create_instance_lister( const std::string& marker, std::unique_ptr& lister) { - const rgw_pool& pool = svc.zone->get_zone_params().domain_root; - auto p = std::make_unique(svc.sysobj->get_pool(pool)); - int r = p->init(dpp, marker, instance_oid_prefix); + const rgw_pool& pool = get_entrypoint_pool(); + auto p = std::make_unique(svc.sysobj->get_pool(pool), this); + int r = p->init(dpp, marker, instance_oid_prefix()); if (r < 0) { return r; } @@ -166,7 +192,7 @@ int RGWSI_Bucket_SObj::read_bucket_entrypoint_info(const string& key, rgw_cache_entry_info *cache_info, boost::optional refresh_version) { - const rgw_pool& pool = svc.zone->get_zone_params().domain_root; + const rgw_pool& pool = get_entrypoint_pool(); bufferlist bl; int ret = rgw_get_system_obj(svc.sysobj, pool, key, bl, objv_tracker, pmtime, y, dpp, @@ -185,6 +211,13 @@ int RGWSI_Bucket_SObj::read_bucket_entrypoint_info(const string& key, return 0; } + +int RGWSI_Bucket_SObj::complete_entry(const DoutPrefixProvider* dpp, optional_yield y, + const std::string& section, const std::string& key, + const RGWObjVersionTracker* objv) { + return svc.mdlog->complete_entry(dpp, y, section, key, objv); +} + int RGWSI_Bucket_SObj::store_bucket_entrypoint_info(const string& key, RGWBucketEntryPoint& info, bool exclusive, @@ -197,14 +230,14 @@ int RGWSI_Bucket_SObj::store_bucket_entrypoint_info(const string& key, bufferlist bl; encode(info, bl); - const rgw_pool& pool = svc.zone->get_zone_params().domain_root; + const rgw_pool& pool = get_entrypoint_pool(); int ret = rgw_put_system_obj(dpp, svc.sysobj, pool, key, bl, exclusive, objv_tracker, mtime, y, pattrs); if (ret < 0) { return ret; } - return svc.mdlog->complete_entry(dpp, y, "bucket", key, objv_tracker); + return complete_entry(dpp, y, "bucket", key, objv_tracker); } int RGWSI_Bucket_SObj::remove_bucket_entrypoint_info(const string& key, @@ -212,13 +245,13 @@ int RGWSI_Bucket_SObj::remove_bucket_entrypoint_info(const string& key, optional_yield y, const DoutPrefixProvider *dpp) { - const rgw_pool& pool = svc.zone->get_zone_params().domain_root; + const rgw_pool& pool = get_entrypoint_pool(); int ret = rgw_delete_system_obj(dpp, svc.sysobj, pool, key, objv_tracker, y); if (ret < 0) { return ret; } - return svc.mdlog->complete_entry(dpp, y, "bucket", key, objv_tracker); + return complete_entry(dpp, y, "bucket", key, objv_tracker); } int RGWSI_Bucket_SObj::read_bucket_instance_info(const string& key, @@ -298,7 +331,7 @@ int RGWSI_Bucket_SObj::do_read_bucket_instance_info(const string& key, optional_yield y, const DoutPrefixProvider *dpp) { - const rgw_pool& pool = svc.zone->get_zone_params().domain_root; + const rgw_pool& pool = get_entrypoint_pool(); const std::string oid = instance_meta_key_to_oid(key); bufferlist bl; RGWObjVersionTracker objv; @@ -470,12 +503,12 @@ int RGWSI_Bucket_SObj::store_bucket_instance_info(const string& key, } } - const rgw_pool& pool = svc.zone->get_zone_params().domain_root; + const rgw_pool& pool = get_entrypoint_pool(); const std::string oid = instance_meta_key_to_oid(key); int ret = rgw_put_system_obj(dpp, svc.sysobj, pool, oid, bl, exclusive, &info.objv_tracker, mtime, y, pattrs); if (ret >= 0) { - int r = svc.mdlog->complete_entry(dpp, y, "bucket.instance", + int r = complete_entry(dpp, y, "bucket.instance", key, &info.objv_tracker); if (r < 0) { return r; @@ -510,7 +543,7 @@ int RGWSI_Bucket_SObj::remove_bucket_instance_info(const string& key, optional_yield y, const DoutPrefixProvider *dpp) { - const rgw_pool& pool = svc.zone->get_zone_params().domain_root; + const rgw_pool& pool = get_entrypoint_pool(); const std::string oid = instance_meta_key_to_oid(key); int ret = rgw_delete_system_obj(dpp, svc.sysobj, pool, oid, objv_tracker, y); if (ret < 0 && @@ -577,3 +610,21 @@ int RGWSI_Bucket_SObj::read_buckets_stats(std::vector& buckets, return buckets.size(); } + +const std::string& RGWSI_VectorBucket_SObj::instance_oid_prefix() const +{ + static const std::string vector_instance_oid_prefix = ".vectorbucket.meta."; + return vector_instance_oid_prefix; +} + +std::string RGWSI_VectorBucket_SObj::get_cache_key(const std::string& key) const { + std::string cache_key("vbi/"); + cache_key.append(key); + return cache_key; +} + +int RGWSI_VectorBucket_SObj::complete_entry(const DoutPrefixProvider* dpp, optional_yield y, + const std::string& section, const std::string& key, + const RGWObjVersionTracker* objv) { + return svc.mdlog->complete_entry(dpp, y, "vector"+section, key, objv); +} diff --git a/src/rgw/services/svc_bucket_sobj.h b/src/rgw/services/svc_bucket_sobj.h index 1e763716570..30842904b1b 100644 --- a/src/rgw/services/svc_bucket_sobj.h +++ b/src/rgw/services/svc_bucket_sobj.h @@ -35,6 +35,14 @@ class RGWChainedCacheImpl; class RGWSI_Bucket_SObj : public RGWSI_Bucket { + virtual const std::string& instance_oid_prefix() const; + std::string instance_meta_key_to_oid(const std::string& metadata_key) const; + virtual const rgw_pool& get_entrypoint_pool() const; + virtual std::string get_cache_key(const std::string& key) const; + virtual int complete_entry(const DoutPrefixProvider* dpp, optional_yield y, + const std::string& section, const std::string& key, + const RGWObjVersionTracker* objv); + struct bucket_info_cache_entry { RGWBucketInfo info; real_time mtime; @@ -73,8 +81,9 @@ public: } svc; RGWSI_Bucket_SObj(CephContext *cct); - ~RGWSI_Bucket_SObj(); + ~RGWSI_Bucket_SObj() override; + std::string instance_oid_to_meta_key(const std::string& oid) const; void init(RGWSI_Zone *_zone_svc, RGWSI_SysObj *_sysobj_svc, RGWSI_SysObj_Cache *_cache_svc, @@ -158,3 +167,16 @@ public: const DoutPrefixProvider *dpp) override; }; +class RGWSI_VectorBucket_SObj : public RGWSI_Bucket_SObj +{ + const std::string& instance_oid_prefix() const override; + const rgw_pool& get_entrypoint_pool() const override; + std::string get_cache_key(const std::string& key) const override; + int complete_entry(const DoutPrefixProvider* dpp, optional_yield y, + const std::string& section, const std::string& key, + const RGWObjVersionTracker* objv) override; +public: + RGWSI_VectorBucket_SObj(CephContext *cct) : RGWSI_Bucket_SObj(cct) {} + ~RGWSI_VectorBucket_SObj() override = default; +}; + diff --git a/src/rgw/services/svc_user.h b/src/rgw/services/svc_user.h index 699bf453f49..66a4a56d0c6 100644 --- a/src/rgw/services/svc_user.h +++ b/src/rgw/services/svc_user.h @@ -42,6 +42,8 @@ public: virtual rgw_raw_obj get_buckets_obj(const rgw_user& user_id) const = 0; + virtual rgw_raw_obj get_vector_buckets_obj(const rgw_user& user_id) const = 0; + virtual int create_lister(const DoutPrefixProvider* dpp, const std::string& marker, std::unique_ptr& lister) = 0; diff --git a/src/rgw/services/svc_user_rados.cc b/src/rgw/services/svc_user_rados.cc index 56b9a0a06af..3f5423adb80 100644 --- a/src/rgw/services/svc_user_rados.cc +++ b/src/rgw/services/svc_user_rados.cc @@ -26,6 +26,7 @@ #define dout_subsys ceph_subsys_rgw #define RGW_BUCKETS_OBJ_SUFFIX ".buckets" +#define RGW_VECTOR_BUCKETS_OBJ_SUFFIX ".vectorbuckets" using namespace std; @@ -62,6 +63,12 @@ rgw_raw_obj RGWSI_User_RADOS::get_buckets_obj(const rgw_user& user) const return rgw_raw_obj(svc.zone->get_zone_params().user_uid_pool, oid); } +rgw_raw_obj RGWSI_User_RADOS::get_vector_buckets_obj(const rgw_user& user) const +{ + string oid = user.to_str() + RGW_VECTOR_BUCKETS_OBJ_SUFFIX; + return rgw_raw_obj(svc.zone->get_zone_params().user_uid_pool, oid); +} + class UserLister : public RGWMetadataLister { public: using RGWMetadataLister::RGWMetadataLister; @@ -572,6 +579,7 @@ int RGWSI_User_RADOS::remove_user_info(const RGWUserInfo& info, ldpp_dout(dpp, 0) << "ERROR: could not remove " << info.user_id << ":" << uid_bucks << ", should be fixed (err=" << ret << ")" << dendl; return ret; } + // TODO: remove vector buckets } else if (info.type != TYPE_ROOT) { // unlink the name from its account const RGWZoneParams& zone = svc.zone->get_zone_params(); diff --git a/src/rgw/services/svc_user_rados.h b/src/rgw/services/svc_user_rados.h index 9b6cad8ccd2..8c723e2ebc5 100644 --- a/src/rgw/services/svc_user_rados.h +++ b/src/rgw/services/svc_user_rados.h @@ -51,6 +51,8 @@ class RGWSI_User_RADOS : public RGWSI_User rgw_raw_obj get_buckets_obj(const rgw_user& user_id) const override; + rgw_raw_obj get_vector_buckets_obj(const rgw_user& user_id) const override; + int get_user_info_from_index(const std::string& key, const rgw_pool& pool, RGWUserInfo *info, diff --git a/src/test/rgw/s3vectors/README.rst b/src/test/rgw/s3vectors/README.rst index e5d16c1821d..be5ed61bb7f 100644 --- a/src/test/rgw/s3vectors/README.rst +++ b/src/test/rgw/s3vectors/README.rst @@ -8,5 +8,6 @@ s3vectors Tests * To run a specific tests use: `S3VTESTS_CONF=s3vtests.conf.SAMPLE tox -- s3vector_test.py::` * To run a group of tests use: - `S3VTESTS_CONF=s3vtests.conf.SAMPLE tox -- s3vector_test.py -m "" + `S3VTESTS_CONF=s3vtests.conf.SAMPLE tox -- s3vector_test.py -m ""` +* In case of multisite environment, you can set a "secondary" site in the conf file. See: `s3vtests.conf.multisite` diff --git a/src/test/rgw/s3vectors/__init__.py b/src/test/rgw/s3vectors/__init__.py index 6843ecb077a..2ef443a267b 100644 --- a/src/test/rgw/s3vectors/__init__.py +++ b/src/test/rgw/s3vectors/__init__.py @@ -33,6 +33,16 @@ def setup(): global main_secret_key main_secret_key = cfg.get('s3 main',"secret_key") + # vars from the secondary section + global secondary_host + global secondary_port + if cfg.has_section("secondary"): + secondary_host = cfg.get('secondary',"host") + secondary_port = int(cfg.get('secondary',"port")) + else: + secondary_host = None + secondary_port = None + def get_config_host(): global default_host @@ -54,6 +64,16 @@ def get_secret_key(): return main_secret_key +def get_config_host2(): + global secondary_host + return secondary_host + + +def get_config_port2(): + global secondary_port + return secondary_port + + @pytest.fixture(autouse=True, scope="package") def configfile(): setup() diff --git a/src/test/rgw/s3vectors/s3vector_test.py b/src/test/rgw/s3vectors/s3vector_test.py index 510ca85fc42..9144288ed78 100644 --- a/src/test/rgw/s3vectors/s3vector_test.py +++ b/src/test/rgw/s3vectors/s3vector_test.py @@ -18,7 +18,9 @@ from . import( get_config_host, get_config_port, get_access_key, - get_secret_key + get_secret_key, + get_config_host2, + get_config_port2 ) @@ -48,10 +50,10 @@ def gen_bucket_name(): global num_buckets num_buckets += 1 - return run_prefix + '-' + str(num_buckets) + return 'kaboom' + run_prefix + '-' + str(num_buckets) -def connection(): +def connection(service_name='s3vectors'): hostname = get_config_host() port_no = get_config_port() access_key = get_access_key() @@ -61,11 +63,45 @@ def connection(): else: scheme = 'http://' - client = boto3.client('s3vectors', + if service_name == 's3vectors': + config = Config(signature_version='s3') + else: + config = None + + client = boto3.client(service_name, endpoint_url=scheme+hostname+':'+str(port_no), aws_access_key_id=access_key, aws_secret_access_key=secret_key, - config=Config(signature_version='s3')) + config=config) + + return client + +def connection2(service_name='s3vectors'): + hostname = get_config_host2() + if not hostname: + log.info("No second host configured") + return None + port_no = get_config_port2() + if not port_no: + log.info("No second port configured") + return None + access_key = get_access_key() + secret_key = get_secret_key() + if port_no == 443 or port_no == 8443: + scheme = 'https://' + else: + scheme = 'http://' + + if service_name == 's3vectors': + config = Config(signature_version='s3') + else: + config = None + + client = boto3.client(service_name, + endpoint_url=scheme+hostname+':'+str(port_no), + aws_access_key_id=access_key, + aws_secret_access_key=secret_key, + config=config) return client @@ -100,6 +136,13 @@ def another_user(tenant=None): # s3vectors tests ################# +def _delete_all_vector_buckets(conn): + result = conn.list_vector_buckets() + assert result['ResponseMetadata']['HTTPStatusCode'] == 200 + for bucket in result['vectorBuckets']: + _ = conn.delete_vector_bucket(vectorBucketName=bucket['vectorBucketName']) + + @pytest.mark.vector_bucket_test def test_create_vector_bucket(): conn = connection() @@ -107,7 +150,7 @@ def test_create_vector_bucket(): result = conn.create_vector_bucket(vectorBucketName=bucket_name) assert result['ResponseMetadata']['HTTPStatusCode'] == 200 # cleanup - _ = conn.delete_vector_bucket(vectorBucketName=bucket_name) + _delete_all_vector_buckets(conn) @pytest.mark.vector_bucket_test @@ -118,8 +161,11 @@ def test_get_vector_bucket(): assert result['ResponseMetadata']['HTTPStatusCode'] == 200 result = conn.get_vector_bucket(vectorBucketName=bucket_name) assert result['ResponseMetadata']['HTTPStatusCode'] == 200 + log.info("get_vector_buckets result: %s", result) + invalid_name = bucket_name + '-invalid' + pytest.raises(conn.exceptions.ClientError, conn.get_vector_bucket, vectorBucketName=invalid_name) # cleanup - _ = conn.delete_vector_bucket(vectorBucketName=bucket_name) + _delete_all_vector_buckets(conn) @pytest.mark.vector_bucket_test @@ -132,13 +178,17 @@ def test_delete_vector_bucket(): assert result['ResponseMetadata']['HTTPStatusCode'] == 200 result = conn.delete_vector_bucket(vectorBucketName=bucket_name) assert result['ResponseMetadata']['HTTPStatusCode'] == 200 - # not implemented yet - #with pytest.raises(conn.exceptions.NoSuchVectorBucket): - # result = conn.get_vector_bucket(vectorBucketName=bucket_name) + pytest.raises(conn.exceptions.ClientError, conn.get_vector_bucket, vectorBucketName=bucket_name) + pytest.raises(conn.exceptions.ClientError, conn.delete_vector_bucket, vectorBucketName=bucket_name) + result = conn.list_vector_buckets() + assert result['ResponseMetadata']['HTTPStatusCode'] == 200 + assert len(result['vectorBuckets']) == 0 + # cleanup + _delete_all_vector_buckets(conn) @pytest.mark.vector_bucket_test -def test_list_vector_bucket(): +def test_list_vector_buckets(): conn = connection() bucket_name1 = gen_bucket_name() bucket_name2 = gen_bucket_name() @@ -148,13 +198,184 @@ def test_list_vector_bucket(): assert result['ResponseMetadata']['HTTPStatusCode'] == 200 result = conn.list_vector_buckets() assert result['ResponseMetadata']['HTTPStatusCode'] == 200 - # not implemented yet - #bucket_names = [b['Name'] for b in result['VectorBuckets']] - #assert bucket_name1 in bucket_names - #assert bucket_name2 in bucket_names + log.info("list_vector_buckets result: %s", result) + bucket_names = [b['vectorBucketName'] for b in result['vectorBuckets']] + assert bucket_name1 in bucket_names + assert bucket_name2 in bucket_names + # cleanup + _delete_all_vector_buckets(conn) + + +@pytest.mark.vector_bucket_test +def test_vector_buckets_sync(): + conn = connection() + conn2 = connection2() + if not conn2: + log.info("Skipping test_vector_buckets_sync since second connection is not configured") + return + + # create buckets from the first connection + bucket_name1 = gen_bucket_name() + bucket_name2 = gen_bucket_name() + result = conn.create_vector_bucket(vectorBucketName=bucket_name1) + assert result['ResponseMetadata']['HTTPStatusCode'] == 200 + result = conn.create_vector_bucket(vectorBucketName=bucket_name2) + assert result['ResponseMetadata']['HTTPStatusCode'] == 200 + result = conn.list_vector_buckets() + assert result['ResponseMetadata']['HTTPStatusCode'] == 200 + log.info("list_vector_buckets result: %s", result) + bucket_names = [b['vectorBucketName'] for b in result['vectorBuckets']] + assert bucket_name1 in bucket_names + assert bucket_name2 in bucket_names + time.sleep(5) + + # now check from the second connection + result = conn2.list_vector_buckets() + assert result['ResponseMetadata']['HTTPStatusCode'] == 200 + log.info("list_vector_buckets from conn2 result: %s", result) + bucket_names = [b['vectorBucketName'] for b in result['vectorBuckets']] + assert bucket_name1 in bucket_names + assert bucket_name2 in bucket_names + + # create buckets from the 2nd connection + bucket_name3 = gen_bucket_name() + bucket_name4 = gen_bucket_name() + result = conn2.create_vector_bucket(vectorBucketName=bucket_name3) + assert result['ResponseMetadata']['HTTPStatusCode'] == 200 + result = conn2.create_vector_bucket(vectorBucketName=bucket_name4) + assert result['ResponseMetadata']['HTTPStatusCode'] == 200 + result = conn2.list_vector_buckets() + assert result['ResponseMetadata']['HTTPStatusCode'] == 200 + log.info("list_vector_buckets from conn2 result: %s", result) + bucket_names = [b['vectorBucketName'] for b in result['vectorBuckets']] + assert bucket_name3 in bucket_names + assert bucket_name4 in bucket_names + time.sleep(5) + + # now check from the first connection + result = conn.list_vector_buckets() + assert result['ResponseMetadata']['HTTPStatusCode'] == 200 + log.info("list_vector_buckets result: %s", result) + bucket_names = [b['vectorBucketName'] for b in result['vectorBuckets']] + assert bucket_name3 in bucket_names + assert bucket_name4 in bucket_names + + # cleanup + _delete_all_vector_buckets(conn) + + +def _create_s3bucket(s3conn, bucket_name): + try: + result = s3conn.create_bucket(Bucket=bucket_name) + assert result['ResponseMetadata']['HTTPStatusCode'] == 200 + except s3conn.exceptions.ClientError as err: + log.warning("s3 bucket creation failed with: %s", str(err)) + assert err.response['ResponseMetadata']['HTTPStatusCode'] == 500 + + +@pytest.mark.vector_bucket_test +def test_vector_buckets_creation_with_buckets(): + conn = connection() + s3conn = connection('s3') + bucket_name1 = gen_bucket_name() + # create vector bucket + result = conn.create_vector_bucket(vectorBucketName=bucket_name1) + assert result['ResponseMetadata']['HTTPStatusCode'] == 200 + # create s3 bucket with the same name + _create_s3bucket(s3conn, bucket_name1) + # create another s3 bucket + bucket_name2 = gen_bucket_name() + _create_s3bucket(s3conn, bucket_name2) + # list vector buckets and verify only one bucket there + result = conn.list_vector_buckets() + assert result['ResponseMetadata']['HTTPStatusCode'] == 200 + log.info("list_vector_buckets result: %s", result) + vector_bucket_names = [b['vectorBucketName'] for b in result['vectorBuckets']] + assert bucket_name1 in vector_bucket_names + assert bucket_name2 not in vector_bucket_names + # list s3 buckets and verify both bucket there + result = s3conn.list_buckets() + assert result['ResponseMetadata']['HTTPStatusCode'] == 200 + log.info("list_buckets result: %s", result) + s3_bucket_names = [b['Name'] for b in result['Buckets']] + assert bucket_name1 in s3_bucket_names + assert bucket_name2 in s3_bucket_names + # now try to create a vector bucket with a name that already exists as an s3 bucket + result = conn.create_vector_bucket(vectorBucketName=bucket_name2) + assert result['ResponseMetadata']['HTTPStatusCode'] == 200 + # list vector buckets and verify both bucket there + result = conn.list_vector_buckets() + assert result['ResponseMetadata']['HTTPStatusCode'] == 200 + log.info("list_vector_buckets result: %s", result) + vector_bucket_names = [b['vectorBucketName'] for b in result['vectorBuckets']] + assert bucket_name1 in vector_bucket_names + assert bucket_name2 in vector_bucket_names + # cleanup + _delete_all_vector_buckets(conn) + + +@pytest.mark.vector_bucket_test +def test_vector_buckets_deletion_with_buckets(): + conn = connection() + s3conn = connection('s3') + bucket_name1 = gen_bucket_name() + result = conn.create_vector_bucket(vectorBucketName=bucket_name1) + assert result['ResponseMetadata']['HTTPStatusCode'] == 200 + # create s3 bucket with the same name + _create_s3bucket(s3conn, bucket_name1) + + # verify vector bucket exists (via get and list) + result = conn.get_vector_bucket(vectorBucketName=bucket_name1) + assert result['ResponseMetadata']['HTTPStatusCode'] == 200 + result = conn.list_vector_buckets() + assert result['ResponseMetadata']['HTTPStatusCode'] == 200 + log.info("list_vector_buckets result: %s", result) + vector_bucket_names = [b['vectorBucketName'] for b in result['vectorBuckets']] + assert bucket_name1 in vector_bucket_names + # verify s3 bucket exists (via get and list) + result = s3conn.head_bucket(Bucket=bucket_name1) + assert result['ResponseMetadata']['HTTPStatusCode'] == 200 + result = s3conn.list_buckets() + assert result['ResponseMetadata']['HTTPStatusCode'] == 200 + log.info("list_buckets result: %s", result) + bucket_names = [b['Name'] for b in result['Buckets']] + assert bucket_name1 in bucket_names + + + # delete vector bucket + result = conn.delete_vector_bucket(vectorBucketName=bucket_name1) + assert result['ResponseMetadata']['HTTPStatusCode'] == 200 + # verify vector bucket is not there (via get and list) + pytest.raises(conn.exceptions.ClientError, conn.get_vector_bucket, vectorBucketName=bucket_name1) + # verify s3 bucket still exists + result = s3conn.head_bucket(Bucket=bucket_name1) + assert result['ResponseMetadata']['HTTPStatusCode'] == 200 + result = s3conn.list_buckets() + assert result['ResponseMetadata']['HTTPStatusCode'] == 200 + log.info("list_buckets result: %s", result) + s3_bucket_names = [b['Name'] for b in result['Buckets']] + assert bucket_name1 in s3_bucket_names + # create another vector bucket + bucket_name2 = gen_bucket_name() + result = conn.create_vector_bucket(vectorBucketName=bucket_name2) + assert result['ResponseMetadata']['HTTPStatusCode'] == 200 + # and an s3 bucket with the same name + _create_s3bucket(s3conn, bucket_name2) + # delete the s3 bucket + result = s3conn.delete_bucket(Bucket=bucket_name2) + assert result['ResponseMetadata']['HTTPStatusCode'] == 204 + # verify s3 bucket is not there + pytest.raises(s3conn.exceptions.ClientError, s3conn.head_bucket, Bucket=bucket_name2) + # verify vector bucket still exists (via get and list) + result = conn.get_vector_bucket(vectorBucketName=bucket_name2) + assert result['ResponseMetadata']['HTTPStatusCode'] == 200 + result = conn.list_vector_buckets() + assert result['ResponseMetadata']['HTTPStatusCode'] == 200 + log.info("list_vector_buckets result: %s", result) + vector_bucket_names = [b['vectorBucketName'] for b in result['vectorBuckets']] + assert bucket_name2 in vector_bucket_names # cleanup - _ = conn.delete_vector_bucket(vectorBucketName=bucket_name1) - _ = conn.delete_vector_bucket(vectorBucketName=bucket_name2) + _delete_all_vector_buckets(conn) @pytest.mark.index_test @@ -166,6 +387,9 @@ def test_create_index(): index_name = 'test-index' result = conn.create_index(vectorBucketName=bucket_name, indexName=index_name, dataType='float32', dimension=128, distanceMetric='euclidean') assert result['ResponseMetadata']['HTTPStatusCode'] == 200 + # create an index on bucket that does not exist + invalid_bucket_name = bucket_name + '-invalid' + pytest.raises(conn.exceptions.ClientError, conn.create_index, vectorBucketName=invalid_bucket_name, indexName=index_name, dataType='float32', dimension=128, distanceMetric='euclidean') # cleanup _ = conn.delete_vector_bucket(vectorBucketName=bucket_name) @@ -181,6 +405,9 @@ def test_get_index(): assert result['ResponseMetadata']['HTTPStatusCode'] == 200 result = conn.get_index(vectorBucketName=bucket_name, indexName=index_name) assert result['ResponseMetadata']['HTTPStatusCode'] == 200 + # get an index from bucket that does not exist + invalid_bucket_name = bucket_name + '-invalid' + pytest.raises(conn.exceptions.ClientError, conn.get_index, vectorBucketName=invalid_bucket_name, indexName=index_name) # cleanup _ = conn.delete_vector_bucket(vectorBucketName=bucket_name) @@ -196,6 +423,10 @@ def test_delete_index(): assert result['ResponseMetadata']['HTTPStatusCode'] == 200 result = conn.get_index(vectorBucketName=bucket_name, indexName=index_name) assert result['ResponseMetadata']['HTTPStatusCode'] == 200 + # delete an index from bucket that does not exist + invalid_bucket_name = bucket_name + '-invalid' + pytest.raises(conn.exceptions.ClientError, conn.delete_index, vectorBucketName=invalid_bucket_name, indexName=index_name) + # delete the index from the right bucket result = conn.delete_index(vectorBucketName=bucket_name, indexName=index_name) assert result['ResponseMetadata']['HTTPStatusCode'] == 200 # not implemented yet @@ -219,6 +450,9 @@ def test_list_indexes(): assert result['ResponseMetadata']['HTTPStatusCode'] == 200 result = conn.list_indexes(vectorBucketName=bucket_name) assert result['ResponseMetadata']['HTTPStatusCode'] == 200 + # list indexs from bucket that does not exist + invalid_bucket_name = bucket_name + '-invalid' + pytest.raises(conn.exceptions.ClientError, conn.list_indexes, vectorBucketName=invalid_bucket_name) # not implemented yet #index_names = [i['IndexName'] for i in result['Indexes']] #assert index_name1 in index_names diff --git a/src/test/rgw/s3vectors/s3vtests.conf.multisite b/src/test/rgw/s3vectors/s3vtests.conf.multisite new file mode 100644 index 00000000000..3306db5b2ee --- /dev/null +++ b/src/test/rgw/s3vectors/s3vtests.conf.multisite @@ -0,0 +1,14 @@ +[DEFAULT] +port = 8101 +host = localhost + +[secondary] +port = 8201 +host = localhost + +[s3 main] +access_key = 1234567890 +secret_key = pencil +display_name = M. Tester +user_id = testid +email = tester@ceph.com -- 2.47.3