From 3773780a635eec6ea169434e1da5f122070420af Mon Sep 17 00:00:00 2001 From: Kalpesh Pandya Date: Wed, 23 Jun 2021 14:26:31 +0530 Subject: [PATCH] rgw: DPP addition 5 Following files have been covered here: 1. rgw_sync_module_aws.cc 2. rgw_sync_module_es.cc 3. rgw_sync_module_pubsub.cc 4. rgw_rest_client.cc 5. cls_fifo_legacy.cc Signed-off-by: Kalpesh Pandya --- src/rgw/cls_fifo_legacy.cc | 70 +++++++++++------------ src/rgw/cls_fifo_legacy.h | 5 +- src/rgw/rgw_admin.cc | 2 +- src/rgw/rgw_cr_rest.cc | 8 +-- src/rgw/rgw_cr_rest.h | 8 +-- src/rgw/rgw_data_sync.cc | 38 ++++++------- src/rgw/rgw_data_sync.h | 4 +- src/rgw/rgw_rest_client.cc | 34 ++++++------ src/rgw/rgw_sync_module.h | 16 +++--- src/rgw/rgw_sync_module_aws.cc | 83 ++++++++++++++-------------- src/rgw/rgw_sync_module_aws.h | 2 +- src/rgw/rgw_sync_module_es.cc | 47 ++++++++-------- src/rgw/rgw_sync_module_es.h | 4 +- src/rgw/rgw_sync_module_log.cc | 14 ++--- src/rgw/rgw_sync_module_log.h | 2 +- src/rgw/rgw_sync_module_pubsub.cc | 62 ++++++++++----------- src/rgw/rgw_sync_module_pubsub.h | 4 +- src/rgw/services/svc_sync_modules.cc | 2 +- 18 files changed, 205 insertions(+), 200 deletions(-) diff --git a/src/rgw/cls_fifo_legacy.cc b/src/rgw/cls_fifo_legacy.cc index 6c752814b1594..d164a0e409520 100644 --- a/src/rgw/cls_fifo_legacy.cc +++ b/src/rgw/cls_fifo_legacy.cc @@ -35,7 +35,6 @@ #include "cls_fifo_legacy.h" namespace rgw::cls::fifo { -static constexpr auto dout_subsys = ceph_subsys_objclass; namespace cb = ceph::buffer; namespace fifo = rados::cls::fifo; @@ -432,22 +431,23 @@ std::string FIFO::generate_tag() const } -int FIFO::apply_update(fifo::info* info, +int FIFO::apply_update(const DoutPrefixProvider *dpp, + fifo::info* info, const fifo::objv& objv, const fifo::update& update, std::uint64_t tid) { - ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ + ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ << " entering: tid=" << tid << dendl; std::unique_lock l(m); if (objv != info->version) { - lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__ + ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__ << " version mismatch, canceling: tid=" << tid << dendl; return -ECANCELED; } auto err = info->apply_update(update); if (err) { - lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__ + ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__ << " error applying update: " << *err << " tid=" << tid << dendl; return -ECANCELED; } @@ -470,7 +470,7 @@ int FIFO::_update_meta(const DoutPrefixProvider *dpp, const fifo::update& update if (r >= 0 || r == -ECANCELED) { canceled = (r == -ECANCELED); if (!canceled) { - r = apply_update(&info, version, update, tid); + r = apply_update(dpp, &info, version, update, tid); if (r < 0) canceled = true; } if (canceled) { @@ -507,7 +507,7 @@ struct Updater : public Completion { ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ << " entering: tid=" << tid << dendl; if (reread) - handle_reread(std::move(p), r); + handle_reread(dpp, std::move(p), r); else handle_update(dpp, std::move(p), r); } @@ -524,7 +524,7 @@ struct Updater : public Completion { } bool canceled = (r == -ECANCELED); if (!canceled) { - int r = fifo->apply_update(&fifo->info, version, update, tid); + int r = fifo->apply_update(dpp, &fifo->info, version, update, tid); if (r < 0) { ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ << " update failed, marking canceled: r=" << r @@ -544,8 +544,8 @@ struct Updater : public Completion { complete(std::move(p), 0); } - void handle_reread(Ptr&& p, int r) { - ldout(fifo->cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ + void handle_reread(const DoutPrefixProvider *dpp, Ptr&& p, int r) { + ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ << " handling async read_meta: tid=" << tid << dendl; if (r < 0 && pcanceled) { @@ -554,11 +554,11 @@ struct Updater : public Completion { *pcanceled = true; } if (r < 0) { - lderr(fifo->cct) << __PRETTY_FUNCTION__ << ":" << __LINE__ + ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__ << " failed dispatching read_meta: r=" << r << " tid=" << tid << dendl; } else { - ldout(fifo->cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ + ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ << " completing: tid=" << tid << dendl; } complete(std::move(p), r); @@ -1137,12 +1137,12 @@ int FIFO::trim_part(const DoutPrefixProvider *dpp, int64_t part_num, uint64_t of return 0; } -void FIFO::trim_part(int64_t part_num, uint64_t ofs, +void FIFO::trim_part(const DoutPrefixProvider *dpp, int64_t part_num, uint64_t ofs, std::optional tag, bool exclusive, std::uint64_t tid, lr::AioCompletion* c) { - ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ + ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ << " entering: tid=" << tid << dendl; lr::ObjectWriteOperation op; std::unique_lock l(m); @@ -1431,13 +1431,13 @@ struct Pusher : public Completion { std::uint64_t tid; bool new_heading = false; - void prep_then_push(Ptr&& p, const unsigned successes) { + void prep_then_push(const DoutPrefixProvider *dpp, Ptr&& p, const unsigned successes) { std::unique_lock l(f->m); auto max_part_size = f->info.params.max_part_size; auto part_entry_overhead = f->part_entry_overhead; l.unlock(); - ldout(f->cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ + ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ << " preparing push: remaining=" << remaining.size() << " batch=" << batch.size() << " i=" << i << " tid=" << tid << dendl; @@ -1470,7 +1470,7 @@ struct Pusher : public Completion { batch.push_back(std::move(remaining.front())); remaining.pop_front(); } - ldout(f->cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ + ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ << " prepared push: remaining=" << remaining.size() << " batch=" << batch.size() << " i=" << i << " batch_len=" << batch_len @@ -1503,7 +1503,7 @@ struct Pusher : public Completion { return; } i = 0; // We've made forward progress, so reset the race counter! - prep_then_push(std::move(p), r); + prep_then_push(dpp, std::move(p), r); } else { if (r < 0) { ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__ @@ -1513,14 +1513,14 @@ struct Pusher : public Completion { return; } new_heading = false; - handle_new_head(std::move(p), r); + handle_new_head(dpp, std::move(p), r); } } - void handle_new_head(Ptr&& p, int r) { + void handle_new_head(const DoutPrefixProvider *dpp, Ptr&& p, int r) { if (r == -ECANCELED) { if (p->i == MAX_RACE_RETRIES) { - lderr(f->cct) << __PRETTY_FUNCTION__ << ":" << __LINE__ + ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__ << " canceled too many times, giving up: tid=" << tid << dendl; complete(std::move(p), -ECANCELED); return; @@ -1532,7 +1532,7 @@ struct Pusher : public Completion { } if (p->batch.empty()) { - prep_then_push(std::move(p), 0); + prep_then_push(dpp, std::move(p), 0); return; } else { push(std::move(p)); @@ -1580,7 +1580,7 @@ void FIFO::push(const DoutPrefixProvider *dpp, const std::vector& data << " need new head tid=" << tid << dendl; p->new_head(dpp, std::move(p)); } else { - p->prep_then_push(std::move(p), 0); + p->prep_then_push(dpp, std::move(p), 0); } } @@ -1839,12 +1839,12 @@ struct Trimmer : public Completion { if (pn < part_num) { ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ << " pn=" << pn << " tid=" << tid << dendl; - fifo->trim_part(pn++, max_part_size, std::nullopt, + fifo->trim_part(dpp, pn++, max_part_size, std::nullopt, false, tid, call(std::move(p))); } else { update = true; canceled = tail_part_num < part_num; - fifo->trim_part(part_num, ofs, std::nullopt, exclusive, tid, + fifo->trim_part(dpp, part_num, ofs, std::nullopt, exclusive, tid, call(std::move(p))); } return; @@ -1872,7 +1872,7 @@ struct Trimmer : public Completion { std::unique_lock l(fifo->m); const auto max_part_size = fifo->info.params.max_part_size; l.unlock(); - fifo->trim_part(pn++, max_part_size, std::nullopt, + fifo->trim_part(dpp, pn++, max_part_size, std::nullopt, false, tid, call(std::move(p))); return; } @@ -1882,7 +1882,7 @@ struct Trimmer : public Completion { l.unlock(); update = true; canceled = tail_part_num < part_num; - fifo->trim_part(part_num, ofs, std::nullopt, exclusive, tid, + fifo->trim_part(dpp, part_num, ofs, std::nullopt, exclusive, tid, call(std::move(p))); return; } @@ -1944,7 +1944,7 @@ void FIFO::trim(const DoutPrefixProvider *dpp, std::string_view markstr, bool ex } else { trimmer->update = true; } - trim_part(pn, ofs, std::nullopt, exclusive, + trim_part(dpp, pn, ofs, std::nullopt, exclusive, tid, Trimmer::call(std::move(trimmer))); } @@ -2066,9 +2066,9 @@ private: pp_callback, } state; - void create_part(Ptr&& p, int64_t part_num, + void create_part(const DoutPrefixProvider *dpp, Ptr&& p, int64_t part_num, std::string_view tag) { - ldout(fifo->cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ + ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ << " entering: tid=" << tid << dendl; state = entry_callback; lr::ObjectWriteOperation op; @@ -2083,9 +2083,9 @@ private: return; } - void remove_part(Ptr&& p, int64_t part_num, + void remove_part(const DoutPrefixProvider *dpp, Ptr&& p, int64_t part_num, std::string_view tag) { - ldout(fifo->cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ + ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ << " entering: tid=" << tid << dendl; state = entry_callback; lr::ObjectWriteOperation op; @@ -2272,7 +2272,7 @@ public: const auto entry = iter->second; switch (entry.op) { case fifo::journal_entry::Op::create: - create_part(std::move(p), entry.part_num, entry.part_tag); + create_part(dpp, std::move(p), entry.part_num, entry.part_tag); return; case fifo::journal_entry::Op::set_head: if (entry.part_num > new_head) { @@ -2282,10 +2282,10 @@ public: ++iter; continue; case fifo::journal_entry::Op::remove: - remove_part(std::move(p), entry.part_num, entry.part_tag); + remove_part(dpp, std::move(p), entry.part_num, entry.part_tag); return; default: - lderr(fifo->cct) << __PRETTY_FUNCTION__ << ":" << __LINE__ + ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__ << " unknown journaled op: entry=" << entry << " tid=" << tid << dendl; complete(std::move(p), -EIO); diff --git a/src/rgw/cls_fifo_legacy.h b/src/rgw/cls_fifo_legacy.h index 21d4b72bb1775..9a35e4dd251ce 100644 --- a/src/rgw/cls_fifo_legacy.h +++ b/src/rgw/cls_fifo_legacy.h @@ -129,7 +129,8 @@ class FIFO { std::string generate_tag() const; - int apply_update(fifo::info* info, + int apply_update(const DoutPrefixProvider *dpp, + fifo::info* info, const fifo::objv& objv, const fifo::update& update, std::uint64_t tid); @@ -156,7 +157,7 @@ class FIFO { int trim_part(const DoutPrefixProvider *dpp, int64_t part_num, uint64_t ofs, std::optional tag, bool exclusive, std::uint64_t tid, optional_yield y); - void trim_part(int64_t part_num, uint64_t ofs, + void trim_part(const DoutPrefixProvider *dpp, int64_t part_num, uint64_t ofs, std::optional tag, bool exclusive, std::uint64_t tid, lr::AioCompletion* c); diff --git a/src/rgw/rgw_admin.cc b/src/rgw/rgw_admin.cc index d69a9e99f6aee..c226639f5138a 100644 --- a/src/rgw/rgw_admin.cc +++ b/src/rgw/rgw_admin.cc @@ -8108,7 +8108,7 @@ next: } RGWSyncModuleInstanceRef sync_module; - int ret = static_cast(store)->svc()->sync_modules->get_manager()->create_instance(g_ceph_context, static_cast(store)->svc()->zone->get_zone().tier_type, + int ret = static_cast(store)->svc()->sync_modules->get_manager()->create_instance(dpp(), g_ceph_context, static_cast(store)->svc()->zone->get_zone().tier_type, store->get_zone()->get_params().tier_config, &sync_module); if (ret < 0) { ldpp_dout(dpp(), -1) << "ERROR: failed to init sync module instance, ret=" << ret << dendl; diff --git a/src/rgw/rgw_cr_rest.cc b/src/rgw/rgw_cr_rest.cc index bc013b3c74835..0bd169f99e7d3 100644 --- a/src/rgw/rgw_cr_rest.cc +++ b/src/rgw/rgw_cr_rest.cc @@ -127,7 +127,7 @@ void RGWStreamReadHTTPResourceCRF::get_attrs(std::map *attrs) req->get_out_headers(attrs); } -int RGWStreamReadHTTPResourceCRF::decode_rest_obj(map& headers, bufferlist& extra_data) { +int RGWStreamReadHTTPResourceCRF::decode_rest_obj(const DoutPrefixProvider *dpp, map& headers, bufferlist& extra_data) { /* basic generic implementation */ for (auto header : headers) { const string& val = header.second; @@ -138,7 +138,7 @@ int RGWStreamReadHTTPResourceCRF::decode_rest_obj(map& headers, return 0; } -int RGWStreamReadHTTPResourceCRF::read(bufferlist *out, uint64_t max_size, bool *io_pending) +int RGWStreamReadHTTPResourceCRF::read(const DoutPrefixProvider *dpp, bufferlist *out, uint64_t max_size, bool *io_pending) { reenter(&read_state) { io_read_mask = req->get_io_id(RGWHTTPClient::HTTPCLIENT_IO_READ | RGWHTTPClient::HTTPCLIENT_IO_CONTROL); @@ -156,7 +156,7 @@ int RGWStreamReadHTTPResourceCRF::read(bufferlist *out, uint64_t max_size, bool extra_data.claim_append(in_cb->get_extra_data()); map attrs; req->get_out_headers(&attrs); - int ret = decode_rest_obj(attrs, extra_data); + int ret = decode_rest_obj(dpp, attrs, extra_data); if (ret < 0) { ldout(cct, 0) << "ERROR: " << __func__ << " decode_rest_obj() returned ret=" << ret << dendl; return ret; @@ -281,7 +281,7 @@ int RGWStreamSpliceCR::operate(const DoutPrefixProvider *dpp) { do { yield { - ret = in_crf->read(&bl, 4 * 1024 * 1024, &need_retry); + ret = in_crf->read(dpp, &bl, 4 * 1024 * 1024, &need_retry); if (ret < 0) { return set_cr_error(ret); } diff --git a/src/rgw/rgw_cr_rest.h b/src/rgw/rgw_cr_rest.h index 59045305edfa0..cb103aeb83455 100644 --- a/src/rgw/rgw_cr_rest.h +++ b/src/rgw/rgw_cr_rest.h @@ -422,8 +422,8 @@ protected: public: virtual int init(const DoutPrefixProvider *dpp) = 0; - virtual int read(bufferlist *data, uint64_t max, bool *need_retry) = 0; /* reentrant */ - virtual int decode_rest_obj(std::map& headers, bufferlist& extra_data) = 0; + virtual int read(const DoutPrefixProvider *dpp, bufferlist *data, uint64_t max, bool *need_retry) = 0; /* reentrant */ + virtual int decode_rest_obj(const DoutPrefixProvider *dpp, std::map& headers, bufferlist& extra_data) = 0; virtual bool has_attrs() = 0; virtual void get_attrs(std::map *attrs) = 0; virtual ~RGWStreamReadResourceCRF() = default; @@ -487,8 +487,8 @@ public: ~RGWStreamReadHTTPResourceCRF(); int init(const DoutPrefixProvider *dpp) override; - int read(bufferlist *data, uint64_t max, bool *need_retry) override; /* reentrant */ - int decode_rest_obj(std::map& headers, bufferlist& extra_data) override; + int read(const DoutPrefixProvider *dpp, bufferlist *data, uint64_t max, bool *need_retry) override; /* reentrant */ + int decode_rest_obj(const DoutPrefixProvider *dpp, std::map& headers, bufferlist& extra_data) override; bool has_attrs() override; void get_attrs(std::map *attrs) override; bool is_done(); diff --git a/src/rgw/rgw_data_sync.cc b/src/rgw/rgw_data_sync.cc index 4f37e3603f373..cb7564f0019f2 100644 --- a/src/rgw/rgw_data_sync.cc +++ b/src/rgw/rgw_data_sync.cc @@ -1895,7 +1895,7 @@ public: tn->log(10, SSTR("building full sync maps")); /* call sync module init here */ sync_status.sync_info.num_shards = num_shards; - yield call(data_sync_module->init_sync(sc)); + yield call(data_sync_module->init_sync(dpp, sc)); if (retcode < 0) { tn->log(0, SSTR("ERROR: sync module init_sync() failed, retcode=" << retcode)); return set_cr_error(retcode); @@ -1918,7 +1918,7 @@ public: *reset_backoff = true; } - yield call(data_sync_module->start_sync(sc)); + yield call(data_sync_module->start_sync(dpp, sc)); if (retcode < 0) { tn->log(0, SSTR("ERROR: failed to start sync, retcode=" << retcode)); return set_cr_error(retcode); @@ -1966,9 +1966,9 @@ class RGWDefaultDataSyncModule : public RGWDataSyncModule { public: RGWDefaultDataSyncModule() {} - RGWCoroutine *sync_object(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, std::optional versioned_epoch, rgw_zone_set *zones_trace) override; - RGWCoroutine *remove_object(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override; - RGWCoroutine *create_delete_marker(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime, + RGWCoroutine *sync_object(const DoutPrefixProvider *dpp, RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, std::optional versioned_epoch, rgw_zone_set *zones_trace) override; + RGWCoroutine *remove_object(const DoutPrefixProvider *dpp, RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override; + RGWCoroutine *create_delete_marker(const DoutPrefixProvider *dpp, RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime, rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override; }; @@ -1984,7 +1984,7 @@ public: } }; -int RGWDefaultSyncModule::create_instance(CephContext *cct, const JSONFormattable& config, RGWSyncModuleInstanceRef *instance) +int RGWDefaultSyncModule::create_instance(const DoutPrefixProvider *dpp, CephContext *cct, const JSONFormattable& config, RGWSyncModuleInstanceRef *instance) { instance->reset(new RGWDefaultSyncModuleInstance()); return 0; @@ -2453,12 +2453,12 @@ public: } }; -RGWCoroutine *RGWDefaultDataSyncModule::sync_object(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, std::optional versioned_epoch, rgw_zone_set *zones_trace) +RGWCoroutine *RGWDefaultDataSyncModule::sync_object(const DoutPrefixProvider *dpp, RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, std::optional versioned_epoch, rgw_zone_set *zones_trace) { return new RGWObjFetchCR(sc, sync_pipe, key, std::nullopt, versioned_epoch, zones_trace); } -RGWCoroutine *RGWDefaultDataSyncModule::remove_object(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, +RGWCoroutine *RGWDefaultDataSyncModule::remove_object(const DoutPrefixProvider *dpp, RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) { auto sync_env = sc->env; @@ -2467,7 +2467,7 @@ RGWCoroutine *RGWDefaultDataSyncModule::remove_object(RGWDataSyncCtx *sc, rgw_bu NULL, NULL, false, &mtime, zones_trace); } -RGWCoroutine *RGWDefaultDataSyncModule::create_delete_marker(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime, +RGWCoroutine *RGWDefaultDataSyncModule::create_delete_marker(const DoutPrefixProvider *dpp, RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime, rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) { auto sync_env = sc->env; @@ -2480,9 +2480,9 @@ class RGWArchiveDataSyncModule : public RGWDefaultDataSyncModule { public: RGWArchiveDataSyncModule() {} - RGWCoroutine *sync_object(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, std::optional versioned_epoch, rgw_zone_set *zones_trace) override; - RGWCoroutine *remove_object(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override; - RGWCoroutine *create_delete_marker(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime, + RGWCoroutine *sync_object(const DoutPrefixProvider *dpp, RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, std::optional versioned_epoch, rgw_zone_set *zones_trace) override; + RGWCoroutine *remove_object(const DoutPrefixProvider *dpp, RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override; + RGWCoroutine *create_delete_marker(const DoutPrefixProvider *dpp, RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime, rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override; }; @@ -2501,13 +2501,13 @@ public: } }; -int RGWArchiveSyncModule::create_instance(CephContext *cct, const JSONFormattable& config, RGWSyncModuleInstanceRef *instance) +int RGWArchiveSyncModule::create_instance(const DoutPrefixProvider *dpp, CephContext *cct, const JSONFormattable& config, RGWSyncModuleInstanceRef *instance) { instance->reset(new RGWArchiveSyncModuleInstance()); return 0; } -RGWCoroutine *RGWArchiveDataSyncModule::sync_object(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, std::optional versioned_epoch, rgw_zone_set *zones_trace) +RGWCoroutine *RGWArchiveDataSyncModule::sync_object(const DoutPrefixProvider *dpp, RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, std::optional versioned_epoch, rgw_zone_set *zones_trace) { auto sync_env = sc->env; ldout(sc->cct, 5) << "SYNC_ARCHIVE: sync_object: b=" << sync_pipe.info.source_bs.bucket << " k=" << key << " versioned_epoch=" << versioned_epoch.value_or(0) << dendl; @@ -2535,14 +2535,14 @@ RGWCoroutine *RGWArchiveDataSyncModule::sync_object(RGWDataSyncCtx *sc, rgw_buck return new RGWObjFetchCR(sc, sync_pipe, key, dest_key, versioned_epoch, zones_trace); } -RGWCoroutine *RGWArchiveDataSyncModule::remove_object(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, +RGWCoroutine *RGWArchiveDataSyncModule::remove_object(const DoutPrefixProvider *dpp, RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) { ldout(sc->cct, 0) << "SYNC_ARCHIVE: remove_object: b=" << sync_pipe.info.source_bs.bucket << " k=" << key << " versioned_epoch=" << versioned_epoch << dendl; return NULL; } -RGWCoroutine *RGWArchiveDataSyncModule::create_delete_marker(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime, +RGWCoroutine *RGWArchiveDataSyncModule::create_delete_marker(const DoutPrefixProvider *dpp, RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime, rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) { ldout(sc->cct, 0) << "SYNC_ARCHIVE: create_delete_marker: b=" << sync_pipe.info.source_bs.bucket << " k=" << key << " mtime=" << mtime @@ -3605,19 +3605,19 @@ public: op == CLS_RGW_OP_LINK_OLH) { set_status("syncing obj"); tn->log(5, SSTR("bucket sync: sync obj: " << sc->source_zone << "/" << bs.bucket << "/" << key << "[" << versioned_epoch.value_or(0) << "]")); - call(data_sync_module->sync_object(sc, sync_pipe, key, versioned_epoch, &zones_trace)); + call(data_sync_module->sync_object(dpp, sc, sync_pipe, key, versioned_epoch, &zones_trace)); } else if (op == CLS_RGW_OP_DEL || op == CLS_RGW_OP_UNLINK_INSTANCE) { set_status("removing obj"); if (op == CLS_RGW_OP_UNLINK_INSTANCE) { versioned = true; } tn->log(10, SSTR("removing obj: " << sc->source_zone << "/" << bs.bucket << "/" << key << "[" << versioned_epoch.value_or(0) << "]")); - call(data_sync_module->remove_object(sc, sync_pipe, key, timestamp, versioned, versioned_epoch.value_or(0), &zones_trace)); + call(data_sync_module->remove_object(dpp, sc, sync_pipe, key, timestamp, versioned, versioned_epoch.value_or(0), &zones_trace)); // our copy of the object is more recent, continue as if it succeeded } else if (op == CLS_RGW_OP_LINK_OLH_DM) { set_status("creating delete marker"); tn->log(10, SSTR("creating delete marker: obj: " << sc->source_zone << "/" << bs.bucket << "/" << key << "[" << versioned_epoch.value_or(0) << "]")); - call(data_sync_module->create_delete_marker(sc, sync_pipe, key, timestamp, owner, versioned, versioned_epoch.value_or(0), &zones_trace)); + call(data_sync_module->create_delete_marker(dpp, sc, sync_pipe, key, timestamp, owner, versioned, versioned_epoch.value_or(0), &zones_trace)); } tn->set_resource_name(SSTR(bucket_str_noinstance(bs.bucket) << "/" << key)); } diff --git a/src/rgw/rgw_data_sync.h b/src/rgw/rgw_data_sync.h index d307dce82f230..43ca47126d4df 100644 --- a/src/rgw/rgw_data_sync.h +++ b/src/rgw/rgw_data_sync.h @@ -687,7 +687,7 @@ public: RGWDefaultSyncModule() {} bool supports_writes() override { return true; } bool supports_data_export() override { return true; } - int create_instance(CephContext *cct, const JSONFormattable& config, RGWSyncModuleInstanceRef *instance) override; + int create_instance(const DoutPrefixProvider *dpp, CephContext *cct, const JSONFormattable& config, RGWSyncModuleInstanceRef *instance) override; }; class RGWArchiveSyncModule : public RGWDefaultSyncModule { @@ -695,7 +695,7 @@ public: RGWArchiveSyncModule() {} bool supports_writes() override { return true; } bool supports_data_export() override { return false; } - int create_instance(CephContext *cct, const JSONFormattable& config, RGWSyncModuleInstanceRef *instance) override; + int create_instance(const DoutPrefixProvider *dpp, CephContext *cct, const JSONFormattable& config, RGWSyncModuleInstanceRef *instance) override; }; #endif diff --git a/src/rgw/rgw_rest_client.cc b/src/rgw/rgw_rest_client.cc index 3cb888cdf7b2c..54d185516eda7 100644 --- a/src/rgw/rgw_rest_client.cc +++ b/src/rgw/rgw_rest_client.cc @@ -31,7 +31,7 @@ int RGWHTTPSimpleRequest::handle_header(const string& name, const string& val) string err; long len = strict_strtol(val.c_str(), 10, &err); if (!err.empty()) { - ldout(cct, 0) << "ERROR: failed converting content length (" << val << ") to int " << dendl; + ldpp_dout(this, 0) << "ERROR: failed converting content length (" << val << ") to int " << dendl; return -EINVAL; } @@ -49,7 +49,7 @@ int RGWHTTPSimpleRequest::receive_header(void *ptr, size_t len) char *s = (char *)ptr, *end = (char *)ptr + len; char *p = line; - ldout(cct, 10) << "receive_http_header" << dendl; + ldpp_dout(this, 10) << "receive_http_header" << dendl; while (s != end) { if (*s == '\r') { @@ -58,7 +58,7 @@ int RGWHTTPSimpleRequest::receive_header(void *ptr, size_t len) } if (*s == '\n') { *p = '\0'; - ldout(cct, 10) << "received header:" << line << dendl; + ldpp_dout(this, 10) << "received header:" << line << dendl; // TODO: fill whatever data required here char *l = line; char *tok = strsep(&l, " \t:"); @@ -285,13 +285,14 @@ static string extract_region_name(string&& s) } -static bool identify_scope(CephContext *cct, +static bool identify_scope(const DoutPrefixProvider *dpp, + CephContext *cct, const string& host, string *region, string *service) { if (!boost::algorithm::ends_with(host, "amazonaws.com")) { - ldout(cct, 20) << "NOTICE: cannot identify region for connection to: " << host << dendl; + ldpp_dout(dpp, 20) << "NOTICE: cannot identify region for connection to: " << host << dendl; return false; } @@ -310,7 +311,7 @@ static bool identify_scope(CephContext *cct, } ++iter; if (iter == vec.end()) { - ldout(cct, 0) << "WARNING: cannot identify region name from host name: " << host << dendl; + ldpp_dout(dpp, 0) << "WARNING: cannot identify region name from host name: " << host << dendl; return false; } auto& next = *iter; @@ -329,7 +330,8 @@ static bool identify_scope(CephContext *cct, return false; } -static void scope_from_api_name(CephContext *cct, +static void scope_from_api_name(const DoutPrefixProvider *dpp, + CephContext *cct, const string& host, std::optional api_name, string *region, @@ -341,7 +343,7 @@ static void scope_from_api_name(CephContext *cct, return; } - if (!identify_scope(cct, host, region, service)) { + if (!identify_scope(dpp, cct, host, region, service)) { *region = cct->_conf->rgw_zonegroup; *service = "s3"; return; @@ -381,7 +383,7 @@ int RGWRESTSimpleRequest::forward_request(const DoutPrefixProvider *dpp, RGWAcce string region; string service; - scope_from_api_name(cct, host, api_name, ®ion, &service); + scope_from_api_name(dpp, cct, host, api_name, ®ion, &service); const char *maybe_payload_hash = info.env->get("HTTP_X_AMZ_CONTENT_SHA256"); if (maybe_payload_hash) { @@ -557,7 +559,7 @@ void RGWRESTGenerateHTTPHeaders::init(const string& _method, const string& host, const string& resource, const param_vec_t& params, std::optional api_name) { - scope_from_api_name(cct, host, api_name, ®ion, &service); + scope_from_api_name(this, cct, host, api_name, ®ion, &service); string params_str; map& args = new_info->args.get_params(); @@ -746,7 +748,7 @@ void set_str_from_headers(map& out_headers, const string& header } } -static int parse_rgwx_mtime(CephContext *cct, const string& s, ceph::real_time *rt) +static int parse_rgwx_mtime(const DoutPrefixProvider *dpp, CephContext *cct, const string& s, ceph::real_time *rt) { string err; vector vec; @@ -760,14 +762,14 @@ static int parse_rgwx_mtime(CephContext *cct, const string& s, ceph::real_time * long secs = strict_strtol(vec[0].c_str(), 10, &err); long nsecs = 0; if (!err.empty()) { - ldout(cct, 0) << "ERROR: failed converting mtime (" << s << ") to real_time " << dendl; + ldpp_dout(dpp, 0) << "ERROR: failed converting mtime (" << s << ") to real_time " << dendl; return -EINVAL; } if (vec.size() > 1) { nsecs = strict_strtol(vec[1].c_str(), 10, &err); if (!err.empty()) { - ldout(cct, 0) << "ERROR: failed converting mtime (" << s << ") to real_time " << dendl; + ldpp_dout(dpp, 0) << "ERROR: failed converting mtime (" << s << ") to real_time " << dendl; return -EINVAL; } } @@ -937,7 +939,7 @@ int RGWHTTPStreamRWRequest::complete_request(optional_yield y, string mtime_str; set_str_from_headers(out_headers, "RGWX_MTIME", mtime_str); if (!mtime_str.empty()) { - int ret = parse_rgwx_mtime(cct, mtime_str, mtime); + int ret = parse_rgwx_mtime(this, cct, mtime_str, mtime); if (ret < 0) { return ret; } @@ -951,7 +953,7 @@ int RGWHTTPStreamRWRequest::complete_request(optional_yield y, string err; *psize = strict_strtoll(size_str.c_str(), 10, &err); if (!err.empty()) { - ldout(cct, 0) << "ERROR: failed parsing embedded metadata object size (" << size_str << ") to int " << dendl; + ldpp_dout(this, 0) << "ERROR: failed parsing embedded metadata object size (" << size_str << ") to int " << dendl; return -EIO; } } @@ -990,7 +992,7 @@ int RGWHTTPStreamRWRequest::handle_header(const string& name, const string& val) string err; long len = strict_strtol(val.c_str(), 10, &err); if (!err.empty()) { - ldout(cct, 0) << "ERROR: failed converting embedded metadata len (" << val << ") to int " << dendl; + ldpp_dout(this, 0) << "ERROR: failed converting embedded metadata len (" << val << ") to int " << dendl; return -EINVAL; } diff --git a/src/rgw/rgw_sync_module.h b/src/rgw/rgw_sync_module.h index 19292b3d80995..135495b493440 100644 --- a/src/rgw/rgw_sync_module.h +++ b/src/rgw/rgw_sync_module.h @@ -23,17 +23,17 @@ public: virtual void init(RGWDataSyncCtx *sync_env, uint64_t instance_id) {} - virtual RGWCoroutine *init_sync(RGWDataSyncCtx *sc) { + virtual RGWCoroutine *init_sync(const DoutPrefixProvider *dpp, RGWDataSyncCtx *sc) { return nullptr; } - virtual RGWCoroutine *start_sync(RGWDataSyncCtx *sc) { + virtual RGWCoroutine *start_sync(const DoutPrefixProvider *dpp, RGWDataSyncCtx *sc) { return nullptr; } - virtual RGWCoroutine *sync_object(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, std::optional versioned_epoch, rgw_zone_set *zones_trace) = 0; - virtual RGWCoroutine *remove_object(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& bucket_info, rgw_obj_key& key, real_time& mtime, + virtual RGWCoroutine *sync_object(const DoutPrefixProvider *dpp, RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, std::optional versioned_epoch, rgw_zone_set *zones_trace) = 0; + virtual RGWCoroutine *remove_object(const DoutPrefixProvider *dpp, RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& bucket_info, rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) = 0; - virtual RGWCoroutine *create_delete_marker(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& bucket_info, rgw_obj_key& key, real_time& mtime, + virtual RGWCoroutine *create_delete_marker(const DoutPrefixProvider *dpp, RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& bucket_info, rgw_obj_key& key, real_time& mtime, rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) = 0; }; @@ -76,7 +76,7 @@ public: return false; } virtual bool supports_data_export() = 0; - virtual int create_instance(CephContext *cct, const JSONFormattable& config, RGWSyncModuleInstanceRef *instance) = 0; + virtual int create_instance(const DoutPrefixProvider *dpp, CephContext *cct, const JSONFormattable& config, RGWSyncModuleInstanceRef *instance) = 0; }; typedef std::shared_ptr RGWSyncModuleRef; @@ -119,13 +119,13 @@ public: return module->supports_data_export(); } - int create_instance(CephContext *cct, const std::string& name, const JSONFormattable& config, RGWSyncModuleInstanceRef *instance) { + int create_instance(const DoutPrefixProvider *dpp, CephContext *cct, const std::string& name, const JSONFormattable& config, RGWSyncModuleInstanceRef *instance) { RGWSyncModuleRef module; if (!get_module(name, &module)) { return -ENOENT; } - return module.get()->create_instance(cct, config, instance); + return module.get()->create_instance(dpp, cct, config, instance); } std::vector get_registered_module_names() const { diff --git a/src/rgw/rgw_sync_module_aws.cc b/src/rgw/rgw_sync_module_aws.cc index e74d289458f84..3ea5b4d8a63c1 100644 --- a/src/rgw/rgw_sync_module_aws.cc +++ b/src/rgw/rgw_sync_module_aws.cc @@ -274,14 +274,14 @@ struct AWSSyncConfig_Connection { } }; -static int conf_to_uint64(CephContext *cct, const JSONFormattable& config, const string& key, uint64_t *pval) +static int conf_to_uint64(const DoutPrefixProvider *dpp, CephContext *cct, const JSONFormattable& config, const string& key, uint64_t *pval) { string sval; if (config.find(key, &sval)) { string err; uint64_t val = strict_strtoll(sval.c_str(), 10, &err); if (!err.empty()) { - ldout(cct, 0) << "ERROR: could not parse configurable value for cloud sync module: " << key << ": " << sval << dendl; + ldpp_dout(dpp, 0) << "ERROR: could not parse configurable value for cloud sync module: " << key << ": " << sval << dendl; return -EINVAL; } *pval = val; @@ -293,13 +293,13 @@ struct AWSSyncConfig_S3 { uint64_t multipart_sync_threshold{DEFAULT_MULTIPART_SYNC_PART_SIZE}; uint64_t multipart_min_part_size{DEFAULT_MULTIPART_SYNC_PART_SIZE}; - int init(CephContext *cct, const JSONFormattable& config) { - int r = conf_to_uint64(cct, config, "multipart_sync_threshold", &multipart_sync_threshold); + int init(const DoutPrefixProvider *dpp, CephContext *cct, const JSONFormattable& config) { + int r = conf_to_uint64(dpp, cct, config, "multipart_sync_threshold", &multipart_sync_threshold); if (r < 0) { return r; } - r = conf_to_uint64(cct, config, "multipart_min_part_size", &multipart_min_part_size); + r = conf_to_uint64(dpp, cct, config, "multipart_min_part_size", &multipart_min_part_size); if (r < 0) { return r; } @@ -404,15 +404,15 @@ struct AWSSyncConfig { AWSSyncConfig_S3 s3; - int init_profile(CephContext *cct, const JSONFormattable& profile_conf, AWSSyncConfig_Profile& profile, + int init_profile(const DoutPrefixProvider *dpp, CephContext *cct, const JSONFormattable& profile_conf, AWSSyncConfig_Profile& profile, bool connection_must_exist) { if (!profile.connection_id.empty()) { if (profile.conn_conf) { - ldout(cct, 0) << "ERROR: ambiguous profile connection configuration, connection_id=" << profile.connection_id << dendl; + ldpp_dout(dpp, 0) << "ERROR: ambiguous profile connection configuration, connection_id=" << profile.connection_id << dendl; return -EINVAL; } if (connections.find(profile.connection_id) == connections.end()) { - ldout(cct, 0) << "ERROR: profile configuration reference non-existent connection_id=" << profile.connection_id << dendl; + ldpp_dout(dpp, 0) << "ERROR: profile configuration reference non-existent connection_id=" << profile.connection_id << dendl; return -EINVAL; } profile.conn_conf = connections[profile.connection_id]; @@ -425,7 +425,7 @@ struct AWSSyncConfig { } if (connection_must_exist && !profile.conn_conf) { - ldout(cct, 0) << "ERROR: remote connection undefined for sync profile" << dendl; + ldpp_dout(dpp, 0) << "ERROR: remote connection undefined for sync profile" << dendl; return -EINVAL; } @@ -445,7 +445,7 @@ struct AWSSyncConfig { if (!profile.acls_id.empty()) { if (!acl_profiles.find(profile.acls_id, &acl_mappings)) { - ldout(cct, 0) << "ERROR: profile configuration reference non-existent acls id=" << profile.acls_id << dendl; + ldpp_dout(dpp, 0) << "ERROR: profile configuration reference non-existent acls id=" << profile.acls_id << dendl; return -EINVAL; } profile.acls = acl_profiles.acl_profiles[profile.acls_id]; @@ -466,12 +466,12 @@ struct AWSSyncConfig { return 0; } - int init_target(CephContext *cct, const JSONFormattable& profile_conf, std::shared_ptr *ptarget) { + int init_target(const DoutPrefixProvider *dpp, CephContext *cct, const JSONFormattable& profile_conf, std::shared_ptr *ptarget) { std::shared_ptr profile; profile.reset(new AWSSyncConfig_Profile); profile->init(profile_conf); - int ret = init_profile(cct, profile_conf, *profile, true); + int ret = init_profile(dpp, cct, profile_conf, *profile, true); if (ret < 0) { return ret; } @@ -479,7 +479,7 @@ struct AWSSyncConfig { auto& sb = profile->source_bucket; if (explicit_profiles.find(sb) != explicit_profiles.end()) { - ldout(cct, 0) << "WARNING: duplicate target configuration in sync module" << dendl; + ldpp_dout(dpp, 0) << "WARNING: duplicate target configuration in sync module" << dendl; } explicit_profiles[sb] = profile; @@ -523,12 +523,12 @@ struct AWSSyncConfig { AWSSyncConfig() {} - int init(CephContext *cct, const JSONFormattable& config) { + int init(const DoutPrefixProvider *dpp, CephContext *cct, const JSONFormattable& config) { auto& default_conf = config["default"]; if (config.exists("default")) { default_profile.init(default_conf); - init_profile(cct, default_conf, default_profile, false); + init_profile(dpp, cct, default_conf, default_profile, false); } for (auto& conn : config["connections"].array()) { @@ -542,20 +542,20 @@ struct AWSSyncConfig { acl_profiles.init(config["acl_profiles"]); - int r = s3.init(cct, config["s3"]); + int r = s3.init(dpp, cct, config["s3"]); if (r < 0) { return r; } auto new_root_conf = config; - r = init_target(cct, new_root_conf, &root_profile); /* the root profile config */ + r = init_target(dpp, cct, new_root_conf, &root_profile); /* the root profile config */ if (r < 0) { return r; } for (auto target_conf : config["profiles"].array()) { - int r = init_target(cct, target_conf, nullptr); + int r = init_target(dpp, cct, target_conf, nullptr); if (r < 0) { return r; } @@ -566,7 +566,7 @@ struct AWSSyncConfig { stringstream ss; jf.flush(ss); - ldout(cct, 5) << "sync module config (parsed representation):\n" << ss.str() << dendl; + ldpp_dout(dpp, 5) << "sync module config (parsed representation):\n" << ss.str() << dendl; return 0; } @@ -583,12 +583,12 @@ struct AWSSyncConfig { apply_meta_param(path, "zone_id", zone.id, dest); } - void update_config(RGWDataSyncCtx *sc, const string& sid) { + void update_config(const DoutPrefixProvider *dpp, RGWDataSyncCtx *sc, const string& sid) { expand_target(sc, sid, root_profile->target_path, &root_profile->target_path); - ldout(sc->cct, 20) << "updated target: (root) -> " << root_profile->target_path << dendl; + ldpp_dout(dpp, 20) << "updated target: (root) -> " << root_profile->target_path << dendl; for (auto& t : explicit_profiles) { expand_target(sc, sid, t.second->target_path, &t.second->target_path); - ldout(sc->cct, 20) << "updated target: " << t.first << " -> " << t.second->target_path << dendl; + ldpp_dout(dpp, 20) << "updated target: " << t.first << " -> " << t.second->target_path << dendl; } } @@ -650,7 +650,7 @@ struct AWSSyncConfig { void init_conns(RGWDataSyncCtx *sc, const string& id) { auto sync_env = sc->env; - update_config(sc, id); + update_config(sync_env->dpp, sc, id); auto& root_conf = root_profile->conn_conf; @@ -697,7 +697,7 @@ struct AWSSyncInstanceEnv { } }; -static int do_decode_rest_obj(CephContext *cct, map& attrs, map& headers, rgw_rest_obj *info) +static int do_decode_rest_obj(const DoutPrefixProvider *dpp, CephContext *cct, map& attrs, map& headers, rgw_rest_obj *info) { for (auto header : headers) { const string& val = header.second; @@ -716,11 +716,11 @@ static int do_decode_rest_obj(CephContext *cct, map& attrs, try { info->acls.decode(bliter); } catch (buffer::error& err) { - ldout(cct, 0) << "ERROR: failed to decode policy off attrs" << dendl; + ldpp_dout(dpp, 0) << "ERROR: failed to decode policy off attrs" << dendl; return -EIO; } } else { - ldout(cct, 0) << "WARNING: acl attrs not provided" << dendl; + ldpp_dout(dpp, 0) << "WARNING: acl attrs not provided" << dendl; } return 0; @@ -777,21 +777,21 @@ public: return RGWStreamReadHTTPResourceCRF::init(dpp); } - int decode_rest_obj(map& headers, bufferlist& extra_data) override { + int decode_rest_obj(const DoutPrefixProvider *dpp, map& headers, bufferlist& extra_data) override { map src_attrs; - ldout(sc->cct, 20) << __func__ << ":" << " headers=" << headers << " extra_data.length()=" << extra_data.length() << dendl; + ldpp_dout(dpp, 20) << __func__ << ":" << " headers=" << headers << " extra_data.length()=" << extra_data.length() << dendl; if (extra_data.length() > 0) { JSONParser jp; if (!jp.parse(extra_data.c_str(), extra_data.length())) { - ldout(sc->cct, 0) << "ERROR: failed to parse response extra data. len=" << extra_data.length() << " data=" << extra_data.c_str() << dendl; + ldpp_dout(dpp, 0) << "ERROR: failed to parse response extra data. len=" << extra_data.length() << " data=" << extra_data.c_str() << dendl; return -EIO; } JSONDecoder::decode_json("attrs", src_attrs, &jp); } - return do_decode_rest_obj(sc->cct, src_attrs, headers, &rest_obj); + return do_decode_rest_obj(dpp, sc->cct, src_attrs, headers, &rest_obj); } bool need_extra_data() override { @@ -847,7 +847,8 @@ public: boost::algorithm::starts_with(h, "X_AMZ_")); } - static void init_send_attrs(CephContext *cct, + static void init_send_attrs(const DoutPrefixProvider *dpp, + CephContext *cct, const rgw_rest_obj& rest_obj, const rgw_sync_aws_src_obj_properties& src_properties, const AWSSyncConfig_Profile *target, @@ -877,7 +878,7 @@ public: auto iter = am.find(orig_grantee); if (iter == am.end()) { - ldout(cct, 20) << "acl_mappings: Could not find " << orig_grantee << " .. ignoring" << dendl; + ldpp_dout(dpp, 20) << "acl_mappings: Could not find " << orig_grantee << " .. ignoring" << dendl; continue; } @@ -947,7 +948,7 @@ public: s.append(viter); } - ldout(cct, 20) << "acl_mappings: set acl: " << header_str << "=" << s << dendl; + ldpp_dout(dpp, 20) << "acl_mappings: set acl: " << header_str << "=" << s << dendl; new_attrs[header_str] = s; } @@ -974,7 +975,7 @@ public: map new_attrs; if (!multipart.is_multipart) { - init_send_attrs(sc->cct, rest_obj, src_properties, target.get(), &new_attrs); + init_send_attrs(dpp, sc->cct, rest_obj, src_properties, target.get(), &new_attrs); } r->set_send_length(rest_obj.content_len); @@ -1470,7 +1471,7 @@ public: } if (retcode == -ENOENT) { - RGWAWSStreamPutCRF::init_send_attrs(sc->cct, rest_obj, src_properties, target.get(), &new_attrs); + RGWAWSStreamPutCRF::init_send_attrs(dpp, sc->cct, rest_obj, src_properties, target.get(), &new_attrs); yield call(new RGWAWSInitMultipartCR(sc, target->conn.get(), dest_obj, status.obj_size, std::move(new_attrs), &status.upload_id)); if (retcode < 0) { @@ -1697,7 +1698,7 @@ public: } else { rgw_rest_obj rest_obj; rest_obj.init(key); - if (do_decode_rest_obj(sc->cct, attrs, headers, &rest_obj)) { + if (do_decode_rest_obj(dpp, sc->cct, attrs, headers, &rest_obj)) { ldpp_dout(dpp, 0) << "ERROR: failed to decode rest obj out of headers=" << headers << ", attrs=" << attrs << dendl; return set_cr_error(-EINVAL); } @@ -1788,18 +1789,18 @@ public: ~RGWAWSDataSyncModule() {} - RGWCoroutine *sync_object(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, + RGWCoroutine *sync_object(const DoutPrefixProvider *dpp, RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, std::optional versioned_epoch, rgw_zone_set *zones_trace) override { ldout(sc->cct, 0) << instance.id << ": sync_object: b=" << sync_pipe.info.source_bs.bucket << " k=" << key << " versioned_epoch=" << versioned_epoch.value_or(0) << dendl; return new RGWAWSHandleRemoteObjCR(sc, sync_pipe, key, instance, versioned_epoch.value_or(0)); } - RGWCoroutine *remove_object(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch, + RGWCoroutine *remove_object(const DoutPrefixProvider *dpp, RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override { ldout(sc->cct, 0) <<"rm_object: b=" << sync_pipe.info.source_bs.bucket << " k=" << key << " mtime=" << mtime << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl; return new RGWAWSRemoveRemoteObjCBCR(sc, sync_pipe, key, mtime, instance); } - RGWCoroutine *create_delete_marker(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime, + RGWCoroutine *create_delete_marker(const DoutPrefixProvider *dpp, RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime, rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override { ldout(sc->cct, 0) <<"AWS Not implemented: create_delete_marker: b=" << sync_pipe.info.source_bs.bucket << " k=" << key << " mtime=" << mtime @@ -1817,10 +1818,10 @@ public: } }; -int RGWAWSSyncModule::create_instance(CephContext *cct, const JSONFormattable& config, RGWSyncModuleInstanceRef *instance){ +int RGWAWSSyncModule::create_instance(const DoutPrefixProvider *dpp, CephContext *cct, const JSONFormattable& config, RGWSyncModuleInstanceRef *instance){ AWSSyncConfig conf; - int r = conf.init(cct, config); + int r = conf.init(dpp, cct, config); if (r < 0) { return r; } diff --git a/src/rgw/rgw_sync_module_aws.h b/src/rgw/rgw_sync_module_aws.h index 7799fb8cb8ef6..48f0145fdf92e 100644 --- a/src/rgw/rgw_sync_module_aws.h +++ b/src/rgw/rgw_sync_module_aws.h @@ -105,7 +105,7 @@ class RGWAWSSyncModule : public RGWSyncModule { public: RGWAWSSyncModule() {} bool supports_data_export() override { return false;} - int create_instance(CephContext *cct, const JSONFormattable& config, RGWSyncModuleInstanceRef *instance) override; + int create_instance(const DoutPrefixProvider *dpp, CephContext *cct, const JSONFormattable& config, RGWSyncModuleInstanceRef *instance) override; }; #endif /* RGW_SYNC_MODULE_AWS_H */ diff --git a/src/rgw/rgw_sync_module_es.cc b/src/rgw/rgw_sync_module_es.cc index 42c45104e327e..7f65637b3e950 100644 --- a/src/rgw/rgw_sync_module_es.cc +++ b/src/rgw/rgw_sync_module_es.cc @@ -445,6 +445,7 @@ static size_t attr_len(const bufferlist& val) } struct es_obj_metadata { + const DoutPrefixProvider *dpp; CephContext *cct; ElasticConfigRef es_conf; RGWBucketInfo bucket_info; @@ -494,7 +495,7 @@ struct es_obj_metadata { auto i = val.cbegin(); decode(policy, i); } catch (buffer::error& err) { - ldout(cct, 0) << "ERROR: failed to decode acl for " << bucket_info.bucket << "/" << key << dendl; + ldpp_dout(dpp, 0) << "ERROR: failed to decode acl for " << bucket_info.bucket << "/" << key << dendl; continue; } @@ -516,7 +517,7 @@ struct es_obj_metadata { auto tags_bl = val.cbegin(); decode(obj_tags, tags_bl); } catch (buffer::error& err) { - ldout(cct,0) << "ERROR: failed to decode obj tags for " + ldpp_dout(dpp, 0) << "ERROR: failed to decode obj tags for " << bucket_info.bucket << "/" << key << dendl; continue; } @@ -526,7 +527,7 @@ struct es_obj_metadata { auto vals_bl = val.cbegin(); decode(cs_info, vals_bl); } catch (buffer::error& err) { - ldout(cct,0) << "ERROR: failed to decode compression attr for " + ldpp_dout(dpp, 0) << "ERROR: failed to decode compression attr for " << bucket_info.bucket << "/" << key << dendl; continue; } @@ -567,7 +568,7 @@ struct es_obj_metadata { /* default custom meta is of type string */ custom_str[i.first] = i.second; } else { - ldout(cct, 20) << "custom meta entry key=" << i.first << " not found in bucket mdsearch config: " << bucket_info.mdsearch_config << dendl; + ldpp_dout(dpp, 20) << "custom meta entry key=" << i.first << " not found in bucket mdsearch config: " << bucket_info.mdsearch_config << dendl; } continue; } @@ -613,7 +614,7 @@ struct es_obj_metadata { real_time t; int r = parse_time(i.second.c_str(), &t); if (r < 0) { - ldout(cct, 20) << __func__ << "(): failed to parse time (" << i.second << "), skipping encoding of custom date attribute" << dendl; + ldpp_dout(dpp, 20) << __func__ << "(): failed to parse time (" << i.second << "), skipping encoding of custom date attribute" << dendl; continue; } @@ -865,7 +866,7 @@ public: class RGWElasticDataSyncModule : public RGWDataSyncModule { ElasticConfigRef conf; public: - RGWElasticDataSyncModule(CephContext *cct, const JSONFormattable& config) : conf(std::make_shared()) { + RGWElasticDataSyncModule(const DoutPrefixProvider *dpp, CephContext *cct, const JSONFormattable& config) : conf(std::make_shared()) { conf->init(cct, config); } ~RGWElasticDataSyncModule() override {} @@ -874,39 +875,39 @@ public: conf->init_instance(sc->env->svc->zone->get_realm(), instance_id); } - RGWCoroutine *init_sync(RGWDataSyncCtx *sc) override { - ldout(sc->cct, 5) << conf->id << ": init" << dendl; + RGWCoroutine *init_sync(const DoutPrefixProvider *dpp, RGWDataSyncCtx *sc) override { + ldpp_dout(dpp, 5) << conf->id << ": init" << dendl; return new RGWElasticInitConfigCBCR(sc, conf); } - RGWCoroutine *start_sync(RGWDataSyncCtx *sc) override { - ldout(sc->cct, 5) << conf->id << ": start_sync" << dendl; + RGWCoroutine *start_sync(const DoutPrefixProvider *dpp, RGWDataSyncCtx *sc) override { + ldpp_dout(dpp, 5) << conf->id << ": start_sync" << dendl; // try to get elastic search version return new RGWElasticGetESInfoCBCR(sc, conf); } - RGWCoroutine *sync_object(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, std::optional versioned_epoch, rgw_zone_set *zones_trace) override { - ldout(sc->cct, 10) << conf->id << ": sync_object: b=" << sync_pipe.info.source_bs.bucket << " k=" << key << " versioned_epoch=" << versioned_epoch.value_or(0) << dendl; + RGWCoroutine *sync_object(const DoutPrefixProvider *dpp, RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, std::optional versioned_epoch, rgw_zone_set *zones_trace) override { + ldpp_dout(dpp, 10) << conf->id << ": sync_object: b=" << sync_pipe.info.source_bs.bucket << " k=" << key << " versioned_epoch=" << versioned_epoch.value_or(0) << dendl; if (!conf->should_handle_operation(sync_pipe.dest_bucket_info)) { - ldout(sc->cct, 10) << conf->id << ": skipping operation (bucket not approved)" << dendl; + ldpp_dout(dpp, 10) << conf->id << ": skipping operation (bucket not approved)" << dendl; return nullptr; } return new RGWElasticHandleRemoteObjCR(sc, sync_pipe, key, conf, versioned_epoch.value_or(0)); } - RGWCoroutine *remove_object(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override { + RGWCoroutine *remove_object(const DoutPrefixProvider *dpp, RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override { /* versioned and versioned epoch params are useless in the elasticsearch backend case */ - ldout(sc->cct, 10) << conf->id << ": rm_object: b=" << sync_pipe.info.source_bs.bucket << " k=" << key << " mtime=" << mtime << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl; + ldpp_dout(dpp, 10) << conf->id << ": rm_object: b=" << sync_pipe.info.source_bs.bucket << " k=" << key << " mtime=" << mtime << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl; if (!conf->should_handle_operation(sync_pipe.dest_bucket_info)) { - ldout(sc->cct, 10) << conf->id << ": skipping operation (bucket not approved)" << dendl; + ldpp_dout(dpp, 10) << conf->id << ": skipping operation (bucket not approved)" << dendl; return nullptr; } return new RGWElasticRemoveRemoteObjCBCR(sc, sync_pipe, key, mtime, conf); } - RGWCoroutine *create_delete_marker(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime, + RGWCoroutine *create_delete_marker(const DoutPrefixProvider *dpp, RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime, rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override { - ldout(sc->cct, 10) << conf->id << ": create_delete_marker: b=" << sync_pipe.info.source_bs.bucket << " k=" << key << " mtime=" << mtime + ldpp_dout(dpp, 10) << conf->id << ": create_delete_marker: b=" << sync_pipe.info.source_bs.bucket << " k=" << key << " mtime=" << mtime << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl; - ldout(sc->cct, 10) << conf->id << ": skipping operation (not handled)" << dendl; + ldpp_dout(dpp, 10) << conf->id << ": skipping operation (not handled)" << dendl; return NULL; } RGWRESTConn *get_rest_conn() { @@ -922,9 +923,9 @@ public: } }; -RGWElasticSyncModuleInstance::RGWElasticSyncModuleInstance(CephContext *cct, const JSONFormattable& config) +RGWElasticSyncModuleInstance::RGWElasticSyncModuleInstance(const DoutPrefixProvider *dpp, CephContext *cct, const JSONFormattable& config) { - data_handler = std::unique_ptr(new RGWElasticDataSyncModule(cct, config)); + data_handler = std::unique_ptr(new RGWElasticDataSyncModule(dpp, cct, config)); } RGWDataSyncModule *RGWElasticSyncModuleInstance::get_data_handler() @@ -953,9 +954,9 @@ RGWRESTMgr *RGWElasticSyncModuleInstance::get_rest_filter(int dialect, RGWRESTMg return new RGWRESTMgr_MDSearch_S3(); } -int RGWElasticSyncModule::create_instance(CephContext *cct, const JSONFormattable& config, RGWSyncModuleInstanceRef *instance) { +int RGWElasticSyncModule::create_instance(const DoutPrefixProvider *dpp, CephContext *cct, const JSONFormattable& config, RGWSyncModuleInstanceRef *instance) { string endpoint = config["endpoint"]; - instance->reset(new RGWElasticSyncModuleInstance(cct, config)); + instance->reset(new RGWElasticSyncModuleInstance(dpp, cct, config)); return 0; } diff --git a/src/rgw/rgw_sync_module_es.h b/src/rgw/rgw_sync_module_es.h index d8c088a9aa245..6c0c422c39ccb 100644 --- a/src/rgw/rgw_sync_module_es.h +++ b/src/rgw/rgw_sync_module_es.h @@ -39,7 +39,7 @@ public: bool supports_data_export() override { return false; } - int create_instance(CephContext *cct, const JSONFormattable& config, RGWSyncModuleInstanceRef *instance) override; + int create_instance(const DoutPrefixProvider *dpp, CephContext *cct, const JSONFormattable& config, RGWSyncModuleInstanceRef *instance) override; }; class RGWElasticDataSyncModule; @@ -48,7 +48,7 @@ class RGWRESTConn; class RGWElasticSyncModuleInstance : public RGWSyncModuleInstance { std::unique_ptr data_handler; public: - RGWElasticSyncModuleInstance(CephContext *cct, const JSONFormattable& config); + RGWElasticSyncModuleInstance(const DoutPrefixProvider *dpp, CephContext *cct, const JSONFormattable& config); RGWDataSyncModule *get_data_handler() override; RGWRESTMgr *get_rest_filter(int dialect, RGWRESTMgr *orig) override; RGWRESTConn *get_rest_conn(); diff --git a/src/rgw/rgw_sync_module_log.cc b/src/rgw/rgw_sync_module_log.cc index 9c02818d329d4..a21604cc228ed 100644 --- a/src/rgw/rgw_sync_module_log.cc +++ b/src/rgw/rgw_sync_module_log.cc @@ -43,17 +43,17 @@ class RGWLogDataSyncModule : public RGWDataSyncModule { public: explicit RGWLogDataSyncModule(const string& _prefix) : prefix(_prefix) {} - RGWCoroutine *sync_object(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, std::optional versioned_epoch, rgw_zone_set *zones_trace) override { - ldout(sc->cct, 0) << prefix << ": SYNC_LOG: sync_object: b=" << sync_pipe.info.source_bs.bucket << " k=" << key << " versioned_epoch=" << versioned_epoch.value_or(0) << dendl; + RGWCoroutine *sync_object(const DoutPrefixProvider *dpp, RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, std::optional versioned_epoch, rgw_zone_set *zones_trace) override { + ldpp_dout(dpp, 0) << prefix << ": SYNC_LOG: sync_object: b=" << sync_pipe.info.source_bs.bucket << " k=" << key << " versioned_epoch=" << versioned_epoch.value_or(0) << dendl; return new RGWLogStatRemoteObjCR(sc, sync_pipe.info.source_bs.bucket, key); } - RGWCoroutine *remove_object(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override { - ldout(sc->cct, 0) << prefix << ": SYNC_LOG: rm_object: b=" << sync_pipe.info.source_bs.bucket << " k=" << key << " mtime=" << mtime << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl; + RGWCoroutine *remove_object(const DoutPrefixProvider *dpp, RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override { + ldpp_dout(dpp, 0) << prefix << ": SYNC_LOG: rm_object: b=" << sync_pipe.info.source_bs.bucket << " k=" << key << " mtime=" << mtime << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl; return NULL; } - RGWCoroutine *create_delete_marker(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime, + RGWCoroutine *create_delete_marker(const DoutPrefixProvider *dpp, RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime, rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override { - ldout(sc->cct, 0) << prefix << ": SYNC_LOG: create_delete_marker: b=" << sync_pipe.info.source_bs.bucket << " k=" << key << " mtime=" << mtime + ldpp_dout(dpp, 0) << prefix << ": SYNC_LOG: create_delete_marker: b=" << sync_pipe.info.source_bs.bucket << " k=" << key << " mtime=" << mtime << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl; return NULL; } @@ -68,7 +68,7 @@ public: } }; -int RGWLogSyncModule::create_instance(CephContext *cct, const JSONFormattable& config, RGWSyncModuleInstanceRef *instance) { +int RGWLogSyncModule::create_instance(const DoutPrefixProvider *dpp, CephContext *cct, const JSONFormattable& config, RGWSyncModuleInstanceRef *instance) { string prefix = config["prefix"]; instance->reset(new RGWLogSyncModuleInstance(prefix)); return 0; diff --git a/src/rgw/rgw_sync_module_log.h b/src/rgw/rgw_sync_module_log.h index 416b3f51c18ad..ecf3bb78911ef 100644 --- a/src/rgw/rgw_sync_module_log.h +++ b/src/rgw/rgw_sync_module_log.h @@ -12,7 +12,7 @@ public: bool supports_data_export() override { return false; } - int create_instance(CephContext *cct, const JSONFormattable& config, RGWSyncModuleInstanceRef *instance) override; + int create_instance(const DoutPrefixProvider *dpp, CephContext *cct, const JSONFormattable& config, RGWSyncModuleInstanceRef *instance) override; }; #endif diff --git a/src/rgw/rgw_sync_module_pubsub.cc b/src/rgw/rgw_sync_module_pubsub.cc index c14e795fc57e8..d256111922d73 100644 --- a/src/rgw/rgw_sync_module_pubsub.cc +++ b/src/rgw/rgw_sync_module_pubsub.cc @@ -386,10 +386,10 @@ class RGWSingletonCR : public RGWCoroutine { WaiterInfoRef waiter; while (get_next_waiter(&waiter)) { - ldout(cct, 20) << __func__ << "(): RGWSingletonCR: waking up waiter" << dendl; + ldpp_dout(dpp, 20) << __func__ << "(): RGWSingletonCR: waking up waiter" << dendl; waiter->cr->set_retcode(retcode); waiter->cr->set_sleeping(false); - return_result(waiter->result); + return_result(dpp, waiter->result); put(); } @@ -398,29 +398,29 @@ class RGWSingletonCR : public RGWCoroutine { return 0; } - virtual void return_result(T *result) {} + virtual void return_result(const DoutPrefixProvider *dpp, T *result) {} public: RGWSingletonCR(CephContext *_cct) : RGWCoroutine(_cct) {} - int execute(RGWCoroutine *caller, T *result = nullptr) { + int execute(const DoutPrefixProvider *dpp, RGWCoroutine *caller, T *result = nullptr) { if (!started) { - ldout(cct, 20) << __func__ << "(): singleton not started, starting" << dendl; + ldpp_dout(dpp, 20) << __func__ << "(): singleton not started, starting" << dendl; started = true; caller->call(this); return 0; } else if (!is_done()) { - ldout(cct, 20) << __func__ << "(): singleton not done yet, registering as waiter" << dendl; + ldpp_dout(dpp, 20) << __func__ << "(): singleton not done yet, registering as waiter" << dendl; get(); add_waiter(caller, result); caller->set_sleeping(true); return 0; } - ldout(cct, 20) << __func__ << "(): singleton done, returning retcode=" << retcode << dendl; + ldpp_dout(dpp, 20) << __func__ << "(): singleton done, returning retcode=" << retcode << dendl; caller->set_retcode(retcode); - return_result(result); + return_result(dpp, result); return retcode; } }; @@ -713,8 +713,8 @@ public: return sub; } - int call_init_cr(RGWCoroutine *caller) { - return init_cr->execute(caller); + int call_init_cr(const DoutPrefixProvider *dpp, RGWCoroutine *caller) { + return init_cr->execute(dpp, caller); } template @@ -791,7 +791,7 @@ class PSManager *ref = PSSubscription::get_shared(sc, mgr->env, user_sub_conf); } - yield (*ref)->call_init_cr(this); + yield (*ref)->call_init_cr(dpp, this); if (retcode < 0) { ldpp_dout(dpp, 1) << "ERROR: failed to init subscription when getting subscription: " << sub_name << dendl; mgr->remove_get_sub(owner, sub_name); @@ -805,8 +805,8 @@ class PSManager return 0; } - void return_result(PSSubscriptionRef *result) override { - ldout(cct, 20) << __func__ << "(): returning result: retcode=" << retcode << " resultp=" << (void *)result << dendl; + void return_result(const DoutPrefixProvider *dpp, PSSubscriptionRef *result) override { + ldpp_dout(dpp, 20) << __func__ << "(): returning result: retcode=" << retcode << " resultp=" << (void *)result << dendl; if (retcode >= 0) { *result = *ref; } @@ -851,19 +851,19 @@ public: return std::shared_ptr(new PSManager(_sc, _env)); } - static int call_get_subscription_cr(RGWDataSyncCtx *sc, PSManagerRef& mgr, + static int call_get_subscription_cr(const DoutPrefixProvider *dpp, RGWDataSyncCtx *sc, PSManagerRef& mgr, RGWCoroutine *caller, const rgw_user& owner, const string& sub_name, PSSubscriptionRef *ref) { if (mgr->find_sub_instance(owner, sub_name, ref)) { /* found it! nothing to execute */ - ldout(sc->cct, 20) << __func__ << "(): found sub instance" << dendl; + ldpp_dout(dpp, 20) << __func__ << "(): found sub instance" << dendl; } auto& gs = mgr->get_get_subs(owner, sub_name); if (!gs) { - ldout(sc->cct, 20) << __func__ << "(): first get subs" << dendl; + ldpp_dout(dpp, 20) << __func__ << "(): first get subs" << dendl; gs = new GetSubCR(sc, mgr, owner, sub_name, ref); } - ldout(sc->cct, 20) << __func__ << "(): executing get subs" << dendl; - return gs->execute(caller, ref); + ldpp_dout(dpp, 20) << __func__ << "(): executing get subs" << dendl; + return gs->execute(dpp, caller, ref); } friend class GetSubCR; @@ -1063,7 +1063,7 @@ public: ldpp_dout(dpp, 20) << ": subscription: " << *siter << dendl; has_subscriptions = true; // try to read subscription configuration - yield PSManager::call_get_subscription_cr(sc, env->manager, this, owner, *siter, &sub); + yield PSManager::call_get_subscription_cr(dpp, sc, env->manager, this, owner, *siter, &sub); if (retcode < 0) { if (perfcounter) perfcounter->inc(l_rgw_pubsub_missing_conf); ldpp_dout(dpp, 1) << "ERROR: failed to find subscription config for subscription=" << *siter @@ -1337,28 +1337,28 @@ public: env->init_instance(sync_env->svc->zone->get_realm(), instance_id, mgr); } - RGWCoroutine *start_sync(RGWDataSyncCtx *sc) override { - ldout(sc->cct, 5) << conf->id << ": start" << dendl; + RGWCoroutine *start_sync(const DoutPrefixProvider *dpp, RGWDataSyncCtx *sc) override { + ldpp_dout(dpp, 5) << conf->id << ": start" << dendl; return new RGWPSInitEnvCBCR(sc, env); } - RGWCoroutine *sync_object(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, + RGWCoroutine *sync_object(const DoutPrefixProvider *dpp, RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, std::optional versioned_epoch, rgw_zone_set *zones_trace) override { - ldout(sc->cct, 10) << conf->id << ": sync_object: b=" << sync_pipe << + ldpp_dout(dpp, 10) << conf->id << ": sync_object: b=" << sync_pipe << " k=" << key << " versioned_epoch=" << versioned_epoch.value_or(0) << dendl; return new RGWPSHandleObjCreateCR(sc, sync_pipe, key, env, versioned_epoch); } - RGWCoroutine *remove_object(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, + RGWCoroutine *remove_object(const DoutPrefixProvider *dpp, RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override { - ldout(sc->cct, 10) << conf->id << ": rm_object: b=" << sync_pipe << + ldpp_dout(dpp, 10) << conf->id << ": rm_object: b=" << sync_pipe << " k=" << key << " mtime=" << mtime << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl; return new RGWPSGenericObjEventCBCR(sc, env, sync_pipe, key, mtime, rgw::notify::ObjectRemovedDelete); } - RGWCoroutine *create_delete_marker(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, + RGWCoroutine *create_delete_marker(const DoutPrefixProvider *dpp, RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime, rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override { - ldout(sc->cct, 10) << conf->id << ": create_delete_marker: b=" << sync_pipe << + ldpp_dout(dpp, 10) << conf->id << ": create_delete_marker: b=" << sync_pipe << " k=" << key << " mtime=" << mtime << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl; return new RGWPSGenericObjEventCBCR(sc, env, sync_pipe, key, mtime, rgw::notify::ObjectRemovedDeleteMarkerCreated); } @@ -1366,13 +1366,13 @@ public: PSConfigRef& get_conf() { return conf; } }; -RGWPSSyncModuleInstance::RGWPSSyncModuleInstance(CephContext *cct, const JSONFormattable& config) +RGWPSSyncModuleInstance::RGWPSSyncModuleInstance(const DoutPrefixProvider *dpp, CephContext *cct, const JSONFormattable& config) { data_handler = std::unique_ptr(new RGWPSDataSyncModule(cct, config)); const std::string jconf = json_str("conf", *data_handler->get_conf()); JSONParser p; if (!p.parse(jconf.c_str(), jconf.size())) { - ldout(cct, 1) << "ERROR: failed to parse sync module effective conf: " << jconf << dendl; + ldpp_dout(dpp, 1) << "ERROR: failed to parse sync module effective conf: " << jconf << dendl; effective_conf = config; } else { effective_conf.decode_json(&p); @@ -1395,8 +1395,8 @@ bool RGWPSSyncModuleInstance::should_full_sync() const { return data_handler->get_conf()->start_with_full_sync; } -int RGWPSSyncModule::create_instance(CephContext *cct, const JSONFormattable& config, RGWSyncModuleInstanceRef *instance) { - instance->reset(new RGWPSSyncModuleInstance(cct, config)); +int RGWPSSyncModule::create_instance(const DoutPrefixProvider *dpp, CephContext *cct, const JSONFormattable& config, RGWSyncModuleInstanceRef *instance) { + instance->reset(new RGWPSSyncModuleInstance(dpp, cct, config)); return 0; } diff --git a/src/rgw/rgw_sync_module_pubsub.h b/src/rgw/rgw_sync_module_pubsub.h index 8acc1a626f9ae..e94772cab8ad1 100644 --- a/src/rgw/rgw_sync_module_pubsub.h +++ b/src/rgw/rgw_sync_module_pubsub.h @@ -15,7 +15,7 @@ public: bool supports_writes() override { return true; } - int create_instance(CephContext *cct, const JSONFormattable& config, RGWSyncModuleInstanceRef *instance) override; + int create_instance(const DoutPrefixProvider *dpp, CephContext *cct, const JSONFormattable& config, RGWSyncModuleInstanceRef *instance) override; }; class RGWPSDataSyncModule; @@ -25,7 +25,7 @@ class RGWPSSyncModuleInstance : public RGWSyncModuleInstance { std::unique_ptr data_handler; JSONFormattable effective_conf; public: - RGWPSSyncModuleInstance(CephContext *cct, const JSONFormattable& config); + RGWPSSyncModuleInstance(const DoutPrefixProvider *dpp, CephContext *cct, const JSONFormattable& config); ~RGWPSSyncModuleInstance() = default; RGWDataSyncModule *get_data_handler() override; RGWRESTMgr *get_rest_filter(int dialect, RGWRESTMgr *orig) override; diff --git a/src/rgw/services/svc_sync_modules.cc b/src/rgw/services/svc_sync_modules.cc index 175df08a97bb5..ce9c1e8ba5990 100644 --- a/src/rgw/services/svc_sync_modules.cc +++ b/src/rgw/services/svc_sync_modules.cc @@ -20,7 +20,7 @@ int RGWSI_SyncModules::do_start(optional_yield, const DoutPrefixProvider *dpp) { auto& zone_public_config = svc.zone->get_zone(); - int ret = sync_modules_manager->create_instance(cct, zone_public_config.tier_type, svc.zone->get_zone_params().tier_config, &sync_module); + int ret = sync_modules_manager->create_instance(dpp, cct, zone_public_config.tier_type, svc.zone->get_zone_params().tier_config, &sync_module); if (ret < 0) { ldpp_dout(dpp, -1) << "ERROR: failed to start sync module instance, ret=" << ret << dendl; if (ret == -ENOENT) { -- 2.39.5