#include "cls_fifo_legacy.h"
namespace rgw::cls::fifo {
-static constexpr auto dout_subsys = ceph_subsys_objclass;
namespace cb = ceph::buffer;
namespace fifo = rados::cls::fifo;
}
-int FIFO::apply_update(fifo::info* info,
+int FIFO::apply_update(const DoutPrefixProvider *dpp,
+ fifo::info* info,
const fifo::objv& objv,
const fifo::update& update,
std::uint64_t tid)
{
- ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
<< " entering: tid=" << tid << dendl;
std::unique_lock l(m);
if (objv != info->version) {
- lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
<< " version mismatch, canceling: tid=" << tid << dendl;
return -ECANCELED;
}
auto err = info->apply_update(update);
if (err) {
- lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
<< " error applying update: " << *err << " tid=" << tid << dendl;
return -ECANCELED;
}
if (r >= 0 || r == -ECANCELED) {
canceled = (r == -ECANCELED);
if (!canceled) {
- r = apply_update(&info, version, update, tid);
+ r = apply_update(dpp, &info, version, update, tid);
if (r < 0) canceled = true;
}
if (canceled) {
ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
<< " entering: tid=" << tid << dendl;
if (reread)
- handle_reread(std::move(p), r);
+ handle_reread(dpp, std::move(p), r);
else
handle_update(dpp, std::move(p), r);
}
}
bool canceled = (r == -ECANCELED);
if (!canceled) {
- int r = fifo->apply_update(&fifo->info, version, update, tid);
+ int r = fifo->apply_update(dpp, &fifo->info, version, update, tid);
if (r < 0) {
ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
<< " update failed, marking canceled: r=" << r
complete(std::move(p), 0);
}
- void handle_reread(Ptr&& p, int r) {
- ldout(fifo->cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ void handle_reread(const DoutPrefixProvider *dpp, Ptr&& p, int r) {
+ ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
<< " handling async read_meta: tid="
<< tid << dendl;
if (r < 0 && pcanceled) {
*pcanceled = true;
}
if (r < 0) {
- lderr(fifo->cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
<< " failed dispatching read_meta: r=" << r << " tid="
<< tid << dendl;
} else {
- ldout(fifo->cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
<< " completing: tid=" << tid << dendl;
}
complete(std::move(p), r);
return 0;
}
-void FIFO::trim_part(int64_t part_num, uint64_t ofs,
+void FIFO::trim_part(const DoutPrefixProvider *dpp, int64_t part_num, uint64_t ofs,
std::optional<std::string_view> tag,
bool exclusive, std::uint64_t tid,
lr::AioCompletion* c)
{
- ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
<< " entering: tid=" << tid << dendl;
lr::ObjectWriteOperation op;
std::unique_lock l(m);
std::uint64_t tid;
bool new_heading = false;
- void prep_then_push(Ptr&& p, const unsigned successes) {
+ void prep_then_push(const DoutPrefixProvider *dpp, Ptr&& p, const unsigned successes) {
std::unique_lock l(f->m);
auto max_part_size = f->info.params.max_part_size;
auto part_entry_overhead = f->part_entry_overhead;
l.unlock();
- ldout(f->cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
<< " preparing push: remaining=" << remaining.size()
<< " batch=" << batch.size() << " i=" << i
<< " tid=" << tid << dendl;
batch.push_back(std::move(remaining.front()));
remaining.pop_front();
}
- ldout(f->cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
<< " prepared push: remaining=" << remaining.size()
<< " batch=" << batch.size() << " i=" << i
<< " batch_len=" << batch_len
return;
}
i = 0; // We've made forward progress, so reset the race counter!
- prep_then_push(std::move(p), r);
+ prep_then_push(dpp, std::move(p), r);
} else {
if (r < 0) {
ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
return;
}
new_heading = false;
- handle_new_head(std::move(p), r);
+ handle_new_head(dpp, std::move(p), r);
}
}
- void handle_new_head(Ptr&& p, int r) {
+ void handle_new_head(const DoutPrefixProvider *dpp, Ptr&& p, int r) {
if (r == -ECANCELED) {
if (p->i == MAX_RACE_RETRIES) {
- lderr(f->cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
<< " canceled too many times, giving up: tid=" << tid << dendl;
complete(std::move(p), -ECANCELED);
return;
}
if (p->batch.empty()) {
- prep_then_push(std::move(p), 0);
+ prep_then_push(dpp, std::move(p), 0);
return;
} else {
push(std::move(p));
<< " need new head tid=" << tid << dendl;
p->new_head(dpp, std::move(p));
} else {
- p->prep_then_push(std::move(p), 0);
+ p->prep_then_push(dpp, std::move(p), 0);
}
}
if (pn < part_num) {
ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
<< " pn=" << pn << " tid=" << tid << dendl;
- fifo->trim_part(pn++, max_part_size, std::nullopt,
+ fifo->trim_part(dpp, pn++, max_part_size, std::nullopt,
false, tid, call(std::move(p)));
} else {
update = true;
canceled = tail_part_num < part_num;
- fifo->trim_part(part_num, ofs, std::nullopt, exclusive, tid,
+ fifo->trim_part(dpp, part_num, ofs, std::nullopt, exclusive, tid,
call(std::move(p)));
}
return;
std::unique_lock l(fifo->m);
const auto max_part_size = fifo->info.params.max_part_size;
l.unlock();
- fifo->trim_part(pn++, max_part_size, std::nullopt,
+ fifo->trim_part(dpp, pn++, max_part_size, std::nullopt,
false, tid, call(std::move(p)));
return;
}
l.unlock();
update = true;
canceled = tail_part_num < part_num;
- fifo->trim_part(part_num, ofs, std::nullopt, exclusive, tid,
+ fifo->trim_part(dpp, part_num, ofs, std::nullopt, exclusive, tid,
call(std::move(p)));
return;
}
} else {
trimmer->update = true;
}
- trim_part(pn, ofs, std::nullopt, exclusive,
+ trim_part(dpp, pn, ofs, std::nullopt, exclusive,
tid, Trimmer::call(std::move(trimmer)));
}
pp_callback,
} state;
- void create_part(Ptr&& p, int64_t part_num,
+ void create_part(const DoutPrefixProvider *dpp, Ptr&& p, int64_t part_num,
std::string_view tag) {
- ldout(fifo->cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
<< " entering: tid=" << tid << dendl;
state = entry_callback;
lr::ObjectWriteOperation op;
return;
}
- void remove_part(Ptr&& p, int64_t part_num,
+ void remove_part(const DoutPrefixProvider *dpp, Ptr&& p, int64_t part_num,
std::string_view tag) {
- ldout(fifo->cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
<< " entering: tid=" << tid << dendl;
state = entry_callback;
lr::ObjectWriteOperation op;
const auto entry = iter->second;
switch (entry.op) {
case fifo::journal_entry::Op::create:
- create_part(std::move(p), entry.part_num, entry.part_tag);
+ create_part(dpp, std::move(p), entry.part_num, entry.part_tag);
return;
case fifo::journal_entry::Op::set_head:
if (entry.part_num > new_head) {
++iter;
continue;
case fifo::journal_entry::Op::remove:
- remove_part(std::move(p), entry.part_num, entry.part_tag);
+ remove_part(dpp, std::move(p), entry.part_num, entry.part_tag);
return;
default:
- lderr(fifo->cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
<< " unknown journaled op: entry=" << entry << " tid="
<< tid << dendl;
complete(std::move(p), -EIO);
std::string generate_tag() const;
- int apply_update(fifo::info* info,
+ int apply_update(const DoutPrefixProvider *dpp,
+ fifo::info* info,
const fifo::objv& objv,
const fifo::update& update,
std::uint64_t tid);
int trim_part(const DoutPrefixProvider *dpp, int64_t part_num, uint64_t ofs,
std::optional<std::string_view> tag, bool exclusive,
std::uint64_t tid, optional_yield y);
- void trim_part(int64_t part_num, uint64_t ofs,
+ void trim_part(const DoutPrefixProvider *dpp, int64_t part_num, uint64_t ofs,
std::optional<std::string_view> tag, bool exclusive,
std::uint64_t tid, lr::AioCompletion* c);
}
RGWSyncModuleInstanceRef sync_module;
- int ret = static_cast<rgw::sal::RadosStore*>(store)->svc()->sync_modules->get_manager()->create_instance(g_ceph_context, static_cast<rgw::sal::RadosStore*>(store)->svc()->zone->get_zone().tier_type,
+ int ret = static_cast<rgw::sal::RadosStore*>(store)->svc()->sync_modules->get_manager()->create_instance(dpp(), g_ceph_context, static_cast<rgw::sal::RadosStore*>(store)->svc()->zone->get_zone().tier_type,
store->get_zone()->get_params().tier_config, &sync_module);
if (ret < 0) {
ldpp_dout(dpp(), -1) << "ERROR: failed to init sync module instance, ret=" << ret << dendl;
req->get_out_headers(attrs);
}
-int RGWStreamReadHTTPResourceCRF::decode_rest_obj(map<string, string>& headers, bufferlist& extra_data) {
+int RGWStreamReadHTTPResourceCRF::decode_rest_obj(const DoutPrefixProvider *dpp, map<string, string>& headers, bufferlist& extra_data) {
/* basic generic implementation */
for (auto header : headers) {
const string& val = header.second;
return 0;
}
-int RGWStreamReadHTTPResourceCRF::read(bufferlist *out, uint64_t max_size, bool *io_pending)
+int RGWStreamReadHTTPResourceCRF::read(const DoutPrefixProvider *dpp, bufferlist *out, uint64_t max_size, bool *io_pending)
{
reenter(&read_state) {
io_read_mask = req->get_io_id(RGWHTTPClient::HTTPCLIENT_IO_READ | RGWHTTPClient::HTTPCLIENT_IO_CONTROL);
extra_data.claim_append(in_cb->get_extra_data());
map<string, string> attrs;
req->get_out_headers(&attrs);
- int ret = decode_rest_obj(attrs, extra_data);
+ int ret = decode_rest_obj(dpp, attrs, extra_data);
if (ret < 0) {
ldout(cct, 0) << "ERROR: " << __func__ << " decode_rest_obj() returned ret=" << ret << dendl;
return ret;
do {
yield {
- ret = in_crf->read(&bl, 4 * 1024 * 1024, &need_retry);
+ ret = in_crf->read(dpp, &bl, 4 * 1024 * 1024, &need_retry);
if (ret < 0) {
return set_cr_error(ret);
}
public:
virtual int init(const DoutPrefixProvider *dpp) = 0;
- virtual int read(bufferlist *data, uint64_t max, bool *need_retry) = 0; /* reentrant */
- virtual int decode_rest_obj(std::map<std::string, std::string>& headers, bufferlist& extra_data) = 0;
+ virtual int read(const DoutPrefixProvider *dpp, bufferlist *data, uint64_t max, bool *need_retry) = 0; /* reentrant */
+ virtual int decode_rest_obj(const DoutPrefixProvider *dpp, std::map<std::string, std::string>& headers, bufferlist& extra_data) = 0;
virtual bool has_attrs() = 0;
virtual void get_attrs(std::map<std::string, std::string> *attrs) = 0;
virtual ~RGWStreamReadResourceCRF() = default;
~RGWStreamReadHTTPResourceCRF();
int init(const DoutPrefixProvider *dpp) override;
- int read(bufferlist *data, uint64_t max, bool *need_retry) override; /* reentrant */
- int decode_rest_obj(std::map<std::string, std::string>& headers, bufferlist& extra_data) override;
+ int read(const DoutPrefixProvider *dpp, bufferlist *data, uint64_t max, bool *need_retry) override; /* reentrant */
+ int decode_rest_obj(const DoutPrefixProvider *dpp, std::map<std::string, std::string>& headers, bufferlist& extra_data) override;
bool has_attrs() override;
void get_attrs(std::map<std::string, std::string> *attrs) override;
bool is_done();
tn->log(10, SSTR("building full sync maps"));
/* call sync module init here */
sync_status.sync_info.num_shards = num_shards;
- yield call(data_sync_module->init_sync(sc));
+ yield call(data_sync_module->init_sync(dpp, sc));
if (retcode < 0) {
tn->log(0, SSTR("ERROR: sync module init_sync() failed, retcode=" << retcode));
return set_cr_error(retcode);
*reset_backoff = true;
}
- yield call(data_sync_module->start_sync(sc));
+ yield call(data_sync_module->start_sync(dpp, sc));
if (retcode < 0) {
tn->log(0, SSTR("ERROR: failed to start sync, retcode=" << retcode));
return set_cr_error(retcode);
public:
RGWDefaultDataSyncModule() {}
- RGWCoroutine *sync_object(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, std::optional<uint64_t> versioned_epoch, rgw_zone_set *zones_trace) override;
- RGWCoroutine *remove_object(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override;
- RGWCoroutine *create_delete_marker(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime,
+ RGWCoroutine *sync_object(const DoutPrefixProvider *dpp, RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, std::optional<uint64_t> versioned_epoch, rgw_zone_set *zones_trace) override;
+ RGWCoroutine *remove_object(const DoutPrefixProvider *dpp, RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override;
+ RGWCoroutine *create_delete_marker(const DoutPrefixProvider *dpp, RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime,
rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override;
};
}
};
-int RGWDefaultSyncModule::create_instance(CephContext *cct, const JSONFormattable& config, RGWSyncModuleInstanceRef *instance)
+int RGWDefaultSyncModule::create_instance(const DoutPrefixProvider *dpp, CephContext *cct, const JSONFormattable& config, RGWSyncModuleInstanceRef *instance)
{
instance->reset(new RGWDefaultSyncModuleInstance());
return 0;
}
};
-RGWCoroutine *RGWDefaultDataSyncModule::sync_object(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, std::optional<uint64_t> versioned_epoch, rgw_zone_set *zones_trace)
+RGWCoroutine *RGWDefaultDataSyncModule::sync_object(const DoutPrefixProvider *dpp, RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, std::optional<uint64_t> versioned_epoch, rgw_zone_set *zones_trace)
{
return new RGWObjFetchCR(sc, sync_pipe, key, std::nullopt, versioned_epoch, zones_trace);
}
-RGWCoroutine *RGWDefaultDataSyncModule::remove_object(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key,
+RGWCoroutine *RGWDefaultDataSyncModule::remove_object(const DoutPrefixProvider *dpp, RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key,
real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace)
{
auto sync_env = sc->env;
NULL, NULL, false, &mtime, zones_trace);
}
-RGWCoroutine *RGWDefaultDataSyncModule::create_delete_marker(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime,
+RGWCoroutine *RGWDefaultDataSyncModule::create_delete_marker(const DoutPrefixProvider *dpp, RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime,
rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace)
{
auto sync_env = sc->env;
public:
RGWArchiveDataSyncModule() {}
- RGWCoroutine *sync_object(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, std::optional<uint64_t> versioned_epoch, rgw_zone_set *zones_trace) override;
- RGWCoroutine *remove_object(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override;
- RGWCoroutine *create_delete_marker(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime,
+ RGWCoroutine *sync_object(const DoutPrefixProvider *dpp, RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, std::optional<uint64_t> versioned_epoch, rgw_zone_set *zones_trace) override;
+ RGWCoroutine *remove_object(const DoutPrefixProvider *dpp, RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override;
+ RGWCoroutine *create_delete_marker(const DoutPrefixProvider *dpp, RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime,
rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override;
};
}
};
-int RGWArchiveSyncModule::create_instance(CephContext *cct, const JSONFormattable& config, RGWSyncModuleInstanceRef *instance)
+int RGWArchiveSyncModule::create_instance(const DoutPrefixProvider *dpp, CephContext *cct, const JSONFormattable& config, RGWSyncModuleInstanceRef *instance)
{
instance->reset(new RGWArchiveSyncModuleInstance());
return 0;
}
-RGWCoroutine *RGWArchiveDataSyncModule::sync_object(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, std::optional<uint64_t> versioned_epoch, rgw_zone_set *zones_trace)
+RGWCoroutine *RGWArchiveDataSyncModule::sync_object(const DoutPrefixProvider *dpp, RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, std::optional<uint64_t> versioned_epoch, rgw_zone_set *zones_trace)
{
auto sync_env = sc->env;
ldout(sc->cct, 5) << "SYNC_ARCHIVE: sync_object: b=" << sync_pipe.info.source_bs.bucket << " k=" << key << " versioned_epoch=" << versioned_epoch.value_or(0) << dendl;
return new RGWObjFetchCR(sc, sync_pipe, key, dest_key, versioned_epoch, zones_trace);
}
-RGWCoroutine *RGWArchiveDataSyncModule::remove_object(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key,
+RGWCoroutine *RGWArchiveDataSyncModule::remove_object(const DoutPrefixProvider *dpp, RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key,
real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace)
{
ldout(sc->cct, 0) << "SYNC_ARCHIVE: remove_object: b=" << sync_pipe.info.source_bs.bucket << " k=" << key << " versioned_epoch=" << versioned_epoch << dendl;
return NULL;
}
-RGWCoroutine *RGWArchiveDataSyncModule::create_delete_marker(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime,
+RGWCoroutine *RGWArchiveDataSyncModule::create_delete_marker(const DoutPrefixProvider *dpp, RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime,
rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace)
{
ldout(sc->cct, 0) << "SYNC_ARCHIVE: create_delete_marker: b=" << sync_pipe.info.source_bs.bucket << " k=" << key << " mtime=" << mtime
op == CLS_RGW_OP_LINK_OLH) {
set_status("syncing obj");
tn->log(5, SSTR("bucket sync: sync obj: " << sc->source_zone << "/" << bs.bucket << "/" << key << "[" << versioned_epoch.value_or(0) << "]"));
- call(data_sync_module->sync_object(sc, sync_pipe, key, versioned_epoch, &zones_trace));
+ call(data_sync_module->sync_object(dpp, sc, sync_pipe, key, versioned_epoch, &zones_trace));
} else if (op == CLS_RGW_OP_DEL || op == CLS_RGW_OP_UNLINK_INSTANCE) {
set_status("removing obj");
if (op == CLS_RGW_OP_UNLINK_INSTANCE) {
versioned = true;
}
tn->log(10, SSTR("removing obj: " << sc->source_zone << "/" << bs.bucket << "/" << key << "[" << versioned_epoch.value_or(0) << "]"));
- call(data_sync_module->remove_object(sc, sync_pipe, key, timestamp, versioned, versioned_epoch.value_or(0), &zones_trace));
+ call(data_sync_module->remove_object(dpp, sc, sync_pipe, key, timestamp, versioned, versioned_epoch.value_or(0), &zones_trace));
// our copy of the object is more recent, continue as if it succeeded
} else if (op == CLS_RGW_OP_LINK_OLH_DM) {
set_status("creating delete marker");
tn->log(10, SSTR("creating delete marker: obj: " << sc->source_zone << "/" << bs.bucket << "/" << key << "[" << versioned_epoch.value_or(0) << "]"));
- call(data_sync_module->create_delete_marker(sc, sync_pipe, key, timestamp, owner, versioned, versioned_epoch.value_or(0), &zones_trace));
+ call(data_sync_module->create_delete_marker(dpp, sc, sync_pipe, key, timestamp, owner, versioned, versioned_epoch.value_or(0), &zones_trace));
}
tn->set_resource_name(SSTR(bucket_str_noinstance(bs.bucket) << "/" << key));
}
RGWDefaultSyncModule() {}
bool supports_writes() override { return true; }
bool supports_data_export() override { return true; }
- int create_instance(CephContext *cct, const JSONFormattable& config, RGWSyncModuleInstanceRef *instance) override;
+ int create_instance(const DoutPrefixProvider *dpp, CephContext *cct, const JSONFormattable& config, RGWSyncModuleInstanceRef *instance) override;
};
class RGWArchiveSyncModule : public RGWDefaultSyncModule {
RGWArchiveSyncModule() {}
bool supports_writes() override { return true; }
bool supports_data_export() override { return false; }
- int create_instance(CephContext *cct, const JSONFormattable& config, RGWSyncModuleInstanceRef *instance) override;
+ int create_instance(const DoutPrefixProvider *dpp, CephContext *cct, const JSONFormattable& config, RGWSyncModuleInstanceRef *instance) override;
};
#endif
string err;
long len = strict_strtol(val.c_str(), 10, &err);
if (!err.empty()) {
- ldout(cct, 0) << "ERROR: failed converting content length (" << val << ") to int " << dendl;
+ ldpp_dout(this, 0) << "ERROR: failed converting content length (" << val << ") to int " << dendl;
return -EINVAL;
}
char *s = (char *)ptr, *end = (char *)ptr + len;
char *p = line;
- ldout(cct, 10) << "receive_http_header" << dendl;
+ ldpp_dout(this, 10) << "receive_http_header" << dendl;
while (s != end) {
if (*s == '\r') {
}
if (*s == '\n') {
*p = '\0';
- ldout(cct, 10) << "received header:" << line << dendl;
+ ldpp_dout(this, 10) << "received header:" << line << dendl;
// TODO: fill whatever data required here
char *l = line;
char *tok = strsep(&l, " \t:");
}
-static bool identify_scope(CephContext *cct,
+static bool identify_scope(const DoutPrefixProvider *dpp,
+ CephContext *cct,
const string& host,
string *region,
string *service)
{
if (!boost::algorithm::ends_with(host, "amazonaws.com")) {
- ldout(cct, 20) << "NOTICE: cannot identify region for connection to: " << host << dendl;
+ ldpp_dout(dpp, 20) << "NOTICE: cannot identify region for connection to: " << host << dendl;
return false;
}
}
++iter;
if (iter == vec.end()) {
- ldout(cct, 0) << "WARNING: cannot identify region name from host name: " << host << dendl;
+ ldpp_dout(dpp, 0) << "WARNING: cannot identify region name from host name: " << host << dendl;
return false;
}
auto& next = *iter;
return false;
}
-static void scope_from_api_name(CephContext *cct,
+static void scope_from_api_name(const DoutPrefixProvider *dpp,
+ CephContext *cct,
const string& host,
std::optional<string> api_name,
string *region,
return;
}
- if (!identify_scope(cct, host, region, service)) {
+ if (!identify_scope(dpp, cct, host, region, service)) {
*region = cct->_conf->rgw_zonegroup;
*service = "s3";
return;
string region;
string service;
- scope_from_api_name(cct, host, api_name, ®ion, &service);
+ scope_from_api_name(dpp, cct, host, api_name, ®ion, &service);
const char *maybe_payload_hash = info.env->get("HTTP_X_AMZ_CONTENT_SHA256");
if (maybe_payload_hash) {
const string& resource, const param_vec_t& params,
std::optional<string> api_name)
{
- scope_from_api_name(cct, host, api_name, ®ion, &service);
+ scope_from_api_name(this, cct, host, api_name, ®ion, &service);
string params_str;
map<string, string>& args = new_info->args.get_params();
}
}
-static int parse_rgwx_mtime(CephContext *cct, const string& s, ceph::real_time *rt)
+static int parse_rgwx_mtime(const DoutPrefixProvider *dpp, CephContext *cct, const string& s, ceph::real_time *rt)
{
string err;
vector<string> vec;
long secs = strict_strtol(vec[0].c_str(), 10, &err);
long nsecs = 0;
if (!err.empty()) {
- ldout(cct, 0) << "ERROR: failed converting mtime (" << s << ") to real_time " << dendl;
+ ldpp_dout(dpp, 0) << "ERROR: failed converting mtime (" << s << ") to real_time " << dendl;
return -EINVAL;
}
if (vec.size() > 1) {
nsecs = strict_strtol(vec[1].c_str(), 10, &err);
if (!err.empty()) {
- ldout(cct, 0) << "ERROR: failed converting mtime (" << s << ") to real_time " << dendl;
+ ldpp_dout(dpp, 0) << "ERROR: failed converting mtime (" << s << ") to real_time " << dendl;
return -EINVAL;
}
}
string mtime_str;
set_str_from_headers(out_headers, "RGWX_MTIME", mtime_str);
if (!mtime_str.empty()) {
- int ret = parse_rgwx_mtime(cct, mtime_str, mtime);
+ int ret = parse_rgwx_mtime(this, cct, mtime_str, mtime);
if (ret < 0) {
return ret;
}
string err;
*psize = strict_strtoll(size_str.c_str(), 10, &err);
if (!err.empty()) {
- ldout(cct, 0) << "ERROR: failed parsing embedded metadata object size (" << size_str << ") to int " << dendl;
+ ldpp_dout(this, 0) << "ERROR: failed parsing embedded metadata object size (" << size_str << ") to int " << dendl;
return -EIO;
}
}
string err;
long len = strict_strtol(val.c_str(), 10, &err);
if (!err.empty()) {
- ldout(cct, 0) << "ERROR: failed converting embedded metadata len (" << val << ") to int " << dendl;
+ ldpp_dout(this, 0) << "ERROR: failed converting embedded metadata len (" << val << ") to int " << dendl;
return -EINVAL;
}
virtual void init(RGWDataSyncCtx *sync_env, uint64_t instance_id) {}
- virtual RGWCoroutine *init_sync(RGWDataSyncCtx *sc) {
+ virtual RGWCoroutine *init_sync(const DoutPrefixProvider *dpp, RGWDataSyncCtx *sc) {
return nullptr;
}
- virtual RGWCoroutine *start_sync(RGWDataSyncCtx *sc) {
+ virtual RGWCoroutine *start_sync(const DoutPrefixProvider *dpp, RGWDataSyncCtx *sc) {
return nullptr;
}
- virtual RGWCoroutine *sync_object(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, std::optional<uint64_t> versioned_epoch, rgw_zone_set *zones_trace) = 0;
- virtual RGWCoroutine *remove_object(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& bucket_info, rgw_obj_key& key, real_time& mtime,
+ virtual RGWCoroutine *sync_object(const DoutPrefixProvider *dpp, RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, std::optional<uint64_t> versioned_epoch, rgw_zone_set *zones_trace) = 0;
+ virtual RGWCoroutine *remove_object(const DoutPrefixProvider *dpp, RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& bucket_info, rgw_obj_key& key, real_time& mtime,
bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) = 0;
- virtual RGWCoroutine *create_delete_marker(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& bucket_info, rgw_obj_key& key, real_time& mtime,
+ virtual RGWCoroutine *create_delete_marker(const DoutPrefixProvider *dpp, RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& bucket_info, rgw_obj_key& key, real_time& mtime,
rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) = 0;
};
return false;
}
virtual bool supports_data_export() = 0;
- virtual int create_instance(CephContext *cct, const JSONFormattable& config, RGWSyncModuleInstanceRef *instance) = 0;
+ virtual int create_instance(const DoutPrefixProvider *dpp, CephContext *cct, const JSONFormattable& config, RGWSyncModuleInstanceRef *instance) = 0;
};
typedef std::shared_ptr<RGWSyncModule> RGWSyncModuleRef;
return module->supports_data_export();
}
- int create_instance(CephContext *cct, const std::string& name, const JSONFormattable& config, RGWSyncModuleInstanceRef *instance) {
+ int create_instance(const DoutPrefixProvider *dpp, CephContext *cct, const std::string& name, const JSONFormattable& config, RGWSyncModuleInstanceRef *instance) {
RGWSyncModuleRef module;
if (!get_module(name, &module)) {
return -ENOENT;
}
- return module.get()->create_instance(cct, config, instance);
+ return module.get()->create_instance(dpp, cct, config, instance);
}
std::vector<std::string> get_registered_module_names() const {
}
};
-static int conf_to_uint64(CephContext *cct, const JSONFormattable& config, const string& key, uint64_t *pval)
+static int conf_to_uint64(const DoutPrefixProvider *dpp, CephContext *cct, const JSONFormattable& config, const string& key, uint64_t *pval)
{
string sval;
if (config.find(key, &sval)) {
string err;
uint64_t val = strict_strtoll(sval.c_str(), 10, &err);
if (!err.empty()) {
- ldout(cct, 0) << "ERROR: could not parse configurable value for cloud sync module: " << key << ": " << sval << dendl;
+ ldpp_dout(dpp, 0) << "ERROR: could not parse configurable value for cloud sync module: " << key << ": " << sval << dendl;
return -EINVAL;
}
*pval = val;
uint64_t multipart_sync_threshold{DEFAULT_MULTIPART_SYNC_PART_SIZE};
uint64_t multipart_min_part_size{DEFAULT_MULTIPART_SYNC_PART_SIZE};
- int init(CephContext *cct, const JSONFormattable& config) {
- int r = conf_to_uint64(cct, config, "multipart_sync_threshold", &multipart_sync_threshold);
+ int init(const DoutPrefixProvider *dpp, CephContext *cct, const JSONFormattable& config) {
+ int r = conf_to_uint64(dpp, cct, config, "multipart_sync_threshold", &multipart_sync_threshold);
if (r < 0) {
return r;
}
- r = conf_to_uint64(cct, config, "multipart_min_part_size", &multipart_min_part_size);
+ r = conf_to_uint64(dpp, cct, config, "multipart_min_part_size", &multipart_min_part_size);
if (r < 0) {
return r;
}
AWSSyncConfig_S3 s3;
- int init_profile(CephContext *cct, const JSONFormattable& profile_conf, AWSSyncConfig_Profile& profile,
+ int init_profile(const DoutPrefixProvider *dpp, CephContext *cct, const JSONFormattable& profile_conf, AWSSyncConfig_Profile& profile,
bool connection_must_exist) {
if (!profile.connection_id.empty()) {
if (profile.conn_conf) {
- ldout(cct, 0) << "ERROR: ambiguous profile connection configuration, connection_id=" << profile.connection_id << dendl;
+ ldpp_dout(dpp, 0) << "ERROR: ambiguous profile connection configuration, connection_id=" << profile.connection_id << dendl;
return -EINVAL;
}
if (connections.find(profile.connection_id) == connections.end()) {
- ldout(cct, 0) << "ERROR: profile configuration reference non-existent connection_id=" << profile.connection_id << dendl;
+ ldpp_dout(dpp, 0) << "ERROR: profile configuration reference non-existent connection_id=" << profile.connection_id << dendl;
return -EINVAL;
}
profile.conn_conf = connections[profile.connection_id];
}
if (connection_must_exist && !profile.conn_conf) {
- ldout(cct, 0) << "ERROR: remote connection undefined for sync profile" << dendl;
+ ldpp_dout(dpp, 0) << "ERROR: remote connection undefined for sync profile" << dendl;
return -EINVAL;
}
if (!profile.acls_id.empty()) {
if (!acl_profiles.find(profile.acls_id, &acl_mappings)) {
- ldout(cct, 0) << "ERROR: profile configuration reference non-existent acls id=" << profile.acls_id << dendl;
+ ldpp_dout(dpp, 0) << "ERROR: profile configuration reference non-existent acls id=" << profile.acls_id << dendl;
return -EINVAL;
}
profile.acls = acl_profiles.acl_profiles[profile.acls_id];
return 0;
}
- int init_target(CephContext *cct, const JSONFormattable& profile_conf, std::shared_ptr<AWSSyncConfig_Profile> *ptarget) {
+ int init_target(const DoutPrefixProvider *dpp, CephContext *cct, const JSONFormattable& profile_conf, std::shared_ptr<AWSSyncConfig_Profile> *ptarget) {
std::shared_ptr<AWSSyncConfig_Profile> profile;
profile.reset(new AWSSyncConfig_Profile);
profile->init(profile_conf);
- int ret = init_profile(cct, profile_conf, *profile, true);
+ int ret = init_profile(dpp, cct, profile_conf, *profile, true);
if (ret < 0) {
return ret;
}
auto& sb = profile->source_bucket;
if (explicit_profiles.find(sb) != explicit_profiles.end()) {
- ldout(cct, 0) << "WARNING: duplicate target configuration in sync module" << dendl;
+ ldpp_dout(dpp, 0) << "WARNING: duplicate target configuration in sync module" << dendl;
}
explicit_profiles[sb] = profile;
AWSSyncConfig() {}
- int init(CephContext *cct, const JSONFormattable& config) {
+ int init(const DoutPrefixProvider *dpp, CephContext *cct, const JSONFormattable& config) {
auto& default_conf = config["default"];
if (config.exists("default")) {
default_profile.init(default_conf);
- init_profile(cct, default_conf, default_profile, false);
+ init_profile(dpp, cct, default_conf, default_profile, false);
}
for (auto& conn : config["connections"].array()) {
acl_profiles.init(config["acl_profiles"]);
- int r = s3.init(cct, config["s3"]);
+ int r = s3.init(dpp, cct, config["s3"]);
if (r < 0) {
return r;
}
auto new_root_conf = config;
- r = init_target(cct, new_root_conf, &root_profile); /* the root profile config */
+ r = init_target(dpp, cct, new_root_conf, &root_profile); /* the root profile config */
if (r < 0) {
return r;
}
for (auto target_conf : config["profiles"].array()) {
- int r = init_target(cct, target_conf, nullptr);
+ int r = init_target(dpp, cct, target_conf, nullptr);
if (r < 0) {
return r;
}
stringstream ss;
jf.flush(ss);
- ldout(cct, 5) << "sync module config (parsed representation):\n" << ss.str() << dendl;
+ ldpp_dout(dpp, 5) << "sync module config (parsed representation):\n" << ss.str() << dendl;
return 0;
}
apply_meta_param(path, "zone_id", zone.id, dest);
}
- void update_config(RGWDataSyncCtx *sc, const string& sid) {
+ void update_config(const DoutPrefixProvider *dpp, RGWDataSyncCtx *sc, const string& sid) {
expand_target(sc, sid, root_profile->target_path, &root_profile->target_path);
- ldout(sc->cct, 20) << "updated target: (root) -> " << root_profile->target_path << dendl;
+ ldpp_dout(dpp, 20) << "updated target: (root) -> " << root_profile->target_path << dendl;
for (auto& t : explicit_profiles) {
expand_target(sc, sid, t.second->target_path, &t.second->target_path);
- ldout(sc->cct, 20) << "updated target: " << t.first << " -> " << t.second->target_path << dendl;
+ ldpp_dout(dpp, 20) << "updated target: " << t.first << " -> " << t.second->target_path << dendl;
}
}
void init_conns(RGWDataSyncCtx *sc, const string& id) {
auto sync_env = sc->env;
- update_config(sc, id);
+ update_config(sync_env->dpp, sc, id);
auto& root_conf = root_profile->conn_conf;
}
};
-static int do_decode_rest_obj(CephContext *cct, map<string, bufferlist>& attrs, map<string, string>& headers, rgw_rest_obj *info)
+static int do_decode_rest_obj(const DoutPrefixProvider *dpp, CephContext *cct, map<string, bufferlist>& attrs, map<string, string>& headers, rgw_rest_obj *info)
{
for (auto header : headers) {
const string& val = header.second;
try {
info->acls.decode(bliter);
} catch (buffer::error& err) {
- ldout(cct, 0) << "ERROR: failed to decode policy off attrs" << dendl;
+ ldpp_dout(dpp, 0) << "ERROR: failed to decode policy off attrs" << dendl;
return -EIO;
}
} else {
- ldout(cct, 0) << "WARNING: acl attrs not provided" << dendl;
+ ldpp_dout(dpp, 0) << "WARNING: acl attrs not provided" << dendl;
}
return 0;
return RGWStreamReadHTTPResourceCRF::init(dpp);
}
- int decode_rest_obj(map<string, string>& headers, bufferlist& extra_data) override {
+ int decode_rest_obj(const DoutPrefixProvider *dpp, map<string, string>& headers, bufferlist& extra_data) override {
map<string, bufferlist> src_attrs;
- ldout(sc->cct, 20) << __func__ << ":" << " headers=" << headers << " extra_data.length()=" << extra_data.length() << dendl;
+ ldpp_dout(dpp, 20) << __func__ << ":" << " headers=" << headers << " extra_data.length()=" << extra_data.length() << dendl;
if (extra_data.length() > 0) {
JSONParser jp;
if (!jp.parse(extra_data.c_str(), extra_data.length())) {
- ldout(sc->cct, 0) << "ERROR: failed to parse response extra data. len=" << extra_data.length() << " data=" << extra_data.c_str() << dendl;
+ ldpp_dout(dpp, 0) << "ERROR: failed to parse response extra data. len=" << extra_data.length() << " data=" << extra_data.c_str() << dendl;
return -EIO;
}
JSONDecoder::decode_json("attrs", src_attrs, &jp);
}
- return do_decode_rest_obj(sc->cct, src_attrs, headers, &rest_obj);
+ return do_decode_rest_obj(dpp, sc->cct, src_attrs, headers, &rest_obj);
}
bool need_extra_data() override {
boost::algorithm::starts_with(h, "X_AMZ_"));
}
- static void init_send_attrs(CephContext *cct,
+ static void init_send_attrs(const DoutPrefixProvider *dpp,
+ CephContext *cct,
const rgw_rest_obj& rest_obj,
const rgw_sync_aws_src_obj_properties& src_properties,
const AWSSyncConfig_Profile *target,
auto iter = am.find(orig_grantee);
if (iter == am.end()) {
- ldout(cct, 20) << "acl_mappings: Could not find " << orig_grantee << " .. ignoring" << dendl;
+ ldpp_dout(dpp, 20) << "acl_mappings: Could not find " << orig_grantee << " .. ignoring" << dendl;
continue;
}
s.append(viter);
}
- ldout(cct, 20) << "acl_mappings: set acl: " << header_str << "=" << s << dendl;
+ ldpp_dout(dpp, 20) << "acl_mappings: set acl: " << header_str << "=" << s << dendl;
new_attrs[header_str] = s;
}
map<string, string> new_attrs;
if (!multipart.is_multipart) {
- init_send_attrs(sc->cct, rest_obj, src_properties, target.get(), &new_attrs);
+ init_send_attrs(dpp, sc->cct, rest_obj, src_properties, target.get(), &new_attrs);
}
r->set_send_length(rest_obj.content_len);
}
if (retcode == -ENOENT) {
- RGWAWSStreamPutCRF::init_send_attrs(sc->cct, rest_obj, src_properties, target.get(), &new_attrs);
+ RGWAWSStreamPutCRF::init_send_attrs(dpp, sc->cct, rest_obj, src_properties, target.get(), &new_attrs);
yield call(new RGWAWSInitMultipartCR(sc, target->conn.get(), dest_obj, status.obj_size, std::move(new_attrs), &status.upload_id));
if (retcode < 0) {
} else {
rgw_rest_obj rest_obj;
rest_obj.init(key);
- if (do_decode_rest_obj(sc->cct, attrs, headers, &rest_obj)) {
+ if (do_decode_rest_obj(dpp, sc->cct, attrs, headers, &rest_obj)) {
ldpp_dout(dpp, 0) << "ERROR: failed to decode rest obj out of headers=" << headers << ", attrs=" << attrs << dendl;
return set_cr_error(-EINVAL);
}
~RGWAWSDataSyncModule() {}
- RGWCoroutine *sync_object(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key,
+ RGWCoroutine *sync_object(const DoutPrefixProvider *dpp, RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key,
std::optional<uint64_t> versioned_epoch,
rgw_zone_set *zones_trace) override {
ldout(sc->cct, 0) << instance.id << ": sync_object: b=" << sync_pipe.info.source_bs.bucket << " k=" << key << " versioned_epoch=" << versioned_epoch.value_or(0) << dendl;
return new RGWAWSHandleRemoteObjCR(sc, sync_pipe, key, instance, versioned_epoch.value_or(0));
}
- RGWCoroutine *remove_object(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch,
+ RGWCoroutine *remove_object(const DoutPrefixProvider *dpp, RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch,
rgw_zone_set *zones_trace) override {
ldout(sc->cct, 0) <<"rm_object: b=" << sync_pipe.info.source_bs.bucket << " k=" << key << " mtime=" << mtime << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl;
return new RGWAWSRemoveRemoteObjCBCR(sc, sync_pipe, key, mtime, instance);
}
- RGWCoroutine *create_delete_marker(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime,
+ RGWCoroutine *create_delete_marker(const DoutPrefixProvider *dpp, RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime,
rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch,
rgw_zone_set *zones_trace) override {
ldout(sc->cct, 0) <<"AWS Not implemented: create_delete_marker: b=" << sync_pipe.info.source_bs.bucket << " k=" << key << " mtime=" << mtime
}
};
-int RGWAWSSyncModule::create_instance(CephContext *cct, const JSONFormattable& config, RGWSyncModuleInstanceRef *instance){
+int RGWAWSSyncModule::create_instance(const DoutPrefixProvider *dpp, CephContext *cct, const JSONFormattable& config, RGWSyncModuleInstanceRef *instance){
AWSSyncConfig conf;
- int r = conf.init(cct, config);
+ int r = conf.init(dpp, cct, config);
if (r < 0) {
return r;
}
public:
RGWAWSSyncModule() {}
bool supports_data_export() override { return false;}
- int create_instance(CephContext *cct, const JSONFormattable& config, RGWSyncModuleInstanceRef *instance) override;
+ int create_instance(const DoutPrefixProvider *dpp, CephContext *cct, const JSONFormattable& config, RGWSyncModuleInstanceRef *instance) override;
};
#endif /* RGW_SYNC_MODULE_AWS_H */
}
struct es_obj_metadata {
+ const DoutPrefixProvider *dpp;
CephContext *cct;
ElasticConfigRef es_conf;
RGWBucketInfo bucket_info;
auto i = val.cbegin();
decode(policy, i);
} catch (buffer::error& err) {
- ldout(cct, 0) << "ERROR: failed to decode acl for " << bucket_info.bucket << "/" << key << dendl;
+ ldpp_dout(dpp, 0) << "ERROR: failed to decode acl for " << bucket_info.bucket << "/" << key << dendl;
continue;
}
auto tags_bl = val.cbegin();
decode(obj_tags, tags_bl);
} catch (buffer::error& err) {
- ldout(cct,0) << "ERROR: failed to decode obj tags for "
+ ldpp_dout(dpp, 0) << "ERROR: failed to decode obj tags for "
<< bucket_info.bucket << "/" << key << dendl;
continue;
}
auto vals_bl = val.cbegin();
decode(cs_info, vals_bl);
} catch (buffer::error& err) {
- ldout(cct,0) << "ERROR: failed to decode compression attr for "
+ ldpp_dout(dpp, 0) << "ERROR: failed to decode compression attr for "
<< bucket_info.bucket << "/" << key << dendl;
continue;
}
/* default custom meta is of type string */
custom_str[i.first] = i.second;
} else {
- ldout(cct, 20) << "custom meta entry key=" << i.first << " not found in bucket mdsearch config: " << bucket_info.mdsearch_config << dendl;
+ ldpp_dout(dpp, 20) << "custom meta entry key=" << i.first << " not found in bucket mdsearch config: " << bucket_info.mdsearch_config << dendl;
}
continue;
}
real_time t;
int r = parse_time(i.second.c_str(), &t);
if (r < 0) {
- ldout(cct, 20) << __func__ << "(): failed to parse time (" << i.second << "), skipping encoding of custom date attribute" << dendl;
+ ldpp_dout(dpp, 20) << __func__ << "(): failed to parse time (" << i.second << "), skipping encoding of custom date attribute" << dendl;
continue;
}
class RGWElasticDataSyncModule : public RGWDataSyncModule {
ElasticConfigRef conf;
public:
- RGWElasticDataSyncModule(CephContext *cct, const JSONFormattable& config) : conf(std::make_shared<ElasticConfig>()) {
+ RGWElasticDataSyncModule(const DoutPrefixProvider *dpp, CephContext *cct, const JSONFormattable& config) : conf(std::make_shared<ElasticConfig>()) {
conf->init(cct, config);
}
~RGWElasticDataSyncModule() override {}
conf->init_instance(sc->env->svc->zone->get_realm(), instance_id);
}
- RGWCoroutine *init_sync(RGWDataSyncCtx *sc) override {
- ldout(sc->cct, 5) << conf->id << ": init" << dendl;
+ RGWCoroutine *init_sync(const DoutPrefixProvider *dpp, RGWDataSyncCtx *sc) override {
+ ldpp_dout(dpp, 5) << conf->id << ": init" << dendl;
return new RGWElasticInitConfigCBCR(sc, conf);
}
- RGWCoroutine *start_sync(RGWDataSyncCtx *sc) override {
- ldout(sc->cct, 5) << conf->id << ": start_sync" << dendl;
+ RGWCoroutine *start_sync(const DoutPrefixProvider *dpp, RGWDataSyncCtx *sc) override {
+ ldpp_dout(dpp, 5) << conf->id << ": start_sync" << dendl;
// try to get elastic search version
return new RGWElasticGetESInfoCBCR(sc, conf);
}
- RGWCoroutine *sync_object(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, std::optional<uint64_t> versioned_epoch, rgw_zone_set *zones_trace) override {
- ldout(sc->cct, 10) << conf->id << ": sync_object: b=" << sync_pipe.info.source_bs.bucket << " k=" << key << " versioned_epoch=" << versioned_epoch.value_or(0) << dendl;
+ RGWCoroutine *sync_object(const DoutPrefixProvider *dpp, RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, std::optional<uint64_t> versioned_epoch, rgw_zone_set *zones_trace) override {
+ ldpp_dout(dpp, 10) << conf->id << ": sync_object: b=" << sync_pipe.info.source_bs.bucket << " k=" << key << " versioned_epoch=" << versioned_epoch.value_or(0) << dendl;
if (!conf->should_handle_operation(sync_pipe.dest_bucket_info)) {
- ldout(sc->cct, 10) << conf->id << ": skipping operation (bucket not approved)" << dendl;
+ ldpp_dout(dpp, 10) << conf->id << ": skipping operation (bucket not approved)" << dendl;
return nullptr;
}
return new RGWElasticHandleRemoteObjCR(sc, sync_pipe, key, conf, versioned_epoch.value_or(0));
}
- RGWCoroutine *remove_object(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override {
+ RGWCoroutine *remove_object(const DoutPrefixProvider *dpp, RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override {
/* versioned and versioned epoch params are useless in the elasticsearch backend case */
- ldout(sc->cct, 10) << conf->id << ": rm_object: b=" << sync_pipe.info.source_bs.bucket << " k=" << key << " mtime=" << mtime << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl;
+ ldpp_dout(dpp, 10) << conf->id << ": rm_object: b=" << sync_pipe.info.source_bs.bucket << " k=" << key << " mtime=" << mtime << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl;
if (!conf->should_handle_operation(sync_pipe.dest_bucket_info)) {
- ldout(sc->cct, 10) << conf->id << ": skipping operation (bucket not approved)" << dendl;
+ ldpp_dout(dpp, 10) << conf->id << ": skipping operation (bucket not approved)" << dendl;
return nullptr;
}
return new RGWElasticRemoveRemoteObjCBCR(sc, sync_pipe, key, mtime, conf);
}
- RGWCoroutine *create_delete_marker(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime,
+ RGWCoroutine *create_delete_marker(const DoutPrefixProvider *dpp, RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime,
rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override {
- ldout(sc->cct, 10) << conf->id << ": create_delete_marker: b=" << sync_pipe.info.source_bs.bucket << " k=" << key << " mtime=" << mtime
+ ldpp_dout(dpp, 10) << conf->id << ": create_delete_marker: b=" << sync_pipe.info.source_bs.bucket << " k=" << key << " mtime=" << mtime
<< " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl;
- ldout(sc->cct, 10) << conf->id << ": skipping operation (not handled)" << dendl;
+ ldpp_dout(dpp, 10) << conf->id << ": skipping operation (not handled)" << dendl;
return NULL;
}
RGWRESTConn *get_rest_conn() {
}
};
-RGWElasticSyncModuleInstance::RGWElasticSyncModuleInstance(CephContext *cct, const JSONFormattable& config)
+RGWElasticSyncModuleInstance::RGWElasticSyncModuleInstance(const DoutPrefixProvider *dpp, CephContext *cct, const JSONFormattable& config)
{
- data_handler = std::unique_ptr<RGWElasticDataSyncModule>(new RGWElasticDataSyncModule(cct, config));
+ data_handler = std::unique_ptr<RGWElasticDataSyncModule>(new RGWElasticDataSyncModule(dpp, cct, config));
}
RGWDataSyncModule *RGWElasticSyncModuleInstance::get_data_handler()
return new RGWRESTMgr_MDSearch_S3();
}
-int RGWElasticSyncModule::create_instance(CephContext *cct, const JSONFormattable& config, RGWSyncModuleInstanceRef *instance) {
+int RGWElasticSyncModule::create_instance(const DoutPrefixProvider *dpp, CephContext *cct, const JSONFormattable& config, RGWSyncModuleInstanceRef *instance) {
string endpoint = config["endpoint"];
- instance->reset(new RGWElasticSyncModuleInstance(cct, config));
+ instance->reset(new RGWElasticSyncModuleInstance(dpp, cct, config));
return 0;
}
bool supports_data_export() override {
return false;
}
- int create_instance(CephContext *cct, const JSONFormattable& config, RGWSyncModuleInstanceRef *instance) override;
+ int create_instance(const DoutPrefixProvider *dpp, CephContext *cct, const JSONFormattable& config, RGWSyncModuleInstanceRef *instance) override;
};
class RGWElasticDataSyncModule;
class RGWElasticSyncModuleInstance : public RGWSyncModuleInstance {
std::unique_ptr<RGWElasticDataSyncModule> data_handler;
public:
- RGWElasticSyncModuleInstance(CephContext *cct, const JSONFormattable& config);
+ RGWElasticSyncModuleInstance(const DoutPrefixProvider *dpp, CephContext *cct, const JSONFormattable& config);
RGWDataSyncModule *get_data_handler() override;
RGWRESTMgr *get_rest_filter(int dialect, RGWRESTMgr *orig) override;
RGWRESTConn *get_rest_conn();
public:
explicit RGWLogDataSyncModule(const string& _prefix) : prefix(_prefix) {}
- RGWCoroutine *sync_object(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, std::optional<uint64_t> versioned_epoch, rgw_zone_set *zones_trace) override {
- ldout(sc->cct, 0) << prefix << ": SYNC_LOG: sync_object: b=" << sync_pipe.info.source_bs.bucket << " k=" << key << " versioned_epoch=" << versioned_epoch.value_or(0) << dendl;
+ RGWCoroutine *sync_object(const DoutPrefixProvider *dpp, RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, std::optional<uint64_t> versioned_epoch, rgw_zone_set *zones_trace) override {
+ ldpp_dout(dpp, 0) << prefix << ": SYNC_LOG: sync_object: b=" << sync_pipe.info.source_bs.bucket << " k=" << key << " versioned_epoch=" << versioned_epoch.value_or(0) << dendl;
return new RGWLogStatRemoteObjCR(sc, sync_pipe.info.source_bs.bucket, key);
}
- RGWCoroutine *remove_object(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override {
- ldout(sc->cct, 0) << prefix << ": SYNC_LOG: rm_object: b=" << sync_pipe.info.source_bs.bucket << " k=" << key << " mtime=" << mtime << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl;
+ RGWCoroutine *remove_object(const DoutPrefixProvider *dpp, RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override {
+ ldpp_dout(dpp, 0) << prefix << ": SYNC_LOG: rm_object: b=" << sync_pipe.info.source_bs.bucket << " k=" << key << " mtime=" << mtime << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl;
return NULL;
}
- RGWCoroutine *create_delete_marker(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime,
+ RGWCoroutine *create_delete_marker(const DoutPrefixProvider *dpp, RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime,
rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override {
- ldout(sc->cct, 0) << prefix << ": SYNC_LOG: create_delete_marker: b=" << sync_pipe.info.source_bs.bucket << " k=" << key << " mtime=" << mtime
+ ldpp_dout(dpp, 0) << prefix << ": SYNC_LOG: create_delete_marker: b=" << sync_pipe.info.source_bs.bucket << " k=" << key << " mtime=" << mtime
<< " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl;
return NULL;
}
}
};
-int RGWLogSyncModule::create_instance(CephContext *cct, const JSONFormattable& config, RGWSyncModuleInstanceRef *instance) {
+int RGWLogSyncModule::create_instance(const DoutPrefixProvider *dpp, CephContext *cct, const JSONFormattable& config, RGWSyncModuleInstanceRef *instance) {
string prefix = config["prefix"];
instance->reset(new RGWLogSyncModuleInstance(prefix));
return 0;
bool supports_data_export() override {
return false;
}
- int create_instance(CephContext *cct, const JSONFormattable& config, RGWSyncModuleInstanceRef *instance) override;
+ int create_instance(const DoutPrefixProvider *dpp, CephContext *cct, const JSONFormattable& config, RGWSyncModuleInstanceRef *instance) override;
};
#endif
WaiterInfoRef waiter;
while (get_next_waiter(&waiter)) {
- ldout(cct, 20) << __func__ << "(): RGWSingletonCR: waking up waiter" << dendl;
+ ldpp_dout(dpp, 20) << __func__ << "(): RGWSingletonCR: waking up waiter" << dendl;
waiter->cr->set_retcode(retcode);
waiter->cr->set_sleeping(false);
- return_result(waiter->result);
+ return_result(dpp, waiter->result);
put();
}
return 0;
}
- virtual void return_result(T *result) {}
+ virtual void return_result(const DoutPrefixProvider *dpp, T *result) {}
public:
RGWSingletonCR(CephContext *_cct)
: RGWCoroutine(_cct) {}
- int execute(RGWCoroutine *caller, T *result = nullptr) {
+ int execute(const DoutPrefixProvider *dpp, RGWCoroutine *caller, T *result = nullptr) {
if (!started) {
- ldout(cct, 20) << __func__ << "(): singleton not started, starting" << dendl;
+ ldpp_dout(dpp, 20) << __func__ << "(): singleton not started, starting" << dendl;
started = true;
caller->call(this);
return 0;
} else if (!is_done()) {
- ldout(cct, 20) << __func__ << "(): singleton not done yet, registering as waiter" << dendl;
+ ldpp_dout(dpp, 20) << __func__ << "(): singleton not done yet, registering as waiter" << dendl;
get();
add_waiter(caller, result);
caller->set_sleeping(true);
return 0;
}
- ldout(cct, 20) << __func__ << "(): singleton done, returning retcode=" << retcode << dendl;
+ ldpp_dout(dpp, 20) << __func__ << "(): singleton done, returning retcode=" << retcode << dendl;
caller->set_retcode(retcode);
- return_result(result);
+ return_result(dpp, result);
return retcode;
}
};
return sub;
}
- int call_init_cr(RGWCoroutine *caller) {
- return init_cr->execute(caller);
+ int call_init_cr(const DoutPrefixProvider *dpp, RGWCoroutine *caller) {
+ return init_cr->execute(dpp, caller);
}
template<typename EventType>
*ref = PSSubscription::get_shared(sc, mgr->env, user_sub_conf);
}
- yield (*ref)->call_init_cr(this);
+ yield (*ref)->call_init_cr(dpp, this);
if (retcode < 0) {
ldpp_dout(dpp, 1) << "ERROR: failed to init subscription when getting subscription: " << sub_name << dendl;
mgr->remove_get_sub(owner, sub_name);
return 0;
}
- void return_result(PSSubscriptionRef *result) override {
- ldout(cct, 20) << __func__ << "(): returning result: retcode=" << retcode << " resultp=" << (void *)result << dendl;
+ void return_result(const DoutPrefixProvider *dpp, PSSubscriptionRef *result) override {
+ ldpp_dout(dpp, 20) << __func__ << "(): returning result: retcode=" << retcode << " resultp=" << (void *)result << dendl;
if (retcode >= 0) {
*result = *ref;
}
return std::shared_ptr<PSManager>(new PSManager(_sc, _env));
}
- static int call_get_subscription_cr(RGWDataSyncCtx *sc, PSManagerRef& mgr,
+ static int call_get_subscription_cr(const DoutPrefixProvider *dpp, RGWDataSyncCtx *sc, PSManagerRef& mgr,
RGWCoroutine *caller, const rgw_user& owner, const string& sub_name, PSSubscriptionRef *ref) {
if (mgr->find_sub_instance(owner, sub_name, ref)) {
/* found it! nothing to execute */
- ldout(sc->cct, 20) << __func__ << "(): found sub instance" << dendl;
+ ldpp_dout(dpp, 20) << __func__ << "(): found sub instance" << dendl;
}
auto& gs = mgr->get_get_subs(owner, sub_name);
if (!gs) {
- ldout(sc->cct, 20) << __func__ << "(): first get subs" << dendl;
+ ldpp_dout(dpp, 20) << __func__ << "(): first get subs" << dendl;
gs = new GetSubCR(sc, mgr, owner, sub_name, ref);
}
- ldout(sc->cct, 20) << __func__ << "(): executing get subs" << dendl;
- return gs->execute(caller, ref);
+ ldpp_dout(dpp, 20) << __func__ << "(): executing get subs" << dendl;
+ return gs->execute(dpp, caller, ref);
}
friend class GetSubCR;
ldpp_dout(dpp, 20) << ": subscription: " << *siter << dendl;
has_subscriptions = true;
// try to read subscription configuration
- yield PSManager::call_get_subscription_cr(sc, env->manager, this, owner, *siter, &sub);
+ yield PSManager::call_get_subscription_cr(dpp, sc, env->manager, this, owner, *siter, &sub);
if (retcode < 0) {
if (perfcounter) perfcounter->inc(l_rgw_pubsub_missing_conf);
ldpp_dout(dpp, 1) << "ERROR: failed to find subscription config for subscription=" << *siter
env->init_instance(sync_env->svc->zone->get_realm(), instance_id, mgr);
}
- RGWCoroutine *start_sync(RGWDataSyncCtx *sc) override {
- ldout(sc->cct, 5) << conf->id << ": start" << dendl;
+ RGWCoroutine *start_sync(const DoutPrefixProvider *dpp, RGWDataSyncCtx *sc) override {
+ ldpp_dout(dpp, 5) << conf->id << ": start" << dendl;
return new RGWPSInitEnvCBCR(sc, env);
}
- RGWCoroutine *sync_object(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe,
+ RGWCoroutine *sync_object(const DoutPrefixProvider *dpp, RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe,
rgw_obj_key& key, std::optional<uint64_t> versioned_epoch, rgw_zone_set *zones_trace) override {
- ldout(sc->cct, 10) << conf->id << ": sync_object: b=" << sync_pipe <<
+ ldpp_dout(dpp, 10) << conf->id << ": sync_object: b=" << sync_pipe <<
" k=" << key << " versioned_epoch=" << versioned_epoch.value_or(0) << dendl;
return new RGWPSHandleObjCreateCR(sc, sync_pipe, key, env, versioned_epoch);
}
- RGWCoroutine *remove_object(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe,
+ RGWCoroutine *remove_object(const DoutPrefixProvider *dpp, RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe,
rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override {
- ldout(sc->cct, 10) << conf->id << ": rm_object: b=" << sync_pipe <<
+ ldpp_dout(dpp, 10) << conf->id << ": rm_object: b=" << sync_pipe <<
" k=" << key << " mtime=" << mtime << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl;
return new RGWPSGenericObjEventCBCR(sc, env, sync_pipe, key, mtime, rgw::notify::ObjectRemovedDelete);
}
- RGWCoroutine *create_delete_marker(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe,
+ RGWCoroutine *create_delete_marker(const DoutPrefixProvider *dpp, RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe,
rgw_obj_key& key, real_time& mtime, rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override {
- ldout(sc->cct, 10) << conf->id << ": create_delete_marker: b=" << sync_pipe <<
+ ldpp_dout(dpp, 10) << conf->id << ": create_delete_marker: b=" << sync_pipe <<
" k=" << key << " mtime=" << mtime << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl;
return new RGWPSGenericObjEventCBCR(sc, env, sync_pipe, key, mtime, rgw::notify::ObjectRemovedDeleteMarkerCreated);
}
PSConfigRef& get_conf() { return conf; }
};
-RGWPSSyncModuleInstance::RGWPSSyncModuleInstance(CephContext *cct, const JSONFormattable& config)
+RGWPSSyncModuleInstance::RGWPSSyncModuleInstance(const DoutPrefixProvider *dpp, CephContext *cct, const JSONFormattable& config)
{
data_handler = std::unique_ptr<RGWPSDataSyncModule>(new RGWPSDataSyncModule(cct, config));
const std::string jconf = json_str("conf", *data_handler->get_conf());
JSONParser p;
if (!p.parse(jconf.c_str(), jconf.size())) {
- ldout(cct, 1) << "ERROR: failed to parse sync module effective conf: " << jconf << dendl;
+ ldpp_dout(dpp, 1) << "ERROR: failed to parse sync module effective conf: " << jconf << dendl;
effective_conf = config;
} else {
effective_conf.decode_json(&p);
return data_handler->get_conf()->start_with_full_sync;
}
-int RGWPSSyncModule::create_instance(CephContext *cct, const JSONFormattable& config, RGWSyncModuleInstanceRef *instance) {
- instance->reset(new RGWPSSyncModuleInstance(cct, config));
+int RGWPSSyncModule::create_instance(const DoutPrefixProvider *dpp, CephContext *cct, const JSONFormattable& config, RGWSyncModuleInstanceRef *instance) {
+ instance->reset(new RGWPSSyncModuleInstance(dpp, cct, config));
return 0;
}
bool supports_writes() override {
return true;
}
- int create_instance(CephContext *cct, const JSONFormattable& config, RGWSyncModuleInstanceRef *instance) override;
+ int create_instance(const DoutPrefixProvider *dpp, CephContext *cct, const JSONFormattable& config, RGWSyncModuleInstanceRef *instance) override;
};
class RGWPSDataSyncModule;
std::unique_ptr<RGWPSDataSyncModule> data_handler;
JSONFormattable effective_conf;
public:
- RGWPSSyncModuleInstance(CephContext *cct, const JSONFormattable& config);
+ RGWPSSyncModuleInstance(const DoutPrefixProvider *dpp, CephContext *cct, const JSONFormattable& config);
~RGWPSSyncModuleInstance() = default;
RGWDataSyncModule *get_data_handler() override;
RGWRESTMgr *get_rest_filter(int dialect, RGWRESTMgr *orig) override;
{
auto& zone_public_config = svc.zone->get_zone();
- int ret = sync_modules_manager->create_instance(cct, zone_public_config.tier_type, svc.zone->get_zone_params().tier_config, &sync_module);
+ int ret = sync_modules_manager->create_instance(dpp, cct, zone_public_config.tier_type, svc.zone->get_zone_params().tier_config, &sync_module);
if (ret < 0) {
ldpp_dout(dpp, -1) << "ERROR: failed to start sync module instance, ret=" << ret << dendl;
if (ret == -ENOENT) {