virtual const std::string& get_compression_type(const rgw_placement_rule& rule) override;
virtual bool valid_placement(const rgw_placement_rule& rule) override { return true; }
+ int load_vector_bucket(const DoutPrefixProvider* dpp, const rgw_bucket& b,
+ std::unique_ptr<VectorBucket>* bucket, optional_yield y) override { return -ENOTSUP; }
+ int list_vector_buckets(const DoutPrefixProvider* dpp,
+ const rgw_owner& owner, const std::string& tenant,
+ const std::string& marker, const std::string& end_marker,
+ uint64_t max, BucketList& buckets,
+ optional_yield y) override { return -ENOTSUP; }
virtual void finalize(void) override {}
namespace rgwrados::account {
static constexpr std::string_view buckets_oid_prefix = "buckets.";
+static constexpr std::string_view vector_buckets_oid_prefix = "vectorbuckets.";
static constexpr std::string_view users_oid_prefix = "users.";
static constexpr std::string_view groups_oid_prefix = "groups.";
static constexpr std::string_view roles_oid_prefix = "roles.";
static std::string get_buckets_key(std::string_view account_id) {
return string_cat_reserve(buckets_oid_prefix, account_id);
}
+static std::string get_vector_buckets_key(std::string_view account_id) {
+ return string_cat_reserve(vector_buckets_oid_prefix, account_id);
+}
rgw_raw_obj get_buckets_obj(const RGWZoneParams& zone,
std::string_view account_id) {
return {zone.account_pool, get_buckets_key(account_id)};
}
+rgw_raw_obj get_vector_buckets_obj(const RGWZoneParams& zone,
+ std::string_view account_id) {
+ return {zone.account_pool, get_vector_buckets_key(account_id)};
+}
static std::string get_users_key(std::string_view account_id) {
return string_cat_reserve(users_oid_prefix, account_id);
}
rgw_raw_obj get_buckets_obj(const RGWZoneParams& zone,
std::string_view account_id);
+/// Return the rados object that tracks the given account's vector buckets. This
+/// can be used with the cls_user interface in namespace rgwrados::buckets.
+rgw_raw_obj get_vector_buckets_obj(const RGWZoneParams& zone,
+ std::string_view account_id);
+
/// Return the rados object that tracks the given account's users. This
/// can be used with the cls_user interface in namespace rgwrados::users.
rgw_raw_obj get_users_obj(const RGWZoneParams& zone,
}
};
+class RGWVectorBucketMetadataHandler : public RGWBucketMetadataHandler {
+ public:
+ using RGWBucketMetadataHandler::RGWBucketMetadataHandler;
+
+ string get_type() override { return "vectorbucket"; }
+};
+
class RGWBucketInstanceMetadataHandler : public RGWMetadataHandler {
rgw::sal::Driver* driver;
RGWSI_Zone* svc_zone{nullptr};
RGWSI_BucketIndex* svc_bi{nullptr};
RGWDataChangesLog *svc_datalog{nullptr};
- int put_prepare(const DoutPrefixProvider* dpp, optional_yield y,
+protected:
+ virtual int put_prepare(const DoutPrefixProvider* dpp, optional_yield y,
const std::string& entry, RGWBucketCompleteInfo& bci,
const std::optional<RGWBucketCompleteInfo>& old_bci,
const RGWObjVersionTracker& objv_tracker,
bool from_remote_zone);
- int put_post(const DoutPrefixProvider* dpp, optional_yield y,
+ virtual int put_post(const DoutPrefixProvider* dpp, optional_yield y,
const RGWBucketCompleteInfo& bci,
const std::optional<RGWBucketCompleteInfo>& old_bci,
RGWObjVersionTracker& objv_tracker);
}
};
+class RGWVectorBucketInstanceMetadataHandler : public RGWBucketInstanceMetadataHandler {
+protected:
+ int put_prepare(const DoutPrefixProvider* dpp, optional_yield y,
+ const std::string& entry, RGWBucketCompleteInfo& bci,
+ const std::optional<RGWBucketCompleteInfo>& old_bci,
+ const RGWObjVersionTracker& objv_tracker,
+ bool from_remote_zone) override {
+ bci.info.layout.current_index.layout.type = rgw::BucketIndexType::Indexless;
+ return 0;
+ }
+ int put_post(const DoutPrefixProvider* dpp, optional_yield y,
+ const RGWBucketCompleteInfo& bci,
+ const std::optional<RGWBucketCompleteInfo>& old_bci,
+ RGWObjVersionTracker& objv_tracker) override {return 0;}
+ public:
+ RGWVectorBucketInstanceMetadataHandler(rgw::sal::Driver* driver,
+ RGWSI_Zone* svc_zone,
+ RGWSI_Bucket* svc_bucket) :
+ RGWBucketInstanceMetadataHandler(driver, svc_zone, svc_bucket, nullptr, nullptr) {}
+
+ string get_type() override { return "vectorbucket.instance"; }
+};
+
RGWBucketCtl::RGWBucketCtl(RGWSI_Zone *zone_svc,
RGWSI_Bucket *bucket_svc,
RGWSI_Bucket_Sync *bucket_sync_svc,
RGWSI_BucketIndex *bi_svc,
RGWSI_User* user_svc,
- RGWDataChangesLog *datalog_svc)
- : cct(zone_svc->ctx())
+ RGWDataChangesLog *datalog_svc,
+ BucketsObjGetter obj_getter_func,
+ UserBucketsObjGetter user_obj_getter_func)
+ : cct(zone_svc->ctx()), get_buckets_obj(obj_getter_func), get_user_buckets_obj(user_obj_getter_func)
{
svc.zone = zone_svc;
svc.bucket = bucket_svc;
.set_orig_info(&bucket_info));
}
-static rgw_raw_obj get_owner_buckets_obj(RGWSI_User* svc_user,
+rgw_raw_obj RGWBucketCtl::get_owner_buckets_obj(RGWSI_User* svc_user,
RGWSI_Zone* svc_zone,
const rgw_owner& owner)
{
return std::visit(fu2::overload(
[&] (const rgw_user& uid) {
- return svc_user->get_buckets_obj(uid);
+ return (svc_user->*get_user_buckets_obj)(uid);
},
[&] (const rgw_account_id& account_id) {
const RGWZoneParams& zone = svc_zone->get_zone_params();
- return rgwrados::account::get_buckets_obj(zone, account_id);
+ return get_buckets_obj(zone, account_id);
}), owner);
}
// flush stats to the user/account owner object
const rgw_raw_obj& obj = std::visit(fu2::overload(
[&] (const rgw_user& user) {
- return svc.user->get_buckets_obj(user);
+ return (svc.user->*get_user_buckets_obj)(user);
},
[&] (const rgw_account_id& id) {
const RGWZoneParams& zone = svc.zone->get_zone_params();
- return rgwrados::account::get_buckets_obj(zone, id);
+ return get_buckets_obj(zone, id);
}), owner);
return rgwrados::buckets::write_stats(dpp, y, rados, obj, *pent);
}
svc_datalog);
}
+auto create_vector_bucket_metadata_handler(librados::Rados& rados,
+ RGWSI_Bucket* svc_bucket,
+ RGWBucketCtl* ctl_bucket)
+ -> std::unique_ptr<RGWMetadataHandler>
+{
+ return std::make_unique<RGWVectorBucketMetadataHandler>(
+ rados, svc_bucket, ctl_bucket);
+}
+
+auto create_vector_bucket_instance_metadata_handler(rgw::sal::Driver* driver,
+ RGWSI_Zone* svc_zone,
+ RGWSI_Bucket* svc_bucket)
+ -> std::unique_ptr<RGWMetadataHandler>
+{
+ return std::make_unique<RGWVectorBucketInstanceMetadataHandler>(driver, svc_zone,
+ svc_bucket);
+}
+
list<RGWBucketEntryPoint> RGWBucketEntryPoint::generate_test_instances()
{
list<RGWBucketEntryPoint> o;
-> std::unique_ptr<RGWMetadataHandler>;
+// vector bucket entrypoint metadata handler factory
+auto create_vector_bucket_metadata_handler(librados::Rados& rados,
+ RGWSI_Bucket* svc_bucket,
+ RGWBucketCtl* ctl_bucket)
+ -> std::unique_ptr<RGWMetadataHandler>;
+
+// vector bucket instance metadata handler factory
+auto create_vector_bucket_instance_metadata_handler(rgw::sal::Driver* driver,
+ RGWSI_Zone* svc_zone,
+ RGWSI_Bucket* svc_bucket)
+ -> std::unique_ptr<RGWMetadataHandler>;
+
extern int rgw_remove_object(const DoutPrefixProvider* dpp,
rgw::sal::Driver* driver,
rgw::sal::Bucket* bucket,
: ep(ep), attrs(attrs) {}
};
+using BucketsObjGetter = rgw_raw_obj (*)(const RGWZoneParams&, std::string_view);
+using UserBucketsObjGetter = rgw_raw_obj (RGWSI_User::*)(const rgw_user&) const;
+
class RGWBucketCtl {
CephContext *cct;
+ BucketsObjGetter get_buckets_obj;
+ UserBucketsObjGetter get_user_buckets_obj;
+ rgw_raw_obj get_owner_buckets_obj(RGWSI_User* svc_user,
+ RGWSI_Zone* svc_zone,
+ const rgw_owner& owner);
struct Svc {
RGWSI_Zone *zone{nullptr};
RGWSI_Bucket_Sync *bucket_sync_svc,
RGWSI_BucketIndex *bi_svc,
RGWSI_User* user_svc,
- RGWDataChangesLog *datalog_svc);
+ RGWDataChangesLog *datalog_svc,
+ BucketsObjGetter obj_getter_func,
+ UserBucketsObjGetter user_buckets_getter_func);
void init(RGWUserCtl *user_ctl,
RGWDataChangesLog *datalog,
bool rgw_find_bucket_by_id(const DoutPrefixProvider *dpp, CephContext *cct, rgw::sal::Driver* driver, const std::string& marker,
const std::string& bucket_id, rgw_bucket* bucket_out);
+
return 0;
}
+// TODO: add vector bucket support
int RGWAsyncPutBucketInstanceInfo::_send_request(const DoutPrefixProvider *dpp)
{
auto r = store->getRados()->put_bucket_instance_info(bucket_info, exclusive,
- mtime, attrs, dpp, null_yield);
+ mtime, attrs, dpp, null_yield, store->ctl()->bucket);
if (r < 0) {
ldpp_dout(dpp, 0) << "ERROR: failed to put bucket instance info for "
<< bucket_info.bucket << dendl;
if (ret < 0)
return ret;
+ ret = open_vector_pool_ctx(dpp);
+ if (ret < 0)
+ return ret;
+
pools_initialized = true;
index_completion_manager = new RGWIndexCompletionManager(this);
return rgw_init_ioctx(dpp, get_rados_handle(), pool, io_ctx, create, mostly_omap, bulk);
}
+int RGWRados::open_vector_pool_ctx(const DoutPrefixProvider *dpp)
+{
+ return rgw_init_ioctx(dpp, get_rados_handle(), svc.zone->get_zone_params().vector_pool, vector_pool_ctx, true, true);
+}
+
/**** logs ****/
struct log_list_state {
*bucket_id = buf;
}
+constexpr int MAX_CREATE_RETRIES= 20; /* need to bound retries */
+
int RGWRados::create_bucket(const DoutPrefixProvider* dpp,
optional_yield y,
const rgw_bucket& bucket,
{
int ret = 0;
-#define MAX_CREATE_RETRIES 20 /* need to bound retries */
for (int i = 0; i < MAX_CREATE_RETRIES; i++) {
RGWObjVersionTracker& objv_tracker = info.objv_tracker;
objv_tracker.read_version.clear();
}
constexpr bool exclusive = true;
- ret = put_linked_bucket_info(info, exclusive, ceph::real_time(), pep_objv, &attrs, true, dpp, y);
+ ret = put_linked_bucket_info(info, exclusive, ceph::real_time(), pep_objv, &attrs, true, dpp, y, ctl.bucket);
if (ret == -ECANCELED) {
ret = -EEXIST;
}
if (ret == -EEXIST) {
/* we need to reread the info and return it, caller will have a use for it */
RGWBucketInfo orig_info;
- int r = get_bucket_info(&svc, bucket.tenant, bucket.name, orig_info, NULL, y, dpp);
+ int r = get_bucket_info(&svc, bucket.tenant, bucket.name, orig_info, NULL, y, dpp, ctl.bucket);
if (r < 0) {
if (r == -ENOENT) {
continue;
return -ENOENT;
}
+int RGWRados::create_vector_bucket(const DoutPrefixProvider* dpp,
+ optional_yield y,
+ const rgw_bucket& bucket,
+ const rgw_owner& owner,
+ const std::string& zonegroup_id,
+ const rgw_placement_rule& placement_rule,
+ const std::map<std::string, bufferlist>& attrs,
+ const std::optional<RGWQuotaInfo>& quota,
+ std::optional<ceph::real_time> creation_time,
+ obj_version* pep_objv,
+ RGWBucketInfo& info)
+{
+ ldpp_dout(dpp, 20) << "s3vector --- RGWRados::create_vector_bucke called" << dendl;
+ int ret = 0;
+
+ for (int i = 0; i < MAX_CREATE_RETRIES; i++) {
+ RGWObjVersionTracker& objv_tracker = info.objv_tracker;
+ objv_tracker.read_version.clear();
+ objv_tracker.generate_new_write_ver(cct);
+
+ if (bucket.marker.empty()) {
+ create_bucket_id(&info.bucket.marker);
+ info.bucket.bucket_id = info.bucket.marker;
+ } else {
+ info.bucket = bucket;
+ }
+
+ info.owner = owner;
+ info.zonegroup = zonegroup_id;
+ info.placement_rule = placement_rule;
+
+ if (creation_time) {
+ info.creation_time = *creation_time;
+ } else {
+ info.creation_time = ceph::real_clock::now();
+ }
+ if (quota) {
+ info.quota = *quota;
+ }
+
+ constexpr bool exclusive = true;
+ ret = put_linked_bucket_info(info, exclusive, ceph::real_time(), pep_objv, &attrs, true, dpp, y, ctl.vector_bucket);
+ if (ret == -ECANCELED) {
+ ret = -EEXIST;
+ }
+ if (ret == -EEXIST) {
+ /* we need to reread the info and return it, caller will have a use for it */
+ RGWBucketInfo orig_info;
+ int r = get_bucket_info(&svc, bucket.tenant, bucket.name, orig_info, NULL, y, dpp, ctl.vector_bucket);
+ if (r < 0) {
+ if (r == -ENOENT) {
+ continue;
+ }
+ ldpp_dout(dpp, 0) << "get s3vector bucket info returned " << r << dendl;
+ return r;
+ }
+
+ /* only remove it if it's a different bucket instance */
+ if (orig_info.bucket.bucket_id != bucket.bucket_id) {
+ r = ctl.vector_bucket->remove_bucket_instance_info(info.bucket, info, y, dpp);
+ if (r < 0) {
+ ldpp_dout(dpp, 0) << "WARNING: " << __func__ << "(): failed to remove s3vector bucket instance info: bucket instance=" << info.bucket.get_key() << ": r=" << r << dendl;
+ /* continue anyway */
+ }
+ }
+
+ info = std::move(orig_info);
+ /* ret == -EEXIST here */
+ }
+ return ret;
+ }
+
+ /* this is highly unlikely */
+ ldpp_dout(dpp, 0) << "ERROR: could not create s3vector bucket, continuously raced with bucket creation and removal" << dendl;
+ return -ENOENT;
+}
+
bool RGWRados::obj_to_raw(const rgw_placement_rule& placement_rule, const rgw_obj& obj, rgw_raw_obj *raw_obj)
{
get_obj_bucket_and_oid_loc(obj, raw_obj->oid, raw_obj->loc);
RGWBucketInfo dest_bucket_info;
- r = get_bucket_info(&svc, bucket_info.bucket.tenant, bucket_info.swift_ver_location, dest_bucket_info, NULL, y, NULL);
+ r = get_bucket_info(&svc, bucket_info.bucket.tenant, bucket_info.swift_ver_location, dest_bucket_info, NULL, y, dpp, ctl.bucket, NULL);
if (r < 0) {
ldpp_dout(dpp, 10) << "failed to read dest bucket info: r=" << r << dendl;
if (r == -ENOENT) {
int ret = get_bucket_info(&svc, bucket_info.bucket.tenant,
bucket_info.swift_ver_location,
- archive_binfo, nullptr, y, nullptr);
+ archive_binfo, nullptr, y, dpp, ctl.bucket, nullptr);
if (ret < 0) {
return ret;
}
return 0;
}
+int RGWRados::delete_vector_bucket(RGWBucketInfo& bucket_info, std::map<std::string, bufferlist>& attrs, RGWObjVersionTracker& objv_tracker, optional_yield y, const DoutPrefixProvider *dpp)
+{
+ ldpp_dout(dpp, 20) << "s3vector --- RGWRados::delete_vector_bucket called" << dendl;
+ const rgw_bucket& bucket = bucket_info.bucket;
+
+ bool remove_ep = true;
+
+ int r;
+ if (objv_tracker.read_version.empty()) {
+ RGWBucketEntryPoint ep;
+ r = ctl.vector_bucket->read_bucket_entrypoint_info(bucket_info.bucket,
+ &ep,
+ y,
+ dpp,
+ RGWBucketCtl::Bucket::GetParams()
+ .set_objv_tracker(&objv_tracker));
+ if (r < 0 ||
+ (!bucket_info.bucket.bucket_id.empty() &&
+ ep.bucket.bucket_id != bucket_info.bucket.bucket_id)) {
+ if (r != -ENOENT) {
+ ldpp_dout(dpp, 0) << "ERROR: read_bucket_entrypoint_info() s3vector bucket=" << bucket_info.bucket << " returned error: r=" << r << dendl;
+ /* we have no idea what caused the error, will not try to remove it */
+ }
+ /*
+ * either failed to read bucket entrypoint, or it points to a different bucket instance than
+ * requested
+ */
+ remove_ep = false;
+ }
+ }
+
+ if (remove_ep) {
+ r = ctl.vector_bucket->remove_bucket_entrypoint_info(bucket_info.bucket, y, dpp,
+ RGWBucketCtl::Bucket::RemoveParams()
+ .set_objv_tracker(&objv_tracker));
+ if (r < 0)
+ return r;
+ }
+
+ /* if the bucket is not synced we can remove the meta file */
+ if (!svc.zone->is_syncing_bucket_meta()) {
+ r = ctl.vector_bucket->remove_bucket_instance_info(bucket, bucket_info, y, dpp);
+ if (r < 0) {
+ return r;
+ }
+
+ } else {
+ // set 'deleted' flag for multisite replication to handle bucket instance removal
+ // TODO: implement store_delete_bucket_info_flag for vector buckets
+ /*r = store_delete_bucket_info_flag(bucket_info, attrs, y, dpp);
+ if (r < 0) {
+ // no need to treat this as an error
+ ldpp_dout(dpp, 0) << "WARNING: failed to store bucket info flag 'deleted' on bucket: " << bucket.name << " r=" << r << dendl;
+ } else {
+ ldpp_dout(dpp, 20) << "INFO: setting bucket info flag to deleted for bucket: " << bucket.name << dendl;
+ }*/
+ }
+
+ return 0;
+}
+
+// TODO add vector bucket support
int RGWRados::set_bucket_owner(rgw_bucket& bucket, ACLOwner& owner, const DoutPrefixProvider *dpp, optional_yield y)
{
RGWBucketInfo info;
int r;
if (bucket.bucket_id.empty()) {
- r = get_bucket_info(&svc, bucket.tenant, bucket.name, info, NULL, y, dpp, &attrs);
+ r = get_bucket_info(&svc, bucket.tenant, bucket.name, info, NULL, y, dpp, ctl.bucket, &attrs);
} else {
r = get_bucket_instance_info(bucket, info, nullptr, &attrs, y, dpp);
}
info.owner = owner.id;
- r = put_bucket_instance_info(info, false, real_time(), &attrs, dpp, y);
+ r = put_bucket_instance_info(info, false, real_time(), &attrs, dpp, y, ctl.bucket);
if (r < 0) {
ldpp_dout(dpp, 0) << "NOTICE: put_bucket_info on bucket=" << bucket.name << " returned err=" << r << dendl;
return r;
}
+// TODO add vector bucket support
int RGWRados::set_buckets_enabled(vector<rgw_bucket>& buckets, bool enabled, const DoutPrefixProvider *dpp, optional_yield y)
{
int ret = 0;
RGWBucketInfo info;
map<string, bufferlist> attrs;
- int r = get_bucket_info(&svc, bucket.tenant, bucket.name, info, NULL, y, dpp, &attrs);
+ int r = get_bucket_info(&svc, bucket.tenant, bucket.name, info, NULL, y, dpp, ctl.bucket, &attrs);
if (r < 0) {
ldpp_dout(dpp, 0) << "NOTICE: get_bucket_info on bucket=" << bucket.name << " returned err=" << r << ", skipping bucket" << dendl;
ret = r;
info.flags |= BUCKET_SUSPENDED;
}
- r = put_bucket_instance_info(info, false, real_time(), &attrs, dpp, y);
+ r = put_bucket_instance_info(info, false, real_time(), &attrs, dpp, y, ctl.bucket);
if (r < 0) {
ldpp_dout(dpp, 0) << "NOTICE: put_bucket_info on bucket=" << bucket.name << " returned err=" << r << ", skipping bucket" << dendl;
ret = r;
return ret;
}
+// TODO add vector bucket support
int RGWRados::bucket_suspended(const DoutPrefixProvider *dpp, rgw_bucket& bucket, bool *suspended, optional_yield y)
{
RGWBucketInfo bucket_info;
- int ret = get_bucket_info(&svc, bucket.tenant, bucket.name, bucket_info, NULL, y, dpp);
+ int ret = get_bucket_info(&svc, bucket.tenant, bucket.name, bucket_info, NULL, y, dpp, ctl.bucket);
if (ret < 0) {
return ret;
}
map<string, bufferlist> bucket_attrs;
int ret = get_bucket_info(&svc, bucket_info.bucket.tenant, bucket_info.bucket.name,
- bucket_info, nullptr, y, dpp, &bucket_attrs);
+ bucket_info, nullptr, y, dpp, ctl.bucket, &bucket_attrs);
if (ret < 0) {
ldpp_dout(dpp, 0) << __func__ <<
" ERROR: failed to refresh bucket info : " << cpp_strerror(-ret) << dendl;
bucket_info.bucket.bucket_id << "; expected if resharding underway" << dendl;
// update the judge time
bucket_info.layout.judge_reshard_lock_time = real_clock::now();
- ret = put_bucket_instance_info(bucket_info, false, real_time(), &bucket_attrs, dpp, y);
+ ret = put_bucket_instance_info(bucket_info, false, real_time(), &bucket_attrs, dpp, y, ctl.bucket);
if (ret < 0) {
ldpp_dout(dpp, 0) << "RGWReshard::" << __func__ <<
" ERROR: error putting bucket instance info: " << cpp_strerror(-ret) << dendl;
auto fetch_new_bucket_info =
[this, bs, &obj_instance, &bucket_info, &bucket_attrs, &y, dpp](const std::string& log_tag) -> int {
int ret = get_bucket_info(&svc, bs->bucket.tenant, bs->bucket.name,
- bucket_info, nullptr, y, dpp, &bucket_attrs);
+ bucket_info, nullptr, y, dpp, ctl.bucket, &bucket_attrs);
if (ret < 0) {
ldpp_dout(dpp, 0) << __func__ <<
" ERROR: failed to refresh bucket info after reshard at " <<
RGWBucketInfo& info,
real_time *pmtime,
optional_yield y,
- const DoutPrefixProvider *dpp, map<string, bufferlist> *pattrs)
+ const DoutPrefixProvider *dpp, RGWBucketCtl* bucket_ctl, map<string, bufferlist> *pattrs)
{
rgw_bucket bucket;
bucket.tenant = tenant;
bucket.name = bucket_name;
- return ctl.bucket->read_bucket_info(bucket, &info, y, dpp,
+ return bucket_ctl->read_bucket_info(bucket, &info, y, dpp,
RGWBucketCtl::BucketInstance::GetParams()
.set_mtime(pmtime)
.set_attrs(pattrs));
int RGWRados::try_refresh_bucket_info(RGWBucketInfo& info,
ceph::real_time *pmtime,
const DoutPrefixProvider *dpp, optional_yield y,
- map<string, bufferlist> *pattrs)
+ map<string, bufferlist> *pattrs,
+ RGWBucketCtl* bucket_ctl)
{
rgw_bucket bucket = info.bucket;
bucket.bucket_id.clear();
auto rv = info.objv_tracker.read_version;
- return ctl.bucket->read_bucket_info(bucket, &info, y, dpp,
+ return bucket_ctl->read_bucket_info(bucket, &info, y, dpp,
RGWBucketCtl::BucketInstance::GetParams()
.set_mtime(pmtime)
.set_attrs(pattrs)
int RGWRados::put_bucket_instance_info(RGWBucketInfo& info, bool exclusive,
real_time mtime, const map<string, bufferlist> *pattrs,
- const DoutPrefixProvider *dpp, optional_yield y)
+ const DoutPrefixProvider *dpp, optional_yield y, RGWBucketCtl* bucket_ctl)
{
- return ctl.bucket->store_bucket_instance_info(info.bucket, info, y, dpp,
+ return bucket_ctl->store_bucket_instance_info(info.bucket, info, y, dpp,
RGWBucketCtl::BucketInstance::PutParams()
.set_exclusive(exclusive)
.set_mtime(mtime)
int RGWRados::put_linked_bucket_info(RGWBucketInfo& info, bool exclusive, real_time mtime, obj_version *pep_objv,
const map<string, bufferlist> *pattrs, bool create_entry_point,
- const DoutPrefixProvider *dpp, optional_yield y)
+ const DoutPrefixProvider *dpp, optional_yield y, RGWBucketCtl* bucket_ctl)
{
bool create_head = !info.has_instance_obj || create_entry_point;
- int ret = put_bucket_instance_info(info, exclusive, mtime, pattrs, dpp, y);
+ int ret = put_bucket_instance_info(info, exclusive, mtime, pattrs, dpp, y, bucket_ctl);
if (ret < 0) {
return ret;
}
*pep_objv = ot.write_version;
}
}
- ret = ctl.bucket->store_bucket_entrypoint_info(info.bucket, entry_point, y, dpp, RGWBucketCtl::Bucket::PutParams()
+ ret = bucket_ctl->store_bucket_entrypoint_info(info.bucket, entry_point, y, dpp, RGWBucketCtl::Bucket::PutParams()
.set_exclusive(exclusive)
.set_objv_tracker(&ot)
.set_mtime(mtime));
int open_reshard_pool_ctx(const DoutPrefixProvider *dpp);
int open_notif_pool_ctx(const DoutPrefixProvider *dpp);
int open_logging_pool_ctx(const DoutPrefixProvider *dpp);
+ int open_vector_pool_ctx(const DoutPrefixProvider *dpp);
int open_pool_ctx(const DoutPrefixProvider *dpp, const rgw_pool& pool, librados::IoCtx& io_ctx,
bool mostly_omap, bool bulk);
librados::IoCtx reshard_pool_ctx;
librados::IoCtx notif_pool_ctx; // .rgw.notif
librados::IoCtx logging_pool_ctx; // .rgw.logging
+ librados::IoCtx vector_pool_ctx; // .rgw.meta.vector
bool pools_initialized{false};
obj_version* pep_objv,
RGWBucketInfo& info);
+ int create_vector_bucket(const DoutPrefixProvider* dpp,
+ optional_yield y,
+ const rgw_bucket& bucket,
+ const rgw_owner& owner,
+ const std::string& zonegroup_id,
+ const rgw_placement_rule& placement_rule,
+ const std::map<std::string, bufferlist>& attrs,
+ const std::optional<RGWQuotaInfo>& quota,
+ std::optional<ceph::real_time> creation_time,
+ obj_version* pep_objv,
+ RGWBucketInfo& info);
+
RGWCoroutinesManagerRegistry *get_cr_registry() { return cr_registry; }
struct BucketShard {
*/
int delete_bucket(RGWBucketInfo& bucket_info, std::map<std::string, bufferlist>& attrs, RGWObjVersionTracker& objv_tracker, optional_yield y, const DoutPrefixProvider *dpp, bool check_empty = true);
+ int delete_vector_bucket(RGWBucketInfo& bucket_info, std::map<std::string, bufferlist>& attrs, RGWObjVersionTracker& objv_tracker, optional_yield y, const DoutPrefixProvider *dpp);
+
void wakeup_meta_sync_shards(std::set<int>& shard_ids);
void wakeup_data_sync_shards(const DoutPrefixProvider *dpp, const rgw_zone_id& source_zone, bc::flat_map<int, bc::flat_set<rgw_data_notify_entry> >& entries);
std::map<RGWObjCategory, RGWStorageStats>& stats, std::string *max_marker, bool* syncstopped = NULL);
int get_bucket_stats_async(const DoutPrefixProvider *dpp, RGWBucketInfo& bucket_info, const rgw::bucket_index_layout_generation& idx_layout, int shard_id, boost::intrusive_ptr<rgw::sal::ReadStatsCB> cb);
- int put_bucket_instance_info(RGWBucketInfo& info, bool exclusive, ceph::real_time mtime, const std::map<std::string, bufferlist> *pattrs, const DoutPrefixProvider *dpp, optional_yield y);
+ int put_bucket_instance_info(RGWBucketInfo& info, bool exclusive, ceph::real_time mtime, const std::map<std::string, bufferlist> *pattrs, const DoutPrefixProvider *dpp, optional_yield y, RGWBucketCtl* bucket_ctl);
/* xxx dang obj_ctx -> svc */
int get_bucket_instance_info(const std::string& meta_key, RGWBucketInfo& info, ceph::real_time *pmtime, std::map<std::string, bufferlist> *pattrs, optional_yield y, const DoutPrefixProvider *dpp);
int get_bucket_instance_info(const rgw_bucket& bucket, RGWBucketInfo& info, ceph::real_time *pmtime, std::map<std::string, bufferlist> *pattrs, optional_yield y, const DoutPrefixProvider *dpp);
const std::string& tenant_name, const std::string& bucket_name,
RGWBucketInfo& info,
ceph::real_time *pmtime, optional_yield y,
- const DoutPrefixProvider *dpp, std::map<std::string, bufferlist> *pattrs = NULL);
+ const DoutPrefixProvider *dpp, RGWBucketCtl* bucket_ctl, std::map<std::string, bufferlist> *pattrs = NULL);
RGWChainedCacheImpl_bucket_topics_entry *get_topic_cache() { return topic_cache; }
int try_refresh_bucket_info(RGWBucketInfo& info,
ceph::real_time *pmtime,
const DoutPrefixProvider *dpp, optional_yield y,
- std::map<std::string, bufferlist> *pattrs = nullptr);
+ std::map<std::string, bufferlist> *pattrs, RGWBucketCtl* bucket_ctl);
int put_linked_bucket_info(RGWBucketInfo& info, bool exclusive, ceph::real_time mtime, obj_version *pep_objv,
const std::map<std::string, bufferlist> *pattrs, bool create_entry_point,
- const DoutPrefixProvider *dpp, optional_yield y);
+ const DoutPrefixProvider *dpp, optional_yield y, RGWBucketCtl* bucket_ctl);
int cls_obj_prepare_op(const DoutPrefixProvider *dpp, BucketShard& bs, RGWModifyOp op, std::string& tag, rgw_obj& obj,
optional_yield y);
if (ret = fault.check("set_target_layout");
ret == 0) { // no fault injected, write the bucket instance metadata
ret = store->getRados()->put_bucket_instance_info(bucket_info, false,
- real_time(), &bucket_attrs, dpp, y);
+ real_time(), &bucket_attrs, dpp, y, store->ctl()->bucket);
} else if (ret == -ECANCELED) {
fault.clear(); // clear the fault so a retry can succeed
}
ret == 0) { // no fault injected, revert the bucket instance metadata
ret = store->getRados()->put_bucket_instance_info(bucket_info, false,
real_time(),
- &bucket_attrs, dpp, y);
+ &bucket_attrs, dpp, y, store->ctl()->bucket);
} else if (ret == -ECANCELED) {
fault.clear(); // clear the fault so a retry can succeed
}
if (ret = fault.check("change_reshard_state");
ret == 0) { // no fault injected, write the bucket instance metadata
ret = store->getRados()->put_bucket_instance_info(bucket_info, false,
- real_time(), &bucket_attrs, dpp, y);
+ real_time(), &bucket_attrs, dpp, y, store->ctl()->bucket);
} else if (ret == -ECANCELED) {
fault.clear(); // clear the fault so a retry can succeed
}
int ret = fault.check("commit_target_layout");
if (ret == 0) { // no fault injected, write the bucket instance metadata
ret = store->getRados()->put_bucket_instance_info(
- bucket_info, false, real_time(), &bucket_attrs, dpp, y);
+ bucket_info, false, real_time(), &bucket_attrs, dpp, y, store->ctl()->bucket);
} else if (ret == -ECANCELED) {
fault.clear(); // clear the fault so a retry can succeed
}
entry.bucket_name,
bucket_info, nullptr,
y, dpp,
+ store->ctl()->bucket,
&bucket_attrs);
if (ret < 0 || bucket_info.bucket.bucket_id != entry.bucket_id) {
if (ret < 0) {
return std::visit(visitor{svc_user, svc_zone}, owner);
}
+static rgw_raw_obj get_owner_vector_buckets_obj(RGWSI_User* svc_user,
+ RGWSI_Zone* svc_zone,
+ const rgw_owner& owner)
+{
+ struct visitor {
+ RGWSI_User* svc_user;
+ RGWSI_Zone* svc_zone;
+
+ rgw_raw_obj operator()(const rgw_user& user) {
+ return svc_user->get_vector_buckets_obj(user);
+ }
+ rgw_raw_obj operator()(const rgw_account_id& id) {
+ const RGWZoneParams& zone = svc_zone->get_zone_params();
+ return rgwrados::account::get_vector_buckets_obj(zone, id);
+ }
+ };
+ return std::visit(visitor{svc_user, svc_zone}, owner);
+}
+
int RadosStore::list_buckets(const DoutPrefixProvider* dpp,
const rgw_owner& owner, const std::string& tenant,
const std::string& marker, const std::string& end_marker,
return ret;
}
+int RadosVectorBucket::unlink(const DoutPrefixProvider* dpp, const rgw_owner& owner, optional_yield y, bool update_entrypoint)
+{
+ ldpp_dout(dpp, 20) << "s3vector --- RadosVectorBucket::unlink called" << dendl;
+ librados::Rados& rados = *store->getRados()->get_rados_handle();
+ return store->ctl()->vector_bucket->unlink_bucket(rados, owner, info.bucket,
+ y, dpp, update_entrypoint);
+}
+
+int RadosVectorBucket::remove(const DoutPrefixProvider* dpp,
+ bool delete_children,
+ optional_yield y)
+{
+ ldpp_dout(dpp, 20) << "s3vector --- RadosVectorBucket::remove called" << dendl;
+ RGWObjVersionTracker ot;
+
+ int ret = store->getRados()->delete_vector_bucket(info, get_attrs(), ot, y, dpp);
+ if (ret < 0) {
+ ldpp_dout(dpp, -1) << "ERROR: could not remove s3vector bucket " <<
+ info.bucket.name << dendl;
+ return ret;
+ }
+
+ librados::Rados& rados = *store->getRados()->get_rados_handle();
+ ret = store->ctl()->vector_bucket->unlink_bucket(rados, info.owner,
+ info.bucket, y, dpp, false);
+ if (ret < 0) {
+ ldpp_dout(dpp, -1) << "ERROR: unable to remove user s3vector bucket information" << dendl;
+ }
+
+ return ret;
+}
+
int RadosBucket::load_bucket(const DoutPrefixProvider* dpp, optional_yield y)
{
int ret;
return ret;
}
+int RadosVectorBucket::load_bucket(const DoutPrefixProvider* dpp, optional_yield y)
+{
+ ldpp_dout(dpp, 20) << "s3vector --- RadosVectorBucket::load_bucket called" << dendl;
+ int ret;
+
+ RGWObjVersionTracker ep_ot;
+ if (info.bucket.bucket_id.empty()) {
+ ret = store->ctl()->vector_bucket->read_bucket_info(info.bucket, &info, y, dpp,
+ RGWBucketCtl::BucketInstance::GetParams()
+ .set_mtime(&mtime)
+ .set_attrs(&attrs),
+ &ep_ot);
+ } else {
+ ret = store->ctl()->vector_bucket->read_bucket_instance_info(info.bucket, &info, y, dpp,
+ RGWBucketCtl::BucketInstance::GetParams()
+ .set_mtime(&mtime)
+ .set_attrs(&attrs));
+ }
+ if (ret != 0) {
+ return ret;
+ }
+
+ bucket_version = ep_ot.read_version;
+
+ return ret;
+}
+
+int RadosVectorBucket::create(const DoutPrefixProvider* dpp,
+ const CreateParams& params,
+ optional_yield y)
+{
+ ldpp_dout(dpp, 20) << "s3vector --- RadosVectorBucket::create called" << dendl;
+ rgw_bucket key = get_key();
+ key.marker = params.marker;
+ key.bucket_id = params.bucket_id;
+
+ int ret = store->getRados()->create_vector_bucket(
+ dpp, y, key, params.owner, params.zonegroup_id,
+ params.placement_rule, params.attrs,
+ params.quota, params.creation_time, &bucket_version, info);
+
+ bool existed = false;
+ if (ret == -EEXIST) {
+ existed = true;
+ /* bucket already existed, might have raced with another bucket creation,
+ * or might be partial bucket creation that never completed. Read existing
+ * bucket info, verify that the reported bucket owner is the current user.
+ * If all is ok then update the user's list of buckets. Otherwise inform
+ * client about a name conflict.
+ */
+ if (info.owner != params.owner) {
+ return -ERR_BUCKET_EXISTS;
+ }
+ // prevent re-creation with different index type or shard count
+ if ((params.index_type && *params.index_type !=
+ info.layout.current_index.layout.type) ||
+ (params.index_shards && *params.index_shards !=
+ info.layout.current_index.layout.normal.num_shards)) {
+ return -ERR_BUCKET_EXISTS;
+ }
+ ret = 0;
+ } else if (ret != 0) {
+ return ret;
+ }
+
+ ret = link(dpp, params.owner, y, true);
+ if (ret && !existed && ret != -EEXIST) {
+ /* if it exists (or previously existed), don't remove it! */
+ ret = unlink(dpp, params.owner, y);
+ if (ret < 0) {
+ ldpp_dout(dpp, 0) << "WARNING: failed to unlink bucket: ret=" << ret
+ << dendl;
+ }
+ } else if (ret == -EEXIST) {
+ ret = -ERR_BUCKET_EXISTS;
+ } else if (ret == 0) {
+ /* this is to handle the following race condition:
+ * a concurrent DELETE bucket request deletes the bucket entry point and
+ * unlinks it (if the bucket pre-exists) before it's linked in this
+ * bucket creation request. */
+
+ if (existed) {
+ ret = -ERR_BUCKET_EXISTS;
+ }
+
+ RGWBucketEntryPoint ep;
+ RGWObjVersionTracker objv_tracker;
+ int r = store->ctl()->vector_bucket->read_bucket_entrypoint_info(info.bucket,
+ &ep,
+ y,
+ dpp,
+ RGWBucketCtl::Bucket::GetParams()
+ .set_objv_tracker(&objv_tracker));
+ if (r == -ENOENT) {
+ ret = 0;
+
+ ldpp_dout(dpp, 5) << "WARNING: the bucket entry point has been deleted by a concurrent DELETE bucket request."
+ << " Unlinking the bucket." << dendl;
+ r = unlink(dpp, params.owner, y);
+ if (r < 0) {
+ ldpp_dout(dpp, 0) << "WARNING: failed to unlink bucket: ret=" << r << dendl;
+ }
+ }
+ }
+
+ return ret;
+}
+
+int RadosVectorBucket::link(const DoutPrefixProvider* dpp, const rgw_owner& new_owner, optional_yield y, bool update_entrypoint, RGWObjVersionTracker* objv) {
+ ldpp_dout(dpp, 20) << "s3vector --- RadosVectorBucket::link called" << dendl;
+ RGWBucketEntryPoint ep;
+ ep.bucket = info.bucket;
+ ep.owner = new_owner;
+ ep.creation_time = get_creation_time();
+ ep.linked = true;
+ Attrs ep_attrs;
+ rgw_ep_info ep_data{ep, ep_attrs};
+
+ librados::Rados& rados = *store->getRados()->get_rados_handle();
+ int r = store->ctl()->vector_bucket->link_bucket(rados, new_owner, info.bucket,
+ get_creation_time(), y, dpp, update_entrypoint,
+ &ep_data);
+ if (r < 0)
+ return r;
+
+ if (objv)
+ *objv = ep_data.ep_objv;
+
+ return r;
+}
+
int RadosBucket::read_stats(const DoutPrefixProvider *dpp, optional_yield y,
const bucket_index_layout_generation& idx_layout,
int shard_id, std::string* bucket_ver, std::string* master_ver,
int RadosBucket::put_info(const DoutPrefixProvider* dpp, bool exclusive, ceph::real_time _mtime, optional_yield y)
{
mtime = _mtime;
- return store->getRados()->put_bucket_instance_info(info, exclusive, mtime, &attrs, dpp, y);
+ return store->getRados()->put_bucket_instance_info(info, exclusive, mtime, &attrs, dpp, y, store->ctl()->bucket);
+}
+
+int RadosVectorBucket::put_info(const DoutPrefixProvider* dpp, bool exclusive, ceph::real_time _mtime, optional_yield y)
+{
+ mtime = _mtime;
+ return store->getRados()->put_bucket_instance_info(info, exclusive, mtime, &attrs, dpp, y, store->ctl()->vector_bucket);
}
int RadosBucket::check_empty(const DoutPrefixProvider* dpp, optional_yield y)
int RadosBucket::try_refresh_info(const DoutPrefixProvider* dpp, ceph::real_time* pmtime, optional_yield y)
{
- return store->getRados()->try_refresh_bucket_info(info, pmtime, dpp, y, &attrs);
+ return store->getRados()->try_refresh_bucket_info(info, pmtime, dpp, y, &attrs, store->ctl()->bucket);
+}
+
+int RadosVectorBucket::try_refresh_info(const DoutPrefixProvider* dpp, ceph::real_time* pmtime, optional_yield y)
+{
+ return store->getRados()->try_refresh_bucket_info(info, pmtime, dpp, y, &attrs, store->ctl()->vector_bucket);
}
int RadosBucket::read_usage(const DoutPrefixProvider *dpp, uint64_t start_epoch, uint64_t end_epoch,
return rados->get_obj_head_ioctx(dpp, bucket_info, obj, ioctx);
}
+int RadosStore::load_vector_bucket(const DoutPrefixProvider* dpp, const rgw_bucket& b,
+ std::unique_ptr<VectorBucket>* bucket, optional_yield y) {
+ *bucket = std::make_unique<RadosVectorBucket>(this, b);
+ return (*bucket)->load_bucket(dpp, y);
+}
+
+int RadosStore::list_vector_buckets(const DoutPrefixProvider* dpp,
+ const rgw_owner& owner, const std::string& tenant,
+ const std::string& marker, const std::string& end_marker,
+ uint64_t max, BucketList& listing,
+ optional_yield y) {
+ librados::Rados& rados = *getRados()->get_rados_handle();
+ const rgw_raw_obj& obj = get_owner_vector_buckets_obj(svc()->user, svc()->zone, owner);
+
+ return rgwrados::buckets::list(dpp, y, rados, obj, tenant,
+ marker, end_marker, max, listing);
+}
+
RadosObject::~RadosObject()
{
if (rados_ctx_owned)
const std::string& unique_tag) override;
virtual const std::string& get_compression_type(const rgw_placement_rule& rule) override;
virtual bool valid_placement(const rgw_placement_rule& rule) override;
+ int load_vector_bucket(const DoutPrefixProvider* dpp, const rgw_bucket& b,
+ std::unique_ptr<VectorBucket>* bucket, optional_yield y) override;
+ int list_vector_buckets(const DoutPrefixProvider* dpp,
+ const rgw_owner& owner, const std::string& tenant,
+ const std::string& marker, const std::string& end_marker,
+ uint64_t max, BucketList& listing,
+ optional_yield y) override;
virtual void shutdown(void) override;
friend class RadosUser;
};
+class RadosVectorBucket : public StoreVectorBucket {
+ private:
+ RadosStore* store;
+
+ public:
+ RadosVectorBucket(RadosStore *_st)
+ : store(_st) {}
+
+ RadosVectorBucket(RadosStore *_st, const rgw_bucket& _b)
+ : StoreVectorBucket(_b),
+ store(_st) {}
+
+ RadosVectorBucket(RadosStore *_st, const RGWBucketInfo& _i)
+ : StoreVectorBucket(_i),
+ store(_st) {}
+
+ ~RadosVectorBucket() override = default;
+ int remove(const DoutPrefixProvider* dpp, bool delete_children, optional_yield y) override;
+ int create(const DoutPrefixProvider* dpp, const CreateParams& params,
+ optional_yield y) override;
+ int load_bucket(const DoutPrefixProvider* dpp, optional_yield y) override;
+ int check_empty(const DoutPrefixProvider* dpp, optional_yield y) override { return 0; }
+ std::unique_ptr<VectorBucket> clone() override {
+ return std::make_unique<RadosVectorBucket>(*this);
+ }
+ int put_info(const DoutPrefixProvider* dpp, bool exclusive, ceph::real_time mtime, optional_yield y) override;
+ int try_refresh_info(const DoutPrefixProvider* dpp, ceph::real_time* pmtime, optional_yield y) override;
+
+ private:
+ int link(const DoutPrefixProvider* dpp, const rgw_owner& new_owner, optional_yield y, bool update_entrypoint = true, RGWObjVersionTracker* objv = nullptr);
+ int unlink(const DoutPrefixProvider* dpp, const rgw_owner& owner, optional_yield y, bool update_entrypoint = true);
+};
+
class RadosMultipartPart : public StoreMultipartPart {
protected:
RGWUploadPartInfo info;
const rgw::SiteConfig* site)
{
bucket_sobj = std::make_unique<RGWSI_Bucket_SObj>(cct);
+ vector_bucket_sobj = std::make_unique<RGWSI_VectorBucket_SObj>(cct);
bucket_sync_sobj = std::make_unique<RGWSI_Bucket_Sync_SObj>(cct);
bi_rados = std::make_unique<RGWSI_BucketIndex_RADOS>(cct);
bilog_rados = std::make_unique<RGWSI_BILog_RADOS>(cct);
bucket_sobj->init(zone.get(), sysobj.get(), sysobj_cache.get(),
bi_rados.get(), mdlog.get(),
sync_modules.get(), bucket_sync_sobj.get());
+ vector_bucket_sobj->init(zone.get(), sysobj.get(), sysobj_cache.get(),
+ bi_rados.get(), mdlog.get(),
+ sync_modules.get(), bucket_sync_sobj.get());
bucket_sync_sobj->init(zone.get(),
sysobj.get(),
sysobj_cache.get(),
return r;
}
+ r = vector_bucket_sobj->start(y, dpp);
+ if (r < 0) {
+ ldpp_dout(dpp, 0) << "ERROR: failed to start s3vector bucket service (" << cpp_strerror(-r) << dendl;
+ return r;
+ }
+
r = bucket_sync_sobj->start(y, dpp);
if (r < 0) {
ldpp_dout(dpp, 0) << "ERROR: failed to start bucket_sync service (" << cpp_strerror(-r) << dendl;
bi_rados->shutdown();
bucket_sync_sobj->shutdown();
bucket_sobj->shutdown();
+ vector_bucket_sobj->shutdown();
sysobj->shutdown();
sysobj_core->shutdown();
bilog_rados = _svc.bilog_rados.get();
bucket_sobj = _svc.bucket_sobj.get();
bucket = bucket_sobj;
+ vector_bucket_sobj = _svc.vector_bucket_sobj.get();
+ vector_bucket = vector_bucket_sobj;
bucket_sync_sobj = _svc.bucket_sync_sobj.get();
bucket_sync = bucket_sync_sobj;
cls = _svc.cls.get();
svc.bucket,
svc.bucket_sync,
svc.bi, svc.user,
- svc.datalog_rados));
+ svc.datalog_rados,
+ rgwrados::account::get_buckets_obj,
+ &RGWSI_User::get_buckets_obj));
+ vector_bucket.reset(new RGWBucketCtl(svc.zone,
+ svc.vector_bucket,
+ svc.bucket_sync,
+ svc.bi, svc.user,
+ svc.datalog_rados,
+ rgwrados::account::get_vector_buckets_obj,
+ &RGWSI_User::get_vector_buckets_obj));
auto sync_module = svc.sync_modules->get_sync_module();
if (sync_module) {
meta.bucket = sync_module->alloc_bucket_meta_handler(rados, svc.bucket, bucket.get());
meta.bucket_instance = sync_module->alloc_bucket_instance_meta_handler(
driver, svc.zone, svc.bucket, svc.bi, svc.datalog_rados);
+ meta.vector_bucket = sync_module->alloc_vector_bucket_meta_handler(rados, svc.vector_bucket, vector_bucket.get());
+ meta.vector_bucket_instance = sync_module->alloc_vector_bucket_instance_meta_handler(
+ driver, svc.zone, svc.vector_bucket);
} else {
meta.bucket = create_bucket_metadata_handler(rados, svc.bucket, bucket.get());
meta.bucket_instance = create_bucket_instance_metadata_handler(
driver, svc.zone, svc.bucket, svc.bi, svc.datalog_rados);
+ meta.vector_bucket = create_vector_bucket_metadata_handler(rados, svc.vector_bucket, vector_bucket.get());
+ meta.vector_bucket_instance = create_vector_bucket_instance_metadata_handler(
+ driver, svc.zone, svc.vector_bucket);
}
meta.otp = rgwrados::otp::create_metadata_handler(
user->init(bucket.get());
bucket->init(user.get(), svc.datalog_rados, dpp);
+ vector_bucket->init(user.get(), svc.datalog_rados, dpp);
return 0;
}
meta.user = _ctl.meta.user.get();
meta.bucket = _ctl.meta.bucket.get();
meta.bucket_instance = _ctl.meta.bucket_instance.get();
+ meta.vector_bucket = _ctl.meta.vector_bucket.get();
+ meta.vector_bucket_instance = _ctl.meta.vector_bucket_instance.get();
meta.otp = _ctl.meta.otp.get();
meta.role = _ctl.meta.role.get();
meta.topic = _ctl.meta.topic.get();
user = _ctl.user.get();
bucket = _ctl.bucket.get();
+ vector_bucket = _ctl.vector_bucket.get();
r = meta.user->attach(meta.mgr);
if (r < 0) {
return r;
}
+ r = meta.vector_bucket->attach(meta.mgr);
+ if (r < 0) {
+ ldout(cct, 0) << "ERROR: failed to start init meta.vector_bucket ctl (" << cpp_strerror(-r) << dendl;
+ return r;
+ }
+
+ r = meta.vector_bucket_instance->attach(meta.mgr);
+ if (r < 0) {
+ ldout(cct, 0) << "ERROR: failed to start init meta.vector_bucket_instance ctl (" << cpp_strerror(-r) << dendl;
+ return r;
+ }
+
r = meta.otp->attach(meta.mgr);
if (r < 0) {
ldout(cct, 0) << "ERROR: failed to start init otp ctl (" << cpp_strerror(-r) << dendl;
class RGWSI_Bucket;
class RGWSI_Bucket_SObj;
+class RGWSI_VectorBucket_SObj;
class RGWSI_Bucket_Sync;
class RGWSI_Bucket_Sync_SObj;
class RGWSI_BucketIndex;
bool has_shutdown{false};
std::unique_ptr<RGWSI_Bucket_SObj> bucket_sobj;
+ std::unique_ptr<RGWSI_VectorBucket_SObj> vector_bucket_sobj;
std::unique_ptr<RGWSI_Bucket_Sync_SObj> bucket_sync_sobj;
std::unique_ptr<RGWSI_BucketIndex_RADOS> bi_rados;
std::unique_ptr<RGWSI_BILog_RADOS> bilog_rados;
const rgw::SiteConfig* site{nullptr};
RGWSI_Bucket *bucket{nullptr};
+ RGWSI_Bucket *vector_bucket{nullptr};
RGWSI_Bucket_SObj *bucket_sobj{nullptr};
+ RGWSI_VectorBucket_SObj *vector_bucket_sobj{nullptr};
RGWSI_Bucket_Sync *bucket_sync{nullptr};
RGWSI_Bucket_Sync_SObj *bucket_sync_sobj{nullptr};
RGWSI_BucketIndex *bi{nullptr};
std::unique_ptr<RGWMetadataManager> mgr;
std::unique_ptr<RGWMetadataHandler> bucket;
std::unique_ptr<RGWMetadataHandler> bucket_instance;
+ std::unique_ptr<RGWMetadataHandler> vector_bucket;
+ std::unique_ptr<RGWMetadataHandler> vector_bucket_instance;
std::unique_ptr<RGWMetadataHandler> user;
std::unique_ptr<RGWMetadataHandler> otp;
std::unique_ptr<RGWMetadataHandler> role;
std::unique_ptr<RGWUserCtl> user;
std::unique_ptr<RGWBucketCtl> bucket;
+ std::unique_ptr<RGWBucketCtl> vector_bucket;
RGWCtlDef();
~RGWCtlDef();
RGWMetadataHandler *bucket{nullptr};
RGWMetadataHandler *bucket_instance{nullptr};
+ RGWMetadataHandler *vector_bucket{nullptr};
+ RGWMetadataHandler *vector_bucket_instance{nullptr};
RGWMetadataHandler *user{nullptr};
RGWMetadataHandler *otp{nullptr};
RGWMetadataHandler *role{nullptr};
RGWUserCtl *user{nullptr};
RGWBucketCtl *bucket{nullptr};
+ RGWBucketCtl *vector_bucket{nullptr};
int init(RGWServices *_svc, rgw::sal::Driver* driver,
librados::Rados& rados, const DoutPrefixProvider *dpp);
svc_bucket, svc_bi, svc_datalog);
}
+auto RGWSyncModuleInstance::alloc_vector_bucket_meta_handler(librados::Rados& rados,
+ RGWSI_Bucket* svc_bucket,
+ RGWBucketCtl* ctl_bucket)
+ -> std::unique_ptr<RGWMetadataHandler>
+{
+ return create_vector_bucket_metadata_handler(rados, svc_bucket, ctl_bucket);
+}
+
+auto RGWSyncModuleInstance::alloc_vector_bucket_instance_meta_handler(rgw::sal::Driver* driver,
+ RGWSI_Zone* svc_zone,
+ RGWSI_Bucket* svc_bucket)
+ -> std::unique_ptr<RGWMetadataHandler>
+{
+ return create_vector_bucket_instance_metadata_handler(driver, svc_zone,
+ svc_bucket);
+}
RGWStatRemoteObjCBCR::RGWStatRemoteObjCBCR(RGWDataSyncCtx *_sc,
rgw_bucket& _src_bucket, rgw_obj_key& _key) : RGWCoroutine(_sc->cct),
sc(_sc), sync_env(_sc->env),
RGWSI_BucketIndex* svc_bi,
RGWDataChangesLog *svc_datalog)
-> std::unique_ptr<RGWMetadataHandler>;
+ virtual auto alloc_vector_bucket_meta_handler(librados::Rados& rados,
+ RGWSI_Bucket* svc_bucket,
+ RGWBucketCtl* ctl_bucket)
+ -> std::unique_ptr<RGWMetadataHandler>;
+ virtual auto alloc_vector_bucket_instance_meta_handler(rgw::sal::Driver* driver,
+ RGWSI_Zone* svc_zone,
+ RGWSI_Bucket* svc_bucket)
+ -> std::unique_ptr<RGWMetadataHandler>;
// indication whether the sync module start with full sync (default behavior)
// incremental sync would follow anyway
{ "route53", Service::route53 },
{ "route53domains", Service::route53domains },
{ "s3", Service::s3 },
+ { "s3vectors", Service::s3vectors },
{ "sdb", Service::sdb },
{ "servicecatalog", Service::servicecatalog },
{ "ses", Service::ses },
{ Service::route53, "route53" },
{ Service::route53domains, "route53domains" },
{ Service::s3, "s3" },
+ { Service::s3vectors, "s3vectors" },
{ Service::sdb, "sdb" },
{ Service::servicecatalog, "servicecatalog" },
{ Service::ses, "ses" },
machinelearning, aws_marketplace, aws_marketplace_management,
mobileanalytics, mobilehub, opsworks, opsworks_cm, polly,
redshift, rds, route53, route53domains, sts, servicecatalog,
- ses, sns, sqs, s3, swf, sdb, states, storagegateway, support,
+ ses, sns, sqs, s3, s3vectors, swf, sdb, states, storagegateway, support,
trustedadvisor, waf, workmail, workspaces, wildcard
};
//
// The called function must return an integer, negative on error. In
// general, they should just return op_ret.
-template<typename F>
+template<typename F, typename B=rgw::sal::Bucket>
int retry_raced_bucket_write(const DoutPrefixProvider *dpp,
- rgw::sal::Bucket *b,
+ B* b,
const F &f,
optional_yield y) {
auto r = f();
new_env.set("HTTP_X_AMZ_CONTENT_SHA256", maybe_payload_hash);
}
+ const char* content_type = info.env->get("CONTENT_TYPE");
+ if (content_type) {
+ new_env.set("HTTP_CONTENT_TYPE", content_type);
+ }
+
int ret = sign_request(dpp, key, region, s, new_env, new_info, nullptr);
if (ret < 0) {
ldpp_dout(dpp, 0) << "ERROR: failed to sign request" << dendl;
set_req_state_err(s, op_ret);
}
dump_errno(s);
- end_header(s);
-
- if (op_ret < 0)
- return;
- if (s->system_request) {
+ if (op_ret == 0 && s->system_request) {
+ s->format = RGWFormat::JSON;
+ end_header(s, this, to_mime_type(s->format));
JSONFormatter f; /* use json formatter for system requests output */
const RGWBucketInfo& info = s->bucket->get_info();
encode_json("bucket_info", info, &f);
f.close_section();
rgw_flush_formatter_and_reset(s, &f);
+ return;
}
+ end_header(s);
}
void RGWDeleteBucket_ObjStore_S3::send_response()
// s3vector requests looks like bucket POST operations
// where the "bucket name" is the operation name.
// with JSON payload
+// and no user provided params
// POST /<op name> HTTP/1.1
// Content-type: application/json
bool is_s3vector_op(const req_state* s) {
const auto content_type = s->info.env->get_optional("CONTENT_TYPE");
+ const auto& params = s->info.args.get_params();
return std::string_view(s->info.method) == "POST" &&
- s->info.args.get_num_params() == 0 &&
+ std::count_if(params.begin(), params.end(), [](const auto& p) {
+ return !p.first.starts_with(RGW_SYS_PARAM_PREFIX);
+ }) == 0 &&
content_type &&
*content_type == "application/json";
}
#include "rgw_op.h"
#include "rgw_rest_s3vector.h"
#include "rgw_s3vector.h"
+#include "rgw_process_env.h"
#include "common/async/yield_context.h"
#define dout_context g_ceph_context
class RGWS3VectorBase : public RGWDefaultResponseOp {
protected:
+ bufferlist in_data;
template<typename T>
int do_init_processing(T& configuration, optional_yield y) {
const auto max_size = s->cct->_conf->rgw_max_put_param_size;
- bufferlist data;
int ret = 0;
- if (std::tie(ret, data) = read_all_input(s, max_size, false); ret < 0) {
+ if (std::tie(ret, in_data) = read_all_input(s, max_size, false); ret < 0) {
ldpp_dout(this, 1) << "ERROR: failed to read JSON s3vector payload, ret = " << ret << dendl;
return ret;
}
- if (data.length() == 0) {
+ if (in_data.length() == 0) {
ldpp_dout(this, 1) << "ERROR: JSON s3vector payload missing" << dendl;
return -EINVAL;
}
JSONParser parser;
- if (!parser.parse(data.c_str(), data.length())) {
+ if (!parser.parse(in_data.c_str(), in_data.length())) {
ldpp_dout(this, 1) << "ERROR: failed to parse JSON s3vector payload" << dendl;
return -EINVAL;
}
}
void execute(optional_yield y) override {
+ const rgw_bucket bucket_id(s->bucket_tenant, configuration.vector_bucket_name);
+ std::unique_ptr<rgw::sal::VectorBucket> bucket;
+ op_ret = driver->load_vector_bucket(this, bucket_id, &bucket, y);
+ if (op_ret < 0) {
+ ldpp_dout(this, 1) << "ERROR: failed to load s3vector bucket " << bucket_id << ". error: " << op_ret << dendl;
+ return;
+ }
op_ret = rgw::s3vector::create_index(configuration, this, y);
}
};
class RGWS3VectorCreateVectorBucket : public RGWS3VectorBase {
rgw::s3vector::create_vector_bucket_t configuration;
+ std::unique_ptr<rgw::sal::VectorBucket> bucket;
int verify_permission(optional_yield y) override {
ldpp_dout(this, 10) << "INFO: verifying permission for s3vector CreateVectorBucket" << dendl;
- // TODO: implement permission check
- /*if (!verify_bucket_permission(this, s, rgw::IAM::s3vectorsCreateVectorBucket)) {
+ if (s->auth.identity->is_anonymous()) {
return -EACCES;
- }*/
+ }
+
+ rgw::ARN arn(rgw::Partition::aws, rgw::Service::s3vectors,
+ "", s->bucket_tenant, configuration.vector_bucket_name);
+ if (!verify_user_permission(this, s, arn, rgw::IAM::s3vectorsCreateVectorBucket, false)) {
+ return -EACCES;
+ }
+
+ if (s->auth.identity->get_tenant() != s->bucket_tenant) {
+ //AssumeRole is meant for cross account access
+ if (s->auth.identity->get_identity_type() != TYPE_ROLE) {
+ ldpp_dout(this, 5) << "WARNING: user cannot create an s3vector bucket in a different tenant"
+ << " (user_id.tenant=" << s->user->get_tenant()
+ << " requested=" << s->bucket_tenant << ")"
+ << dendl;
+ return -EACCES;
+ }
+ }
+
return 0;
}
}
void execute(optional_yield y) override {
+ const rgw_bucket bucket_id(s->bucket_tenant, configuration.vector_bucket_name);
+ int ret = driver->load_vector_bucket(this, bucket_id, &bucket, y);
+ if (ret != -ENOENT) {
+ // TODO: verify attributes from the existing bucket match the requested configuration
+ op_ret = ret;
+ return;
+ }
+
+ const auto& zonegroup = s->penv.site->get_zonegroup();
+
+ rgw::sal::VectorBucket::CreateParams createparams;
+ createparams.owner = s->user->get_id();
+ createparams.zonegroup_id = zonegroup.id;
+ // vector buckets are indexless
+ createparams.index_type = rgw::BucketIndexType::Indexless;
+ createparams.placement_rule.storage_class = s->info.storage_class;
+ if (!driver->is_meta_master()) {
+ // apply bucket creation on the master zone first
+ JSONParser jp;
+ op_ret = rgw_forward_request_to_master(this, *s->penv.site, s->owner.id,
+ &in_data, &jp, s->info, s->err, y);
+ if (op_ret < 0) {
+ return;
+ }
+
+ RGWBucketInfo master_info;
+ JSONDecoder::decode_json("bucket_info", master_info, &jp);
+
+ // update params with info from the master
+ createparams.marker = master_info.bucket.marker;
+ createparams.bucket_id = master_info.bucket.bucket_id;
+ createparams.zonegroup_id = master_info.zonegroup;
+ createparams.quota = master_info.quota;
+ createparams.creation_time = master_info.creation_time;
+ }
+
+ op_ret = bucket->create(this, createparams, y);
+ if (op_ret < 0) {
+ ldpp_dout(this, 1) << "ERROR: failed to create s3vector bucket " << bucket_id << ". error: " << ret << dendl;
+ return;
+ }
op_ret = rgw::s3vector::create_vector_bucket(configuration, this, y);
+ if (op_ret < 0) {
+ ldpp_dout(this, 1) << "ERROR: failed to initialize s3vector bucket " << bucket_id << ". error: " << ret << dendl;
+ return;
+ }
}
+
+ void send_response() override {
+ if (op_ret == -ERR_BUCKET_EXISTS) {
+ const auto eexist_override = s->cct->_conf.get_val<bool>("rgw_bucket_eexist_override");
+ if (! eexist_override) [[likely]] {
+ op_ret = 0;
+ } else {
+ s->err.message = "The requested bucket name is not available. The bucket namespace is shared by all users of the system. Specify a different name and try again.";
+ }
+ }
+
+ if (op_ret) {
+ set_req_state_err(s, op_ret);
+ }
+ dump_errno(s);
+
+ if (op_ret == 0 && s->system_request) {
+ s->format = RGWFormat::JSON;
+ end_header(s, this, to_mime_type(s->format));
+ JSONFormatter f; /* use json formatter for system requests output */
+
+ ceph_assert(bucket);
+ ceph_assert(!bucket->empty());
+ const RGWBucketInfo& info = bucket->get_info();
+ const obj_version& ep_objv = bucket->get_version();
+ f.open_object_section("info");
+ encode_json("entry_point_object_ver", ep_objv, &f);
+ encode_json("object_ver", info.objv_tracker.read_version, &f);
+ encode_json("bucket_info", info, &f);
+ f.close_section();
+ rgw_flush_formatter_and_reset(s, &f);
+ return;
+ }
+ end_header(s);
+ }
+
};
class RGWS3VectorDeleteIndex : public RGWS3VectorBase {
}
void execute(optional_yield y) override {
+ const rgw_bucket bucket_id(s->bucket_tenant, configuration.vector_bucket_name);
+ std::unique_ptr<rgw::sal::VectorBucket> bucket;
+ op_ret = driver->load_vector_bucket(this, bucket_id, &bucket, y);
+ if (op_ret < 0) {
+ ldpp_dout(this, 1) << "ERROR: failed to load s3vector bucket " << bucket_id << ". error: " << op_ret << dendl;
+ return;
+ }
op_ret = rgw::s3vector::delete_index(configuration, this, y);
}
};
class RGWS3VectorDeleteVectorBucket : public RGWS3VectorBase {
+ // TODO: collapse with get_vector_bucket_t an create a common function
+ // to build and verify the ARN in init_processing()
rgw::s3vector::delete_vector_bucket_t configuration;
+ boost::optional<rgw::ARN> arn;
int verify_permission(optional_yield y) override {
ldpp_dout(this, 10) << "INFO: verifying permission for s3vector DeleteVectorBucket" << dendl;
- // TODO: implement permission check
- /*if (!verify_bucket_permission(this, s, rgw::IAM::s3vectorsDeleteVectorBucket)) {
- return -EACCES;
- }*/
+ if (!verify_bucket_permission(this, s, arn.get(), rgw::IAM::s3vectorsDeleteVectorBucket)) {
+ // TODO: ignore failure for now "evaluate_iam_policies: implicit deny from identity-based policy"
+ // return -EACCES;
+ return 0;
+ }
return 0;
}
uint32_t op_mask() override { return RGW_OP_TYPE_WRITE; }
int init_processing(optional_yield y) override {
- return do_init_processing(configuration, y);
+ int ret = do_init_processing(configuration, y);
+ if (ret < 0) {
+ return ret;
+ }
+ if (configuration.vector_bucket_arn.empty()) {
+ arn.emplace(rgw::Partition::aws,
+ rgw::Service::s3vectors,
+ s->penv.site->get_zonegroup().api_name,
+ s->auth.identity->get_tenant(),
+ configuration.vector_bucket_name);
+ } else {
+ arn = rgw::ARN::parse(configuration.vector_bucket_arn);
+ if (!arn) {
+ ldpp_dout(this, 1) << "ERROR: invalid s3vector bucket ARN: " << configuration.vector_bucket_arn << dendl;
+ return -EINVAL;
+ }
+ if (arn->service != rgw::Service::s3vectors ||
+ arn->partition != rgw::Partition::aws ||
+ arn->region != s->penv.site->get_zonegroup().api_name ||
+ arn->account != s->bucket_tenant ||
+ arn->resource.empty()) {
+ ldpp_dout(this, 1) << "ERROR: invalid s3vector bucket ARN service: " << arn->to_string() << dendl;
+ return -EINVAL;
+ }
+ }
+ if (!configuration.vector_bucket_name.empty() &&
+ arn->resource != configuration.vector_bucket_name) {
+ ldpp_dout(this, 1) << "ERROR: s3vector bucket ARN bucket mismatch: " << arn->to_string()
+ << " expected bucket: " << configuration.vector_bucket_name << dendl;
+ return -EINVAL;
+ }
+
+ s->bucket_name = arn->resource;
+ ldpp_dout(this, 20) << "INFO: s3vector bucket ARN: " << arn.get() << dendl;
+ return 0;
}
void execute(optional_yield y) override {
- op_ret = rgw::s3vector::delete_vector_bucket(configuration, this, y);
+ {
+ JSONFormatter f;
+ configuration.dump(&f);
+ std::stringstream ss;
+ f.flush(ss);
+ ldpp_dout(this, 20) << "INFO: executing s3vector DeleteVectorBucket with: " << ss.str() << dendl;
+ }
+ const rgw_bucket bucket_id(s->bucket_tenant, configuration.vector_bucket_name);
+ std::unique_ptr<rgw::sal::VectorBucket> bucket;
+ op_ret = driver->load_vector_bucket(this, bucket_id, &bucket, y);
+ if (op_ret < 0) {
+ ldpp_dout(this, 1) << "ERROR: failed to load s3vector bucket " << bucket_id << ". error: " << op_ret << dendl;
+ return;
+ }
+ op_ret = bucket->remove(this, false, y);
+ if (op_ret < 0) {
+ ldpp_dout(this, 1) << "ERROR: failed to delete s3vector bucket " << bucket_id << ". error: " << op_ret << dendl;
+ return;
+ }
+ // TODO: verify bucket is empty before deletion (or support force delete)
+ // rgw::s3vector::delete_vector_bucket(configuration, this, y);
}
};
}
void execute(optional_yield y) override {
+ const rgw_bucket bucket_id(s->bucket_tenant, configuration.vector_bucket_name);
+ std::unique_ptr<rgw::sal::VectorBucket> bucket;
+ op_ret = driver->load_vector_bucket(this, bucket_id, &bucket, y);
+ if (op_ret < 0) {
+ ldpp_dout(this, 1) << "ERROR: failed to load s3vector bucket " << bucket_id << ". error: " << op_ret << dendl;
+ return;
+ }
op_ret = rgw::s3vector::delete_vector_bucket_policy(configuration, this, y);
}
};
}
void execute(optional_yield y) override {
+ const rgw_bucket bucket_id(s->bucket_tenant, configuration.vector_bucket_name);
+ std::unique_ptr<rgw::sal::VectorBucket> bucket;
+ op_ret = driver->load_vector_bucket(this, bucket_id, &bucket, y);
+ if (op_ret < 0) {
+ ldpp_dout(this, 1) << "ERROR: failed to load s3vector bucket " << bucket_id << ". error: " << op_ret << dendl;
+ return;
+ }
op_ret = rgw::s3vector::put_vectors(configuration, this, y);
}
};
}
void execute(optional_yield y) override {
+ const rgw_bucket bucket_id(s->bucket_tenant, configuration.vector_bucket_name);
+ std::unique_ptr<rgw::sal::VectorBucket> bucket;
+ op_ret = driver->load_vector_bucket(this, bucket_id, &bucket, y);
+ if (op_ret < 0) {
+ ldpp_dout(this, 1) << "ERROR: failed to load s3vector bucket " << bucket_id << ". error: " << op_ret << dendl;
+ return;
+ }
op_ret = rgw::s3vector::get_vectors(configuration, this, y);
}
};
}
void execute(optional_yield y) override {
+ const rgw_bucket bucket_id(s->bucket_tenant, configuration.vector_bucket_name);
+ std::unique_ptr<rgw::sal::VectorBucket> bucket;
+ op_ret = driver->load_vector_bucket(this, bucket_id, &bucket, y);
+ if (op_ret < 0) {
+ ldpp_dout(this, 1) << "ERROR: failed to load s3vector bucket " << bucket_id << ". error: " << op_ret << dendl;
+ return;
+ }
op_ret = rgw::s3vector::list_vectors(configuration, this, y);
}
};
class RGWS3VectorListVectorBuckets : public RGWS3VectorBase {
rgw::s3vector::list_vector_buckets_t configuration;
+ rgw::sal::BucketList listing;
+ std::string zonegroup_name;
int verify_permission(optional_yield y) override {
ldpp_dout(this, 10) << "INFO: verifying permission for s3vector ListVectorBuckets" << dendl;
- // TODO: implement permission check
- /*if (!verify_bucket_permission(this, s, rgw::IAM::s3vectorsListVectorBuckets)) {
+ if (s->auth.identity->is_anonymous()) {
return -EACCES;
- }*/
+ }
+
+ zonegroup_name = s->penv.site->get_zonegroup().api_name;
+ rgw::ARN arn = rgw::ARN(rgw::Partition::aws,
+ rgw::Service::s3vectors,
+ zonegroup_name,
+ s->auth.identity->get_tenant(),
+ "*");
+ if (!verify_user_permission(this, s, arn, rgw::IAM::s3vectorsListVectorBuckets, false)) {
+ return -EACCES;
+ }
return 0;
}
}
void execute(optional_yield y) override {
- op_ret = rgw::s3vector::list_vector_buckets(configuration, this, y);
+ {
+ JSONFormatter f;
+ configuration.dump(&f);
+ std::stringstream ss;
+ f.flush(ss);
+ ldpp_dout(this, 20) << "INFO: executing s3vector ListVectorBuckets with: " << ss.str() << dendl;
+ }
+
+ op_ret = driver->list_vector_buckets(this, s->owner.id, s->auth.identity->get_tenant(),
+ configuration.next_token, "", configuration.max_results, listing, y);
+ if (op_ret < 0) {
+ ldpp_dout(this, 20) << "ERROR: failed to execute ListVectorBuckets. error: " << op_ret << dendl;
+ return;
+ }
+ }
+
+ void send_response() override {
+ if (op_ret < 0) {
+ set_req_state_err(s, op_ret);
+ }
+ dump_errno(s);
+ end_header(s, this, "application/json");
+ // convert BucketList to AWS S3 Vector Buckets format
+ JSONFormatter f;
+ f.open_object_section("");
+ if (!listing.next_marker.empty()) {
+ ::encode_json("nextToken", listing.next_marker, &f);
+ }
+ f.open_array_section("vectorBuckets");
+ for (const auto& bucket : listing.buckets) {
+ f.open_object_section("");
+ ::encode_json("creationTime", ceph::to_iso_8601(bucket.creation_time), &f);
+ rgw::ARN arn(rgw::Partition::aws,
+ rgw::Service::s3vectors,
+ zonegroup_name,
+ s->auth.identity->get_tenant(),
+ bucket.bucket.name);
+ ::encode_json("vectorBucketArn", arn.to_string(), &f);
+ ::encode_json("vectorBucketName", bucket.bucket.name, &f);
+ f.close_section();
+ }
+ f.close_section(); // vectorBuckets
+ f.close_section(); // root object
+ std::stringstream ss;
+ f.flush(ss);
+ dump_body(s, ss.str());
}
};
class RGWS3VectorGetVectorBucket : public RGWS3VectorBase {
rgw::s3vector::get_vector_bucket_t configuration;
+ std::unique_ptr<rgw::sal::VectorBucket> bucket;
+ boost::optional<rgw::ARN> arn;
int verify_permission(optional_yield y) override {
ldpp_dout(this, 10) << "INFO: verifying permission for s3vector GetVectorBucket" << dendl;
- // TODO: implement permission check
- /*if (!verify_bucket_permission(this, s, rgw::IAM::s3vectorsGetVectorBucket)) {
- return -EACCES;
- }*/
+
+ if (!verify_bucket_permission(this, s, arn.get(), rgw::IAM::s3vectorsGetVectorBucket)) {
+ // TODO: ignore failure for now "evaluate_iam_policies: implicit deny from identity-based policy"
+ // return -EACCES;
+ return 0;
+ }
return 0;
}
uint32_t op_mask() override { return RGW_OP_TYPE_READ; }
int init_processing(optional_yield y) override {
- return do_init_processing(configuration, y);
+ int ret = do_init_processing(configuration, y);
+ if (ret < 0) {
+ return ret;
+ }
+ if (configuration.vector_bucket_arn.empty()) {
+ arn.emplace(rgw::Partition::aws,
+ rgw::Service::s3vectors,
+ s->penv.site->get_zonegroup().api_name,
+ s->auth.identity->get_tenant(),
+ configuration.vector_bucket_name);
+ } else {
+ arn = rgw::ARN::parse(configuration.vector_bucket_arn);
+ if (!arn) {
+ ldpp_dout(this, 1) << "ERROR: invalid s3vector bucket ARN: " << configuration.vector_bucket_arn << dendl;
+ return -EINVAL;
+ }
+ if (arn->service != rgw::Service::s3vectors ||
+ arn->partition != rgw::Partition::aws ||
+ arn->region != s->penv.site->get_zonegroup().api_name ||
+ arn->account != s->bucket_tenant ||
+ arn->resource.empty()) {
+ ldpp_dout(this, 1) << "ERROR: invalid s3vector bucket ARN service: " << arn->to_string() << dendl;
+ return -EINVAL;
+ }
+ }
+ if (!configuration.vector_bucket_name.empty() &&
+ arn->resource != configuration.vector_bucket_name) {
+ ldpp_dout(this, 1) << "ERROR: s3vector bucket ARN bucket mismatch: " << arn->to_string()
+ << " expected bucket: " << configuration.vector_bucket_name << dendl;
+ return -EINVAL;
+ }
+
+ s->bucket_name = arn->resource;
+ ldpp_dout(this, 20) << "INFO: s3vector bucket ARN: " << arn.get() << dendl;
+ return 0;
}
void execute(optional_yield y) override {
- op_ret = rgw::s3vector::get_vector_bucket(configuration, this, y);
+ {
+ JSONFormatter f;
+ configuration.dump(&f);
+ std::stringstream ss;
+ f.flush(ss);
+ ldpp_dout(this, 20) << "INFO: executing s3vector GetVectorBucket with: " << ss.str() << dendl;
+ }
+ const rgw_bucket bucket_id(s->bucket_tenant, configuration.vector_bucket_name);
+ op_ret = driver->load_vector_bucket(this, bucket_id, &bucket, y);
+ if (op_ret < 0) {
+ ldpp_dout(this, 1) << "ERROR: failed to load s3vector bucket " << bucket_id << ". error: " << op_ret << dendl;
+ }
+ }
+
+ void send_response() override {
+ if (op_ret < 0) {
+ set_req_state_err(s, op_ret);
+ }
+ dump_errno(s);
+ end_header(s, this, "application/json");
+ // convert BucketList to AWS S3 Vector Buckets format
+ JSONFormatter f;
+ f.open_object_section("");
+ f.open_object_section("vectorBucket");
+ ::encode_json("creationTime", ceph::to_iso_8601(bucket->get_creation_time()), &f);
+ rgw::ARN arn(rgw::Partition::aws, rgw::Service::s3vectors,
+ "", bucket->get_tenant(), bucket->get_name());
+ ::encode_json("vectorBucketArn", arn.to_string(), &f);
+ ::encode_json("vectorBucketName", bucket->get_name(), &f);
+ f.close_section(); // vectorBucket
+ f.close_section(); // root object
+ std::stringstream ss;
+ f.flush(ss);
+ dump_body(s, ss.str());
}
};
}
void execute(optional_yield y) override {
+ const rgw_bucket bucket_id(s->bucket_tenant, configuration.vector_bucket_name);
+ std::unique_ptr<rgw::sal::VectorBucket> bucket;
+ op_ret = driver->load_vector_bucket(this, bucket_id, &bucket, y);
+ if (op_ret < 0) {
+ ldpp_dout(this, 1) << "ERROR: failed to load s3vector bucket " << bucket_id << ". error: " << op_ret << dendl;
+ return;
+ }
op_ret = rgw::s3vector::get_index(configuration, this, y);
}
};
}
void execute(optional_yield y) override {
+ const rgw_bucket bucket_id(s->bucket_tenant, configuration.vector_bucket_name);
+ std::unique_ptr<rgw::sal::VectorBucket> bucket;
+ op_ret = driver->load_vector_bucket(this, bucket_id, &bucket, y);
+ if (op_ret < 0) {
+ ldpp_dout(this, 1) << "ERROR: failed to load s3vector bucket " << bucket_id << ". error: " << op_ret << dendl;
+ return;
+ }
op_ret = rgw::s3vector::list_indexes(configuration, this, y);
}
};
}
void execute(optional_yield y) override {
+ const rgw_bucket bucket_id(s->bucket_tenant, configuration.vector_bucket_name);
+ std::unique_ptr<rgw::sal::VectorBucket> bucket;
+ op_ret = driver->load_vector_bucket(this, bucket_id, &bucket, y);
+ if (op_ret < 0) {
+ ldpp_dout(this, 1) << "ERROR: failed to load s3vector bucket " << bucket_id << ". error: " << op_ret << dendl;
+ return;
+ }
op_ret = rgw::s3vector::put_vector_bucket_policy(configuration, this, y);
}
};
}
void execute(optional_yield y) override {
+ const rgw_bucket bucket_id(s->bucket_tenant, configuration.vector_bucket_name);
+ std::unique_ptr<rgw::sal::VectorBucket> bucket;
+ op_ret = driver->load_vector_bucket(this, bucket_id, &bucket, y);
+ if (op_ret < 0) {
+ ldpp_dout(this, 1) << "ERROR: failed to load s3vector bucket " << bucket_id << ". error: " << op_ret << dendl;
+ return;
+ }
op_ret = rgw::s3vector::get_vector_bucket_policy(configuration, this, y);
}
};
}
void execute(optional_yield y) override {
+ const rgw_bucket bucket_id(s->bucket_tenant, configuration.vector_bucket_name);
+ std::unique_ptr<rgw::sal::VectorBucket> bucket;
+ op_ret = driver->load_vector_bucket(this, bucket_id, &bucket, y);
+ if (op_ret < 0) {
+ ldpp_dout(this, 1) << "ERROR: failed to load s3vector bucket " << bucket_id << ". error: " << op_ret << dendl;
+ return;
+ }
op_ret = rgw::s3vector::delete_vectors(configuration, this, y);
}
};
}
void execute(optional_yield y) override {
+ const rgw_bucket bucket_id(s->bucket_tenant, configuration.vector_bucket_name);
+ std::unique_ptr<rgw::sal::VectorBucket> bucket;
+ op_ret = driver->load_vector_bucket(this, bucket_id, &bucket, y);
+ if (op_ret < 0) {
+ ldpp_dout(this, 1) << "ERROR: failed to load s3vector bucket " << bucket_id << ". error: " << op_ret << dendl;
+ return;
+ }
op_ret = rgw::s3vector::query_vectors(configuration, this, y);
}
};
#include "common/Formatter.h"
#include "common/dout.h"
#include <fmt/format.h>
+#include "rgw_sal.h"
#define dout_subsys ceph_subsys_rgw
}
int create_vector_bucket(const create_vector_bucket_t& configuration, DoutPrefixProvider* dpp, optional_yield y) {
- JSONFormatter f;
- configuration.dump(&f);
- std::stringstream ss;
- f.flush(ss);
- ldpp_dout(dpp, 20) << "INFO: executing s3vector CreateVectorBucket with: " << ss.str() << dendl;
return 0;
}
}
int delete_vector_bucket(const delete_vector_bucket_t& configuration, DoutPrefixProvider* dpp, optional_yield y) {
- JSONFormatter f;
- configuration.dump(&f);
- std::stringstream ss;
- f.flush(ss);
- ldpp_dout(dpp, 20) << "INFO: executing s3vector DeleteVectorBucket with: " << ss.str() << dendl;
return 0;
}
decode_name_or_arn("vectorBucketName", "vectorBucketArn", vector_bucket_name, vector_bucket_arn, obj);
}
-int list_vector_buckets(const list_vector_buckets_t& configuration, DoutPrefixProvider* dpp, optional_yield y) {
- JSONFormatter f;
- configuration.dump(&f);
- std::stringstream ss;
- f.flush(ss);
- ldpp_dout(dpp, 20) << "INFO: executing s3vector ListVectorBuckets with: " << ss.str() << dendl;
- return 0;
-}
-
void get_index_t::dump(ceph::Formatter* f) const {
f->open_object_section("");
::encode_json("indexArn", index_arn, f);
decode_name("vectorBucketName", vector_bucket_name, obj);
}
-int get_vector_bucket(const get_vector_bucket_t& configuration, DoutPrefixProvider* dpp, optional_yield y) {
- JSONFormatter f;
- configuration.dump(&f);
- std::stringstream ss;
- f.flush(ss);
- ldpp_dout(dpp, 20) << "INFO: executing s3vector GetVectorBucket with: " << ss.str() << dendl;
- return 0;
-}
-
void list_indexes_t::dump(ceph::Formatter* f) const {
f->open_object_section("");
::encode_json("maxResults", max_results, f);
class JSONObj;
class DoutPrefixProvider;
+struct rgw_bucket;
+
+namespace rgw::sal {
+class Driver;
+}
+
namespace rgw::s3vector {
enum class DistanceMetric {
COSINE,
int put_vectors(const put_vectors_t& configuration, DoutPrefixProvider* dpp, optional_yield y);
int get_vectors(const get_vectors_t& configuration, DoutPrefixProvider* dpp, optional_yield y);
int list_vectors(const list_vectors_t& configuration, DoutPrefixProvider* dpp, optional_yield y);
-int list_vector_buckets(const list_vector_buckets_t& configuration, DoutPrefixProvider* dpp, optional_yield y);
-int get_vector_bucket(const get_vector_bucket_t& configuration, DoutPrefixProvider* dpp, optional_yield y);
int get_index(const get_index_t& configuration, DoutPrefixProvider* dpp, optional_yield y);
int list_indexes(const list_indexes_t& configuration, DoutPrefixProvider* dpp, optional_yield y);
int put_vector_bucket_policy(const put_vector_bucket_policy_t& configuration, DoutPrefixProvider* dpp, optional_yield y);
struct MPSerializer;
class GCChain;
class RGWRole;
+class VectorBucket;
enum AttrsMod {
ATTRSMOD_NONE = 0,
/** Check to see if this placement rule is valid */
virtual bool valid_placement(const rgw_placement_rule& rule) = 0;
+ /** Load a VectorBucket by key. Queries driver for vector bucket info. On -ENOENT, the
+ * bucket must still be allocated to support bucket->create(). */
+ virtual int load_vector_bucket(const DoutPrefixProvider* dpp, const rgw_bucket& b,
+ std::unique_ptr<VectorBucket>* bucket, optional_yield y) = 0;
+ /** List the vector buckets of a given owner */
+ virtual int list_vector_buckets(const DoutPrefixProvider* dpp,
+ const rgw_owner& owner, const std::string& tenant,
+ const std::string& marker, const std::string& end_marker,
+ uint64_t max, BucketList& buckets,
+ optional_yield y) = 0;
+
/** Shut down background tasks, to be called while Asio is running. */
virtual void shutdown(void) { };
virtual void set_luarocks_path(const std::string& path) = 0;
};
+class VectorBucket {
+ public:
+ VectorBucket() = default;
+ virtual ~VectorBucket() = default;
+
+ /** Remove this vector bucket from the backing store */
+ virtual int remove(const DoutPrefixProvider* dpp, bool delete_children, optional_yield y) = 0;
+
+ using CreateParams = Bucket::CreateParams;
+ /// Create this bucket in the backing store.
+ virtual int create(const DoutPrefixProvider* dpp,
+ const CreateParams& params,
+ optional_yield y) = 0;
+
+ /** Get the cached attributes associated with this vector bucket */
+ virtual Attrs& get_attrs(void) = 0;
+ /** Set the cached attributes on this vector bucket */
+ virtual int set_attrs(Attrs a) = 0;
+ /** Load this vector bucket from the backing store. Requires the key to be set, fills other fields. */
+ virtual int load_bucket(const DoutPrefixProvider* dpp, optional_yield y) = 0;
+ /** Get the owner of this vector bucket */
+ virtual const rgw_owner& get_owner() const = 0;
+ /** Check in the backing store if this vector bucket is empty */
+ virtual int check_empty(const DoutPrefixProvider* dpp, optional_yield y) = 0;
+ /** Check if this instantiation is empty */
+ virtual bool empty() const = 0;
+ /** Get the cached name of this vector bucket */
+ virtual const std::string& get_name() const = 0;
+ /** Get the cached tenant of this vector bucket */
+ virtual const std::string& get_tenant() const = 0;
+ /** Get the cached marker of this vector bucket */
+ virtual const std::string& get_marker() const = 0;
+ /** Get the cached ID of this vector bucket */
+ virtual const std::string& get_bucket_id() const = 0;
+ /** Get the cached placement rule of this vector bucket */
+ virtual const rgw_placement_rule& get_placement_rule() const = 0;
+ /** Get the cached creation time of this vector bucket */
+ virtual ceph::real_time get_creation_time() const = 0;
+ /** Get the cached modification time of this vector bucket */
+ virtual const ceph::real_time& get_modification_time() const = 0;
+ /** Get the cached version of this vector bucket */
+ virtual const obj_version& get_version() const = 0;
+ /** Set the cached version of this vector bucket */
+ virtual void set_version(obj_version &ver) = 0;
+ /** Get the key for this vector bucket */
+ virtual const rgw_bucket& get_key() const = 0;
+ /** Get the info for this vector bucket */
+ virtual const RGWBucketInfo& get_info() const = 0;
+
+ /** Check if a VectorBucket pointer is empty */
+ static bool empty(const VectorBucket* b) { return (!b || b->empty()); }
+ /** Check if a VectorBucket unique pointer is empty */
+ static bool empty(const std::unique_ptr<VectorBucket>& b) { return (!b || b->empty()); }
+ /** Clone a copy of this vector bucket. Used when modification is necessary of the copy */
+ virtual std::unique_ptr<VectorBucket> clone() = 0;
+
+ /** Print the User to @a out */
+ virtual void print(std::ostream& out) const = 0;
+
+ friend inline std::ostream& operator<<(std::ostream& out, const VectorBucket& b) {
+ b.print(out);
+ return out;
+ }
+
+ friend inline std::ostream& operator<<(std::ostream& out, const VectorBucket* b) {
+ if (!b)
+ out << "<NULL>";
+ else
+ b->print(out);
+ return out;
+ }
+
+ friend inline std::ostream& operator<<(std::ostream& out, const std::unique_ptr<VectorBucket>& p) {
+ out << p.get();
+ return out;
+ }
+
+ virtual bool operator==(const VectorBucket& b) const = 0;
+ virtual bool operator!=(const VectorBucket& b) const = 0;
+
+ /** Store the cached bucket info into the backing store */
+ virtual int put_info(const DoutPrefixProvider* dpp, bool exclusive, ceph::real_time mtime, optional_yield y) = 0;
+
+ /** Try to refresh the cached bucket info from the backing store. Used in
+ * read-modify-update loop. */
+ virtual int try_refresh_info(const DoutPrefixProvider* dpp, ceph::real_time* pmtime, optional_yield y) = 0;
+
+ // TODO check if we need these API from Bucket class:
+ /** Read the bucket stats from the backing Store, synchronous */
+ //virtual int read_stats(const DoutPrefixProvider *dpp, optional_yield y,
+ // const bucket_index_layout_generation& idx_layout,
+ // int shard_id, std::string* bucket_ver, std::string* master_ver,
+ // std::map<RGWObjCategory, RGWStorageStats>& stats,
+ // std::string* max_marker = nullptr,
+ // bool* syncstopped = nullptr) = 0;
+ /** Read the bucket stats from the backing Store, asynchronous */
+ //virtual int read_stats_async(const DoutPrefixProvider *dpp,
+ // const bucket_index_layout_generation& idx_layout,
+ // int shard_id, boost::intrusive_ptr<ReadStatsCB> cb) = 0;
+ /** Sync this bucket's stats to the owning user's stats in the backing store */
+ //virtual int sync_owner_stats(const DoutPrefixProvider *dpp, optional_yield y,
+ // RGWBucketEnt* optional_ent) = 0;
+ /** Change the owner of this bucket in the backing store. Current owner must be set. Does not
+ * change ownership of the objects in the bucket. */
+ //virtual int chown(const DoutPrefixProvider* dpp,
+ // const rgw_owner& new_owner,
+ // const std::string& new_owner_name,
+ // optional_yield y) = 0;
+ /** Check if the given size fits within the quota */
+ // virtual int check_quota(const DoutPrefixProvider *dpp, RGWQuota& quota, uint64_t obj_size, optional_yield y, bool check_size_only = false) = 0;
+ /** Set the attributes in attrs, leaving any other existing attrs set, and
+ * write them to the backing store; a merge operation */
+ //virtual int merge_and_store_attrs(const DoutPrefixProvider* dpp, Attrs& new_attrs, optional_yield y) = 0;
+ /** Read usage information about this bucket from the backing store */
+ //virtual int read_usage(const DoutPrefixProvider *dpp, uint64_t start_epoch, uint64_t end_epoch, uint32_t max_entries,
+ // bool* is_truncated, RGWUsageIter& usage_iter,
+ // std::map<rgw_user_bucket, rgw_usage_log_entry>& usage) = 0;
+ /** Trim the usage information to the given epoch range */
+ //virtual int trim_usage(const DoutPrefixProvider *dpp, uint64_t start_epoch, uint64_t end_epoch, optional_yield y) = 0;
+ /** Remove this specific vector bucket instance from the backing store. May be removed from API */
+ //virtual int purge_instance(const DoutPrefixProvider* dpp, optional_yield y) = 0;
+};
+
/** @} namespace rgw::sal in group RGWSAL */
} } // namespace rgw::sal
virtual const std::string& get_compression_type(const rgw_placement_rule& rule) override;
virtual bool valid_placement(const rgw_placement_rule& rule) override;
+ int load_vector_bucket(const DoutPrefixProvider* dpp, const rgw_bucket& b,
+ std::unique_ptr<VectorBucket>* bucket, optional_yield y) override { return -ENOTSUP; }
+ int list_vector_buckets(const DoutPrefixProvider* dpp,
+ const rgw_owner& owner, const std::string& tenant,
+ const std::string& marker, const std::string& end_marker,
+ uint64_t max, BucketList& buckets,
+ optional_yield y) override { return -ENOTSUP; }
virtual void finalize(void) override;
virtual const std::string& get_compression_type(const rgw_placement_rule& rule) override;
virtual bool valid_placement(const rgw_placement_rule& rule) override;
+ int load_vector_bucket(const DoutPrefixProvider* dpp, const rgw_bucket& b,
+ std::unique_ptr<VectorBucket>* bucket, optional_yield y) override {
+ return next->load_vector_bucket(dpp, b, bucket, y);
+ }
+ int list_vector_buckets(const DoutPrefixProvider* dpp,
+ const rgw_owner& owner, const std::string& tenant,
+ const std::string& marker, const std::string& end_marker,
+ uint64_t max, BucketList& buckets,
+ optional_yield y) override {
+ return next->list_vector_buckets(dpp, owner, tenant, marker,
+ end_marker, max, buckets, y);
+ }
virtual void shutdown(void) override { next->shutdown(); };
friend class BucketList;
};
+class StoreVectorBucket : public VectorBucket {
+ protected:
+ RGWBucketInfo info;
+ Attrs attrs;
+ obj_version bucket_version;
+ ceph::real_time mtime;
+
+ public:
+
+ StoreVectorBucket() = default;
+ StoreVectorBucket(const rgw_bucket& b) { info.bucket = b; }
+ StoreVectorBucket(const RGWBucketInfo& i) : info(i) {}
+ virtual ~StoreVectorBucket() = default;
+
+ virtual Attrs& get_attrs(void) override { return attrs; }
+ virtual int set_attrs(Attrs a) override { attrs = a; return 0; }
+ const rgw_owner& get_owner() const override { return info.owner; }
+ bool empty() const override { return info.bucket.name.empty(); }
+ const std::string& get_name() const override { return info.bucket.name; }
+ const std::string& get_tenant() const override { return info.bucket.tenant; }
+ const std::string& get_marker() const override { return info.bucket.marker; }
+ const std::string& get_bucket_id() const override { return info.bucket.bucket_id; }
+ const rgw_placement_rule& get_placement_rule() const override { return info.placement_rule; }
+ ceph::real_time get_creation_time() const override { return info.creation_time; }
+ const ceph::real_time& get_modification_time() const override { return mtime; }
+ const obj_version& get_version() const override { return bucket_version; }
+ void set_version(obj_version &ver) override { bucket_version = ver; }
+ const rgw_bucket& get_key() const override { return info.bucket; }
+ const RGWBucketInfo& get_info() const override { return info; }
+ void print(std::ostream& out) const override { out << info.bucket; }
+ bool operator==(const VectorBucket& b) const override {
+ if (typeid(*this) != typeid(b)) {
+ return false;
+ }
+ const StoreVectorBucket& sb = dynamic_cast<const StoreVectorBucket&>(b);
+
+ return (info.bucket.tenant == sb.info.bucket.tenant) &&
+ (info.bucket.name == sb.info.bucket.name) &&
+ (info.bucket.bucket_id == sb.info.bucket.bucket_id);
+ }
+ virtual bool operator!=(const VectorBucket& b) const override {
+ if (typeid(*this) != typeid(b)) {
+ return false;
+ }
+ const StoreVectorBucket& sb = dynamic_cast<const StoreVectorBucket&>(b);
+
+ return (info.bucket.tenant != sb.info.bucket.tenant) ||
+ (info.bucket.name != sb.info.bucket.name) ||
+ (info.bucket.bucket_id != sb.info.bucket.bucket_id);
+ }
+};
+
class StoreObject : public Object {
protected:
RGWObjState state;
JSONDecoder::decode_json("tier_config", tier_config, obj);
JSONDecoder::decode_json("realm_id", realm_id, obj);
JSONDecoder::decode_json("restore_pool", restore_pool, obj);
+ JSONDecoder::decode_json("vector_pool", vector_pool, obj);
}
void RGWZoneParams::dump(Formatter *f) const
encode_json("tier_config", tier_config, f);
encode_json("realm_id", realm_id, f);
encode_json("restore_pool", restore_pool, f);
+ encode_json("vector_pool", vector_pool, f);
}
rgw_pool RGWZoneParams::get_pool(CephContext *cct) const
pools.insert(info.topics_pool);
pools.insert(info.account_pool);
pools.insert(info.group_pool);
+ pools.insert(info.vector_pool);
for (const auto& [pname, placement] : info.placement_pools) {
pools.insert(placement.index_pool);
fix_zone_pool_dup(pools, info.name, ".rgw.meta:topics", info.topics_pool);
info.account_pool = fix_zone_pool_dup(pools, info.name, ".rgw.meta:accounts", info.account_pool);
info.group_pool = fix_zone_pool_dup(pools, info.name, ".rgw.meta:groups", info.group_pool);
+ info.vector_pool = fix_zone_pool_dup(pools, info.name, ".rgw.meta:vector", info.vector_pool);
for (auto& [pname, placement] : info.placement_pools) {
placement.index_pool = fix_zone_pool_dup(pools, info.name, "." + default_bucket_index_pool_suffix, placement.index_pool);
rgw_pool group_pool;
rgw_pool dedup_pool;
rgw_pool bucket_logging_pool;
+ rgw_pool vector_pool;
RGWAccessKey system_key;
const std::string& get_compression_type(const rgw_placement_rule& placement_rule) const;
void encode(bufferlist& bl) const {
- ENCODE_START(18, 1, bl);
+ ENCODE_START(19, 1, bl);
encode(domain_root, bl);
encode(control_pool, bl);
encode(gc_pool, bl);
encode(restore_pool, bl);
encode(dedup_pool, bl);
encode(bucket_logging_pool, bl);
+ encode(vector_pool, bl);
ENCODE_FINISH(bl);
}
void decode(bufferlist::const_iterator& bl) {
- DECODE_START(18, bl);
+ DECODE_START(19, bl);
decode(domain_root, bl);
decode(control_pool, bl);
decode(gc_pool, bl);
} else {
bucket_logging_pool = log_pool.name + ":logging";
}
+ if (struct_v >= 19) {
+ decode(vector_pool, bl);
+ } else {
+ vector_pool = name + ".rgw..meta.vector";
+ }
DECODE_FINISH(bl);
}
void dump(Formatter *f) const;
using namespace std;
-static const std::string instance_oid_prefix = ".bucket.meta.";
-
// convert bucket instance oids back to the tenant/ format for metadata keys.
// it's safe to parse 'tenant:' only for oids, because they won't contain the
// optional :shard at the end
-static std::string instance_meta_key_to_oid(const std::string& metadata_key)
+std::string RGWSI_Bucket_SObj::instance_meta_key_to_oid(const std::string& metadata_key) const
{
- std::string oid = string_cat_reserve(instance_oid_prefix, metadata_key);
+ const auto& prefix = instance_oid_prefix();
+ std::string oid = string_cat_reserve(prefix, metadata_key);
// replace tenant/ with tenant:
- auto c = oid.find('/', instance_oid_prefix.size());
+ auto c = oid.find('/', prefix.size());
if (c != string::npos) {
oid[c] = ':';
}
// convert bucket instance oids back to the tenant/ format for metadata keys.
// it's safe to parse 'tenant:' only for oids, because they won't contain the
// optional :shard at the end
-static std::string instance_oid_to_meta_key(const std::string& oid)
+std::string RGWSI_Bucket_SObj::instance_oid_to_meta_key(const std::string& oid) const
{
- if (oid.size() < instance_oid_prefix.size()) { /* just sanity check */
+ const auto prefix_size = instance_oid_prefix().size();
+ if (oid.size() < prefix_size) { /* just sanity check */
return string();
}
- std::string key = oid.substr(instance_oid_prefix.size());
+ std::string key = oid.substr(prefix_size);
// find first : (could be tenant:bucket or bucket:instance)
auto c = key.find(':');
return key;
}
+std::string RGWSI_Bucket_SObj::get_cache_key(const std::string& key) const {
+ std::string cache_key("bi/");
+ cache_key.append(key);
+ return cache_key;
+}
RGWSI_Bucket_SObj::RGWSI_Bucket_SObj(CephContext *cct): RGWSI_Bucket(cct) {
}
}
};
+const std::string& RGWSI_Bucket_SObj::instance_oid_prefix() const
+{
+ static const std::string instance_oid_prefix = ".bucket.meta.";
+ return instance_oid_prefix;
+}
+
+const rgw_pool& RGWSI_Bucket_SObj::get_entrypoint_pool() const
+{
+ return svc.zone->get_zone_params().domain_root;
+}
+
int RGWSI_Bucket_SObj::create_entrypoint_lister(
const DoutPrefixProvider* dpp,
const std::string& marker,
std::unique_ptr<RGWMetadataLister>& lister)
{
- const rgw_pool& pool = svc.zone->get_zone_params().domain_root;
+ const rgw_pool& pool = get_entrypoint_pool();
auto p = std::make_unique<BucketEntrypointLister>(svc.sysobj->get_pool(pool));
int r = p->init(dpp, marker, ""); // empty prefix
if (r < 0) {
return 0;
}
+const rgw_pool& RGWSI_VectorBucket_SObj::get_entrypoint_pool() const
+{
+ return svc.zone->get_zone_params().vector_pool;
+}
class BucketInstanceLister : public RGWMetadataLister {
+ const RGWSI_Bucket_SObj* const bucket_sobj;
public:
- using RGWMetadataLister::RGWMetadataLister;
+ BucketInstanceLister(RGWSI_SysObj::Pool pool, const RGWSI_Bucket_SObj* _bucket_sobj) :
+ RGWMetadataLister(pool),
+ bucket_sobj(_bucket_sobj) {}
+ ~BucketInstanceLister() override = default;
void filter_transform(std::vector<std::string>& oids,
std::list<std::string>& keys) override
// transform instance oids to metadata keys
std::transform(oids.begin(), oids.end(),
std::back_inserter(keys),
- instance_oid_to_meta_key);
+ [this](const std::string& oid) {
+ return bucket_sobj->instance_oid_to_meta_key(oid);
+ });
}
};
const std::string& marker,
std::unique_ptr<RGWMetadataLister>& lister)
{
- const rgw_pool& pool = svc.zone->get_zone_params().domain_root;
- auto p = std::make_unique<BucketInstanceLister>(svc.sysobj->get_pool(pool));
- int r = p->init(dpp, marker, instance_oid_prefix);
+ const rgw_pool& pool = get_entrypoint_pool();
+ auto p = std::make_unique<BucketInstanceLister>(svc.sysobj->get_pool(pool), this);
+ int r = p->init(dpp, marker, instance_oid_prefix());
if (r < 0) {
return r;
}
rgw_cache_entry_info *cache_info,
boost::optional<obj_version> refresh_version)
{
- const rgw_pool& pool = svc.zone->get_zone_params().domain_root;
+ const rgw_pool& pool = get_entrypoint_pool();
bufferlist bl;
int ret = rgw_get_system_obj(svc.sysobj, pool, key, bl,
objv_tracker, pmtime, y, dpp,
return 0;
}
+
+int RGWSI_Bucket_SObj::complete_entry(const DoutPrefixProvider* dpp, optional_yield y,
+ const std::string& section, const std::string& key,
+ const RGWObjVersionTracker* objv) {
+ return svc.mdlog->complete_entry(dpp, y, section, key, objv);
+}
+
int RGWSI_Bucket_SObj::store_bucket_entrypoint_info(const string& key,
RGWBucketEntryPoint& info,
bool exclusive,
bufferlist bl;
encode(info, bl);
- const rgw_pool& pool = svc.zone->get_zone_params().domain_root;
+ const rgw_pool& pool = get_entrypoint_pool();
int ret = rgw_put_system_obj(dpp, svc.sysobj, pool, key, bl, exclusive,
objv_tracker, mtime, y, pattrs);
if (ret < 0) {
return ret;
}
- return svc.mdlog->complete_entry(dpp, y, "bucket", key, objv_tracker);
+ return complete_entry(dpp, y, "bucket", key, objv_tracker);
}
int RGWSI_Bucket_SObj::remove_bucket_entrypoint_info(const string& key,
optional_yield y,
const DoutPrefixProvider *dpp)
{
- const rgw_pool& pool = svc.zone->get_zone_params().domain_root;
+ const rgw_pool& pool = get_entrypoint_pool();
int ret = rgw_delete_system_obj(dpp, svc.sysobj, pool, key, objv_tracker, y);
if (ret < 0) {
return ret;
}
- return svc.mdlog->complete_entry(dpp, y, "bucket", key, objv_tracker);
+ return complete_entry(dpp, y, "bucket", key, objv_tracker);
}
int RGWSI_Bucket_SObj::read_bucket_instance_info(const string& key,
optional_yield y,
const DoutPrefixProvider *dpp)
{
- const rgw_pool& pool = svc.zone->get_zone_params().domain_root;
+ const rgw_pool& pool = get_entrypoint_pool();
const std::string oid = instance_meta_key_to_oid(key);
bufferlist bl;
RGWObjVersionTracker objv;
}
}
- const rgw_pool& pool = svc.zone->get_zone_params().domain_root;
+ const rgw_pool& pool = get_entrypoint_pool();
const std::string oid = instance_meta_key_to_oid(key);
int ret = rgw_put_system_obj(dpp, svc.sysobj, pool, oid, bl, exclusive,
&info.objv_tracker, mtime, y, pattrs);
if (ret >= 0) {
- int r = svc.mdlog->complete_entry(dpp, y, "bucket.instance",
+ int r = complete_entry(dpp, y, "bucket.instance",
key, &info.objv_tracker);
if (r < 0) {
return r;
optional_yield y,
const DoutPrefixProvider *dpp)
{
- const rgw_pool& pool = svc.zone->get_zone_params().domain_root;
+ const rgw_pool& pool = get_entrypoint_pool();
const std::string oid = instance_meta_key_to_oid(key);
int ret = rgw_delete_system_obj(dpp, svc.sysobj, pool, oid, objv_tracker, y);
if (ret < 0 &&
return buckets.size();
}
+
+const std::string& RGWSI_VectorBucket_SObj::instance_oid_prefix() const
+{
+ static const std::string vector_instance_oid_prefix = ".vectorbucket.meta.";
+ return vector_instance_oid_prefix;
+}
+
+std::string RGWSI_VectorBucket_SObj::get_cache_key(const std::string& key) const {
+ std::string cache_key("vbi/");
+ cache_key.append(key);
+ return cache_key;
+}
+
+int RGWSI_VectorBucket_SObj::complete_entry(const DoutPrefixProvider* dpp, optional_yield y,
+ const std::string& section, const std::string& key,
+ const RGWObjVersionTracker* objv) {
+ return svc.mdlog->complete_entry(dpp, y, "vector"+section, key, objv);
+}
class RGWSI_Bucket_SObj : public RGWSI_Bucket
{
+ virtual const std::string& instance_oid_prefix() const;
+ std::string instance_meta_key_to_oid(const std::string& metadata_key) const;
+ virtual const rgw_pool& get_entrypoint_pool() const;
+ virtual std::string get_cache_key(const std::string& key) const;
+ virtual int complete_entry(const DoutPrefixProvider* dpp, optional_yield y,
+ const std::string& section, const std::string& key,
+ const RGWObjVersionTracker* objv);
+
struct bucket_info_cache_entry {
RGWBucketInfo info;
real_time mtime;
} svc;
RGWSI_Bucket_SObj(CephContext *cct);
- ~RGWSI_Bucket_SObj();
+ ~RGWSI_Bucket_SObj() override;
+ std::string instance_oid_to_meta_key(const std::string& oid) const;
void init(RGWSI_Zone *_zone_svc,
RGWSI_SysObj *_sysobj_svc,
RGWSI_SysObj_Cache *_cache_svc,
const DoutPrefixProvider *dpp) override;
};
+class RGWSI_VectorBucket_SObj : public RGWSI_Bucket_SObj
+{
+ const std::string& instance_oid_prefix() const override;
+ const rgw_pool& get_entrypoint_pool() const override;
+ std::string get_cache_key(const std::string& key) const override;
+ int complete_entry(const DoutPrefixProvider* dpp, optional_yield y,
+ const std::string& section, const std::string& key,
+ const RGWObjVersionTracker* objv) override;
+public:
+ RGWSI_VectorBucket_SObj(CephContext *cct) : RGWSI_Bucket_SObj(cct) {}
+ ~RGWSI_VectorBucket_SObj() override = default;
+};
+
virtual rgw_raw_obj get_buckets_obj(const rgw_user& user_id) const = 0;
+ virtual rgw_raw_obj get_vector_buckets_obj(const rgw_user& user_id) const = 0;
+
virtual int create_lister(const DoutPrefixProvider* dpp,
const std::string& marker,
std::unique_ptr<RGWMetadataLister>& lister) = 0;
#define dout_subsys ceph_subsys_rgw
#define RGW_BUCKETS_OBJ_SUFFIX ".buckets"
+#define RGW_VECTOR_BUCKETS_OBJ_SUFFIX ".vectorbuckets"
using namespace std;
return rgw_raw_obj(svc.zone->get_zone_params().user_uid_pool, oid);
}
+rgw_raw_obj RGWSI_User_RADOS::get_vector_buckets_obj(const rgw_user& user) const
+{
+ string oid = user.to_str() + RGW_VECTOR_BUCKETS_OBJ_SUFFIX;
+ return rgw_raw_obj(svc.zone->get_zone_params().user_uid_pool, oid);
+}
+
class UserLister : public RGWMetadataLister {
public:
using RGWMetadataLister::RGWMetadataLister;
ldpp_dout(dpp, 0) << "ERROR: could not remove " << info.user_id << ":" << uid_bucks << ", should be fixed (err=" << ret << ")" << dendl;
return ret;
}
+ // TODO: remove vector buckets
} else if (info.type != TYPE_ROOT) {
// unlink the name from its account
const RGWZoneParams& zone = svc.zone->get_zone_params();
rgw_raw_obj get_buckets_obj(const rgw_user& user_id) const override;
+ rgw_raw_obj get_vector_buckets_obj(const rgw_user& user_id) const override;
+
int get_user_info_from_index(const std::string& key,
const rgw_pool& pool,
RGWUserInfo *info,
* To run a specific tests use:
`S3VTESTS_CONF=s3vtests.conf.SAMPLE tox -- s3vector_test.py::<test_name>`
* To run a group of tests use:
- `S3VTESTS_CONF=s3vtests.conf.SAMPLE tox -- s3vector_test.py -m "<marker name>"
+ `S3VTESTS_CONF=s3vtests.conf.SAMPLE tox -- s3vector_test.py -m "<marker name>"`
+* In case of multisite environment, you can set a "secondary" site in the conf file. See: `s3vtests.conf.multisite`
global main_secret_key
main_secret_key = cfg.get('s3 main',"secret_key")
+ # vars from the secondary section
+ global secondary_host
+ global secondary_port
+ if cfg.has_section("secondary"):
+ secondary_host = cfg.get('secondary',"host")
+ secondary_port = int(cfg.get('secondary',"port"))
+ else:
+ secondary_host = None
+ secondary_port = None
+
def get_config_host():
global default_host
return main_secret_key
+def get_config_host2():
+ global secondary_host
+ return secondary_host
+
+
+def get_config_port2():
+ global secondary_port
+ return secondary_port
+
+
@pytest.fixture(autouse=True, scope="package")
def configfile():
setup()
get_config_host,
get_config_port,
get_access_key,
- get_secret_key
+ get_secret_key,
+ get_config_host2,
+ get_config_port2
)
global num_buckets
num_buckets += 1
- return run_prefix + '-' + str(num_buckets)
+ return 'kaboom' + run_prefix + '-' + str(num_buckets)
-def connection():
+def connection(service_name='s3vectors'):
hostname = get_config_host()
port_no = get_config_port()
access_key = get_access_key()
else:
scheme = 'http://'
- client = boto3.client('s3vectors',
+ if service_name == 's3vectors':
+ config = Config(signature_version='s3')
+ else:
+ config = None
+
+ client = boto3.client(service_name,
endpoint_url=scheme+hostname+':'+str(port_no),
aws_access_key_id=access_key,
aws_secret_access_key=secret_key,
- config=Config(signature_version='s3'))
+ config=config)
+
+ return client
+
+def connection2(service_name='s3vectors'):
+ hostname = get_config_host2()
+ if not hostname:
+ log.info("No second host configured")
+ return None
+ port_no = get_config_port2()
+ if not port_no:
+ log.info("No second port configured")
+ return None
+ access_key = get_access_key()
+ secret_key = get_secret_key()
+ if port_no == 443 or port_no == 8443:
+ scheme = 'https://'
+ else:
+ scheme = 'http://'
+
+ if service_name == 's3vectors':
+ config = Config(signature_version='s3')
+ else:
+ config = None
+
+ client = boto3.client(service_name,
+ endpoint_url=scheme+hostname+':'+str(port_no),
+ aws_access_key_id=access_key,
+ aws_secret_access_key=secret_key,
+ config=config)
return client
# s3vectors tests
#################
+def _delete_all_vector_buckets(conn):
+ result = conn.list_vector_buckets()
+ assert result['ResponseMetadata']['HTTPStatusCode'] == 200
+ for bucket in result['vectorBuckets']:
+ _ = conn.delete_vector_bucket(vectorBucketName=bucket['vectorBucketName'])
+
+
@pytest.mark.vector_bucket_test
def test_create_vector_bucket():
conn = connection()
result = conn.create_vector_bucket(vectorBucketName=bucket_name)
assert result['ResponseMetadata']['HTTPStatusCode'] == 200
# cleanup
- _ = conn.delete_vector_bucket(vectorBucketName=bucket_name)
+ _delete_all_vector_buckets(conn)
@pytest.mark.vector_bucket_test
assert result['ResponseMetadata']['HTTPStatusCode'] == 200
result = conn.get_vector_bucket(vectorBucketName=bucket_name)
assert result['ResponseMetadata']['HTTPStatusCode'] == 200
+ log.info("get_vector_buckets result: %s", result)
+ invalid_name = bucket_name + '-invalid'
+ pytest.raises(conn.exceptions.ClientError, conn.get_vector_bucket, vectorBucketName=invalid_name)
# cleanup
- _ = conn.delete_vector_bucket(vectorBucketName=bucket_name)
+ _delete_all_vector_buckets(conn)
@pytest.mark.vector_bucket_test
assert result['ResponseMetadata']['HTTPStatusCode'] == 200
result = conn.delete_vector_bucket(vectorBucketName=bucket_name)
assert result['ResponseMetadata']['HTTPStatusCode'] == 200
- # not implemented yet
- #with pytest.raises(conn.exceptions.NoSuchVectorBucket):
- # result = conn.get_vector_bucket(vectorBucketName=bucket_name)
+ pytest.raises(conn.exceptions.ClientError, conn.get_vector_bucket, vectorBucketName=bucket_name)
+ pytest.raises(conn.exceptions.ClientError, conn.delete_vector_bucket, vectorBucketName=bucket_name)
+ result = conn.list_vector_buckets()
+ assert result['ResponseMetadata']['HTTPStatusCode'] == 200
+ assert len(result['vectorBuckets']) == 0
+ # cleanup
+ _delete_all_vector_buckets(conn)
@pytest.mark.vector_bucket_test
-def test_list_vector_bucket():
+def test_list_vector_buckets():
conn = connection()
bucket_name1 = gen_bucket_name()
bucket_name2 = gen_bucket_name()
assert result['ResponseMetadata']['HTTPStatusCode'] == 200
result = conn.list_vector_buckets()
assert result['ResponseMetadata']['HTTPStatusCode'] == 200
- # not implemented yet
- #bucket_names = [b['Name'] for b in result['VectorBuckets']]
- #assert bucket_name1 in bucket_names
- #assert bucket_name2 in bucket_names
+ log.info("list_vector_buckets result: %s", result)
+ bucket_names = [b['vectorBucketName'] for b in result['vectorBuckets']]
+ assert bucket_name1 in bucket_names
+ assert bucket_name2 in bucket_names
+ # cleanup
+ _delete_all_vector_buckets(conn)
+
+
+@pytest.mark.vector_bucket_test
+def test_vector_buckets_sync():
+ conn = connection()
+ conn2 = connection2()
+ if not conn2:
+ log.info("Skipping test_vector_buckets_sync since second connection is not configured")
+ return
+
+ # create buckets from the first connection
+ bucket_name1 = gen_bucket_name()
+ bucket_name2 = gen_bucket_name()
+ result = conn.create_vector_bucket(vectorBucketName=bucket_name1)
+ assert result['ResponseMetadata']['HTTPStatusCode'] == 200
+ result = conn.create_vector_bucket(vectorBucketName=bucket_name2)
+ assert result['ResponseMetadata']['HTTPStatusCode'] == 200
+ result = conn.list_vector_buckets()
+ assert result['ResponseMetadata']['HTTPStatusCode'] == 200
+ log.info("list_vector_buckets result: %s", result)
+ bucket_names = [b['vectorBucketName'] for b in result['vectorBuckets']]
+ assert bucket_name1 in bucket_names
+ assert bucket_name2 in bucket_names
+ time.sleep(5)
+
+ # now check from the second connection
+ result = conn2.list_vector_buckets()
+ assert result['ResponseMetadata']['HTTPStatusCode'] == 200
+ log.info("list_vector_buckets from conn2 result: %s", result)
+ bucket_names = [b['vectorBucketName'] for b in result['vectorBuckets']]
+ assert bucket_name1 in bucket_names
+ assert bucket_name2 in bucket_names
+
+ # create buckets from the 2nd connection
+ bucket_name3 = gen_bucket_name()
+ bucket_name4 = gen_bucket_name()
+ result = conn2.create_vector_bucket(vectorBucketName=bucket_name3)
+ assert result['ResponseMetadata']['HTTPStatusCode'] == 200
+ result = conn2.create_vector_bucket(vectorBucketName=bucket_name4)
+ assert result['ResponseMetadata']['HTTPStatusCode'] == 200
+ result = conn2.list_vector_buckets()
+ assert result['ResponseMetadata']['HTTPStatusCode'] == 200
+ log.info("list_vector_buckets from conn2 result: %s", result)
+ bucket_names = [b['vectorBucketName'] for b in result['vectorBuckets']]
+ assert bucket_name3 in bucket_names
+ assert bucket_name4 in bucket_names
+ time.sleep(5)
+
+ # now check from the first connection
+ result = conn.list_vector_buckets()
+ assert result['ResponseMetadata']['HTTPStatusCode'] == 200
+ log.info("list_vector_buckets result: %s", result)
+ bucket_names = [b['vectorBucketName'] for b in result['vectorBuckets']]
+ assert bucket_name3 in bucket_names
+ assert bucket_name4 in bucket_names
+
+ # cleanup
+ _delete_all_vector_buckets(conn)
+
+
+def _create_s3bucket(s3conn, bucket_name):
+ try:
+ result = s3conn.create_bucket(Bucket=bucket_name)
+ assert result['ResponseMetadata']['HTTPStatusCode'] == 200
+ except s3conn.exceptions.ClientError as err:
+ log.warning("s3 bucket creation failed with: %s", str(err))
+ assert err.response['ResponseMetadata']['HTTPStatusCode'] == 500
+
+
+@pytest.mark.vector_bucket_test
+def test_vector_buckets_creation_with_buckets():
+ conn = connection()
+ s3conn = connection('s3')
+ bucket_name1 = gen_bucket_name()
+ # create vector bucket
+ result = conn.create_vector_bucket(vectorBucketName=bucket_name1)
+ assert result['ResponseMetadata']['HTTPStatusCode'] == 200
+ # create s3 bucket with the same name
+ _create_s3bucket(s3conn, bucket_name1)
+ # create another s3 bucket
+ bucket_name2 = gen_bucket_name()
+ _create_s3bucket(s3conn, bucket_name2)
+ # list vector buckets and verify only one bucket there
+ result = conn.list_vector_buckets()
+ assert result['ResponseMetadata']['HTTPStatusCode'] == 200
+ log.info("list_vector_buckets result: %s", result)
+ vector_bucket_names = [b['vectorBucketName'] for b in result['vectorBuckets']]
+ assert bucket_name1 in vector_bucket_names
+ assert bucket_name2 not in vector_bucket_names
+ # list s3 buckets and verify both bucket there
+ result = s3conn.list_buckets()
+ assert result['ResponseMetadata']['HTTPStatusCode'] == 200
+ log.info("list_buckets result: %s", result)
+ s3_bucket_names = [b['Name'] for b in result['Buckets']]
+ assert bucket_name1 in s3_bucket_names
+ assert bucket_name2 in s3_bucket_names
+ # now try to create a vector bucket with a name that already exists as an s3 bucket
+ result = conn.create_vector_bucket(vectorBucketName=bucket_name2)
+ assert result['ResponseMetadata']['HTTPStatusCode'] == 200
+ # list vector buckets and verify both bucket there
+ result = conn.list_vector_buckets()
+ assert result['ResponseMetadata']['HTTPStatusCode'] == 200
+ log.info("list_vector_buckets result: %s", result)
+ vector_bucket_names = [b['vectorBucketName'] for b in result['vectorBuckets']]
+ assert bucket_name1 in vector_bucket_names
+ assert bucket_name2 in vector_bucket_names
+ # cleanup
+ _delete_all_vector_buckets(conn)
+
+
+@pytest.mark.vector_bucket_test
+def test_vector_buckets_deletion_with_buckets():
+ conn = connection()
+ s3conn = connection('s3')
+ bucket_name1 = gen_bucket_name()
+ result = conn.create_vector_bucket(vectorBucketName=bucket_name1)
+ assert result['ResponseMetadata']['HTTPStatusCode'] == 200
+ # create s3 bucket with the same name
+ _create_s3bucket(s3conn, bucket_name1)
+
+ # verify vector bucket exists (via get and list)
+ result = conn.get_vector_bucket(vectorBucketName=bucket_name1)
+ assert result['ResponseMetadata']['HTTPStatusCode'] == 200
+ result = conn.list_vector_buckets()
+ assert result['ResponseMetadata']['HTTPStatusCode'] == 200
+ log.info("list_vector_buckets result: %s", result)
+ vector_bucket_names = [b['vectorBucketName'] for b in result['vectorBuckets']]
+ assert bucket_name1 in vector_bucket_names
+ # verify s3 bucket exists (via get and list)
+ result = s3conn.head_bucket(Bucket=bucket_name1)
+ assert result['ResponseMetadata']['HTTPStatusCode'] == 200
+ result = s3conn.list_buckets()
+ assert result['ResponseMetadata']['HTTPStatusCode'] == 200
+ log.info("list_buckets result: %s", result)
+ bucket_names = [b['Name'] for b in result['Buckets']]
+ assert bucket_name1 in bucket_names
+
+
+ # delete vector bucket
+ result = conn.delete_vector_bucket(vectorBucketName=bucket_name1)
+ assert result['ResponseMetadata']['HTTPStatusCode'] == 200
+ # verify vector bucket is not there (via get and list)
+ pytest.raises(conn.exceptions.ClientError, conn.get_vector_bucket, vectorBucketName=bucket_name1)
+ # verify s3 bucket still exists
+ result = s3conn.head_bucket(Bucket=bucket_name1)
+ assert result['ResponseMetadata']['HTTPStatusCode'] == 200
+ result = s3conn.list_buckets()
+ assert result['ResponseMetadata']['HTTPStatusCode'] == 200
+ log.info("list_buckets result: %s", result)
+ s3_bucket_names = [b['Name'] for b in result['Buckets']]
+ assert bucket_name1 in s3_bucket_names
+ # create another vector bucket
+ bucket_name2 = gen_bucket_name()
+ result = conn.create_vector_bucket(vectorBucketName=bucket_name2)
+ assert result['ResponseMetadata']['HTTPStatusCode'] == 200
+ # and an s3 bucket with the same name
+ _create_s3bucket(s3conn, bucket_name2)
+ # delete the s3 bucket
+ result = s3conn.delete_bucket(Bucket=bucket_name2)
+ assert result['ResponseMetadata']['HTTPStatusCode'] == 204
+ # verify s3 bucket is not there
+ pytest.raises(s3conn.exceptions.ClientError, s3conn.head_bucket, Bucket=bucket_name2)
+ # verify vector bucket still exists (via get and list)
+ result = conn.get_vector_bucket(vectorBucketName=bucket_name2)
+ assert result['ResponseMetadata']['HTTPStatusCode'] == 200
+ result = conn.list_vector_buckets()
+ assert result['ResponseMetadata']['HTTPStatusCode'] == 200
+ log.info("list_vector_buckets result: %s", result)
+ vector_bucket_names = [b['vectorBucketName'] for b in result['vectorBuckets']]
+ assert bucket_name2 in vector_bucket_names
# cleanup
- _ = conn.delete_vector_bucket(vectorBucketName=bucket_name1)
- _ = conn.delete_vector_bucket(vectorBucketName=bucket_name2)
+ _delete_all_vector_buckets(conn)
@pytest.mark.index_test
index_name = 'test-index'
result = conn.create_index(vectorBucketName=bucket_name, indexName=index_name, dataType='float32', dimension=128, distanceMetric='euclidean')
assert result['ResponseMetadata']['HTTPStatusCode'] == 200
+ # create an index on bucket that does not exist
+ invalid_bucket_name = bucket_name + '-invalid'
+ pytest.raises(conn.exceptions.ClientError, conn.create_index, vectorBucketName=invalid_bucket_name, indexName=index_name, dataType='float32', dimension=128, distanceMetric='euclidean')
# cleanup
_ = conn.delete_vector_bucket(vectorBucketName=bucket_name)
assert result['ResponseMetadata']['HTTPStatusCode'] == 200
result = conn.get_index(vectorBucketName=bucket_name, indexName=index_name)
assert result['ResponseMetadata']['HTTPStatusCode'] == 200
+ # get an index from bucket that does not exist
+ invalid_bucket_name = bucket_name + '-invalid'
+ pytest.raises(conn.exceptions.ClientError, conn.get_index, vectorBucketName=invalid_bucket_name, indexName=index_name)
# cleanup
_ = conn.delete_vector_bucket(vectorBucketName=bucket_name)
assert result['ResponseMetadata']['HTTPStatusCode'] == 200
result = conn.get_index(vectorBucketName=bucket_name, indexName=index_name)
assert result['ResponseMetadata']['HTTPStatusCode'] == 200
+ # delete an index from bucket that does not exist
+ invalid_bucket_name = bucket_name + '-invalid'
+ pytest.raises(conn.exceptions.ClientError, conn.delete_index, vectorBucketName=invalid_bucket_name, indexName=index_name)
+ # delete the index from the right bucket
result = conn.delete_index(vectorBucketName=bucket_name, indexName=index_name)
assert result['ResponseMetadata']['HTTPStatusCode'] == 200
# not implemented yet
assert result['ResponseMetadata']['HTTPStatusCode'] == 200
result = conn.list_indexes(vectorBucketName=bucket_name)
assert result['ResponseMetadata']['HTTPStatusCode'] == 200
+ # list indexs from bucket that does not exist
+ invalid_bucket_name = bucket_name + '-invalid'
+ pytest.raises(conn.exceptions.ClientError, conn.list_indexes, vectorBucketName=invalid_bucket_name)
# not implemented yet
#index_names = [i['IndexName'] for i in result['Indexes']]
#assert index_name1 in index_names
--- /dev/null
+[DEFAULT]
+port = 8101
+host = localhost
+
+[secondary]
+port = 8201
+host = localhost
+
+[s3 main]
+access_key = 1234567890
+secret_key = pencil
+display_name = M. Tester
+user_id = testid
+email = tester@ceph.com