driver/rados/sync_fairness.cc
driver/rados/topic.cc
driver/rados/topic_migration.cc
+ driver/rados/topics.cc
driver/rados/users.cc)
list(APPEND librgw_common_srcs
static constexpr std::string_view users_oid_prefix = "users.";
static constexpr std::string_view groups_oid_prefix = "groups.";
static constexpr std::string_view roles_oid_prefix = "roles.";
+static constexpr std::string_view topics_oid_prefix = "topics.";
static const std::string account_oid_prefix = "account.";
static constexpr std::string_view name_oid_prefix = "name.";
return {zone.account_pool, get_roles_key(account_id)};
}
+static std::string get_topics_key(std::string_view account_id) {
+ return string_cat_reserve(topics_oid_prefix, account_id);
+}
+rgw_raw_obj get_topics_obj(const RGWZoneParams& zone,
+ std::string_view account_id) {
+ return {zone.account_pool, get_topics_key(account_id)};
+}
+
static std::string get_account_key(std::string_view account_id) {
return string_cat_reserve(account_oid_prefix, account_id);
}
rgw_raw_obj get_roles_obj(const RGWZoneParams& zone,
std::string_view account_id);
+/// Return the rados object that tracks the given account's topics. This
+/// can be used with the cls_user interface in namespace rgwrados::topics.
+rgw_raw_obj get_topics_obj(const RGWZoneParams& zone,
+ std::string_view account_id);
+
/// Read account info by id
int read(const DoutPrefixProvider* dpp,
#include "users.h"
#include "rgw_pubsub.h"
#include "topic.h"
+#include "topics.h"
#define dout_subsys ceph_subsys_rgw
optional_yield y,
const DoutPrefixProvider* dpp)
{
+ librados::Rados& rados = *getRados()->get_rados_handle();
const RGWZoneParams& zone = svc()->zone->get_zone_params();
- return rgwrados::topic::write(dpp, y, *svc()->sysobj, svc()->mdlog, zone,
- topic, objv_tracker, {}, exclusive);
+ return rgwrados::topic::write(dpp, y, *svc()->sysobj, svc()->mdlog, rados,
+ zone, topic, objv_tracker, {}, exclusive);
}
int RadosStore::remove_topic_v2(const std::string& topic_name,
optional_yield y,
const DoutPrefixProvider* dpp)
{
+ librados::Rados& rados = *getRados()->get_rados_handle();
const RGWZoneParams& zone = svc()->zone->get_zone_params();
- const std::string key = get_topic_metadata_key(tenant, topic_name);
return rgwrados::topic::remove(dpp, y, *svc()->sysobj, svc()->mdlog,
- zone, key, objv_tracker);
+ rados, zone, tenant, topic_name, objv_tracker);
+}
+
+int RadosStore::list_account_topics(const DoutPrefixProvider* dpp,
+ optional_yield y,
+ std::string_view account_id,
+ std::string_view marker,
+ uint32_t max_items,
+ TopicList& listing)
+{
+ librados::Rados& rados = *getRados()->get_rados_handle();
+ const RGWZoneParams& zone = svc()->zone->get_zone_params();
+ const rgw_raw_obj& obj = rgwrados::account::get_topics_obj(zone, account_id);
+ return rgwrados::topics::list(dpp, y, rados, obj, marker, max_items,
+ listing.topics, listing.next_marker);
}
int RadosStore::remove_bucket_mapping_from_topics(
RGWObjVersionTracker& objv_tracker,
optional_yield y,
const DoutPrefixProvider* dpp) override;
+ int list_account_topics(const DoutPrefixProvider* dpp,
+ optional_yield y,
+ std::string_view account_id,
+ std::string_view marker,
+ uint32_t max_items,
+ TopicList& listing) override;
int update_bucket_topic_mapping(const rgw_pubsub_topic& topic,
const std::string& bucket_key,
bool add_mapping,
meta.topic_cache->init(svc.cache);
meta.topic = rgwrados::topic::create_metadata_handler(
- *svc.sysobj, svc.cache, *svc.mdlog, svc.zone->get_zone_params(),
- *meta.topic_cache);
+ *svc.sysobj, svc.cache, *svc.mdlog, rados,
+ svc.zone->get_zone_params(), *meta.topic_cache);
RGWOTPMetadataHandlerBase *otp_handler = static_cast<RGWOTPMetadataHandlerBase *>(meta.otp.get());
otp_handler->init(svc.zone, svc.meta_be_otp, svc.otp);
#include "topic.h"
#include "common/errno.h"
+#include "account.h"
+#include "rgw_account.h"
#include "rgw_common.h"
#include "rgw_metadata.h"
#include "rgw_metadata_lister.h"
#include "rgw_zone.h"
#include "svc_mdlog.h"
#include "svc_sys_obj_cache.h"
+#include "topics.h"
namespace rgwrados::topic {
}
int write(const DoutPrefixProvider* dpp, optional_yield y,
- RGWSI_SysObj& sysobj, RGWSI_MDLog* mdlog, const RGWZoneParams& zone,
+ RGWSI_SysObj& sysobj, RGWSI_MDLog* mdlog,
+ librados::Rados& rados, const RGWZoneParams& zone,
const rgw_pubsub_topic& info, RGWObjVersionTracker& objv,
ceph::real_time mtime, bool exclusive)
{
return r;
}
+ if (const auto* id = std::get_if<rgw_account_id>(&info.owner); id) {
+ // link the topic to its account
+ const auto& topics = account::get_topics_obj(zone, *id);
+ r = topics::add(dpp, y, rados, topics, info, false,
+ std::numeric_limits<uint32_t>::max());
+ if (r < 0) {
+ ldpp_dout(dpp, 0) << "WARNING: could not link topic to account "
+ << *id << ": " << cpp_strerror(r) << dendl;
+ } // not fatal
+ }
+
// record in the mdlog on success
if (mdlog) {
return mdlog->complete_entry(dpp, y, "topic", topic_key, &objv);
}
int remove(const DoutPrefixProvider* dpp, optional_yield y,
- RGWSI_SysObj& sysobj, RGWSI_MDLog* mdlog, const RGWZoneParams& zone,
- const std::string& topic_key, RGWObjVersionTracker& objv)
+ RGWSI_SysObj& sysobj, RGWSI_MDLog* mdlog,
+ librados::Rados& rados, const RGWZoneParams& zone,
+ const std::string& tenant, const std::string& name,
+ RGWObjVersionTracker& objv)
{
+ const std::string topic_key = get_topic_metadata_key(tenant, name);
+
// delete topic info
const rgw_raw_obj topic = get_topic_obj(zone, topic_key);
int r = rgw_delete_system_obj(dpp, &sysobj, topic.pool, topic.oid, &objv, y);
<< buckets.oid << " with: " << cpp_strerror(r) << dendl;
} // not fatal
+ if (rgw::account::validate_id(tenant)) {
+ // unlink the name from its account
+ const auto& topics = account::get_topics_obj(zone, tenant);
+ r = topics::remove(dpp, y, rados, topics, name);
+ if (r < 0) {
+ ldpp_dout(dpp, 0) << "ERROR: could not unlink from account "
+ << tenant << ": " << cpp_strerror(r) << dendl;
+ } // not fatal
+ }
+
// record in the mdlog on success
if (mdlog) {
return mdlog->complete_entry(dpp, y, "topic", topic_key, &objv);
RGWSI_SysObj& sysobj;
RGWSI_SysObj_Cache* cache_svc;
RGWSI_MDLog& mdlog;
+ librados::Rados& rados;
const RGWZoneParams& zone;
RGWChainedCacheImpl<cache_entry>& cache;
public:
MetadataHandler(RGWSI_SysObj& sysobj, RGWSI_SysObj_Cache* cache_svc,
- RGWSI_MDLog& mdlog, const RGWZoneParams& zone,
+ RGWSI_MDLog& mdlog, librados::Rados& rados,
+ const RGWZoneParams& zone,
RGWChainedCacheImpl<cache_entry>& cache)
: sysobj(sysobj), cache_svc(cache_svc), mdlog(mdlog),
- zone(zone), cache(cache)
+ rados(rados), zone(zone), cache(cache)
{}
std::string get_type() final { return "topic"; }
auto mtime = robj->get_mtime();
constexpr bool exclusive = false;
- int r = write(dpp, y, sysobj, &mdlog, zone, info,
- objv_tracker, mtime, exclusive);
+ int r = write(dpp, y, sysobj, &mdlog, rados, zone,
+ info, objv_tracker, mtime, exclusive);
if (r < 0) {
return r;
}
int remove(std::string& entry, RGWObjVersionTracker& objv_tracker,
optional_yield y, const DoutPrefixProvider *dpp) override
{
- int r = topic::remove(dpp, y, sysobj, &mdlog, zone, entry, objv_tracker);
+ std::string name;
+ std::string tenant;
+ parse_topic_metadata_key(entry, tenant, name);
+
+ int r = topic::remove(dpp, y, sysobj, &mdlog, rados, zone,
+ tenant, name, objv_tracker);
if (r < 0) {
return r;
}
+
// delete persistent topic queue. expect ENOENT for non-persistent topics
- std::string name;
- std::string tenant;
- parse_topic_metadata_key(entry, tenant, name);
r = rgw::notify::remove_persistent_topic(name, y);
if (r < 0 && r != -ENOENT) {
ldpp_dout(dpp, 1) << "Failed to delete queue for persistent topic: "
auto create_metadata_handler(RGWSI_SysObj& sysobj,
RGWSI_SysObj_Cache* cache_svc,
- RGWSI_MDLog& mdlog, const RGWZoneParams& zone,
+ RGWSI_MDLog& mdlog, librados::Rados& rados,
+ const RGWZoneParams& zone,
RGWChainedCacheImpl<cache_entry>& cache)
-> std::unique_ptr<RGWMetadataHandler>
{
return std::make_unique<MetadataHandler>(sysobj, cache_svc, mdlog,
- zone, cache);
+ rados, zone, cache);
}
} // rgwrados::topic
/// Write or overwrite topic info.
int write(const DoutPrefixProvider* dpp, optional_yield y,
- RGWSI_SysObj& sysobj, RGWSI_MDLog* mdlog, const RGWZoneParams& zone,
+ RGWSI_SysObj& sysobj, RGWSI_MDLog* mdlog,
+ librados::Rados& rados, const RGWZoneParams& zone,
const rgw_pubsub_topic& info, RGWObjVersionTracker& objv,
ceph::real_time mtime, bool exclusive);
/// Remove a topic by metadata key.
int remove(const DoutPrefixProvider* dpp, optional_yield y,
RGWSI_SysObj& sysobj, RGWSI_MDLog* mdlog,
- const RGWZoneParams& zone, const std::string& topic_key,
+ librados::Rados& rados, const RGWZoneParams& zone,
+ const std::string& tenant, const std::string& name,
RGWObjVersionTracker& objv);
/// Topic metadata handler factory.
auto create_metadata_handler(RGWSI_SysObj& sysobj,
RGWSI_SysObj_Cache* cache_svc,
- RGWSI_MDLog& mdlog, const RGWZoneParams& zone,
+ RGWSI_MDLog& mdlog, librados::Rados& rados,
+ const RGWZoneParams& zone,
RGWChainedCacheImpl<cache_entry>& cache)
-> std::unique_ptr<RGWMetadataHandler>;
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab ft=cpp
+
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright contributors to the Ceph project
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include "topics.h"
+
+#include "include/rados/librados.hpp"
+#include "common/dout.h"
+#include "cls/user/cls_user_client.h"
+#include "rgw_pubsub.h"
+#include "rgw_sal.h"
+
+namespace rgwrados::topics {
+
+int add(const DoutPrefixProvider* dpp,
+ optional_yield y,
+ librados::Rados& rados,
+ const rgw_raw_obj& obj,
+ const rgw_pubsub_topic& topic,
+ bool exclusive, uint32_t limit)
+{
+ cls_user_account_resource resource;
+ resource.name = topic.name;
+
+ rgw_rados_ref ref;
+ int r = rgw_get_rados_ref(dpp, &rados, obj, &ref);
+ if (r < 0) {
+ return r;
+ }
+
+ librados::ObjectWriteOperation op;
+ ::cls_user_account_resource_add(op, resource, exclusive, limit);
+ return ref.operate(dpp, &op, y);
+}
+
+int remove(const DoutPrefixProvider* dpp,
+ optional_yield y,
+ librados::Rados& rados,
+ const rgw_raw_obj& obj,
+ std::string_view name)
+{
+ rgw_rados_ref ref;
+ int r = rgw_get_rados_ref(dpp, &rados, obj, &ref);
+ if (r < 0) {
+ return r;
+ }
+
+ librados::ObjectWriteOperation op;
+ ::cls_user_account_resource_rm(op, name);
+ return ref.operate(dpp, &op, y);
+}
+
+int list(const DoutPrefixProvider* dpp,
+ optional_yield y,
+ librados::Rados& rados,
+ const rgw_raw_obj& obj,
+ std::string_view marker,
+ uint32_t max_items,
+ std::vector<std::string>& names,
+ std::string& next_marker)
+{
+ rgw_rados_ref ref;
+ int r = rgw_get_rados_ref(dpp, &rados, obj, &ref);
+ if (r < 0) {
+ return r;
+ }
+
+ librados::ObjectReadOperation op;
+ const std::string path_prefix; // unused
+ std::vector<cls_user_account_resource> entries;
+ bool truncated = false;
+ int ret = 0;
+ ::cls_user_account_resource_list(op, marker, path_prefix, max_items,
+ entries, &truncated, &next_marker, &ret);
+
+ r = ref.operate(dpp, &op, nullptr, y);
+ if (r == -ENOENT) {
+ next_marker.clear();
+ return 0;
+ }
+ if (r < 0) {
+ return r;
+ }
+ if (ret < 0) {
+ return ret;
+ }
+
+ for (auto& resource : entries) {
+ names.push_back(std::move(resource.name));
+ }
+
+ if (!truncated) {
+ next_marker.clear();
+ }
+ return 0;
+}
+
+} // namespace rgwrados::topics
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab ft=cpp
+
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright contributors to the Ceph project
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#pragma once
+
+#include <cstdint>
+#include <string>
+#include <vector>
+#include "include/rados/librados_fwd.hpp"
+#include "rgw_sal_fwd.h"
+
+class DoutPrefixProvider;
+class optional_yield;
+struct rgw_raw_obj;
+struct rgw_pubsub_topic;
+
+
+namespace rgwrados::topics {
+
+/// Add the given topic to the list.
+int add(const DoutPrefixProvider* dpp,
+ optional_yield y,
+ librados::Rados& rados,
+ const rgw_raw_obj& obj,
+ const rgw_pubsub_topic& info,
+ bool exclusive, uint32_t limit);
+
+/// Remove the given topic from the list.
+int remove(const DoutPrefixProvider* dpp,
+ optional_yield y,
+ librados::Rados& rados,
+ const rgw_raw_obj& obj,
+ std::string_view name);
+
+/// Return a paginated listing of topic names.
+int list(const DoutPrefixProvider* dpp,
+ optional_yield y,
+ librados::Rados& rados,
+ const rgw_raw_obj& obj,
+ std::string_view marker,
+ uint32_t max_items,
+ std::vector<std::string>& names,
+ std::string& next_marker);
+
+} // namespace rgwrados::topics
// vim: ts=8 sw=2 smarttab ft=cpp
#include "services/svc_zone.h"
+#include "rgw_account.h"
#include "rgw_b64.h"
#include "rgw_sal.h"
#include "rgw_pubsub.h"
rgw_pubsub_topics& result, std::string& next_marker,
optional_yield y) const
{
+ if (rgw::account::validate_id(tenant)) {
+ // if our tenant is an account, return the account listing
+ return list_account_topics(dpp, start_marker, max_items,
+ result, next_marker, y);
+ }
+
if (!use_notification_v2 || driver->stat_topics_v1(tenant, y, dpp) != -ENOENT) {
// in case of v1 or during migration we use v1 topics
// v1 returns all topics, ignoring marker/max_items
return ret;
}
+int RGWPubSub::list_account_topics(const DoutPrefixProvider* dpp,
+ const std::string& start_marker,
+ int max_items, rgw_pubsub_topics& result,
+ std::string& next_marker,
+ optional_yield y) const
+{
+ if (max_items > 1000) {
+ max_items = 1000;
+ }
+
+ rgw::sal::TopicList listing;
+ int ret = driver->list_account_topics(dpp, y, tenant, start_marker,
+ max_items, listing);
+ if (ret < 0) {
+ return ret;
+ }
+
+ for (const auto& topic_name : listing.topics) {
+ rgw_pubsub_topic topic;
+ int r = get_topic(dpp, topic_name, topic, y, nullptr);
+ if (r < 0) {
+ continue;
+ }
+ result.topics[topic_name] = std::move(topic);
+ }
+
+ next_marker = std::move(listing.next_marker);
+ return 0;
+}
+
int RGWPubSub::read_topics_v1(const DoutPrefixProvider *dpp, rgw_pubsub_topics& result,
RGWObjVersionTracker *objv_tracker, optional_yield y) const
{
const rgw_pubsub_topic& topic,
optional_yield y) const;
+ int list_account_topics(const DoutPrefixProvider* dpp,
+ const std::string& start_marker, int max_items,
+ rgw_pubsub_topics& result, std::string& next_marker,
+ optional_yield y) const;
+
public:
RGWPubSub(rgw::sal::Driver* _driver,
const std::string& _tenant,
std::string next_marker;
};
+/// A list of topic names
+struct TopicList {
+ /// The list of results, sorted by name
+ std::vector<std::string> topics;
+ /// The next marker to resume listing, or empty
+ std::string next_marker;
+};
+
/** A list of key-value attributes */
using Attrs = std::map<std::string, ceph::buffer::list>;
RGWObjVersionTracker& objv_tracker,
optional_yield y,
const DoutPrefixProvider* dpp) = 0;
+ /** Return a paginated listing of the account's topic names */
+ virtual int list_account_topics(const DoutPrefixProvider* dpp,
+ optional_yield y,
+ std::string_view account_id,
+ std::string_view marker,
+ uint32_t max_items,
+ TopicList& listing) = 0;
/** Update the bucket-topic mapping in the store, if |add_mapping|=true then
* adding the |bucket_key| |topic| mapping to store, else delete the
* |bucket_key| |topic| mapping from the store. The |bucket_key| is
return std::make_unique<DBNotification>(obj, src_obj, event_types);
}
+ int DBStore::list_account_topics(const DoutPrefixProvider* dpp,
+ optional_yield y,
+ std::string_view account_id,
+ std::string_view marker,
+ uint32_t max_items,
+ TopicList& listing)
+ {
+ return -ENOTSUP;
+ }
+
RGWLC* DBStore::get_rgwlc(void) {
return lc;
}
std::string& _req_id,
optional_yield y) override;
- virtual RGWLC* get_rgwlc(void) override;
+ int list_account_topics(const DoutPrefixProvider* dpp,
+ optional_yield y,
+ std::string_view account_id,
+ std::string_view marker,
+ uint32_t max_items,
+ TopicList& listing) override;
+
+ virtual RGWLC* get_rgwlc(void) override;
virtual RGWCoroutinesManagerRegistry* get_cr_registry() override { return NULL; }
virtual int log_usage(const DoutPrefixProvider *dpp, std::map<rgw_user_bucket, RGWUsageBatch>& usage_info, optional_yield y) override;
virtual int log_op(const DoutPrefixProvider *dpp, std::string& oid, bufferlist& bl) override;
const DoutPrefixProvider* dpp) override {
return next->remove_topic_v2(topic_name, tenant, objv_tracker, y, dpp);
}
+ int list_account_topics(const DoutPrefixProvider* dpp,
+ optional_yield y,
+ std::string_view account_id,
+ std::string_view marker,
+ uint32_t max_items,
+ TopicList& listing) override {
+ return next->list_account_topics(dpp, y, account_id, marker,
+ max_items, listing);
+ }
int update_bucket_topic_mapping(const rgw_pubsub_topic& topic,
const std::string& bucket_key,
bool add_mapping,
class RGWRole;
struct RoleList;
struct GroupList;
+ struct TopicList;
class DataProcessor;
class ObjectProcessor;
class ReadStatsCB;