/* stats - Not for first pass */
int DaosUser::read_stats_async(const DoutPrefixProvider* dpp,
- RGWGetUserStats_CB* cb) {
+ boost::intrusive_ptr<ReadStatsCB> cb) {
return DAOS_NOT_IMPLEMENTED_LOG(dpp);
}
int DaosBucket::read_stats_async(
const DoutPrefixProvider* dpp,
const bucket_index_layout_generation& idx_layout, int shard_id,
- RGWGetBucketStats_CB* ctx) {
+ boost::intrusive_ptr<ReadStatsCB> ctx) {
return DAOS_NOT_IMPLEMENTED_LOG(dpp);
}
ceph::real_time* last_stats_sync = nullptr,
ceph::real_time* last_stats_update = nullptr) override;
virtual int read_stats_async(const DoutPrefixProvider* dpp,
- RGWGetUserStats_CB* cb) override;
+ boost::intrusive_ptr<ReadStatsCB> cb) override;
virtual int complete_flush_stats(const DoutPrefixProvider* dpp,
optional_yield y) override;
virtual int read_usage(
virtual int read_stats_async(const DoutPrefixProvider* dpp,
const bucket_index_layout_generation& idx_layout,
int shard_id,
- RGWGetBucketStats_CB* ctx) override;
+ boost::intrusive_ptr<ReadStatsCB> ctx) override;
virtual int sync_user_stats(const DoutPrefixProvider* dpp,
optional_yield y) override;
virtual int check_bucket_shards(const DoutPrefixProvider* dpp) override;
}
/* stats - Not for first pass */
-int MotrUser::read_stats_async(const DoutPrefixProvider *dpp, RGWGetUserStats_CB *cb)
+int MotrUser::read_stats_async(const DoutPrefixProvider *dpp, boost::intrusive_ptr<ReadStatsCB> cb)
{
return 0;
}
int MotrBucket::read_stats_async(const DoutPrefixProvider *dpp,
const bucket_index_layout_generation& idx_layout,
- int shard_id, RGWGetBucketStats_CB *ctx)
+ int shard_id, boost::intrusive_ptr<ReadStatsCB> ctx)
{
return 0;
}
optional_yield y, RGWStorageStats* stats,
ceph::real_time *last_stats_sync = nullptr,
ceph::real_time *last_stats_update = nullptr) override;
- virtual int read_stats_async(const DoutPrefixProvider *dpp, RGWGetUserStats_CB* cb) override;
+ virtual int read_stats_async(const DoutPrefixProvider *dpp, boost::intrusive_ptr<ReadStatsCB> cb) override;
virtual int complete_flush_stats(const DoutPrefixProvider *dpp, optional_yield y) override;
virtual int read_usage(const DoutPrefixProvider *dpp, uint64_t start_epoch, uint64_t end_epoch, uint32_t max_entries,
bool* is_truncated, RGWUsageIter& usage_iter,
bool *syncstopped = nullptr) override;
virtual int read_stats_async(const DoutPrefixProvider *dpp,
const bucket_index_layout_generation& idx_layout,
- int shard_id, RGWGetBucketStats_CB* ctx) override;
+ int shard_id, boost::intrusive_ptr<ReadStatsCB> ctx) override;
int sync_user_stats(const DoutPrefixProvider *dpp, optional_yield y,
RGWBucketEnt* ent) override;
int check_bucket_shards(const DoutPrefixProvider *dpp,
int POSIXBucket::read_stats_async(const DoutPrefixProvider *dpp,
const bucket_index_layout_generation& idx_layout,
- int shard_id, RGWGetBucketStats_CB* ctx)
+ int shard_id, boost::intrusive_ptr<ReadStatsCB> ctx)
{
return 0;
}
bool* syncstopped = nullptr) override;
virtual int read_stats_async(const DoutPrefixProvider *dpp,
const bucket_index_layout_generation& idx_layout,
- int shard_id, RGWGetBucketStats_CB* ctx) override;
+ int shard_id, boost::intrusive_ptr<ReadStatsCB> ctx) override;
virtual int sync_user_stats(const DoutPrefixProvider *dpp, optional_yield y,
RGWBucketEnt* ent) override;
virtual int check_bucket_shards(const DoutPrefixProvider* dpp,
}
class RGWGetBucketStatsContext : public RGWGetDirHeader_CB {
- RGWGetBucketStats_CB *cb;
+ boost::intrusive_ptr<rgw::sal::ReadStatsCB> cb;
uint32_t pendings;
- map<RGWObjCategory, RGWStorageStats> stats;
+ RGWStorageStats stats;
int ret_code;
bool should_cb;
ceph::mutex lock = ceph::make_mutex("RGWGetBucketStatsContext");
public:
- RGWGetBucketStatsContext(RGWGetBucketStats_CB *_cb, uint32_t _pendings)
- : cb(_cb), pendings(_pendings), stats(), ret_code(0), should_cb(true)
+ RGWGetBucketStatsContext(boost::intrusive_ptr<rgw::sal::ReadStatsCB> cb, uint32_t _pendings)
+ : cb(std::move(cb)), pendings(_pendings), stats(), ret_code(0), should_cb(true)
{}
void handle_response(int r, rgw_bucket_dir_header& header) override {
std::lock_guard l{lock};
if (should_cb) {
- if ( r >= 0) {
- accumulate_raw_stats(header, stats);
+ if (r >= 0) {
+ for (const auto& [c, s] : header.stats) {
+ stats.size += s.total_size;
+ stats.size_rounded += s.total_size_rounded;
+ stats.size_utilized += s.actual_size;
+ stats.num_objects += s.num_entries;
+ }
} else {
ret_code = r;
}
// Are we all done?
if (--pendings == 0) {
- if (!ret_code) {
- cb->set_response(&stats);
- }
- cb->handle_response(ret_code);
- cb->put();
+ cb->handle_response(ret_code, stats);
+ cb.reset();
}
}
}
}
};
-int RGWRados::get_bucket_stats_async(const DoutPrefixProvider *dpp, RGWBucketInfo& bucket_info, const rgw::bucket_index_layout_generation& idx_layout, int shard_id, RGWGetBucketStats_CB *ctx)
+int RGWRados::get_bucket_stats_async(const DoutPrefixProvider *dpp, RGWBucketInfo& bucket_info, const rgw::bucket_index_layout_generation& idx_layout, int shard_id, boost::intrusive_ptr<rgw::sal::ReadStatsCB> cb)
{
int num_aio = 0;
- RGWGetBucketStatsContext *get_ctx = new RGWGetBucketStatsContext(ctx, bucket_info.layout.current_index.layout.normal.num_shards ? : 1);
+ RGWGetBucketStatsContext *get_ctx = new RGWGetBucketStatsContext(std::move(cb), bucket_info.layout.current_index.layout.normal.num_shards ? : 1);
ceph_assert(get_ctx);
int r = cls_bucket_head_async(dpp, bucket_info, idx_layout, shard_id, get_ctx, &num_aio);
if (r < 0) {
- ctx->put();
if (num_aio) {
get_ctx->unset_cb();
}
int decode_policy(const DoutPrefixProvider *dpp, bufferlist& bl, ACLOwner *owner);
int get_bucket_stats(const DoutPrefixProvider *dpp, RGWBucketInfo& bucket_info, const rgw::bucket_index_layout_generation& idx_layout, int shard_id, std::string *bucket_ver, std::string *master_ver,
std::map<RGWObjCategory, RGWStorageStats>& stats, std::string *max_marker, bool* syncstopped = NULL);
- int get_bucket_stats_async(const DoutPrefixProvider *dpp, RGWBucketInfo& bucket_info, const rgw::bucket_index_layout_generation& idx_layout, int shard_id, RGWGetBucketStats_CB *cb);
+ int get_bucket_stats_async(const DoutPrefixProvider *dpp, RGWBucketInfo& bucket_info, const rgw::bucket_index_layout_generation& idx_layout, int shard_id, boost::intrusive_ptr<rgw::sal::ReadStatsCB> cb);
int put_bucket_instance_info(RGWBucketInfo& info, bool exclusive, ceph::real_time mtime, const std::map<std::string, bufferlist> *pattrs, const DoutPrefixProvider *dpp, optional_yield y);
/* xxx dang obj_ctx -> svc */
return store->ctl()->user->read_stats(dpp, get_id(), stats, y, last_stats_sync, last_stats_update);
}
-int RadosUser::read_stats_async(const DoutPrefixProvider *dpp, RGWGetUserStats_CB* cb)
+int RadosUser::read_stats_async(const DoutPrefixProvider *dpp, boost::intrusive_ptr<ReadStatsCB> cb)
{
return store->svc()->user->read_stats_async(dpp, get_id(), cb);
}
int RadosBucket::read_stats_async(const DoutPrefixProvider *dpp,
const bucket_index_layout_generation& idx_layout,
- int shard_id, RGWGetBucketStats_CB* ctx)
+ int shard_id, boost::intrusive_ptr<ReadStatsCB> ctx)
{
return store->getRados()->get_bucket_stats_async(dpp, get_info(), idx_layout, shard_id, ctx);
}
optional_yield y, RGWStorageStats* stats,
ceph::real_time* last_stats_sync = nullptr,
ceph::real_time* last_stats_update = nullptr) override;
- virtual int read_stats_async(const DoutPrefixProvider *dpp, RGWGetUserStats_CB* cb) override;
+ virtual int read_stats_async(const DoutPrefixProvider *dpp, boost::intrusive_ptr<ReadStatsCB> cb) override;
virtual int complete_flush_stats(const DoutPrefixProvider *dpp, optional_yield y) override;
virtual int read_usage(const DoutPrefixProvider *dpp, uint64_t start_epoch, uint64_t end_epoch, uint32_t max_entries,
bool* is_truncated, RGWUsageIter& usage_iter,
bool* syncstopped = nullptr) override;
virtual int read_stats_async(const DoutPrefixProvider *dpp,
const bucket_index_layout_generation& idx_layout,
- int shard_id, RGWGetBucketStats_CB* ctx) override;
+ int shard_id, boost::intrusive_ptr<ReadStatsCB> ctx) override;
int sync_user_stats(const DoutPrefixProvider *dpp, optional_yield y,
RGWBucketEnt* ent) override;
int check_bucket_shards(const DoutPrefixProvider* dpp, uint64_t num_objs,
class RGWBucketCtl;
class RGWUserBuckets;
-class RGWGetUserStats_CB;
-
/**
* A string wrapper that includes encode/decode functions
* for easily accessing a UID in all forms
}
class BucketAsyncRefreshHandler : public RGWQuotaCache<rgw_bucket>::AsyncRefreshHandler,
- public RGWGetBucketStats_CB {
+ public rgw::sal::ReadStatsCB {
rgw_user user;
+ rgw_bucket bucket;
public:
BucketAsyncRefreshHandler(rgw::sal::Driver* _driver, RGWQuotaCache<rgw_bucket> *_cache,
- const rgw_user& _user, const rgw_bucket& _bucket) :
- RGWQuotaCache<rgw_bucket>::AsyncRefreshHandler(_driver, _cache),
- RGWGetBucketStats_CB(_bucket), user(_user) {}
+ const rgw_user& _user, const rgw_bucket& _bucket)
+ : RGWQuotaCache<rgw_bucket>::AsyncRefreshHandler(_driver, _cache),
+ user(_user), bucket(_bucket) {}
- void drop_reference() override { put(); }
- void handle_response(int r) override;
+ void drop_reference() override {
+ intrusive_ptr_release(this);
+ }
+ void handle_response(int r, const RGWStorageStats& stats) override;
int init_fetch() override;
};
return 0;
}
-void BucketAsyncRefreshHandler::handle_response(const int r)
+void BucketAsyncRefreshHandler::handle_response(const int r, const RGWStorageStats& stats)
{
if (r < 0) {
ldout(driver->ctx(), 20) << "AsyncRefreshHandler::handle_response() r=" << r << dendl;
return;
}
- RGWStorageStats bs;
-
- for (const auto& pair : *stats) {
- const RGWStorageStats& s = pair.second;
-
- bs.size += s.size;
- bs.size_rounded += s.size_rounded;
- bs.num_objects += s.num_objects;
- }
-
- cache->async_refresh_response(user, bucket, bs);
+ cache->async_refresh_response(user, bucket, stats);
}
class RGWBucketStatsCache : public RGWQuotaCache<rgw_bucket> {
}
class UserAsyncRefreshHandler : public RGWQuotaCache<rgw_user>::AsyncRefreshHandler,
- public RGWGetUserStats_CB {
+ public rgw::sal::ReadStatsCB {
const DoutPrefixProvider *dpp;
rgw_bucket bucket;
rgw_user user;
dpp(_dpp), bucket(_bucket), user(_user)
{}
- void drop_reference() override { put(); }
+ void drop_reference() override {
+ intrusive_ptr_release(this);
+ }
int init_fetch() override;
void handle_response(int r, const RGWStorageStats& stats) override;
};
#pragma once
+#include <boost/intrusive_ptr.hpp>
+#include <boost/smart_ptr/intrusive_ref_counter.hpp>
+
#include "common/tracer.h"
#include "rgw_sal_fwd.h"
#include "rgw_lua.h"
uint64_t num_objects;
};
-class RGWGetBucketStats_CB : public RefCountedObject {
-protected:
- rgw_bucket bucket;
- std::map<RGWObjCategory, RGWStorageStats>* stats;
-public:
- explicit RGWGetBucketStats_CB(const rgw_bucket& _bucket) : bucket(_bucket), stats(NULL) {}
- ~RGWGetBucketStats_CB() override {}
- virtual void handle_response(int r) = 0;
- virtual void set_response(std::map<RGWObjCategory, RGWStorageStats>* _stats) {
- stats = _stats;
- }
-};
-
-class RGWGetUserStats_CB : public RefCountedObject {
- public:
- virtual void handle_response(int r, const RGWStorageStats& stats) = 0;
-};
-
struct RGWObjState {
rgw_obj obj;
bool is_atomic{false};
virtual void register_admin_apis(RGWRESTMgr* mgr) = 0;
};
+
+/// \brief Ref-counted callback object for User/Bucket read_stats_async().
+class ReadStatsCB : public boost::intrusive_ref_counter<ReadStatsCB> {
+ public:
+ virtual ~ReadStatsCB() {}
+ virtual void handle_response(int r, const RGWStorageStats& stats) = 0;
+};
+
/**
* @brief A list of buckets
*
ceph::real_time* last_stats_sync = nullptr,
ceph::real_time* last_stats_update = nullptr) = 0;
/** Read the User stats from the backing Store, asynchronous */
- virtual int read_stats_async(const DoutPrefixProvider *dpp, RGWGetUserStats_CB* cb) = 0;
+ virtual int read_stats_async(const DoutPrefixProvider *dpp,
+ boost::intrusive_ptr<ReadStatsCB> cb) = 0;
/** Flush accumulated stat changes for this User to the backing store */
virtual int complete_flush_stats(const DoutPrefixProvider *dpp, optional_yield y) = 0;
/** Read detailed usage stats for this User from the backing store */
/** Read the bucket stats from the backing Store, asynchronous */
virtual int read_stats_async(const DoutPrefixProvider *dpp,
const bucket_index_layout_generation& idx_layout,
- int shard_id, RGWGetBucketStats_CB* ctx) = 0;
+ int shard_id, boost::intrusive_ptr<ReadStatsCB> cb) = 0;
/** Sync this bucket's stats to the owning user's stats in the backing store */
virtual int sync_user_stats(const DoutPrefixProvider *dpp, optional_yield y,
RGWBucketEnt* optional_ent) = 0;
}
/* stats - Not for first pass */
- int DBUser::read_stats_async(const DoutPrefixProvider *dpp, RGWGetUserStats_CB *cb)
+ int DBUser::read_stats_async(const DoutPrefixProvider *dpp, boost::intrusive_ptr<ReadStatsCB> cb)
{
return 0;
}
return 0;
}
- int DBBucket::read_stats_async(const DoutPrefixProvider *dpp, const bucket_index_layout_generation& idx_layout, int shard_id, RGWGetBucketStats_CB *ctx)
+ int DBBucket::read_stats_async(const DoutPrefixProvider *dpp, const bucket_index_layout_generation& idx_layout, int shard_id, boost::intrusive_ptr<ReadStatsCB> ctx)
{
return 0;
}
optional_yield y, RGWStorageStats* stats,
ceph::real_time *last_stats_sync = nullptr,
ceph::real_time *last_stats_update = nullptr) override;
- virtual int read_stats_async(const DoutPrefixProvider *dpp, RGWGetUserStats_CB* cb) override;
+ virtual int read_stats_async(const DoutPrefixProvider *dpp, boost::intrusive_ptr<ReadStatsCB> cb) override;
virtual int complete_flush_stats(const DoutPrefixProvider *dpp, optional_yield y) override;
virtual int read_usage(const DoutPrefixProvider *dpp, uint64_t start_epoch, uint64_t end_epoch, uint32_t max_entries,
bool* is_truncated, RGWUsageIter& usage_iter,
std::map<RGWObjCategory, RGWStorageStats>& stats,
std::string *max_marker = nullptr,
bool *syncstopped = nullptr) override;
- virtual int read_stats_async(const DoutPrefixProvider *dpp, const bucket_index_layout_generation& idx_layout, int shard_id, RGWGetBucketStats_CB* ctx) override;
+ virtual int read_stats_async(const DoutPrefixProvider *dpp, const bucket_index_layout_generation& idx_layout, int shard_id, boost::intrusive_ptr<ReadStatsCB> ctx) override;
int sync_user_stats(const DoutPrefixProvider *dpp, optional_yield y,
RGWBucketEnt* ent) override;
int check_bucket_shards(const DoutPrefixProvider *dpp,
return next->read_stats(dpp, y, stats, last_stats_sync, last_stats_update);
}
-int FilterUser::read_stats_async(const DoutPrefixProvider *dpp, RGWGetUserStats_CB* cb)
+int FilterUser::read_stats_async(const DoutPrefixProvider *dpp, boost::intrusive_ptr<ReadStatsCB> cb)
{
return next->read_stats_async(dpp, cb);
}
int FilterBucket::read_stats_async(const DoutPrefixProvider *dpp,
const bucket_index_layout_generation& idx_layout,
- int shard_id, RGWGetBucketStats_CB* ctx)
+ int shard_id, boost::intrusive_ptr<ReadStatsCB> ctx)
{
return next->read_stats_async(dpp, idx_layout, shard_id, ctx);
}
ceph::real_time* last_stats_sync = nullptr,
ceph::real_time* last_stats_update = nullptr) override;
virtual int read_stats_async(const DoutPrefixProvider *dpp,
- RGWGetUserStats_CB* cb) override;
+ boost::intrusive_ptr<ReadStatsCB> cb) override;
virtual int complete_flush_stats(const DoutPrefixProvider *dpp, optional_yield y) override;
virtual int read_usage(const DoutPrefixProvider *dpp, uint64_t start_epoch,
uint64_t end_epoch, uint32_t max_entries,
bool* syncstopped = nullptr) override;
virtual int read_stats_async(const DoutPrefixProvider *dpp,
const bucket_index_layout_generation& idx_layout,
- int shard_id, RGWGetBucketStats_CB* ctx) override;
+ int shard_id, boost::intrusive_ptr<ReadStatsCB> ctx) override;
int sync_user_stats(const DoutPrefixProvider *dpp, optional_yield y,
RGWBucketEnt* ent) override;
int check_bucket_shards(const DoutPrefixProvider* dpp,
class Zone;
class LuaManager;
struct RGWRoleInfo;
+ class DataProcessor;
+ class ObjectProcessor;
+ class ReadStatsCB;
class ConfigStore;
class RealmWriter;
#include "svc_meta_be.h"
#include "rgw_service.h"
+#include "rgw_sal_fwd.h"
class RGWUserBuckets;
-class RGWGetUserStats_CB;
class RGWSI_User : public RGWServiceInstance
{
optional_yield y) = 0; /* last time a stats update was done */
virtual int read_stats_async(const DoutPrefixProvider *dpp,
- const rgw_user& user, RGWGetUserStats_CB *cb) = 0;
+ const rgw_user& user,
+ boost::intrusive_ptr<rgw::sal::ReadStatsCB> cb) = 0;
};
}
class RGWGetUserStatsContext : public RGWGetUserHeader_CB {
- RGWGetUserStats_CB *cb;
+ boost::intrusive_ptr<rgw::sal::ReadStatsCB> cb;
public:
- explicit RGWGetUserStatsContext(RGWGetUserStats_CB * const cb)
- : cb(cb) {}
+ explicit RGWGetUserStatsContext(boost::intrusive_ptr<rgw::sal::ReadStatsCB> cb)
+ : cb(std::move(cb)) {}
void handle_response(int r, cls_user_header& header) override {
const cls_user_stats& hs = header.stats;
stats.num_objects = hs.total_entries;
cb->handle_response(r, stats);
-
- cb->put();
+ cb.reset();
}
};
int RGWSI_User_RADOS::read_stats_async(const DoutPrefixProvider *dpp,
- const rgw_user& user, RGWGetUserStats_CB *_cb)
+ const rgw_user& user,
+ boost::intrusive_ptr<rgw::sal::ReadStatsCB> _cb)
{
string user_str = user.to_str();
- RGWGetUserStatsContext *cb = new RGWGetUserStatsContext(_cb);
+ RGWGetUserStatsContext *cb = new RGWGetUserStatsContext(std::move(_cb));
int r = cls_user_get_header_async(dpp, user_str, cb);
if (r < 0) {
delete cb;
struct rgw_cache_entry_info;
class RGWGetUserHeader_CB;
-class RGWGetUserStats_CB;
template <class T>
class RGWChainedCacheImpl;
optional_yield y) override; /* last time a stats update was done */
int read_stats_async(const DoutPrefixProvider *dpp, const rgw_user& user,
- RGWGetUserStats_CB *cb) override;
+ boost::intrusive_ptr<rgw::sal::ReadStatsCB> cb) override;
};
return 0;
}
- virtual int read_stats_async(const DoutPrefixProvider *dpp, RGWGetUserStats_CB *cb) override {
+ virtual int read_stats_async(const DoutPrefixProvider *dpp, boost::intrusive_ptr<sal::ReadStatsCB> cb) override {
return 0;
}