return choose_oid(bs);
}
-int RGWDataChangesLog::add_entry(const RGWBucketInfo& bucket_info, int shard_id) {
-#warning FIXME
-#if 0
- if (!ctl.bucket->bucket_exports_data(bucket_info.bucket, null_yield)) {
- return 0;
+bool RGWDataChangesLog::filter_bucket(const rgw_bucket& bucket, optional_yield y) const
+{
+ if (!bucket_filter) {
+ return true;
}
-#endif
+ return bucket_filter->filter(bucket, y);
+}
+
+int RGWDataChangesLog::add_entry(const RGWBucketInfo& bucket_info, int shard_id) {
auto& bucket = bucket_info.bucket;
+ if (!filter_bucket(bucket, null_yield)) {
+ return 0;
+ }
+
if (observer) {
observer->on_bucket_changed(bucket.get_key());
}
}
};
+bool RGWBucketCtl::DataLogFilter::filter(const rgw_bucket& bucket, optional_yield y) const
+{
+ return bucket_ctl->bucket_exports_data(bucket, null_yield);
+}
+
RGWBucketCtl::RGWBucketCtl(RGWSI_Zone *zone_svc,
RGWSI_Bucket *bucket_svc,
RGWSI_Bucket_Sync *bucket_sync_svc,
- RGWSI_BucketIndex *bi_svc) : cct(zone_svc->ctx())
+ RGWSI_BucketIndex *bi_svc) : cct(zone_svc->ctx()),
+ datalog_filter(this)
{
svc.zone = zone_svc;
svc.bucket = bucket_svc;
void RGWBucketCtl::init(RGWUserCtl *user_ctl,
RGWBucketMetadataHandler *_bm_handler,
- RGWBucketInstanceMetadataHandler *_bmi_handler)
+ RGWBucketInstanceMetadataHandler *_bmi_handler,
+ RGWDataChangesLog *datalog)
{
ctl.user = user_ctl;
bucket_be_handler = bm_handler->get_be_handler();
bi_be_handler = bmi_handler->get_be_handler();
+
+ datalog->set_bucket_filter(&datalog_filter);
}
int RGWBucketCtl::call(std::function<int(RGWSI_Bucket_X_Ctx& ctx)> f) {
class RGWBucketMetadataHandler;
class RGWBucketInstanceMetadataHandler;
class RGWUserCtl;
+class RGWBucketCtl;
+
namespace rgw { namespace sal {
class RGWRadosStore;
class RGWBucketList;
};
class RGWDataChangesLog {
+public:
+ class BucketFilter {
+ public:
+ virtual ~BucketFilter() {}
+
+ virtual bool filter(const rgw_bucket& bucket, optional_yield y) const = 0;
+ };
+private:
+
CephContext *cct;
rgw::BucketChangeObserver *observer = nullptr;
ChangesRenewThread *renew_thread;
+ BucketFilter *bucket_filter{nullptr};
+
public:
RGWDataChangesLog(RGWSI_Zone *zone_svc, RGWSI_Cls *cls_svc);
}
bool going_down();
+
+ void set_bucket_filter(BucketFilter *f) {
+ bucket_filter = f;
+ }
+
+ bool filter_bucket(const rgw_bucket& bucket, optional_yield y) const;
};
struct rgw_ep_info {
RGWSI_BucketInstance_BE_Handler bi_be_handler; /* bucket instance backend handler */
int call(std::function<int(RGWSI_Bucket_X_Ctx& ctx)> f);
+
+ class DataLogFilter : public RGWDataChangesLog::BucketFilter {
+ RGWBucketCtl *bucket_ctl;
+ public:
+ DataLogFilter(RGWBucketCtl *_bucket_ctl) : bucket_ctl(_bucket_ctl) {}
+
+ bool filter(const rgw_bucket& bucket, optional_yield y) const override;
+ } datalog_filter;
public:
RGWBucketCtl(RGWSI_Zone *zone_svc,
void init(RGWUserCtl *user_ctl,
RGWBucketMetadataHandler *_bm_handler,
- RGWBucketInstanceMetadataHandler *_bmi_handler);
+ RGWBucketInstanceMetadataHandler *_bmi_handler,
+ RGWDataChangesLog *datalog);
struct Bucket {
struct GetParams {
bool bucket_is_sync_source() const {
- return !targets.empty();
+ return !targets.empty() || !resolved_dests.empty();
}
bool bucket_is_sync_target() const {
user->init(bucket.get());
bucket->init(user.get(),
(RGWBucketMetadataHandler *)bucket_meta_handler,
- (RGWBucketInstanceMetadataHandler *)bi_meta_handler);
+ (RGWBucketInstanceMetadataHandler *)bi_meta_handler,
+ svc.datalog_rados->get_log());
otp->init((RGWOTPMetadataHandler *)meta.otp.get());