From f05d85342c9cd735526f9461c08564a13333ed78 Mon Sep 17 00:00:00 2001 From: Yehuda Sadeh Date: Fri, 15 Feb 2019 15:23:32 -0800 Subject: [PATCH] rgw: store data log entry if bucket sync policy not empty Signed-off-by: Yehuda Sadeh --- src/rgw/rgw_bucket.cc | 9 +++++++-- src/rgw/rgw_bucket.h | 3 ++- src/rgw/rgw_rados.cc | 12 ++++++------ src/rgw/services/svc_bi_rados.cc | 2 +- 4 files changed, 16 insertions(+), 10 deletions(-) diff --git a/src/rgw/rgw_bucket.cc b/src/rgw/rgw_bucket.cc index 77cf9de5740..fd433b07bc1 100644 --- a/src/rgw/rgw_bucket.cc +++ b/src/rgw/rgw_bucket.cc @@ -26,6 +26,7 @@ #include "rgw_string.h" #include "rgw_multi.h" #include "rgw_op.h" +#include "rgw_bucket_sync.h" #include "services/svc_zone.h" #include "services/svc_sys_obj.h" @@ -2178,9 +2179,13 @@ int RGWDataChangesLog::get_log_shard_id(rgw_bucket& bucket, int shard_id) { return choose_oid(bs); } -int RGWDataChangesLog::add_entry(const rgw_bucket& bucket, int shard_id) { - if (!svc.zone->need_to_log_data()) +int RGWDataChangesLog::add_entry(const RGWBucketInfo& bucket_info, int shard_id) { + if (!svc.zone->need_to_log_data() && + (!bucket_info.sync_policy || !bucket_info.sync_policy->zone_is_source(svc.zone->zone_id()))) { return 0; + } + + auto& bucket = bucket_info.bucket; if (observer) { observer->on_bucket_changed(bucket.get_key()); diff --git a/src/rgw/rgw_bucket.h b/src/rgw/rgw_bucket.h index 1c6bd7cc49e..a8e782f07e1 100644 --- a/src/rgw/rgw_bucket.h +++ b/src/rgw/rgw_bucket.h @@ -511,6 +511,7 @@ class RGWDataChangesLog { std::atomic down_flag = { false }; struct ChangeStatus { + std::shared_ptr sync_policy; real_time cur_expiration; real_time cur_sent; bool pending = false; @@ -550,7 +551,7 @@ public: int choose_oid(const rgw_bucket_shard& bs); const std::string& get_oid(int shard_id) const { return oids[shard_id]; } - int add_entry(const rgw_bucket& bucket, int shard_id); + int add_entry(const RGWBucketInfo& bucket_info, int shard_id); int get_log_shard_id(rgw_bucket& bucket, int shard_id); int renew_entries(); int list_entries(int shard, const real_time& start_time, const real_time& end_time, int max_entries, diff --git a/src/rgw/rgw_rados.cc b/src/rgw/rgw_rados.cc index 0d6b0d0fced..5032ac120d3 100644 --- a/src/rgw/rgw_rados.cc +++ b/src/rgw/rgw_rados.cc @@ -835,7 +835,7 @@ int RGWIndexCompletionThread::process() /* ignoring error, can't do anything about it */ continue; } - r = store->svc.datalog_rados->add_entry(bs.bucket, bs.shard_id); + r = store->svc.datalog_rados->add_entry(bucket_info, bs.shard_id); if (r < 0) { lderr(store->ctx()) << "ERROR: failed writing data log" << dendl; } @@ -4830,7 +4830,7 @@ int RGWRados::Object::Delete::delete_obj(optional_yield y) } if (target->bucket_info.datasync_flag_enabled()) { - r = store->svc.datalog_rados->add_entry(bs->bucket, bs->shard_id); + r = store->data_log->add_entry(target->bucket_info, bs->shard_id); if (r < 0) { lderr(store->ctx()) << "ERROR: failed writing data log" << dendl; return r; @@ -5902,7 +5902,7 @@ int RGWRados::Bucket::UpdateIndex::complete(int64_t poolid, uint64_t epoch, ret = store->cls_obj_complete_add(*bs, obj, optag, poolid, epoch, ent, category, remove_objs, bilog_flags, zones_trace); if (target->bucket_info.datasync_flag_enabled()) { - int r = store->svc.datalog_rados->add_entry(bs->bucket, bs->shard_id); + int r = store->data_log->add_entry(target->bucket_info, bs->shard_id); if (r < 0) { lderr(store->ctx()) << "ERROR: failed writing data log" << dendl; } @@ -5930,7 +5930,7 @@ int RGWRados::Bucket::UpdateIndex::complete_del(int64_t poolid, uint64_t epoch, ret = store->cls_obj_complete_del(*bs, optag, poolid, epoch, obj, removed_mtime, remove_objs, bilog_flags, zones_trace); if (target->bucket_info.datasync_flag_enabled()) { - int r = store->svc.datalog_rados->add_entry(bs->bucket, bs->shard_id); + int r = store->data_log->add_entry(target->bucket_info, bs->shard_id); if (r < 0) { lderr(store->ctx()) << "ERROR: failed writing data log" << dendl; } @@ -5958,7 +5958,7 @@ int RGWRados::Bucket::UpdateIndex::cancel() * have no way to tell that they're all caught up */ if (target->bucket_info.datasync_flag_enabled()) { - int r = store->svc.datalog_rados->add_entry(bs->bucket, bs->shard_id); + int r = store->data_log->add_entry(target->bucket_info, bs->shard_id); if (r < 0) { lderr(store->ctx()) << "ERROR: failed writing data log" << dendl; } @@ -6602,7 +6602,7 @@ int RGWRados::bucket_index_link_olh(const RGWBucketInfo& bucket_info, RGWObjStat } if (log_data_change && bucket_info.datasync_flag_enabled()) { - svc.datalog_rados->add_entry(bs.bucket, bs.shard_id); + data_log->add_entry(bucket_info, bs.shard_id); } return 0; diff --git a/src/rgw/services/svc_bi_rados.cc b/src/rgw/services/svc_bi_rados.cc index fe8aa5ea891..956deeb26be 100644 --- a/src/rgw/services/svc_bi_rados.cc +++ b/src/rgw/services/svc_bi_rados.cc @@ -427,7 +427,7 @@ int RGWSI_BucketIndex_RADOS::handle_overwrite(const RGWBucketInfo& info, } for (int i = 0; i < shards_num; ++i, ++shard_id) { - ret = svc.datalog_rados->add_entry(info.bucket, shard_id); + ret = svc.datalog_rados->add_entry(info, shard_id); if (ret < 0) { lderr(cct) << "ERROR: failed writing data log (info.bucket=" << info.bucket << ", shard_id=" << shard_id << ")" << dendl; return ret; -- 2.39.5