From: Adam C. Emerson Date: Fri, 19 Aug 2022 22:58:21 +0000 (-0400) Subject: rgw: `SimpleRadosReadCR` uses an async RADOS call X-Git-Tag: v18.1.0~499^2~12 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=a366b900b5b09be7061b70a3d91d348658ed436e;p=ceph.git rgw: `SimpleRadosReadCR` uses an async RADOS call Don't go through the 'system object' cache. This also saves us the use of the RADOS async completion processor. Signed-off-by: Adam C. Emerson --- diff --git a/src/rgw/driver/rados/rgw_cr_rados.h b/src/rgw/driver/rados/rgw_cr_rados.h index 8d14be261cc65..f3220602ed495 100644 --- a/src/rgw/driver/rados/rgw_cr_rados.h +++ b/src/rgw/driver/rados/rgw_cr_rados.h @@ -402,86 +402,88 @@ public: template class RGWSimpleRadosReadCR : public RGWSimpleCoroutine { - const DoutPrefixProvider *dpp; - RGWAsyncRadosProcessor *async_rados; - RGWSI_SysObj *svc; - + const DoutPrefixProvider* dpp; + rgw::sal::RadosStore* store; rgw_raw_obj obj; - T *result; + T* result; /// on ENOENT, call handle_data() with an empty object instead of failing const bool empty_on_enoent; - RGWObjVersionTracker *objv_tracker; - RGWAsyncGetSystemObj *req{nullptr}; + RGWObjVersionTracker* objv_tracker; + + T val; + rgw_rados_ref ref; + ceph::buffer::list bl; + boost::intrusive_ptr cn; public: - RGWSimpleRadosReadCR(const DoutPrefixProvider *_dpp, - RGWAsyncRadosProcessor *_async_rados, RGWSI_SysObj *_svc, - const rgw_raw_obj& _obj, - T *_result, bool empty_on_enoent = true, - RGWObjVersionTracker *objv_tracker = nullptr) - : RGWSimpleCoroutine(_svc->ctx()), dpp(_dpp), async_rados(_async_rados), svc(_svc), - obj(_obj), result(_result), - empty_on_enoent(empty_on_enoent), objv_tracker(objv_tracker) {} - ~RGWSimpleRadosReadCR() override { - request_cleanup(); + RGWSimpleRadosReadCR(const DoutPrefixProvider* dpp, + rgw::sal::RadosStore* store, + const rgw_raw_obj& obj, + T* result, bool empty_on_enoent = true, + RGWObjVersionTracker* objv_tracker = nullptr) + : RGWSimpleCoroutine(store->ctx()), dpp(dpp), store(store), + obj(obj), result(result), empty_on_enoent(empty_on_enoent), + objv_tracker(objv_tracker) { + if (!result) { + result = &val; + } } - void request_cleanup() override { - if (req) { - req->finish(); - req = NULL; + int send_request(const DoutPrefixProvider *dpp) { + int r = store->getRados()->get_raw_obj_ref(dpp, obj, &ref); + if (r < 0) { + ldpp_dout(dpp, -1) << "ERROR: failed to get ref for (" << obj << ") ret=" + << r << dendl; + return r; } - } - int send_request(const DoutPrefixProvider *dpp) override; - int request_complete() override; + set_status() << "sending request"; - virtual int handle_data(T& data) { - return 0; + librados::ObjectReadOperation op; + if (objv_tracker) { + objv_tracker->prepare_op_for_read(&op); + } + + op.read(0, -1, &bl, nullptr); + + cn = stack->create_completion_notifier(); + return ref.pool.ioctx().aio_operate(ref.obj.oid, cn->completion(), &op, + nullptr); } -}; -template -int RGWSimpleRadosReadCR::send_request(const DoutPrefixProvider *dpp) -{ - req = new RGWAsyncGetSystemObj(dpp, this, stack->create_completion_notifier(), svc, - objv_tracker, obj, false, false); - async_rados->queue(req); - return 0; -} + int request_complete() { + int ret = cn->completion()->get_return_value(); + set_status() << "request complete; ret=" << ret; -template -int RGWSimpleRadosReadCR::request_complete() -{ - int ret = req->get_ret_status(); - retcode = ret; - if (ret == -ENOENT && empty_on_enoent) { - *result = T(); - } else { - if (ret < 0) { - return ret; - } - if (objv_tracker) { // copy the updated version - *objv_tracker = req->objv_tracker; - } - try { - auto iter = req->bl.cbegin(); - if (iter.end()) { - // allow successful reads with empty buffers. ReadSyncStatus coroutines - // depend on this to be able to read without locking, because the - // cls lock from InitSyncStatus will create an empty object if it didn't - // exist - *result = T(); - } else { - decode(*result, iter); + if (ret == -ENOENT && empty_on_enoent) { + *result = T(); + } else { + if (ret < 0) { + return ret; + } + try { + auto iter = bl.cbegin(); + if (iter.end()) { + // allow successful reads with empty buffers. ReadSyncStatus coroutines + // depend on this to be able to read without locking, because the + // cls lock from InitSyncStatus will create an empty object if it didn't + // exist + *result = T(); + } else { + decode(*result, iter); + } + } catch (buffer::error& err) { + return -EIO; } - } catch (buffer::error& err) { - return -EIO; } + + return handle_data(*result); } - return handle_data(*result); -} + virtual int handle_data(T& data) { + return 0; + } +}; class RGWSimpleRadosReadAttrsCR : public RGWSimpleCoroutine { const DoutPrefixProvider* dpp; diff --git a/src/rgw/driver/rados/rgw_data_sync.cc b/src/rgw/driver/rados/rgw_data_sync.cc index 2c4b5783729d7..385801c5789f5 100644 --- a/src/rgw/driver/rados/rgw_data_sync.cc +++ b/src/rgw/driver/rados/rgw_data_sync.cc @@ -118,7 +118,7 @@ bool RGWReadDataSyncStatusMarkersCR::spawn_next() return false; } using CR = RGWSimpleRadosReadCR; - spawn(new CR(env->dpp, env->async_rados, env->svc->sysobj, + spawn(new CR(env->dpp, env->driver, rgw_raw_obj(env->svc->zone->get_zone_params().log_pool, RGWDataSyncStatusManager::shard_obj_name(sc->source_zone, shard_id)), &markers[shard_id], true, &objvs[shard_id]), false); @@ -198,7 +198,7 @@ int RGWReadDataSyncStatusCoroutine::operate(const DoutPrefixProvider *dpp) using ReadInfoCR = RGWSimpleRadosReadCR; yield { bool empty_on_enoent = false; // fail on ENOENT - call(new ReadInfoCR(dpp, sync_env->async_rados, sync_env->svc->sysobj, + call(new ReadInfoCR(dpp, sync_env->driver, rgw_raw_obj(sync_env->svc->zone->get_zone_params().log_pool, RGWDataSyncStatusManager::sync_status_oid(sc->source_zone)), &sync_status->sync_info, empty_on_enoent, objv_tracker)); } @@ -2119,7 +2119,7 @@ public: tn->log(10, "took lease"); /* Reread data sync status to fech latest marker and objv */ objv.clear(); - yield call(new RGWSimpleRadosReadCR(sync_env->dpp, sync_env->async_rados, sync_env->svc->sysobj, + yield call(new RGWSimpleRadosReadCR(sync_env->dpp, sync_env->driver, rgw_raw_obj(pool, status_oid), &sync_marker, true, &objv)); if (retcode < 0) { @@ -2215,7 +2215,7 @@ public: } RGWCoroutine *alloc_finisher_cr() override { - return new RGWSimpleRadosReadCR(sync_env->dpp, sync_env->async_rados, sync_env->svc->sysobj, + return new RGWSimpleRadosReadCR(sync_env->dpp, sync_env->driver, rgw_raw_obj(sync_env->svc->zone->get_zone_params().log_pool, RGWDataSyncStatusManager::shard_obj_name(sc->source_zone, shard_id)), &sync_marker, true, &objv); } @@ -3747,7 +3747,7 @@ int RGWReadPendingBucketShardsCoroutine::operate(const DoutPrefixProvider *dpp) reenter(this){ //read sync status marker using CR = RGWSimpleRadosReadCR; - yield call(new CR(dpp, sync_env->async_rados, sync_env->svc->sysobj, + yield call(new CR(dpp, sync_env->driver, rgw_raw_obj(sync_env->svc->zone->get_zone_params().log_pool, status_oid), sync_marker)); if (retcode < 0) { @@ -4594,7 +4594,7 @@ public: // read bucket sync status objv_tracker.clear(); using ReadCR = RGWSimpleRadosReadCR; - yield call(new ReadCR(dpp, sync_env->async_rados, sync_env->svc->sysobj, + yield call(new ReadCR(dpp, sync_env->driver, bucket_status_obj, &bucket_status, false, &objv_tracker)); if (retcode < 0) { ldpp_dout(dpp, 20) << "failed to read bucket shard status: " @@ -5627,7 +5627,7 @@ int RGWSyncBucketCR::operate(const DoutPrefixProvider *dpp) using WriteCR = RGWSimpleRadosWriteCR; objv.clear(); - yield call(new ReadCR(dpp, env->async_rados, env->svc->sysobj, + yield call(new ReadCR(dpp, env->driver, status_obj, &bucket_status, false, &objv)); if (retcode == -ENOENT) { // use exclusive create to set state=Init @@ -5639,7 +5639,7 @@ int RGWSyncBucketCR::operate(const DoutPrefixProvider *dpp) // raced with another create, read its status tn->log(20, "raced with another create, read its status"); objv.clear(); - yield call(new ReadCR(dpp, env->async_rados, env->svc->sysobj, + yield call(new ReadCR(dpp, env->driver, status_obj, &bucket_status, false, &objv)); } } @@ -5689,7 +5689,7 @@ int RGWSyncBucketCR::operate(const DoutPrefixProvider *dpp) // check if local state is "stopped" objv.clear(); - yield call(new ReadCR(dpp, env->async_rados, env->svc->sysobj, + yield call(new ReadCR(dpp, env->driver, status_obj, &bucket_status, false, &objv)); if (retcode < 0) { tn->log(20, SSTR("ERROR: failed to read status before writing 'stopped'. error: " << retcode)); @@ -5738,8 +5738,8 @@ int RGWSyncBucketCR::operate(const DoutPrefixProvider *dpp) // reread the status after acquiring the lock objv.clear(); - yield call(new ReadCR(dpp, env->async_rados, env->svc->sysobj, - status_obj, &bucket_status, false, &objv)); + yield call(new ReadCR(dpp, env->driver, status_obj, + &bucket_status, false, &objv)); if (retcode < 0) { RELEASE_LOCK(bucket_lease_cr); tn->log(20, SSTR("ERROR: reading the status after acquiring the lock failed. error: " << retcode)); @@ -6162,8 +6162,7 @@ public: // Get the source's status. In incremental sync, this gives us // the generation and shard count that is next needed to be run. yield call(new RGWSimpleRadosReadCR( - dpp, sc.env->async_rados, sc.env->svc->sysobj, - status_obj, &status)); + dpp, sc.env->driver, status_obj, &status)); if (retcode < 0) { ldpp_dout(dpp, -1) << "ERROR: Unable to fetch status for zone=" << sc.source_zone << " retcode=" @@ -6226,8 +6225,7 @@ public: pretty_print(sc.env, "Completed.\n"); yield call(new RGWSimpleRadosReadCR( - dpp, sc.env->async_rados, sc.env->svc->sysobj, - status_obj, &status)); + dpp, sc.env->driver, status_obj, &status)); if (retcode < 0) { ldpp_dout(dpp, -1) << "ERROR: Unable to fetch status for zone=" << sc.source_zone << " retcode=" diff --git a/src/rgw/driver/rados/rgw_sync.cc b/src/rgw/driver/rados/rgw_sync.cc index 065d20985c4d8..7710a2d0ed4fa 100644 --- a/src/rgw/driver/rados/rgw_sync.cc +++ b/src/rgw/driver/rados/rgw_sync.cc @@ -747,7 +747,7 @@ bool RGWReadSyncStatusMarkersCR::spawn_next() using CR = RGWSimpleRadosReadCR; rgw_raw_obj obj{env->store->svc()->zone->get_zone_params().log_pool, env->shard_obj_name(shard_id)}; - spawn(new CR(env->dpp, env->async_rados, env->store->svc()->sysobj, obj, &markers[shard_id]), false); + spawn(new CR(env->dpp, env->store, obj, &markers[shard_id]), false); shard_id++; return true; } @@ -773,7 +773,7 @@ int RGWReadSyncStatusCoroutine::operate(const DoutPrefixProvider *dpp) bool empty_on_enoent = false; // fail on ENOENT rgw_raw_obj obj{sync_env->store->svc()->zone->get_zone_params().log_pool, sync_env->status_oid()}; - call(new ReadInfoCR(dpp, sync_env->async_rados, sync_env->store->svc()->sysobj, obj, + call(new ReadInfoCR(dpp, sync_env->store, obj, &sync_status->sync_info, empty_on_enoent)); } if (retcode < 0) { @@ -1903,7 +1903,7 @@ public: RGWCoroutine *alloc_finisher_cr() override { rgw::sal::RadosStore* store = sync_env->store; - return new RGWSimpleRadosReadCR(sync_env->dpp, sync_env->async_rados, store->svc()->sysobj, + return new RGWSimpleRadosReadCR(sync_env->dpp, store, rgw_raw_obj(pool, sync_env->shard_obj_name(shard_id)), &sync_marker); } diff --git a/src/rgw/driver/rados/rgw_sync_module_aws.cc b/src/rgw/driver/rados/rgw_sync_module_aws.cc index 6827f7f3a1a80..9f79a79742a08 100644 --- a/src/rgw/driver/rados/rgw_sync_module_aws.cc +++ b/src/rgw/driver/rados/rgw_sync_module_aws.cc @@ -1452,8 +1452,8 @@ public: int operate(const DoutPrefixProvider *dpp) override { reenter(this) { - yield call(new RGWSimpleRadosReadCR(dpp, sync_env->async_rados, sync_env->svc->sysobj, - status_obj, &status, false)); + yield call(new RGWSimpleRadosReadCR( + dpp, sync_env->driver, status_obj, &status, false)); if (retcode < 0 && retcode != -ENOENT) { ldpp_dout(dpp, 0) << "ERROR: failed to read sync status of object " << src_obj << " retcode=" << retcode << dendl; diff --git a/src/rgw/driver/rados/rgw_trim_bilog.cc b/src/rgw/driver/rados/rgw_trim_bilog.cc index 6ddda5d6b171f..71811d4b45d27 100644 --- a/src/rgw/driver/rados/rgw_trim_bilog.cc +++ b/src/rgw/driver/rados/rgw_trim_bilog.cc @@ -1096,8 +1096,7 @@ int BucketTrimCR::operate(const DoutPrefixProvider *dpp) // read BucketTrimStatus for marker position set_status("reading trim status"); using ReadStatus = RGWSimpleRadosReadCR; - yield call(new ReadStatus(dpp, store->svc()->rados->get_async_processor(), store->svc()->sysobj, obj, - &status, true, &objv)); + yield call(new ReadStatus(dpp, store, obj, &status, true, &objv)); if (retcode < 0) { ldpp_dout(dpp, 10) << "failed to read bilog trim status: " << cpp_strerror(retcode) << dendl; diff --git a/src/rgw/services/svc_mdlog.cc b/src/rgw/services/svc_mdlog.cc index e8bba4556a626..93eb556e03436 100644 --- a/src/rgw/services/svc_mdlog.cc +++ b/src/rgw/services/svc_mdlog.cc @@ -110,6 +110,83 @@ namespace mdlog { using Cursor = RGWPeriodHistory::Cursor; +namespace { +template +class SysObjReadCR : public RGWSimpleCoroutine { + const DoutPrefixProvider *dpp; + RGWAsyncRadosProcessor *async_rados; + RGWSI_SysObj *svc; + + rgw_raw_obj obj; + T *result; + /// on ENOENT, call handle_data() with an empty object instead of failing + const bool empty_on_enoent; + RGWObjVersionTracker *objv_tracker; + RGWAsyncGetSystemObj *req{nullptr}; + +public: + SysObjReadCR(const DoutPrefixProvider *_dpp, + RGWAsyncRadosProcessor *_async_rados, RGWSI_SysObj *_svc, + const rgw_raw_obj& _obj, + T *_result, bool empty_on_enoent = true, + RGWObjVersionTracker *objv_tracker = nullptr) + : RGWSimpleCoroutine(_svc->ctx()), dpp(_dpp), async_rados(_async_rados), svc(_svc), + obj(_obj), result(_result), + empty_on_enoent(empty_on_enoent), objv_tracker(objv_tracker) {} + ~SysObjReadCR() override { + request_cleanup(); + } + + void request_cleanup() override { + if (req) { + req->finish(); + req = NULL; + } + } + + int send_request(const DoutPrefixProvider *dpp) { + req = new RGWAsyncGetSystemObj(dpp, this, stack->create_completion_notifier(), svc, + objv_tracker, obj, false, false); + async_rados->queue(req); + return 0; + } + + int request_complete() { + int ret = req->get_ret_status(); + retcode = ret; + if (ret == -ENOENT && empty_on_enoent) { + *result = T(); + } else { + if (ret < 0) { + return ret; + } + if (objv_tracker) { // copy the updated version + *objv_tracker = req->objv_tracker; + } + try { + auto iter = req->bl.cbegin(); + if (iter.end()) { + // allow successful reads with empty buffers. ReadSyncStatus + // coroutines depend on this to be able to read without + // locking, because the cls lock from InitSyncStatus will + // create an empty object if it didn't exist + *result = T(); + } else { + decode(*result, iter); + } + } catch (buffer::error& err) { + return -EIO; + } + } + return handle_data(*result); + } + + virtual int handle_data(T& data) { + return 0; + } +}; +} + /// read the mdlog history and use it to initialize the given cursor class ReadHistoryCR : public RGWCoroutine { const DoutPrefixProvider *dpp; @@ -137,7 +214,7 @@ class ReadHistoryCR : public RGWCoroutine { RGWMetadataLogHistory::oid}; constexpr bool empty_on_enoent = false; - using ReadCR = RGWSimpleRadosReadCR; + using ReadCR = SysObjReadCR; call(new ReadCR(dpp, async_processor, svc.sysobj, obj, &state, empty_on_enoent, objv_tracker)); } diff --git a/src/test/rgw/rgw_cr_test.cc b/src/test/rgw/rgw_cr_test.cc index e9f7596aab54f..34afa93f191f8 100644 --- a/src/test/rgw/rgw_cr_test.cc +++ b/src/test/rgw/rgw_cr_test.cc @@ -140,6 +140,82 @@ TEST(ReadAttrs, Filtered) { ASSERT_EQ(ref_attrs, attrs); } +TEST(Read, Dne) { + TempPool pool; + std::string result; + auto r = run(new RGWSimpleRadosReadCR(dpp(), store, {pool, "doesnotexist"}, + &result, false)); + ASSERT_EQ(-ENOENT, r); +} + +TEST(Read, Read) { + TempPool pool; + auto data = "I am test data!"sv; + auto oid = "object"s; + { + bufferlist bl; + encode(data, bl); + librados::IoCtx ioctx(pool); + auto r = ioctx.write_full(oid, bl); + ASSERT_EQ(0, r); + } + std::string result; + auto r = run(new RGWSimpleRadosReadCR(dpp(), store, {pool, oid}, &result, + false)); + ASSERT_EQ(0, r); + ASSERT_EQ(result, data); +} + +TEST(Read, ReadVersion) { + TempPool pool; + auto data = "I am test data!"sv; + auto oid = "object"s; + RGWObjVersionTracker wobjv; + { + bufferlist bl; + encode(data, bl); + librados::IoCtx ioctx(pool); + librados::ObjectWriteOperation op; + wobjv.generate_new_write_ver(store->ctx()); + wobjv.prepare_op_for_write(&op); + op.write_full(bl); + auto r = ioctx.operate(oid, &op); + EXPECT_EQ(0, r); + wobjv.apply_write(); + } + RGWObjVersionTracker robjv; + std::string result; + auto r = run(new RGWSimpleRadosReadCR(dpp(), store, {pool, oid}, &result, + false, &robjv)); + ASSERT_EQ(0, r); + ASSERT_EQ(result, data); + data = "I am NEW test data!"; + { + bufferlist bl; + encode(data, bl); + librados::IoCtx ioctx(pool); + librados::ObjectWriteOperation op; + wobjv.generate_new_write_ver(store->ctx()); + wobjv.prepare_op_for_write(&op); + op.write_full(bl); + r = ioctx.operate(oid, &op); + EXPECT_EQ(0, r); + wobjv.apply_write(); + } + result.clear(); + r = run(new RGWSimpleRadosReadCR(dpp(), store, {pool, oid}, &result, false, + &robjv)); + ASSERT_EQ(-ECANCELED, r); + ASSERT_TRUE(result.empty()); + + robjv.clear(); + r = run(new RGWSimpleRadosReadCR(dpp(), store, {pool, oid}, &result, false, + &robjv)); + ASSERT_EQ(0, r); + ASSERT_EQ(result, data); + ASSERT_EQ(wobjv.read_version, robjv.read_version); +} + int main(int argc, const char **argv) { auto args = argv_to_vec(argc, argv);