return true;
}
- return bucket_filter->filter(bucket, y);
+ return bucket_filter(bucket, y);
}
int RGWDataChangesLog::add_entry(const RGWBucketInfo& bucket_info, int shard_id) {
}
};
-bool RGWBucketCtl::DataLogFilter::filter(const rgw_bucket& bucket, optional_yield y) const
-{
- return bucket_ctl->bucket_exports_data(bucket, null_yield);
-}
-
RGWBucketCtl::RGWBucketCtl(RGWSI_Zone *zone_svc,
RGWSI_Bucket *bucket_svc,
RGWSI_Bucket_Sync *bucket_sync_svc,
- RGWSI_BucketIndex *bi_svc) : cct(zone_svc->ctx()),
- datalog_filter(this)
+ RGWSI_BucketIndex *bi_svc) : cct(zone_svc->ctx())
{
svc.zone = zone_svc;
svc.bucket = bucket_svc;
bucket_be_handler = bm_handler->get_be_handler();
bi_be_handler = bmi_handler->get_be_handler();
- datalog->set_bucket_filter(&datalog_filter);
+ datalog->set_bucket_filter(
+ [this](const rgw_bucket& bucket, optional_yield y) {
+ return bucket_exports_data(bucket, y);
+ });
}
int RGWBucketCtl::call(std::function<int(RGWSI_Bucket_X_Ctx& ctx)> f) {
struct rgw_data_change {
DataLogEntityType entity_type;
- string key;
- real_time timestamp;
+ std::string key;
+ ceph::real_time timestamp;
- void encode(bufferlist& bl) const {
+ void encode(ceph::buffer::list& bl) const {
ENCODE_START(1, 1, bl);
- uint8_t t = (uint8_t)entity_type;
+ auto t = std::uint8_t(entity_type);
encode(t, bl);
encode(key, bl);
encode(timestamp, bl);
void decode(bufferlist::const_iterator& bl) {
DECODE_START(1, bl);
- uint8_t t;
+ std::uint8_t t;
decode(t, bl);
- entity_type = (DataLogEntityType)t;
+ entity_type = DataLogEntityType(t);
decode(key, bl);
decode(timestamp, bl);
DECODE_FINISH(bl);
}
- void dump(Formatter *f) const;
- void decode_json(JSONObj *obj);
+ void dump(ceph::Formatter* f) const;
+ void decode_json(JSONObj* obj);
};
WRITE_CLASS_ENCODER(rgw_data_change)
struct rgw_data_change_log_entry {
- string log_id;
- real_time log_timestamp;
+ std::string log_id;
+ ceph::real_time log_timestamp;
rgw_data_change entry;
- void encode(bufferlist& bl) const {
+ void encode(ceph::buffer::list& bl) const {
ENCODE_START(1, 1, bl);
encode(log_id, bl);
encode(log_timestamp, bl);
ENCODE_FINISH(bl);
}
- void decode(bufferlist::const_iterator& bl) {
+ void decode(ceph::buffer::list::const_iterator& bl) {
DECODE_START(1, bl);
decode(log_id, bl);
decode(log_timestamp, bl);
DECODE_FINISH(bl);
}
- void dump(Formatter *f) const;
- void decode_json(JSONObj *obj);
+ void dump(ceph::Formatter* f) const;
+ void decode_json(JSONObj* obj);
};
WRITE_CLASS_ENCODER(rgw_data_change_log_entry)
struct RGWDataChangesLogInfo {
- string marker;
- real_time last_update;
+ std::string marker;
+ ceph::real_time last_update;
- void dump(Formatter *f) const;
- void decode_json(JSONObj *obj);
+ void dump(ceph::Formatter* f) const;
+ void decode_json(JSONObj* obj);
};
namespace rgw {
}
struct RGWDataChangesLogMarker {
- int shard;
- string marker;
+ int shard = 0;
+ std::string marker;
- RGWDataChangesLogMarker() : shard(0) {}
+ RGWDataChangesLogMarker() = default;
};
class RGWDataChangesLog {
-public:
- class BucketFilter {
- public:
- virtual ~BucketFilter() {}
-
- virtual bool filter(const rgw_bucket& bucket, optional_yield y) const = 0;
- };
-private:
-
CephContext *cct;
rgw::BucketChangeObserver *observer = nullptr;
} svc;
int num_shards;
- string *oids;
+ std::string* oids;
ceph::mutex lock = ceph::make_mutex("RGWDataChangesLog::lock");
ceph::shared_mutex modified_lock =
ceph::make_shared_mutex("RGWDataChangesLog::modified_lock");
- map<int, set<string> > modified_shards;
+ std::map<int, set<string> > modified_shards;
std::atomic<bool> down_flag = { false };
struct ChangeStatus {
std::shared_ptr<const rgw_sync_policy_info> sync_policy;
- real_time cur_expiration;
- real_time cur_sent;
+ ceph::real_time cur_expiration;
+ ceph::real_time cur_sent;
bool pending = false;
RefCountedCond *cond = nullptr;
ceph::mutex lock =
ceph::make_mutex("RGWDataChangesLog::ChangeStatus");
};
- typedef std::shared_ptr<ChangeStatus> ChangeStatusPtr;
+ using ChangeStatusPtr = std::shared_ptr<ChangeStatus>;
lru_map<rgw_bucket_shard, ChangeStatusPtr> changes;
- map<rgw_bucket_shard, bool> cur_cycle;
+ std::map<rgw_bucket_shard, bool> cur_cycle;
void _get_change(const rgw_bucket_shard& bs, ChangeStatusPtr& status);
void register_renew(rgw_bucket_shard& bs);
- void update_renewed(rgw_bucket_shard& bs, real_time& expiration);
+ void update_renewed(rgw_bucket_shard& bs, ceph::real_time& expiration);
class ChangesRenewThread : public Thread {
CephContext *cct;
void stop();
};
- ChangesRenewThread *renew_thread;
+ ChangesRenewThread* renew_thread;
- BucketFilter *bucket_filter{nullptr};
+ std::function<bool(const rgw_bucket& bucket, optional_yield y)> bucket_filter;
public:
bool going_down();
- void set_bucket_filter(BucketFilter *f) {
- bucket_filter = f;
+ void set_bucket_filter(decltype(bucket_filter)&& f) {
+ bucket_filter = std::move(f);
}
bool filter_bucket(const rgw_bucket& bucket, optional_yield y) const;
int call(std::function<int(RGWSI_Bucket_X_Ctx& ctx)> f);
- class DataLogFilter : public RGWDataChangesLog::BucketFilter {
- RGWBucketCtl *bucket_ctl;
- public:
- DataLogFilter(RGWBucketCtl *_bucket_ctl) : bucket_ctl(_bucket_ctl) {}
-
- bool filter(const rgw_bucket& bucket, optional_yield y) const override;
- } datalog_filter;
-
public:
RGWBucketCtl(RGWSI_Zone *zone_svc,
RGWSI_Bucket *bucket_svc,