From: Shilpa Jagannath Date: Mon, 15 Feb 2021 14:46:29 +0000 (+0530) Subject: rgw: adding generation number to async notification X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=0ca8bb72cdeeb123fbab588c6faa450ba2da24de;p=ceph.git rgw: adding generation number to async notification Signed-off-by: Shilpa Jagannath --- diff --git a/src/rgw/rgw_cr_rados.cc b/src/rgw/rgw_cr_rados.cc index 52519600141b1..1c18d3ea1aa24 100644 --- a/src/rgw/rgw_cr_rados.cc +++ b/src/rgw/rgw_cr_rados.cc @@ -8,6 +8,10 @@ #include "rgw_cr_rados.h" #include "rgw_sync_counters.h" #include "rgw_bucket.h" +#include "rgw_datalog_notify.h" +#include "rgw_cr_rest.h" +#include "rgw_rest_conn.h" +#include "rgw_rados.h" #include "services/svc_zone.h" #include "services/svc_zone_utils.h" @@ -18,6 +22,7 @@ #include "cls/rgw/cls_rgw_client.h" #include +#include #define dout_context g_ceph_context #define dout_subsys ceph_subsys_rgw @@ -982,3 +987,34 @@ int RGWRadosNotifyCR::request_complete() return r; } + + +int RGWDataPostNotifyCR::operate(const DoutPrefixProvider* dpp) +{ + reenter(this) { + using PostNotify2 = RGWPostRESTResourceCR>, int>; + yield { + rgw_http_param_pair pairs[] = { { "type", "data" }, + { "notify2", NULL }, + { "source-zone", source_zone }, + { NULL, NULL } }; + call(new PostNotify2(store->ctx(), conn, &http_manager, "/admin/log", pairs, shards, nullptr)); + } + if (retcode == -ERR_METHOD_NOT_ALLOWED) { + using PostNotify1 = RGWPostRESTResourceCR; + yield { + rgw_http_param_pair pairs[] = { { "type", "data" }, + { "notify", NULL }, + { "source-zone", source_zone }, + { NULL, NULL } }; + auto encoder = rgw_data_notify_v1_encoder{shards}; + call(new PostNotify1(store->ctx(), conn, &http_manager, "/admin/log", pairs, encoder, nullptr)); + } + } + if (retcode < 0) { + return set_cr_error(retcode); + } + return set_cr_done(); + } + return 0; +} diff --git a/src/rgw/rgw_cr_rados.h b/src/rgw/rgw_cr_rados.h index f88b3b556cc06..8dc5a9d17d270 100644 --- a/src/rgw/rgw_cr_rados.h +++ b/src/rgw/rgw_cr_rados.h @@ -19,6 +19,9 @@ #define dout_subsys ceph_subsys_rgw +struct rgw_http_param_pair; +class RGWRESTConn; + class RGWAsyncRadosRequest : public RefCountedObject { RGWCoroutine *caller; RGWAioCompletionNotifier *notifier; @@ -1467,4 +1470,20 @@ public: int request_complete() override; }; +class RGWDataPostNotifyCR : public RGWCoroutine { + RGWRados *store; + RGWHTTPManager& http_manager; + bc::flat_map >& shards; + const char *source_zone; + RGWRESTConn *conn; + +public: + RGWDataPostNotifyCR(RGWRados *_store, RGWHTTPManager& _http_manager, bc::flat_map >& _shards, const char *_zone, RGWRESTConn *_conn) + : RGWCoroutine(_store->ctx()), store(_store), http_manager(_http_manager), + shards(_shards), source_zone(_zone), conn(_conn) {} + + int operate(const DoutPrefixProvider* dpp) override; +}; + #endif diff --git a/src/rgw/rgw_data_sync.cc b/src/rgw/rgw_data_sync.cc index bc3aa81228851..c783dcbc19a8b 100644 --- a/src/rgw/rgw_data_sync.cc +++ b/src/rgw/rgw_data_sync.cc @@ -1430,10 +1430,10 @@ class RGWDataSyncShardCR : public RGWCoroutine { boost::asio::coroutine full_cr; - set modified_shards; - set current_modified; + bc::flat_set modified_shards; + bc::flat_set current_modified; - set::iterator modified_iter; + bc::flat_set::iterator modified_iter; uint64_t total_entries = 0; static constexpr int spawn_window = BUCKET_SHARD_SYNC_SPAWN_WINDOW; @@ -1493,9 +1493,9 @@ public: } } - void append_modified_shards(set& keys) { + void append_modified_shards(bc::flat_set& entries) { std::lock_guard l{inc_lock}; - modified_shards.insert(keys.begin(), keys.end()); + modified_shards.insert(entries.begin(), entries.end()); } int operate(const DoutPrefixProvider *dpp) override { @@ -1671,13 +1671,13 @@ public: } /* process out of band updates */ for (modified_iter = current_modified.begin(); modified_iter != current_modified.end(); ++modified_iter) { - retcode = parse_bucket_key(*modified_iter, source_bs); + retcode = parse_bucket_key(modified_iter->key, source_bs); if (retcode < 0) { - tn->log(1, SSTR("failed to parse bucket shard: " << *modified_iter)); + tn->log(1, SSTR("failed to parse bucket shard: " << modified_iter->key)); continue; } - tn->log(20, SSTR("received async update notification: " << *modified_iter)); - spawn(sync_single_entry(source_bs, std::nullopt, string(), + tn->log(20, SSTR("received async update notification: " << modified_iter->key)); + spawn(sync_single_entry(source_bs, modified_iter->gen, string(), ceph::real_time{}, false), false); } @@ -1816,7 +1816,7 @@ public: &sync_marker); } - void append_modified_shards(set& keys) { + void append_modified_shards(bc::flat_set& keys) { std::lock_guard l{cr_lock()}; RGWDataSyncShardCR *cr = static_cast(get_cr()); @@ -1949,13 +1949,13 @@ public: sync_status.sync_info); } - void wakeup(int shard_id, set& keys) { + void wakeup(int shard_id, bc::flat_set& entries) { std::lock_guard l{shard_crs_lock}; map::iterator iter = shard_crs.find(shard_id); if (iter == shard_crs.end()) { return; } - iter->second->append_modified_shards(keys); + iter->second->append_modified_shards(entries); iter->second->wakeup(); } }; @@ -2571,7 +2571,7 @@ public: return new RGWDataSyncCR(sc, num_shards, tn, backoff_ptr()); } - void wakeup(int shard_id, set& keys) { + void wakeup(int shard_id, bc::flat_set& entries) { ceph::mutex& m = cr_lock(); m.lock(); @@ -2585,20 +2585,19 @@ public: m.unlock(); if (cr) { - tn->log(20, SSTR("notify shard=" << shard_id << " keys=" << keys)); - cr->wakeup(shard_id, keys); + cr->wakeup(shard_id, entries); } cr->put(); } }; -void RGWRemoteDataLog::wakeup(int shard_id, set& keys) { +void RGWRemoteDataLog::wakeup(int shard_id, bc::flat_set& entries) { std::shared_lock rl{lock}; if (!data_sync_cr) { return; } - data_sync_cr->wakeup(shard_id, keys); + data_sync_cr->wakeup(shard_id, entries); } int RGWRemoteDataLog::run_sync(const DoutPrefixProvider *dpp, int num_shards) diff --git a/src/rgw/rgw_data_sync.h b/src/rgw/rgw_data_sync.h index 4c97c0d8d2795..97533b4d82c8e 100644 --- a/src/rgw/rgw_data_sync.h +++ b/src/rgw/rgw_data_sync.h @@ -389,7 +389,7 @@ public: int init_sync_status(const DoutPrefixProvider *dpp, int num_shards); int run_sync(const DoutPrefixProvider *dpp, int num_shards); - void wakeup(int shard_id, std::set& keys); + void wakeup(int shard_id, bc::flat_set& entries); }; class RGWDataSyncStatusManager : public DoutPrefixProvider { @@ -456,7 +456,8 @@ public: int run(const DoutPrefixProvider *dpp) { return source_log.run_sync(dpp, num_shards); } - void wakeup(int shard_id, std::set& keys) { return source_log.wakeup(shard_id, keys); } + void wakeup(int shard_id, bc::flat_set& entries) { return source_log.wakeup(shard_id, entries); } + void stop() { source_log.finish(); } diff --git a/src/rgw/rgw_datalog.cc b/src/rgw/rgw_datalog.cc index 7d9e03a663e4a..0b381313ee562 100644 --- a/src/rgw/rgw_datalog.cc +++ b/src/rgw/rgw_datalog.cc @@ -76,6 +76,17 @@ void rgw_data_change_log_entry::decode_json(JSONObj *obj) { JSONDecoder::decode_json("entry", entry, obj); } +void rgw_data_notify_entry::dump(Formatter *f) const +{ + encode_json("key", key, f); + encode_json("gen", gen, f); +} + +void rgw_data_notify_entry::decode_json(JSONObj *obj) { + JSONDecoder::decode_json("key", key, obj); + JSONDecoder::decode_json("gen", gen, obj); +} + class RGWDataChangesOmap final : public RGWDataChangesBE { using centries = std::list; std::vector oids; @@ -621,7 +632,8 @@ int RGWDataChangesLog::add_entry(const DoutPrefixProvider *dpp, rgw_bucket_shard bs(bucket, shard_id); int index = choose_oid(bs); - mark_modified(index, bs); + + mark_modified(index, bs, gen.gen); std::unique_lock l(lock); @@ -998,19 +1010,19 @@ void RGWDataChangesLog::renew_stop() renew_cond.notify_all(); } -void RGWDataChangesLog::mark_modified(int shard_id, const rgw_bucket_shard& bs) +void RGWDataChangesLog::mark_modified(int shard_id, const rgw_bucket_shard& bs, uint64_t gen) { auto key = bs.get_key(); { std::shared_lock rl{modified_lock}; // read lock to check for existence auto shard = modified_shards.find(shard_id); - if (shard != modified_shards.end() && shard->second.count(key)) { + if (shard != modified_shards.end() && shard->second.count({key, gen})) { return; } } std::unique_lock wl{modified_lock}; // write lock for insertion - modified_shards[shard_id].insert(key); + modified_shards[shard_id].insert(rgw_data_notify_entry{key, gen}); } std::string RGWDataChangesLog::max_marker() const { diff --git a/src/rgw/rgw_datalog.h b/src/rgw/rgw_datalog.h index 35822e1c7f42f..423316bee0d34 100644 --- a/src/rgw/rgw_datalog.h +++ b/src/rgw/rgw_datalog.h @@ -13,6 +13,7 @@ #include #include +#include #include #include @@ -138,7 +139,7 @@ struct rgw_data_notify_entry { rgw_data_notify_entry& operator=(const rgw_data_notify_entry&) = default; - bool operator<(const rgw_data_notify_entry& d) const { + bool operator <(const rgw_data_notify_entry& d) const { if (key < d.key) { return true; } @@ -147,6 +148,10 @@ struct rgw_data_notify_entry { } return gen < d.gen; } + friend std::ostream& operator <<(std::ostream& m, + const rgw_data_notify_entry& e) { + return m << "[key: " << e.key << ", gen: " << e.gen << "]"; + } }; class RGWDataChangesBE; @@ -214,7 +219,7 @@ class RGWDataChangesLog { ceph::mutex lock = ceph::make_mutex("RGWDataChangesLog::lock"); ceph::shared_mutex modified_lock = ceph::make_shared_mutex("RGWDataChangesLog::modified_lock"); - bc::flat_map> modified_shards; + bc::flat_map> modified_shards; std::atomic down_flag = { false }; @@ -276,7 +281,7 @@ public: std::vector& entries, LogMarker& marker, bool* ptruncated); - void mark_modified(int shard_id, const rgw_bucket_shard& bs); + void mark_modified(int shard_id, const rgw_bucket_shard& bs, uint64_t gen); auto read_clear_modified() { std::unique_lock wl{modified_lock}; decltype(modified_shards) modified; diff --git a/src/rgw/rgw_http_errors.h b/src/rgw/rgw_http_errors.h index a06d20529ade4..d8674552ab6a3 100644 --- a/src/rgw/rgw_http_errors.h +++ b/src/rgw/rgw_http_errors.h @@ -31,6 +31,8 @@ static inline int rgw_http_error_to_errno(int http_err) return -EACCES; case 404: return -ENOENT; + case 405: + return -ERR_METHOD_NOT_ALLOWED; case 409: return -ENOTEMPTY; case 503: diff --git a/src/rgw/rgw_rados.cc b/src/rgw/rgw_rados.cc index 96518eab01c83..cac073308f7b0 100644 --- a/src/rgw/rgw_rados.cc +++ b/src/rgw/rgw_rados.cc @@ -47,6 +47,7 @@ #include "rgw_etag_verifier.h" #include "rgw_worker.h" #include "rgw_notify.h" +#include "rgw_http_errors.h" #undef fork // fails to compile RGWPeriod::fork() below @@ -74,6 +75,7 @@ using namespace librados; #include "rgw_data_sync.h" #include "rgw_realm_watcher.h" #include "rgw_reshard.h" +#include "rgw_cr_rados.h" #include "services/svc_zone.h" #include "services/svc_zone_utils.h" @@ -329,20 +331,17 @@ public: } int notify_all(const DoutPrefixProvider *dpp, map& conn_map, - bc::flat_map >& shards) { - rgw_http_param_pair pairs[] = { { "type", "data" }, - { "notify", NULL }, - { "source-zone", store->svc.zone->get_zone_params().get_id().c_str() }, - { NULL, NULL } }; + bc::flat_map >& shards) { list stacks; + const char *source_zone = store->svc.zone->get_zone_params().get_id().c_str(); for (auto iter = conn_map.begin(); iter != conn_map.end(); ++iter) { RGWRESTConn *conn = iter->second; RGWCoroutinesStack *stack = new RGWCoroutinesStack(store->ctx(), this); - stack->call(new RGWPostRESTResourceCR >, int>(store->ctx(), conn, &http_manager, "/admin/log", pairs, shards, NULL)); - + stack->call(new RGWDataPostNotifyCR(store, http_manager, shards, source_zone, conn)); stacks.push_back(stack); } + return run(dpp, stacks); } }; @@ -441,6 +440,7 @@ int RGWMetaNotifier::process(const DoutPrefixProvider *dpp) class RGWDataNotifier : public RGWRadosThread { RGWDataNotifierManager notify_mgr; + bc::flat_set entry; uint64_t interval_msec() override { return cct->_conf.get_val("rgw_data_notify_interval_msec"); @@ -467,9 +467,12 @@ int RGWDataNotifier::process(const DoutPrefixProvider *dpp) return 0; } - for (const auto& [shard_id, keys] : shards) { - ldpp_dout(dpp, 20) << __func__ << "(): notifying datalog change, shard_id=" - << shard_id << ": " << keys << dendl; + for (const auto& [shard_id, entries] : shards) { + bc::flat_set::iterator it; + for (const auto& entry : entries) { + ldpp_dout(dpp, 20) << __func__ << "(): notifying datalog change, shard_id=" + << shard_id << ":" << entry.gen << ":" << entry.key << dendl; + } } notify_mgr.notify_all(dpp, store->svc.zone->get_zone_data_notify_to_map(), shards); @@ -547,11 +550,12 @@ public: sync(_store, async_rados, source_zone->id, counters.get()), initialized(false) {} - void wakeup_sync_shards(map >& shard_ids) { - for (map >::iterator iter = shard_ids.begin(); iter != shard_ids.end(); ++iter) { + void wakeup_sync_shards(bc::flat_map >& entries) { + for (bc::flat_map >::iterator iter = entries.begin(); iter != entries.end(); ++iter) { sync.wakeup(iter->first, iter->second); } } + RGWDataSyncStatusManager* get_manager() { return &sync; } int init(const DoutPrefixProvider *dpp) override { @@ -645,9 +649,18 @@ void RGWRados::wakeup_meta_sync_shards(set& shard_ids) } } -void RGWRados::wakeup_data_sync_shards(const DoutPrefixProvider *dpp, const rgw_zone_id& source_zone, map >& shard_ids) +void RGWRados::wakeup_data_sync_shards(const DoutPrefixProvider *dpp, const rgw_zone_id& source_zone, bc::flat_map >& entries) { - ldpp_dout(dpp, 20) << __func__ << ": source_zone=" << source_zone << ", shard_ids=" << shard_ids << dendl; + ldpp_dout(dpp, 20) << __func__ << ": source_zone=" << source_zone << ", entries=" << entries << dendl; + for (bc::flat_map >::iterator iter = entries.begin(); iter != entries.end(); ++iter) { + ldpp_dout(dpp, 20) << __func__ << "(): updated shard=" << iter->first << dendl; + bc::flat_set& entries = iter->second; + for (const auto& [key, gen] : entries) { + ldpp_dout(dpp, 20) << __func__ << ": source_zone=" << source_zone << ", key=" << key + << ", gen=" << gen << dendl; + } + } + std::lock_guard l{data_sync_thread_lock}; auto iter = data_sync_processor_threads.find(source_zone); if (iter == data_sync_processor_threads.end()) { @@ -657,7 +670,7 @@ void RGWRados::wakeup_data_sync_shards(const DoutPrefixProvider *dpp, const rgw_ RGWDataSyncProcessorThread *thread = iter->second; ceph_assert(thread); - thread->wakeup_sync_shards(shard_ids); + thread->wakeup_sync_shards(entries); } RGWMetaSyncStatusManager* RGWRados::get_meta_sync_manager() diff --git a/src/rgw/rgw_rados.h b/src/rgw/rgw_rados.h index 532ea398128be..73d0203e543fd 100644 --- a/src/rgw/rgw_rados.h +++ b/src/rgw/rgw_rados.h @@ -6,6 +6,7 @@ #include #include +#include #include "include/rados/librados.hpp" #include "include/Context.h" @@ -1234,9 +1235,8 @@ public: int delete_bucket(RGWBucketInfo& bucket_info, RGWObjVersionTracker& objv_tracker, optional_yield y, const DoutPrefixProvider *dpp, bool check_empty = true); void wakeup_meta_sync_shards(std::set& shard_ids); - void wakeup_data_sync_shards(const DoutPrefixProvider *dpp, - const rgw_zone_id& source_zone, - std::map >& shard_ids); + + void wakeup_data_sync_shards(const DoutPrefixProvider *dpp, const rgw_zone_id& source_zone, bc::flat_map >& entries); RGWMetaSyncStatusManager* get_meta_sync_manager(); RGWDataSyncStatusManager* get_data_sync_manager(const rgw_zone_id& source_zone); diff --git a/src/rgw/rgw_rest_log.cc b/src/rgw/rgw_rest_log.cc index 1554a3a8243ef..c7f672f8724ec 100644 --- a/src/rgw/rgw_rest_log.cc +++ b/src/rgw/rgw_rest_log.cc @@ -25,6 +25,7 @@ #include "rgw_common.h" #include "rgw_zone.h" #include "rgw_mdlog.h" +#include "rgw_datalog_notify.h" #include "services/svc_zone.h" #include "services/svc_mdlog.h" @@ -761,7 +762,56 @@ void RGWOp_DATALog_Notify::execute(optional_yield y) { return; } - map > updated_shards; + bc::flat_map> updated_shards; + try { + auto decoder = rgw_data_notify_v1_decoder{updated_shards}; + decode_json_obj(decoder, &p); + } catch (JSONDecoder::err& err) { + ldpp_dout(this, 0) << "ERROR: failed to decode JSON" << dendl; + op_ret = -EINVAL; + return; + } + + if (store->ctx()->_conf->subsys.should_gather()) { + for (bc::flat_map >::iterator iter = updated_shards.begin(); iter != updated_shards.end(); ++iter) { + ldpp_dout(this, 20) << __func__ << "(): updated shard=" << iter->first << dendl; + bc::flat_set& entries = iter->second; + for (const auto& [key, gen] : entries) { + ldpp_dout(this, 20) << __func__ << "(): modified key=" << key + << " of gen=" << gen << dendl; + } + } + } + + store->wakeup_data_sync_shards(this, source_zone, updated_shards); + + op_ret = 0; +} + +void RGWOp_DATALog_Notify2::execute(optional_yield y) { + string source_zone = s->info.args.get("source-zone"); +#define LARGE_ENOUGH_BUF (128 * 1024) + + int r = 0; + bufferlist data; + std::tie(r, data) = rgw_rest_read_all_input(s, LARGE_ENOUGH_BUF); + if (r < 0) { + op_ret = r; + return; + } + + char* buf = data.c_str(); + ldout(s->cct, 20) << __func__ << "(): read data: " << buf << dendl; + + JSONParser p; + r = p.parse(buf, data.length()); + if (r < 0) { + ldout(s->cct, 0) << "ERROR: failed to parse JSON" << dendl; + op_ret = r; + return; + } + + bc::flat_map > updated_shards; try { decode_json_obj(updated_shards, &p); } catch (JSONDecoder::err& err) { @@ -771,11 +821,13 @@ void RGWOp_DATALog_Notify::execute(optional_yield y) { } if (store->ctx()->_conf->subsys.should_gather()) { - for (map >::iterator iter = updated_shards.begin(); iter != updated_shards.end(); ++iter) { + for (bc::flat_map >::iterator iter = + updated_shards.begin(); iter != updated_shards.end(); ++iter) { ldpp_dout(this, 20) << __func__ << "(): updated shard=" << iter->first << dendl; - set& keys = iter->second; - for (set::iterator kiter = keys.begin(); kiter != keys.end(); ++kiter) { - ldpp_dout(this, 20) << __func__ << "(): modified key=" << *kiter << dendl; + bc::flat_set& entries = iter->second; + for (const auto& [key, gen] : entries) { + ldpp_dout(this, 20) << __func__ << "(): modified key=" << key << + " of generation=" << gen << dendl; } } } @@ -1145,8 +1197,11 @@ RGWOp *RGWHandler_Log::op_post() { else if (s->info.args.exists("notify")) return new RGWOp_MDLog_Notify; } else if (type.compare("data") == 0) { - if (s->info.args.exists("notify")) + if (s->info.args.exists("notify")) { return new RGWOp_DATALog_Notify; + } else if (s->info.args.exists("notify2")) { + return new RGWOp_DATALog_Notify2; + } } return NULL; } diff --git a/src/rgw/rgw_rest_log.h b/src/rgw/rgw_rest_log.h index 577d799ba56bb..f01fcf857e93f 100644 --- a/src/rgw/rgw_rest_log.h +++ b/src/rgw/rgw_rest_log.h @@ -277,6 +277,21 @@ public: RGWOpType get_type() override { return RGW_OP_SYNC_DATALOG_NOTIFY; } }; +class RGWOp_DATALog_Notify2 : public RGWRESTOp { + rgw_data_notify_entry data_notify; +public: + RGWOp_DATALog_Notify2() {} + ~RGWOp_DATALog_Notify2() override {} + + int check_caps(const RGWUserCaps& caps) override { + return caps.check_cap("datalog", RGW_CAP_WRITE); + } + void execute(optional_yield y) override; + const char* name() const override { + return "datalog_notify2"; + } +}; + class RGWOp_DATALog_Delete : public RGWRESTOp { public: RGWOp_DATALog_Delete() {} diff --git a/src/rgw/rgw_sal.h b/src/rgw/rgw_sal.h index 472745cd7bf81..e7421b8cdc869 100644 --- a/src/rgw/rgw_sal.h +++ b/src/rgw/rgw_sal.h @@ -17,6 +17,7 @@ #include "rgw_user.h" #include "rgw_notify_event_type.h" +#include "rgw_datalog_notify.h" class RGWGetDataCB; struct RGWObjState; @@ -25,7 +26,7 @@ class RGWLC; class RGWObjManifest; struct RGWZoneGroup; struct RGWZoneParams; -struct RGWRealm; +class RGWRealm; struct RGWCtl; struct rgw_user_bucket; class RGWUsageBatch; @@ -226,7 +227,11 @@ class Store { optional_yield y) = 0; virtual RGWDataSyncStatusManager* get_data_sync_manager(const rgw_zone_id& source_zone) = 0; virtual void wakeup_meta_sync_shards(std::set& shard_ids) = 0; - virtual void wakeup_data_sync_shards(const DoutPrefixProvider *dpp, const rgw_zone_id& source_zone, std::map >& shard_ids) = 0; + virtual void wakeup_data_sync_shards(const DoutPrefixProvider *dpp, + const rgw_zone_id& source_zone, + boost::container::flat_map< + int, + boost::container::flat_set>& shard_ids) = 0; virtual int clear_usage(const DoutPrefixProvider *dpp) = 0; virtual int read_all_usage(const DoutPrefixProvider *dpp, uint64_t start_epoch, uint64_t end_epoch, uint32_t max_entries, bool* is_truncated, diff --git a/src/rgw/rgw_sal_dbstore.h b/src/rgw/rgw_sal_dbstore.h index 69ad2a3f61c5e..8d01546ff7f80 100644 --- a/src/rgw/rgw_sal_dbstore.h +++ b/src/rgw/rgw_sal_dbstore.h @@ -319,8 +319,12 @@ namespace rgw { namespace sal { RGWBucketSyncPolicyHandlerRef *phandler, optional_yield y) override; virtual RGWDataSyncStatusManager* get_data_sync_manager(const rgw_zone_id& source_zone) override; - virtual void wakeup_meta_sync_shards(set& shard_ids) override { return; } - virtual void wakeup_data_sync_shards(const DoutPrefixProvider *dpp, const rgw_zone_id& source_zone, map >& shard_ids) override { return; } + virtual void wakeup_meta_sync_shards(std::set& shard_ids) override { return; } + virtual void wakeup_data_sync_shards(const DoutPrefixProvider *dpp, + const rgw_zone_id& source_zone, + boost::container::flat_map< + int, + boost::container::flat_set>& shard_ids) override { return; } virtual int clear_usage(const DoutPrefixProvider *dpp) override { return 0; } virtual int read_all_usage(const DoutPrefixProvider *dpp, uint64_t start_epoch, uint64_t end_epoch, uint32_t max_entries, bool *is_truncated, diff --git a/src/rgw/rgw_sal_rados.h b/src/rgw/rgw_sal_rados.h index 0731d2add3826..bf8185b05a9ff 100644 --- a/src/rgw/rgw_sal_rados.h +++ b/src/rgw/rgw_sal_rados.h @@ -414,7 +414,11 @@ class RadosStore : public Store { optional_yield y) override; virtual RGWDataSyncStatusManager* get_data_sync_manager(const rgw_zone_id& source_zone) override; virtual void wakeup_meta_sync_shards(std::set& shard_ids) override { rados->wakeup_meta_sync_shards(shard_ids); } - virtual void wakeup_data_sync_shards(const DoutPrefixProvider *dpp, const rgw_zone_id& source_zone, std::map >& shard_ids) override { rados->wakeup_data_sync_shards(dpp, source_zone, shard_ids); } + virtual void wakeup_data_sync_shards(const DoutPrefixProvider *dpp, + const rgw_zone_id& source_zone, + bc::flat_map >& shard_ids) override { + rados->wakeup_data_sync_shards(dpp, source_zone, shard_ids); + } virtual int clear_usage(const DoutPrefixProvider *dpp) override { return rados->clear_usage(dpp); } virtual int read_all_usage(const DoutPrefixProvider *dpp, uint64_t start_epoch, uint64_t end_epoch, uint32_t max_entries, bool* is_truncated, diff --git a/src/rgw/rgw_sync_error_repo.cc b/src/rgw/rgw_sync_error_repo.cc index 75f552de93134..44305b60b6b21 100644 --- a/src/rgw/rgw_sync_error_repo.cc +++ b/src/rgw/rgw_sync_error_repo.cc @@ -102,7 +102,7 @@ int write(librados::ObjectWriteOperation& op, { // overwrite the existing timestamp if value is greater const uint64_t value = timestamp.time_since_epoch().count(); - using namespace cls::cmpomap; + using namespace ::cls::cmpomap; const bufferlist zero = u64_buffer(0); // compare against 0 for missing keys return cmp_set_vals(op, Mode::U64, Op::GT, {{key, u64_buffer(value)}}, zero); } @@ -113,7 +113,7 @@ int remove(librados::ObjectWriteOperation& op, { // remove the omap key if value >= existing const uint64_t value = timestamp.time_since_epoch().count(); - using namespace cls::cmpomap; + using namespace ::cls::cmpomap; return cmp_rm_keys(op, Mode::U64, Op::GTE, {{key, u64_buffer(value)}}); }