template <class T>
class RGWSimpleRadosReadCR : public RGWSimpleCoroutine {
- const DoutPrefixProvider *dpp;
- RGWAsyncRadosProcessor *async_rados;
- RGWSI_SysObj *svc;
-
+ const DoutPrefixProvider* dpp;
+ rgw::sal::RadosStore* store;
rgw_raw_obj obj;
- T *result;
+ T* result;
/// on ENOENT, call handle_data() with an empty object instead of failing
const bool empty_on_enoent;
- RGWObjVersionTracker *objv_tracker;
- RGWAsyncGetSystemObj *req{nullptr};
+ RGWObjVersionTracker* objv_tracker;
+
+ T val;
+ rgw_rados_ref ref;
+ ceph::buffer::list bl;
+ boost::intrusive_ptr<RGWAioCompletionNotifier> cn;
public:
- RGWSimpleRadosReadCR(const DoutPrefixProvider *_dpp,
- RGWAsyncRadosProcessor *_async_rados, RGWSI_SysObj *_svc,
- const rgw_raw_obj& _obj,
- T *_result, bool empty_on_enoent = true,
- RGWObjVersionTracker *objv_tracker = nullptr)
- : RGWSimpleCoroutine(_svc->ctx()), dpp(_dpp), async_rados(_async_rados), svc(_svc),
- obj(_obj), result(_result),
- empty_on_enoent(empty_on_enoent), objv_tracker(objv_tracker) {}
- ~RGWSimpleRadosReadCR() override {
- request_cleanup();
+ RGWSimpleRadosReadCR(const DoutPrefixProvider* dpp,
+ rgw::sal::RadosStore* store,
+ const rgw_raw_obj& obj,
+ T* result, bool empty_on_enoent = true,
+ RGWObjVersionTracker* objv_tracker = nullptr)
+ : RGWSimpleCoroutine(store->ctx()), dpp(dpp), store(store),
+ obj(obj), result(result), empty_on_enoent(empty_on_enoent),
+ objv_tracker(objv_tracker) {
+ if (!result) {
+ result = &val;
+ }
}
- void request_cleanup() override {
- if (req) {
- req->finish();
- req = NULL;
+ int send_request(const DoutPrefixProvider *dpp) {
+ int r = store->getRados()->get_raw_obj_ref(dpp, obj, &ref);
+ if (r < 0) {
+ ldpp_dout(dpp, -1) << "ERROR: failed to get ref for (" << obj << ") ret="
+ << r << dendl;
+ return r;
}
- }
- int send_request(const DoutPrefixProvider *dpp) override;
- int request_complete() override;
+ set_status() << "sending request";
- virtual int handle_data(T& data) {
- return 0;
+ librados::ObjectReadOperation op;
+ if (objv_tracker) {
+ objv_tracker->prepare_op_for_read(&op);
+ }
+
+ op.read(0, -1, &bl, nullptr);
+
+ cn = stack->create_completion_notifier();
+ return ref.pool.ioctx().aio_operate(ref.obj.oid, cn->completion(), &op,
+ nullptr);
}
-};
-template <class T>
-int RGWSimpleRadosReadCR<T>::send_request(const DoutPrefixProvider *dpp)
-{
- req = new RGWAsyncGetSystemObj(dpp, this, stack->create_completion_notifier(), svc,
- objv_tracker, obj, false, false);
- async_rados->queue(req);
- return 0;
-}
+ int request_complete() {
+ int ret = cn->completion()->get_return_value();
+ set_status() << "request complete; ret=" << ret;
-template <class T>
-int RGWSimpleRadosReadCR<T>::request_complete()
-{
- int ret = req->get_ret_status();
- retcode = ret;
- if (ret == -ENOENT && empty_on_enoent) {
- *result = T();
- } else {
- if (ret < 0) {
- return ret;
- }
- if (objv_tracker) { // copy the updated version
- *objv_tracker = req->objv_tracker;
- }
- try {
- auto iter = req->bl.cbegin();
- if (iter.end()) {
- // allow successful reads with empty buffers. ReadSyncStatus coroutines
- // depend on this to be able to read without locking, because the
- // cls lock from InitSyncStatus will create an empty object if it didn't
- // exist
- *result = T();
- } else {
- decode(*result, iter);
+ if (ret == -ENOENT && empty_on_enoent) {
+ *result = T();
+ } else {
+ if (ret < 0) {
+ return ret;
+ }
+ try {
+ auto iter = bl.cbegin();
+ if (iter.end()) {
+ // allow successful reads with empty buffers. ReadSyncStatus coroutines
+ // depend on this to be able to read without locking, because the
+ // cls lock from InitSyncStatus will create an empty object if it didn't
+ // exist
+ *result = T();
+ } else {
+ decode(*result, iter);
+ }
+ } catch (buffer::error& err) {
+ return -EIO;
}
- } catch (buffer::error& err) {
- return -EIO;
}
+
+ return handle_data(*result);
}
- return handle_data(*result);
-}
+ virtual int handle_data(T& data) {
+ return 0;
+ }
+};
class RGWSimpleRadosReadAttrsCR : public RGWSimpleCoroutine {
const DoutPrefixProvider* dpp;
return false;
}
using CR = RGWSimpleRadosReadCR<rgw_data_sync_marker>;
- spawn(new CR(env->dpp, env->async_rados, env->svc->sysobj,
+ spawn(new CR(env->dpp, env->driver,
rgw_raw_obj(env->svc->zone->get_zone_params().log_pool, RGWDataSyncStatusManager::shard_obj_name(sc->source_zone, shard_id)),
&markers[shard_id], true, &objvs[shard_id]),
false);
using ReadInfoCR = RGWSimpleRadosReadCR<rgw_data_sync_info>;
yield {
bool empty_on_enoent = false; // fail on ENOENT
- call(new ReadInfoCR(dpp, sync_env->async_rados, sync_env->svc->sysobj,
+ call(new ReadInfoCR(dpp, sync_env->driver,
rgw_raw_obj(sync_env->svc->zone->get_zone_params().log_pool, RGWDataSyncStatusManager::sync_status_oid(sc->source_zone)),
&sync_status->sync_info, empty_on_enoent, objv_tracker));
}
tn->log(10, "took lease");
/* Reread data sync status to fech latest marker and objv */
objv.clear();
- yield call(new RGWSimpleRadosReadCR<rgw_data_sync_marker>(sync_env->dpp, sync_env->async_rados, sync_env->svc->sysobj,
+ yield call(new RGWSimpleRadosReadCR<rgw_data_sync_marker>(sync_env->dpp, sync_env->driver,
rgw_raw_obj(pool, status_oid),
&sync_marker, true, &objv));
if (retcode < 0) {
}
RGWCoroutine *alloc_finisher_cr() override {
- return new RGWSimpleRadosReadCR<rgw_data_sync_marker>(sync_env->dpp, sync_env->async_rados, sync_env->svc->sysobj,
+ return new RGWSimpleRadosReadCR<rgw_data_sync_marker>(sync_env->dpp, sync_env->driver,
rgw_raw_obj(sync_env->svc->zone->get_zone_params().log_pool, RGWDataSyncStatusManager::shard_obj_name(sc->source_zone, shard_id)),
&sync_marker, true, &objv);
}
reenter(this){
//read sync status marker
using CR = RGWSimpleRadosReadCR<rgw_data_sync_marker>;
- yield call(new CR(dpp, sync_env->async_rados, sync_env->svc->sysobj,
+ yield call(new CR(dpp, sync_env->driver,
rgw_raw_obj(sync_env->svc->zone->get_zone_params().log_pool, status_oid),
sync_marker));
if (retcode < 0) {
// read bucket sync status
objv_tracker.clear();
using ReadCR = RGWSimpleRadosReadCR<rgw_bucket_sync_status>;
- yield call(new ReadCR(dpp, sync_env->async_rados, sync_env->svc->sysobj,
+ yield call(new ReadCR(dpp, sync_env->driver,
bucket_status_obj, &bucket_status, false, &objv_tracker));
if (retcode < 0) {
ldpp_dout(dpp, 20) << "failed to read bucket shard status: "
using WriteCR = RGWSimpleRadosWriteCR<rgw_bucket_sync_status>;
objv.clear();
- yield call(new ReadCR(dpp, env->async_rados, env->svc->sysobj,
+ yield call(new ReadCR(dpp, env->driver,
status_obj, &bucket_status, false, &objv));
if (retcode == -ENOENT) {
// use exclusive create to set state=Init
// raced with another create, read its status
tn->log(20, "raced with another create, read its status");
objv.clear();
- yield call(new ReadCR(dpp, env->async_rados, env->svc->sysobj,
+ yield call(new ReadCR(dpp, env->driver,
status_obj, &bucket_status, false, &objv));
}
}
// check if local state is "stopped"
objv.clear();
- yield call(new ReadCR(dpp, env->async_rados, env->svc->sysobj,
+ yield call(new ReadCR(dpp, env->driver,
status_obj, &bucket_status, false, &objv));
if (retcode < 0) {
tn->log(20, SSTR("ERROR: failed to read status before writing 'stopped'. error: " << retcode));
// reread the status after acquiring the lock
objv.clear();
- yield call(new ReadCR(dpp, env->async_rados, env->svc->sysobj,
- status_obj, &bucket_status, false, &objv));
+ yield call(new ReadCR(dpp, env->driver, status_obj,
+ &bucket_status, false, &objv));
if (retcode < 0) {
RELEASE_LOCK(bucket_lease_cr);
tn->log(20, SSTR("ERROR: reading the status after acquiring the lock failed. error: " << retcode));
// Get the source's status. In incremental sync, this gives us
// the generation and shard count that is next needed to be run.
yield call(new RGWSimpleRadosReadCR<rgw_bucket_sync_status>(
- dpp, sc.env->async_rados, sc.env->svc->sysobj,
- status_obj, &status));
+ dpp, sc.env->driver, status_obj, &status));
if (retcode < 0) {
ldpp_dout(dpp, -1) << "ERROR: Unable to fetch status for zone="
<< sc.source_zone << " retcode="
pretty_print(sc.env, "Completed.\n");
yield call(new RGWSimpleRadosReadCR<rgw_bucket_sync_status>(
- dpp, sc.env->async_rados, sc.env->svc->sysobj,
- status_obj, &status));
+ dpp, sc.env->driver, status_obj, &status));
if (retcode < 0) {
ldpp_dout(dpp, -1) << "ERROR: Unable to fetch status for zone="
<< sc.source_zone << " retcode="
using CR = RGWSimpleRadosReadCR<rgw_meta_sync_marker>;
rgw_raw_obj obj{env->store->svc()->zone->get_zone_params().log_pool,
env->shard_obj_name(shard_id)};
- spawn(new CR(env->dpp, env->async_rados, env->store->svc()->sysobj, obj, &markers[shard_id]), false);
+ spawn(new CR(env->dpp, env->store, obj, &markers[shard_id]), false);
shard_id++;
return true;
}
bool empty_on_enoent = false; // fail on ENOENT
rgw_raw_obj obj{sync_env->store->svc()->zone->get_zone_params().log_pool,
sync_env->status_oid()};
- call(new ReadInfoCR(dpp, sync_env->async_rados, sync_env->store->svc()->sysobj, obj,
+ call(new ReadInfoCR(dpp, sync_env->store, obj,
&sync_status->sync_info, empty_on_enoent));
}
if (retcode < 0) {
RGWCoroutine *alloc_finisher_cr() override {
rgw::sal::RadosStore* store = sync_env->store;
- return new RGWSimpleRadosReadCR<rgw_meta_sync_marker>(sync_env->dpp, sync_env->async_rados, store->svc()->sysobj,
+ return new RGWSimpleRadosReadCR<rgw_meta_sync_marker>(sync_env->dpp, store,
rgw_raw_obj(pool, sync_env->shard_obj_name(shard_id)),
&sync_marker);
}
int operate(const DoutPrefixProvider *dpp) override {
reenter(this) {
- yield call(new RGWSimpleRadosReadCR<rgw_sync_aws_multipart_upload_info>(dpp, sync_env->async_rados, sync_env->svc->sysobj,
- status_obj, &status, false));
+ yield call(new RGWSimpleRadosReadCR<rgw_sync_aws_multipart_upload_info>(
+ dpp, sync_env->driver, status_obj, &status, false));
if (retcode < 0 && retcode != -ENOENT) {
ldpp_dout(dpp, 0) << "ERROR: failed to read sync status of object " << src_obj << " retcode=" << retcode << dendl;
// read BucketTrimStatus for marker position
set_status("reading trim status");
using ReadStatus = RGWSimpleRadosReadCR<BucketTrimStatus>;
- yield call(new ReadStatus(dpp, store->svc()->rados->get_async_processor(), store->svc()->sysobj, obj,
- &status, true, &objv));
+ yield call(new ReadStatus(dpp, store, obj, &status, true, &objv));
if (retcode < 0) {
ldpp_dout(dpp, 10) << "failed to read bilog trim status: "
<< cpp_strerror(retcode) << dendl;
using Cursor = RGWPeriodHistory::Cursor;
+namespace {
+template <class T>
+class SysObjReadCR : public RGWSimpleCoroutine {
+ const DoutPrefixProvider *dpp;
+ RGWAsyncRadosProcessor *async_rados;
+ RGWSI_SysObj *svc;
+
+ rgw_raw_obj obj;
+ T *result;
+ /// on ENOENT, call handle_data() with an empty object instead of failing
+ const bool empty_on_enoent;
+ RGWObjVersionTracker *objv_tracker;
+ RGWAsyncGetSystemObj *req{nullptr};
+
+public:
+ SysObjReadCR(const DoutPrefixProvider *_dpp,
+ RGWAsyncRadosProcessor *_async_rados, RGWSI_SysObj *_svc,
+ const rgw_raw_obj& _obj,
+ T *_result, bool empty_on_enoent = true,
+ RGWObjVersionTracker *objv_tracker = nullptr)
+ : RGWSimpleCoroutine(_svc->ctx()), dpp(_dpp), async_rados(_async_rados), svc(_svc),
+ obj(_obj), result(_result),
+ empty_on_enoent(empty_on_enoent), objv_tracker(objv_tracker) {}
+ ~SysObjReadCR() override {
+ request_cleanup();
+ }
+
+ void request_cleanup() override {
+ if (req) {
+ req->finish();
+ req = NULL;
+ }
+ }
+
+ int send_request(const DoutPrefixProvider *dpp) {
+ req = new RGWAsyncGetSystemObj(dpp, this, stack->create_completion_notifier(), svc,
+ objv_tracker, obj, false, false);
+ async_rados->queue(req);
+ return 0;
+ }
+
+ int request_complete() {
+ int ret = req->get_ret_status();
+ retcode = ret;
+ if (ret == -ENOENT && empty_on_enoent) {
+ *result = T();
+ } else {
+ if (ret < 0) {
+ return ret;
+ }
+ if (objv_tracker) { // copy the updated version
+ *objv_tracker = req->objv_tracker;
+ }
+ try {
+ auto iter = req->bl.cbegin();
+ if (iter.end()) {
+ // allow successful reads with empty buffers. ReadSyncStatus
+ // coroutines depend on this to be able to read without
+ // locking, because the cls lock from InitSyncStatus will
+ // create an empty object if it didn't exist
+ *result = T();
+ } else {
+ decode(*result, iter);
+ }
+ } catch (buffer::error& err) {
+ return -EIO;
+ }
+ }
+ return handle_data(*result);
+ }
+
+ virtual int handle_data(T& data) {
+ return 0;
+ }
+};
+}
+
/// read the mdlog history and use it to initialize the given cursor
class ReadHistoryCR : public RGWCoroutine {
const DoutPrefixProvider *dpp;
RGWMetadataLogHistory::oid};
constexpr bool empty_on_enoent = false;
- using ReadCR = RGWSimpleRadosReadCR<RGWMetadataLogHistory>;
+ using ReadCR = SysObjReadCR<RGWMetadataLogHistory>;
call(new ReadCR(dpp, async_processor, svc.sysobj, obj,
&state, empty_on_enoent, objv_tracker));
}
ASSERT_EQ(ref_attrs, attrs);
}
+TEST(Read, Dne) {
+ TempPool pool;
+ std::string result;
+ auto r = run(new RGWSimpleRadosReadCR(dpp(), store, {pool, "doesnotexist"},
+ &result, false));
+ ASSERT_EQ(-ENOENT, r);
+}
+
+TEST(Read, Read) {
+ TempPool pool;
+ auto data = "I am test data!"sv;
+ auto oid = "object"s;
+ {
+ bufferlist bl;
+ encode(data, bl);
+ librados::IoCtx ioctx(pool);
+ auto r = ioctx.write_full(oid, bl);
+ ASSERT_EQ(0, r);
+ }
+ std::string result;
+ auto r = run(new RGWSimpleRadosReadCR(dpp(), store, {pool, oid}, &result,
+ false));
+ ASSERT_EQ(0, r);
+ ASSERT_EQ(result, data);
+}
+
+TEST(Read, ReadVersion) {
+ TempPool pool;
+ auto data = "I am test data!"sv;
+ auto oid = "object"s;
+ RGWObjVersionTracker wobjv;
+ {
+ bufferlist bl;
+ encode(data, bl);
+ librados::IoCtx ioctx(pool);
+ librados::ObjectWriteOperation op;
+ wobjv.generate_new_write_ver(store->ctx());
+ wobjv.prepare_op_for_write(&op);
+ op.write_full(bl);
+ auto r = ioctx.operate(oid, &op);
+ EXPECT_EQ(0, r);
+ wobjv.apply_write();
+ }
+ RGWObjVersionTracker robjv;
+ std::string result;
+ auto r = run(new RGWSimpleRadosReadCR(dpp(), store, {pool, oid}, &result,
+ false, &robjv));
+ ASSERT_EQ(0, r);
+ ASSERT_EQ(result, data);
+ data = "I am NEW test data!";
+ {
+ bufferlist bl;
+ encode(data, bl);
+ librados::IoCtx ioctx(pool);
+ librados::ObjectWriteOperation op;
+ wobjv.generate_new_write_ver(store->ctx());
+ wobjv.prepare_op_for_write(&op);
+ op.write_full(bl);
+ r = ioctx.operate(oid, &op);
+ EXPECT_EQ(0, r);
+ wobjv.apply_write();
+ }
+ result.clear();
+ r = run(new RGWSimpleRadosReadCR(dpp(), store, {pool, oid}, &result, false,
+ &robjv));
+ ASSERT_EQ(-ECANCELED, r);
+ ASSERT_TRUE(result.empty());
+
+ robjv.clear();
+ r = run(new RGWSimpleRadosReadCR(dpp(), store, {pool, oid}, &result, false,
+ &robjv));
+ ASSERT_EQ(0, r);
+ ASSERT_EQ(result, data);
+ ASSERT_EQ(wobjv.read_version, robjv.read_version);
+}
+
int main(int argc, const char **argv)
{
auto args = argv_to_vec(argc, argv);