void async_refresh_response(const rgw_user& user, rgw_bucket& bucket, const RGWStorageStats& stats);
void async_refresh_fail(const rgw_user& user, rgw_bucket& bucket);
- class AsyncRefreshHandler {
- protected:
- rgw::sal::Driver* driver;
- RGWQuotaCache<T> *cache;
- boost::intrusive_ptr<RefCountedWaitObject> waiter;
- public:
- AsyncRefreshHandler(rgw::sal::Driver* _driver, RGWQuotaCache<T> *_cache,
- boost::intrusive_ptr<RefCountedWaitObject> waiter)
- : driver(_driver), cache(_cache), waiter(std::move(waiter)) {}
- virtual ~AsyncRefreshHandler() {}
-
- virtual int init_fetch() = 0;
- virtual void drop_reference() = 0;
- };
-
- virtual AsyncRefreshHandler *allocate_refresh_handler(const rgw_user& user, const rgw_bucket& bucket,
- boost::intrusive_ptr<RefCountedWaitObject> waiter) = 0;
+ /// start an async refresh that will eventually call async_refresh_response or
+ /// async_refresh_fail. hold a reference to the waiter until completion
+ virtual int init_refresh(const rgw_user& user, const rgw_bucket& bucket,
+ boost::intrusive_ptr<RefCountedWaitObject> waiter) = 0;
};
template<class T>
return 0;
}
- AsyncRefreshHandler *handler = allocate_refresh_handler(user, bucket, async_refcount);
-
- int ret = handler->init_fetch();
- if (ret < 0) {
- handler->drop_reference();
- return ret;
- }
-
- return 0;
+ return init_refresh(user, bucket, async_refcount);
}
template<class T>
data_modified(user, bucket);
}
-class BucketAsyncRefreshHandler : public RGWQuotaCache<rgw_bucket>::AsyncRefreshHandler,
- public rgw::sal::ReadStatsCB {
- rgw_user user;
- rgw_bucket bucket;
-public:
- BucketAsyncRefreshHandler(rgw::sal::Driver* _driver, RGWQuotaCache<rgw_bucket> *_cache,
- boost::intrusive_ptr<RefCountedWaitObject> waiter,
- const rgw_user& _user, const rgw_bucket& _bucket)
- : RGWQuotaCache<rgw_bucket>::AsyncRefreshHandler(_driver, _cache, std::move(waiter)),
- user(_user), bucket(_bucket) {}
-
- void drop_reference() override {
- intrusive_ptr_release(this);
- }
- void handle_response(int r, const RGWStorageStats& stats) override;
- int init_fetch() override;
-};
-
-int BucketAsyncRefreshHandler::init_fetch()
-{
- std::unique_ptr<rgw::sal::Bucket> rbucket;
-
- const DoutPrefix dp(driver->ctx(), dout_subsys, "rgw bucket async refresh handler: ");
- int r = driver->load_bucket(&dp, bucket, &rbucket, null_yield);
- if (r < 0) {
- ldpp_dout(&dp, 0) << "could not get bucket info for bucket=" << bucket << " r=" << r << dendl;
- return r;
- }
-
- ldpp_dout(&dp, 20) << "initiating async quota refresh for bucket=" << bucket << dendl;
-
- const auto& index = rbucket->get_info().get_current_index();
- if (is_layout_indexless(index)) {
- return 0;
- }
-
- r = rbucket->read_stats_async(&dp, index, RGW_NO_SHARD, this);
- if (r < 0) {
- ldpp_dout(&dp, 0) << "could not get bucket info for bucket=" << bucket.name << dendl;
-
- /* read_stats_async() dropped our reference already */
- return r;
- }
-
- return 0;
-}
-
-void BucketAsyncRefreshHandler::handle_response(const int r, const RGWStorageStats& stats)
-{
- if (r < 0) {
- ldout(driver->ctx(), 20) << "AsyncRefreshHandler::handle_response() r=" << r << dendl;
- cache->async_refresh_fail(user, bucket);
- return;
- }
-
- cache->async_refresh_response(user, bucket, stats);
-}
-
class RGWBucketStatsCache : public RGWQuotaCache<rgw_bucket> {
protected:
bool map_find(const rgw_user& user, const rgw_bucket& bucket, RGWQuotaCacheStats& qs) override {
explicit RGWBucketStatsCache(rgw::sal::Driver* _driver) : RGWQuotaCache<rgw_bucket>(_driver, _driver->ctx()->_conf->rgw_bucket_quota_cache_size) {
}
- AsyncRefreshHandler *allocate_refresh_handler(const rgw_user& user, const rgw_bucket& bucket,
- boost::intrusive_ptr<RefCountedWaitObject> waiter) override {
- return new BucketAsyncRefreshHandler(driver, this, std::move(waiter), user, bucket);
- }
+ int init_refresh(const rgw_user& user, const rgw_bucket& bucket,
+ boost::intrusive_ptr<RefCountedWaitObject> waiter) override;
};
int RGWBucketStatsCache::fetch_stats_from_storage(const rgw_user& _u, const rgw_bucket& _b, RGWStorageStats& stats, optional_yield y, const DoutPrefixProvider *dpp)
return 0;
}
-class UserAsyncRefreshHandler : public RGWQuotaCache<rgw_user>::AsyncRefreshHandler,
- public rgw::sal::ReadStatsCB {
- const DoutPrefixProvider *dpp;
- rgw_bucket bucket;
+class BucketAsyncRefreshHandler : public rgw::sal::ReadStatsCB {
+ RGWBucketStatsCache* cache;
+ boost::intrusive_ptr<RefCountedWaitObject> waiter;
rgw_user user;
- public:
- UserAsyncRefreshHandler(const DoutPrefixProvider *_dpp, rgw::sal::Driver* _driver,
- RGWQuotaCache<rgw_user> *_cache,
- boost::intrusive_ptr<RefCountedWaitObject> waiter,
- const rgw_user& _user, const rgw_bucket& _bucket)
- : RGWQuotaCache<rgw_user>::AsyncRefreshHandler(_driver, _cache, std::move(waiter)),
- dpp(_dpp), bucket(_bucket), user(_user)
- {}
+ rgw_bucket bucket;
+public:
+ BucketAsyncRefreshHandler(RGWBucketStatsCache* cache,
+ boost::intrusive_ptr<RefCountedWaitObject> waiter,
+ const rgw_user& user, const rgw_bucket& bucket)
+ : cache(cache), waiter(std::move(waiter)), user(user), bucket(bucket) {}
- void drop_reference() override {
- intrusive_ptr_release(this);
+ void handle_response(int r, const RGWStorageStats& stats) override {
+ if (r < 0) {
+ cache->async_refresh_fail(user, bucket);
+ return;
+ }
+
+ cache->async_refresh_response(user, bucket, stats);
}
- int init_fetch() override;
- void handle_response(int r, const RGWStorageStats& stats) override;
};
-int UserAsyncRefreshHandler::init_fetch()
+
+int RGWBucketStatsCache::init_refresh(const rgw_user& user, const rgw_bucket& bucket,
+ boost::intrusive_ptr<RefCountedWaitObject> waiter)
{
- std::unique_ptr<rgw::sal::User> ruser = driver->get_user(user);
+ std::unique_ptr<rgw::sal::Bucket> rbucket;
- ldpp_dout(dpp, 20) << "initiating async quota refresh for user=" << user << dendl;
- int r = ruser->read_stats_async(dpp, this);
+ const DoutPrefix dp(driver->ctx(), dout_subsys, "rgw bucket async refresh handler: ");
+ int r = driver->load_bucket(&dp, bucket, &rbucket, null_yield);
if (r < 0) {
- ldpp_dout(dpp, 0) << "could not get bucket info for user=" << user << dendl;
-
- /* get_bucket_stats_async() dropped our reference already */
+ ldpp_dout(&dp, 0) << "could not get bucket info for bucket=" << bucket << " r=" << r << dendl;
return r;
}
- return 0;
-}
+ ldpp_dout(&dp, 20) << "initiating async quota refresh for bucket=" << bucket << dendl;
-void UserAsyncRefreshHandler::handle_response(int r, const RGWStorageStats& stats)
-{
+ const auto& index = rbucket->get_info().get_current_index();
+ if (is_layout_indexless(index)) {
+ return 0;
+ }
+
+ boost::intrusive_ptr handler = new BucketAsyncRefreshHandler(
+ this, std::move(waiter), user, bucket);
+
+ r = rbucket->read_stats_async(&dp, index, RGW_NO_SHARD, std::move(handler));
if (r < 0) {
- ldout(driver->ctx(), 20) << "AsyncRefreshHandler::handle_response() r=" << r << dendl;
- cache->async_refresh_fail(user, bucket);
- return;
+ ldpp_dout(&dp, 0) << "could not get bucket stats for bucket=" << bucket.name << dendl;
+ return r;
}
- cache->async_refresh_response(user, bucket, stats);
+ return 0;
}
class RGWUserStatsCache : public RGWQuotaCache<rgw_user> {
stop();
}
- AsyncRefreshHandler *allocate_refresh_handler(const rgw_user& user, const rgw_bucket& bucket,
- boost::intrusive_ptr<RefCountedWaitObject> waiter) override {
- return new UserAsyncRefreshHandler(dpp, driver, this, std::move(waiter), user, bucket);
- }
+ int init_refresh(const rgw_user& user, const rgw_bucket& bucket,
+ boost::intrusive_ptr<RefCountedWaitObject> waiter) override;
bool going_down() {
return down_flag;
}
};
+class UserAsyncRefreshHandler : public rgw::sal::ReadStatsCB {
+ RGWUserStatsCache* cache;
+ boost::intrusive_ptr<RefCountedWaitObject> waiter;
+ rgw_bucket bucket;
+ rgw_user user;
+ public:
+ UserAsyncRefreshHandler(RGWUserStatsCache* cache,
+ boost::intrusive_ptr<RefCountedWaitObject> waiter,
+ const rgw_user& user, const rgw_bucket& bucket)
+ : cache(cache), waiter(std::move(waiter)), bucket(bucket), user(user)
+ {}
+
+ void handle_response(int r, const RGWStorageStats& stats) override;
+};
+
+int RGWUserStatsCache::init_refresh(const rgw_user& user, const rgw_bucket& bucket,
+ boost::intrusive_ptr<RefCountedWaitObject> waiter)
+{
+ boost::intrusive_ptr handler = new UserAsyncRefreshHandler(
+ this, std::move(waiter), user, bucket);
+
+ std::unique_ptr<rgw::sal::User> ruser = driver->get_user(user);
+
+ ldpp_dout(dpp, 20) << "initiating async quota refresh for user=" << user << dendl;
+ int r = ruser->read_stats_async(dpp, std::move(handler));
+ if (r < 0) {
+ ldpp_dout(dpp, 0) << "could not get bucket info for user=" << user << dendl;
+ return r;
+ }
+
+ return 0;
+}
+
+void UserAsyncRefreshHandler::handle_response(int r, const RGWStorageStats& stats)
+{
+ if (r < 0) {
+ cache->async_refresh_fail(user, bucket);
+ return;
+ }
+
+ cache->async_refresh_response(user, bucket, stats);
+}
+
int RGWUserStatsCache::fetch_stats_from_storage(const rgw_user& _u,
const rgw_bucket& _b,
RGWStorageStats& stats,