From aea643ada625f064c28764c9c530890d370e1cf3 Mon Sep 17 00:00:00 2001 From: "Adam C. Emerson" Date: Wed, 24 Aug 2022 13:36:55 -0400 Subject: [PATCH] rgw: `SimpleRadosWriteCR` uses an async RADOS call Don't go through the 'system object' cache. This also saves us the use of the RADOS async completion processor. Signed-off-by: Adam C. Emerson --- src/rgw/driver/rados/rgw_cr_rados.h | 68 ++++++++++++--------- src/rgw/driver/rados/rgw_data_sync.cc | 32 +++++----- src/rgw/driver/rados/rgw_sync.cc | 19 +++--- src/rgw/driver/rados/rgw_sync_module_aws.cc | 2 +- src/rgw/driver/rados/rgw_trim_bilog.cc | 3 +- src/rgw/services/svc_mdlog.cc | 50 ++++++++++++++- src/test/rgw/rgw_cr_test.cc | 51 ++++++++++++++++ 7 files changed, 164 insertions(+), 61 deletions(-) diff --git a/src/rgw/driver/rados/rgw_cr_rados.h b/src/rgw/driver/rados/rgw_cr_rados.h index f3220602ed4..f4362e6b2f2 100644 --- a/src/rgw/driver/rados/rgw_cr_rados.h +++ b/src/rgw/driver/rados/rgw_cr_rados.h @@ -515,49 +515,59 @@ public: template class RGWSimpleRadosWriteCR : public RGWSimpleCoroutine { - const DoutPrefixProvider *dpp; - RGWAsyncRadosProcessor *async_rados; - RGWSI_SysObj *svc; - bufferlist bl; + const DoutPrefixProvider* dpp; + rgw::sal::RadosStore* const store; rgw_raw_obj obj; - RGWObjVersionTracker *objv_tracker; + RGWObjVersionTracker* objv_tracker; bool exclusive; - RGWAsyncPutSystemObj *req{nullptr}; + + bufferlist bl; + rgw_rados_ref ref; + std::map unfiltered_attrs; + boost::intrusive_ptr cn; + public: - RGWSimpleRadosWriteCR(const DoutPrefixProvider *_dpp, - RGWAsyncRadosProcessor *_async_rados, RGWSI_SysObj *_svc, - const rgw_raw_obj& _obj, const T& _data, - RGWObjVersionTracker *objv_tracker = nullptr, + RGWSimpleRadosWriteCR(const DoutPrefixProvider* dpp, + rgw::sal::RadosStore* const store, + rgw_raw_obj obj, const T& data, + RGWObjVersionTracker* objv_tracker = nullptr, bool exclusive = false) - : RGWSimpleCoroutine(_svc->ctx()), dpp(_dpp), async_rados(_async_rados), - svc(_svc), obj(_obj), objv_tracker(objv_tracker), exclusive(exclusive) { - encode(_data, bl); + : RGWSimpleCoroutine(store->ctx()), dpp(dpp), store(store), + obj(std::move(obj)), objv_tracker(objv_tracker), exclusive(exclusive) { + encode(data, bl); } - ~RGWSimpleRadosWriteCR() override { - request_cleanup(); - } + int send_request(const DoutPrefixProvider *dpp) override { + int r = store->getRados()->get_raw_obj_ref(dpp, obj, &ref); + if (r < 0) { + ldpp_dout(dpp, -1) << "ERROR: failed to get ref for (" << obj << ") ret=" + << r << dendl; + return r; + } - void request_cleanup() override { - if (req) { - req->finish(); - req = NULL; + set_status() << "sending request"; + + librados::ObjectWriteOperation op; + if (exclusive) { + op.create(true); } - } + if (objv_tracker) { + objv_tracker->prepare_op_for_write(&op); + } + op.write_full(bl); - int send_request(const DoutPrefixProvider *dpp) override { - req = new RGWAsyncPutSystemObj(dpp, this, stack->create_completion_notifier(), - svc, objv_tracker, obj, exclusive, std::move(bl)); - async_rados->queue(req); - return 0; + cn = stack->create_completion_notifier(); + return ref.pool.ioctx().aio_operate(ref.obj.oid, cn->completion(), &op); } int request_complete() override { - if (objv_tracker) { // copy the updated version - *objv_tracker = req->objv_tracker; + int ret = cn->completion()->get_return_value(); + set_status() << "request complete; ret=" << ret; + if (ret >= 0 && objv_tracker) { + objv_tracker->apply_write(); } - return req->get_ret_status(); + return ret; } }; diff --git a/src/rgw/driver/rados/rgw_data_sync.cc b/src/rgw/driver/rados/rgw_data_sync.cc index 385801c5789..c030d6ddc4d 100644 --- a/src/rgw/driver/rados/rgw_data_sync.cc +++ b/src/rgw/driver/rados/rgw_data_sync.cc @@ -573,7 +573,7 @@ public: } using WriteInfoCR = RGWSimpleRadosWriteCR; - yield call(new WriteInfoCR(dpp, sync_env->async_rados, sync_env->svc->sysobj, + yield call(new WriteInfoCR(dpp, sync_env->driver, rgw_raw_obj{pool, sync_status_oid}, status->sync_info, &objv_tracker)); if (retcode < 0) { @@ -616,7 +616,7 @@ public: auto& objv = objvs[i]; objv.generate_new_write_ver(cct); using WriteMarkerCR = RGWSimpleRadosWriteCR; - spawn(new WriteMarkerCR(dpp, sync_env->async_rados, sync_env->svc->sysobj, + spawn(new WriteMarkerCR(dpp, sync_env->driver, rgw_raw_obj{pool, oid}, marker, &objv), true); } } @@ -629,7 +629,7 @@ public: } status->sync_info.state = rgw_data_sync_info::StateBuildingFullSyncMaps; - yield call(new WriteInfoCR(dpp, sync_env->async_rados, sync_env->svc->sysobj, + yield call(new WriteInfoCR(dpp, sync_env->driver, rgw_raw_obj{pool, sync_status_oid}, status->sync_info, &objv_tracker)); if (retcode < 0) { @@ -1042,7 +1042,7 @@ public: rgw_data_sync_marker& marker = iter->second; marker.total_entries = entries_index->get_total_entries(shard_id); spawn(new RGWSimpleRadosWriteCR( - dpp, sync_env->async_rados, sync_env->svc->sysobj, + dpp, sync_env->driver, rgw_raw_obj(sync_env->svc->zone->get_zone_params().log_pool, RGWDataSyncStatusManager::shard_obj_name( sc->source_zone, shard_id)), @@ -1101,7 +1101,7 @@ public: tn->log(20, SSTR("updating marker marker_oid=" << marker_oid << " marker=" << new_marker)); - return new RGWSimpleRadosWriteCR(sync_env->dpp, sync_env->async_rados, sync_env->svc->sysobj, + return new RGWSimpleRadosWriteCR(sync_env->dpp, sync_env->driver, rgw_raw_obj(sync_env->svc->zone->get_zone_params().log_pool, marker_oid), sync_marker, &objv); } @@ -1824,7 +1824,7 @@ public: sync_marker.marker = sync_marker.next_step_marker; sync_marker.next_step_marker.clear(); yield call(new RGWSimpleRadosWriteCR( - sc->env->dpp,sc->env->async_rados, sc->env->svc->sysobj, + sc->env->dpp, sc->env->driver, rgw_raw_obj(pool, status_oid), sync_marker, &objv)); if (retcode < 0) { tn->log(0, SSTR("ERROR: failed to set sync marker: retcode=" << retcode)); @@ -2405,7 +2405,7 @@ public: } RGWCoroutine *set_sync_info_cr() { - return new RGWSimpleRadosWriteCR(sync_env->dpp, sync_env->async_rados, sync_env->svc->sysobj, + return new RGWSimpleRadosWriteCR(sync_env->dpp, sync_env->driver, rgw_raw_obj(sync_env->svc->zone->get_zone_params().log_pool, RGWDataSyncStatusManager::sync_status_oid(sc->source_zone)), sync_status.sync_info, &obj_version); } @@ -3628,7 +3628,7 @@ public: // write bucket sync status using CR = RGWSimpleRadosWriteCR; - yield call(new CR(dpp, sync_env->async_rados, sync_env->svc->sysobj, + yield call(new CR(dpp, sync_env->driver, status_obj, status, &objv, false)); if (retcode < 0) { ldout(cct, 20) << "failed to write bucket shard status: " @@ -4034,7 +4034,7 @@ public: tn->log(20, SSTR("updating marker oid=" << status_obj.oid << " marker=" << new_marker)); return new RGWSimpleRadosWriteCR( - sync_env->dpp, sync_env->async_rados, sync_env->svc->sysobj, + sync_env->dpp, sync_env->driver, status_obj, sync_status, &objv_tracker); } @@ -4547,8 +4547,7 @@ int RGWBucketFullSyncCR::operate(const DoutPrefixProvider *dpp) sync_status.state = BucketSyncState::Incremental; tn->log(5, SSTR("set bucket state=" << sync_status.state)); yield call(new RGWSimpleRadosWriteCR( - dpp, sync_env->async_rados, sync_env->svc->sysobj, - status_obj, sync_status, &objv)); + dpp, sync_env->driver, status_obj, sync_status, &objv)); tn->log(5, SSTR("bucket status objv=" << objv)); } else { tn->log(10, SSTR("backing out with sync_status=" << sync_result)); @@ -4631,7 +4630,7 @@ public: } ldpp_dout(dpp, 20) << "bucket status incremental gen is " << bucket_status.incremental_gen << dendl; using WriteCR = RGWSimpleRadosWriteCR; - call(new WriteCR(dpp, sync_env->async_rados, sync_env->svc->sysobj, + call(new WriteCR(dpp, sync_env->driver, bucket_status_obj, bucket_status, &objv_tracker, false)); } if (retcode < 0 && retcode != -ECANCELED) { @@ -5632,8 +5631,7 @@ int RGWSyncBucketCR::operate(const DoutPrefixProvider *dpp) if (retcode == -ENOENT) { // use exclusive create to set state=Init objv.generate_new_write_ver(cct); - yield call(new WriteCR(dpp, env->async_rados, env->svc->sysobj, - status_obj, bucket_status, &objv, true)); + yield call(new WriteCR(dpp, env->driver, status_obj, bucket_status, &objv, true)); tn->log(20, "bucket status object does not exist, create a new one"); if (retcode == -EEXIST) { // raced with another create, read its status @@ -5699,8 +5697,8 @@ int RGWSyncBucketCR::operate(const DoutPrefixProvider *dpp) if (bucket_status.state != BucketSyncState::Stopped) { // make sure that state is changed to stopped localy bucket_status.state = BucketSyncState::Stopped; - yield call(new WriteCR(dpp, env->async_rados, env->svc->sysobj, - status_obj, bucket_status, &objv, false)); + yield call(new WriteCR(dpp, env->driver, status_obj, bucket_status, + &objv, false)); if (retcode < 0) { tn->log(20, SSTR("ERROR: failed to write 'stopped' status. error: " << retcode)); RELEASE_LOCK(bucket_lease_cr); @@ -5943,7 +5941,7 @@ int RGWBucketPipeSyncStatusManager::init_sync_status( pretty_print(source.sc.env, "Initializing sync state of bucket {} with zone {}.\n", source.info.bucket.name, source.zone_name); stack->call(new RGWSimpleRadosWriteCR( - dpp, source.sc.env->async_rados, source.sc.env->svc->sysobj, + dpp, source.sc.env->driver, {sync_env.svc->zone->get_zone_params().log_pool, full_status_oid(source.sc.source_zone, source.info.bucket, diff --git a/src/rgw/driver/rados/rgw_sync.cc b/src/rgw/driver/rados/rgw_sync.cc index 7710a2d0ed4..4a56595fe38 100644 --- a/src/rgw/driver/rados/rgw_sync.cc +++ b/src/rgw/driver/rados/rgw_sync.cc @@ -652,7 +652,7 @@ public: yield { set_status("writing sync status"); rgw::sal::RadosStore* store = sync_env->store; - call(new RGWSimpleRadosWriteCR(dpp, sync_env->async_rados, store->svc()->sysobj, + call(new RGWSimpleRadosWriteCR(dpp, store, rgw_raw_obj(store->svc()->zone->get_zone_params().log_pool, sync_env->status_oid()), status)); } @@ -683,8 +683,7 @@ public: marker.timestamp = info.last_update; rgw::sal::RadosStore* store = sync_env->store; spawn(new RGWSimpleRadosWriteCR(dpp, - sync_env->async_rados, - store->svc()->sysobj, + store, rgw_raw_obj(store->svc()->zone->get_zone_params().log_pool, sync_env->shard_obj_name(i)), marker), true); } @@ -693,7 +692,7 @@ public: set_status("changing sync state: build full sync maps"); status.state = rgw_meta_sync_info::StateBuildingFullSyncMaps; rgw::sal::RadosStore* store = sync_env->store; - call(new RGWSimpleRadosWriteCR(dpp, sync_env->async_rados, store->svc()->sysobj, + call(new RGWSimpleRadosWriteCR(dpp, store, rgw_raw_obj(store->svc()->zone->get_zone_params().log_pool, sync_env->status_oid()), status)); } @@ -971,7 +970,7 @@ public: int shard_id = (int)iter->first; rgw_meta_sync_marker& marker = iter->second; marker.total_entries = entries_index->get_total_entries(shard_id); - spawn(new RGWSimpleRadosWriteCR(dpp, sync_env->async_rados, sync_env->store->svc()->sysobj, + spawn(new RGWSimpleRadosWriteCR(dpp, sync_env->store, rgw_raw_obj(sync_env->store->svc()->zone->get_zone_params().log_pool, sync_env->shard_obj_name(shard_id)), marker), true); } @@ -1232,8 +1231,7 @@ public: ldpp_dout(sync_env->dpp, 20) << __func__ << "(): updating marker marker_oid=" << marker_oid << " marker=" << new_marker << " realm_epoch=" << sync_marker.realm_epoch << dendl; tn->log(20, SSTR("new marker=" << new_marker)); rgw::sal::RadosStore* store = sync_env->store; - return new RGWSimpleRadosWriteCR(sync_env->dpp, sync_env->async_rados, - store->svc()->sysobj, + return new RGWSimpleRadosWriteCR(sync_env->dpp, store, rgw_raw_obj(store->svc()->zone->get_zone_params().log_pool, marker_oid), sync_marker); } @@ -1649,7 +1647,7 @@ public: ldpp_dout(sync_env->dpp, 4) << *this << ": saving marker pos=" << temp_marker->marker << " realm_epoch=" << realm_epoch << dendl; using WriteMarkerCR = RGWSimpleRadosWriteCR; - yield call(new WriteMarkerCR(sync_env->dpp, sync_env->async_rados, sync_env->store->svc()->sysobj, + yield call(new WriteMarkerCR(sync_env->dpp, sync_env->store, rgw_raw_obj(pool, sync_env->shard_obj_name(shard_id)), *temp_marker)); } @@ -2016,8 +2014,7 @@ public: // write the updated sync info sync_status.sync_info.period = cursor.get_period().get_id(); sync_status.sync_info.realm_epoch = cursor.get_epoch(); - yield call(new RGWSimpleRadosWriteCR(dpp, sync_env->async_rados, - sync_env->store->svc()->sysobj, + yield call(new RGWSimpleRadosWriteCR(dpp, sync_env->store, rgw_raw_obj(pool, sync_env->status_oid()), sync_status.sync_info)); } @@ -2094,7 +2091,7 @@ int RGWRemoteMetaLog::init_sync_status(const DoutPrefixProvider *dpp) int RGWRemoteMetaLog::store_sync_info(const DoutPrefixProvider *dpp, const rgw_meta_sync_info& sync_info) { tn->log(20, "store sync info"); - return run(dpp, new RGWSimpleRadosWriteCR(dpp, async_rados, store->svc()->sysobj, + return run(dpp, new RGWSimpleRadosWriteCR(dpp, store, rgw_raw_obj(store->svc()->zone->get_zone_params().log_pool, sync_env.status_oid()), sync_info)); } diff --git a/src/rgw/driver/rados/rgw_sync_module_aws.cc b/src/rgw/driver/rados/rgw_sync_module_aws.cc index 9f79a79742a..484df96045b 100644 --- a/src/rgw/driver/rados/rgw_sync_module_aws.cc +++ b/src/rgw/driver/rados/rgw_sync_module_aws.cc @@ -1515,7 +1515,7 @@ public: return set_cr_error(ret_err); } - yield call(new RGWSimpleRadosWriteCR(dpp, sync_env->async_rados, sync_env->svc->sysobj, status_obj, status)); + yield call(new RGWSimpleRadosWriteCR(dpp, sync_env->driver, status_obj, status)); if (retcode < 0) { ldpp_dout(dpp, 0) << "ERROR: failed to store multipart upload state, retcode=" << retcode << dendl; /* continue with upload anyway */ diff --git a/src/rgw/driver/rados/rgw_trim_bilog.cc b/src/rgw/driver/rados/rgw_trim_bilog.cc index 71811d4b45d..200d674230b 100644 --- a/src/rgw/driver/rados/rgw_trim_bilog.cc +++ b/src/rgw/driver/rados/rgw_trim_bilog.cc @@ -1154,8 +1154,7 @@ int BucketTrimCR::operate(const DoutPrefixProvider *dpp) status.marker = std::move(last_cold_marker); ldpp_dout(dpp, 20) << "writing bucket trim marker=" << status.marker << dendl; using WriteStatus = RGWSimpleRadosWriteCR; - yield call(new WriteStatus(dpp, store->svc()->rados->get_async_processor(), store->svc()->sysobj, obj, - status, &objv)); + yield call(new WriteStatus(dpp, store, obj, status, &objv)); if (retcode < 0) { ldpp_dout(dpp, 4) << "failed to write updated trim status: " << cpp_strerror(retcode) << dendl; diff --git a/src/rgw/services/svc_mdlog.cc b/src/rgw/services/svc_mdlog.cc index 93eb556e034..3ff306e9763 100644 --- a/src/rgw/services/svc_mdlog.cc +++ b/src/rgw/services/svc_mdlog.cc @@ -185,6 +185,54 @@ public: return 0; } }; + +template +class SysObjWriteCR : public RGWSimpleCoroutine { + const DoutPrefixProvider *dpp; + RGWAsyncRadosProcessor *async_rados; + RGWSI_SysObj *svc; + bufferlist bl; + rgw_raw_obj obj; + RGWObjVersionTracker *objv_tracker; + bool exclusive; + RGWAsyncPutSystemObj *req{nullptr}; + +public: + SysObjWriteCR(const DoutPrefixProvider *_dpp, + RGWAsyncRadosProcessor *_async_rados, RGWSI_SysObj *_svc, + const rgw_raw_obj& _obj, const T& _data, + RGWObjVersionTracker *objv_tracker = nullptr, + bool exclusive = false) + : RGWSimpleCoroutine(_svc->ctx()), dpp(_dpp), async_rados(_async_rados), + svc(_svc), obj(_obj), objv_tracker(objv_tracker), exclusive(exclusive) { + encode(_data, bl); + } + + ~SysObjWriteCR() override { + request_cleanup(); + } + + void request_cleanup() override { + if (req) { + req->finish(); + req = NULL; + } + } + + int send_request(const DoutPrefixProvider *dpp) override { + req = new RGWAsyncPutSystemObj(dpp, this, stack->create_completion_notifier(), + svc, objv_tracker, obj, exclusive, std::move(bl)); + async_rados->queue(req); + return 0; + } + + int request_complete() override { + if (objv_tracker) { // copy the updated version + *objv_tracker = req->objv_tracker; + } + return req->get_ret_status(); + } +}; } /// read the mdlog history and use it to initialize the given cursor @@ -265,7 +313,7 @@ class WriteHistoryCR : public RGWCoroutine { rgw_raw_obj obj{svc.zone->get_zone_params().log_pool, RGWMetadataLogHistory::oid}; - using WriteCR = RGWSimpleRadosWriteCR; + using WriteCR = SysObjWriteCR; call(new WriteCR(dpp, async_processor, svc.sysobj, obj, state, objv)); } if (retcode < 0) { diff --git a/src/test/rgw/rgw_cr_test.cc b/src/test/rgw/rgw_cr_test.cc index 34afa93f191..957c625574f 100644 --- a/src/test/rgw/rgw_cr_test.cc +++ b/src/test/rgw/rgw_cr_test.cc @@ -216,6 +216,57 @@ TEST(Read, ReadVersion) { ASSERT_EQ(wobjv.read_version, robjv.read_version); } +TEST(Write, Exclusive) { + TempPool pool; + auto oid = "object"s; + { + bufferlist bl; + bl.append("I'm some data!"s); + librados::IoCtx ioctx(pool); + auto r = ioctx.write_full(oid, bl); + ASSERT_EQ(0, r); + } + auto r = run(new RGWSimpleRadosWriteCR(dpp(), store, {pool, oid}, + "I am some DIFFERENT data!"s, nullptr, + true)); + ASSERT_EQ(-EEXIST, r); +} + +TEST(Write, Write) { + TempPool pool; + auto oid = "object"s; + auto data = "I'm some data!"s; + auto r = run(new RGWSimpleRadosWriteCR(dpp(), store, {pool, oid}, + data, nullptr, true)); + ASSERT_EQ(0, r); + bufferlist bl; + librados::IoCtx ioctx(pool); + ioctx.read(oid, bl, 0, 0); + ASSERT_EQ(0, r); + std::string result; + decode(result, bl); + ASSERT_EQ(data, result); +} + +TEST(Write, ObjV) { + TempPool pool; + auto oid = "object"s; + RGWObjVersionTracker objv; + objv.generate_new_write_ver(store->ctx()); + auto r = run(new RGWSimpleRadosWriteCR(dpp(), store, {pool, oid}, + "I'm some data!"s, &objv, + true)); + RGWObjVersionTracker interfering_objv(objv); + r = run(new RGWSimpleRadosWriteCR(dpp(), store, {pool, oid}, + "I'm some newer, better data!"s, + &interfering_objv, false)); + ASSERT_EQ(0, r); + r = run(new RGWSimpleRadosWriteCR(dpp(), store, {pool, oid}, + "I'm some treacherous, obsolete data!"s, + &objv, false)); + ASSERT_EQ(-ECANCELED, r); +} + int main(int argc, const char **argv) { auto args = argv_to_vec(argc, argv); -- 2.39.5