template <class T>
class RGWSimpleRadosWriteCR : public RGWSimpleCoroutine {
- const DoutPrefixProvider *dpp;
- RGWAsyncRadosProcessor *async_rados;
- RGWSI_SysObj *svc;
- bufferlist bl;
+ const DoutPrefixProvider* dpp;
+ rgw::sal::RadosStore* const store;
rgw_raw_obj obj;
- RGWObjVersionTracker *objv_tracker;
+ RGWObjVersionTracker* objv_tracker;
bool exclusive;
- RGWAsyncPutSystemObj *req{nullptr};
+
+ bufferlist bl;
+ rgw_rados_ref ref;
+ std::map<std::string, bufferlist> unfiltered_attrs;
+ boost::intrusive_ptr<RGWAioCompletionNotifier> cn;
+
public:
- RGWSimpleRadosWriteCR(const DoutPrefixProvider *_dpp,
- RGWAsyncRadosProcessor *_async_rados, RGWSI_SysObj *_svc,
- const rgw_raw_obj& _obj, const T& _data,
- RGWObjVersionTracker *objv_tracker = nullptr,
+ RGWSimpleRadosWriteCR(const DoutPrefixProvider* dpp,
+ rgw::sal::RadosStore* const store,
+ rgw_raw_obj obj, const T& data,
+ RGWObjVersionTracker* objv_tracker = nullptr,
bool exclusive = false)
- : RGWSimpleCoroutine(_svc->ctx()), dpp(_dpp), async_rados(_async_rados),
- svc(_svc), obj(_obj), objv_tracker(objv_tracker), exclusive(exclusive) {
- encode(_data, bl);
+ : RGWSimpleCoroutine(store->ctx()), dpp(dpp), store(store),
+ obj(std::move(obj)), objv_tracker(objv_tracker), exclusive(exclusive) {
+ encode(data, bl);
}
- ~RGWSimpleRadosWriteCR() override {
- request_cleanup();
- }
+ int send_request(const DoutPrefixProvider *dpp) override {
+ int r = store->getRados()->get_raw_obj_ref(dpp, obj, &ref);
+ if (r < 0) {
+ ldpp_dout(dpp, -1) << "ERROR: failed to get ref for (" << obj << ") ret="
+ << r << dendl;
+ return r;
+ }
- void request_cleanup() override {
- if (req) {
- req->finish();
- req = NULL;
+ set_status() << "sending request";
+
+ librados::ObjectWriteOperation op;
+ if (exclusive) {
+ op.create(true);
}
- }
+ if (objv_tracker) {
+ objv_tracker->prepare_op_for_write(&op);
+ }
+ op.write_full(bl);
- int send_request(const DoutPrefixProvider *dpp) override {
- req = new RGWAsyncPutSystemObj(dpp, this, stack->create_completion_notifier(),
- svc, objv_tracker, obj, exclusive, std::move(bl));
- async_rados->queue(req);
- return 0;
+ cn = stack->create_completion_notifier();
+ return ref.pool.ioctx().aio_operate(ref.obj.oid, cn->completion(), &op);
}
int request_complete() override {
- if (objv_tracker) { // copy the updated version
- *objv_tracker = req->objv_tracker;
+ int ret = cn->completion()->get_return_value();
+ set_status() << "request complete; ret=" << ret;
+ if (ret >= 0 && objv_tracker) {
+ objv_tracker->apply_write();
}
- return req->get_ret_status();
+ return ret;
}
};
}
using WriteInfoCR = RGWSimpleRadosWriteCR<rgw_data_sync_info>;
- yield call(new WriteInfoCR(dpp, sync_env->async_rados, sync_env->svc->sysobj,
+ yield call(new WriteInfoCR(dpp, sync_env->driver,
rgw_raw_obj{pool, sync_status_oid},
status->sync_info, &objv_tracker));
if (retcode < 0) {
auto& objv = objvs[i];
objv.generate_new_write_ver(cct);
using WriteMarkerCR = RGWSimpleRadosWriteCR<rgw_data_sync_marker>;
- spawn(new WriteMarkerCR(dpp, sync_env->async_rados, sync_env->svc->sysobj,
+ spawn(new WriteMarkerCR(dpp, sync_env->driver,
rgw_raw_obj{pool, oid}, marker, &objv), true);
}
}
}
status->sync_info.state = rgw_data_sync_info::StateBuildingFullSyncMaps;
- yield call(new WriteInfoCR(dpp, sync_env->async_rados, sync_env->svc->sysobj,
+ yield call(new WriteInfoCR(dpp, sync_env->driver,
rgw_raw_obj{pool, sync_status_oid},
status->sync_info, &objv_tracker));
if (retcode < 0) {
rgw_data_sync_marker& marker = iter->second;
marker.total_entries = entries_index->get_total_entries(shard_id);
spawn(new RGWSimpleRadosWriteCR<rgw_data_sync_marker>(
- dpp, sync_env->async_rados, sync_env->svc->sysobj,
+ dpp, sync_env->driver,
rgw_raw_obj(sync_env->svc->zone->get_zone_params().log_pool,
RGWDataSyncStatusManager::shard_obj_name(
sc->source_zone, shard_id)),
tn->log(20, SSTR("updating marker marker_oid=" << marker_oid << " marker=" << new_marker));
- return new RGWSimpleRadosWriteCR<rgw_data_sync_marker>(sync_env->dpp, sync_env->async_rados, sync_env->svc->sysobj,
+ return new RGWSimpleRadosWriteCR<rgw_data_sync_marker>(sync_env->dpp, sync_env->driver,
rgw_raw_obj(sync_env->svc->zone->get_zone_params().log_pool, marker_oid),
sync_marker, &objv);
}
sync_marker.marker = sync_marker.next_step_marker;
sync_marker.next_step_marker.clear();
yield call(new RGWSimpleRadosWriteCR<rgw_data_sync_marker>(
- sc->env->dpp,sc->env->async_rados, sc->env->svc->sysobj,
+ sc->env->dpp, sc->env->driver,
rgw_raw_obj(pool, status_oid), sync_marker, &objv));
if (retcode < 0) {
tn->log(0, SSTR("ERROR: failed to set sync marker: retcode=" << retcode));
}
RGWCoroutine *set_sync_info_cr() {
- return new RGWSimpleRadosWriteCR<rgw_data_sync_info>(sync_env->dpp, sync_env->async_rados, sync_env->svc->sysobj,
+ return new RGWSimpleRadosWriteCR<rgw_data_sync_info>(sync_env->dpp, sync_env->driver,
rgw_raw_obj(sync_env->svc->zone->get_zone_params().log_pool, RGWDataSyncStatusManager::sync_status_oid(sc->source_zone)),
sync_status.sync_info, &obj_version);
}
// write bucket sync status
using CR = RGWSimpleRadosWriteCR<rgw_bucket_sync_status>;
- yield call(new CR(dpp, sync_env->async_rados, sync_env->svc->sysobj,
+ yield call(new CR(dpp, sync_env->driver,
status_obj, status, &objv, false));
if (retcode < 0) {
ldout(cct, 20) << "failed to write bucket shard status: "
tn->log(20, SSTR("updating marker oid=" << status_obj.oid << " marker=" << new_marker));
return new RGWSimpleRadosWriteCR<rgw_bucket_sync_status>(
- sync_env->dpp, sync_env->async_rados, sync_env->svc->sysobj,
+ sync_env->dpp, sync_env->driver,
status_obj, sync_status, &objv_tracker);
}
sync_status.state = BucketSyncState::Incremental;
tn->log(5, SSTR("set bucket state=" << sync_status.state));
yield call(new RGWSimpleRadosWriteCR<rgw_bucket_sync_status>(
- dpp, sync_env->async_rados, sync_env->svc->sysobj,
- status_obj, sync_status, &objv));
+ dpp, sync_env->driver, status_obj, sync_status, &objv));
tn->log(5, SSTR("bucket status objv=" << objv));
} else {
tn->log(10, SSTR("backing out with sync_status=" << sync_result));
}
ldpp_dout(dpp, 20) << "bucket status incremental gen is " << bucket_status.incremental_gen << dendl;
using WriteCR = RGWSimpleRadosWriteCR<rgw_bucket_sync_status>;
- call(new WriteCR(dpp, sync_env->async_rados, sync_env->svc->sysobj,
+ call(new WriteCR(dpp, sync_env->driver,
bucket_status_obj, bucket_status, &objv_tracker, false));
}
if (retcode < 0 && retcode != -ECANCELED) {
if (retcode == -ENOENT) {
// use exclusive create to set state=Init
objv.generate_new_write_ver(cct);
- yield call(new WriteCR(dpp, env->async_rados, env->svc->sysobj,
- status_obj, bucket_status, &objv, true));
+ yield call(new WriteCR(dpp, env->driver, status_obj, bucket_status, &objv, true));
tn->log(20, "bucket status object does not exist, create a new one");
if (retcode == -EEXIST) {
// raced with another create, read its status
if (bucket_status.state != BucketSyncState::Stopped) {
// make sure that state is changed to stopped localy
bucket_status.state = BucketSyncState::Stopped;
- yield call(new WriteCR(dpp, env->async_rados, env->svc->sysobj,
- status_obj, bucket_status, &objv, false));
+ yield call(new WriteCR(dpp, env->driver, status_obj, bucket_status,
+ &objv, false));
if (retcode < 0) {
tn->log(20, SSTR("ERROR: failed to write 'stopped' status. error: " << retcode));
RELEASE_LOCK(bucket_lease_cr);
pretty_print(source.sc.env, "Initializing sync state of bucket {} with zone {}.\n",
source.info.bucket.name, source.zone_name);
stack->call(new RGWSimpleRadosWriteCR<rgw_bucket_sync_status>(
- dpp, source.sc.env->async_rados, source.sc.env->svc->sysobj,
+ dpp, source.sc.env->driver,
{sync_env.svc->zone->get_zone_params().log_pool,
full_status_oid(source.sc.source_zone,
source.info.bucket,
yield {
set_status("writing sync status");
rgw::sal::RadosStore* store = sync_env->store;
- call(new RGWSimpleRadosWriteCR<rgw_meta_sync_info>(dpp, sync_env->async_rados, store->svc()->sysobj,
+ call(new RGWSimpleRadosWriteCR<rgw_meta_sync_info>(dpp, store,
rgw_raw_obj(store->svc()->zone->get_zone_params().log_pool, sync_env->status_oid()),
status));
}
marker.timestamp = info.last_update;
rgw::sal::RadosStore* store = sync_env->store;
spawn(new RGWSimpleRadosWriteCR<rgw_meta_sync_marker>(dpp,
- sync_env->async_rados,
- store->svc()->sysobj,
+ store,
rgw_raw_obj(store->svc()->zone->get_zone_params().log_pool, sync_env->shard_obj_name(i)),
marker), true);
}
set_status("changing sync state: build full sync maps");
status.state = rgw_meta_sync_info::StateBuildingFullSyncMaps;
rgw::sal::RadosStore* store = sync_env->store;
- call(new RGWSimpleRadosWriteCR<rgw_meta_sync_info>(dpp, sync_env->async_rados, store->svc()->sysobj,
+ call(new RGWSimpleRadosWriteCR<rgw_meta_sync_info>(dpp, store,
rgw_raw_obj(store->svc()->zone->get_zone_params().log_pool, sync_env->status_oid()),
status));
}
int shard_id = (int)iter->first;
rgw_meta_sync_marker& marker = iter->second;
marker.total_entries = entries_index->get_total_entries(shard_id);
- spawn(new RGWSimpleRadosWriteCR<rgw_meta_sync_marker>(dpp, sync_env->async_rados, sync_env->store->svc()->sysobj,
+ spawn(new RGWSimpleRadosWriteCR<rgw_meta_sync_marker>(dpp, sync_env->store,
rgw_raw_obj(sync_env->store->svc()->zone->get_zone_params().log_pool, sync_env->shard_obj_name(shard_id)),
marker), true);
}
ldpp_dout(sync_env->dpp, 20) << __func__ << "(): updating marker marker_oid=" << marker_oid << " marker=" << new_marker << " realm_epoch=" << sync_marker.realm_epoch << dendl;
tn->log(20, SSTR("new marker=" << new_marker));
rgw::sal::RadosStore* store = sync_env->store;
- return new RGWSimpleRadosWriteCR<rgw_meta_sync_marker>(sync_env->dpp, sync_env->async_rados,
- store->svc()->sysobj,
+ return new RGWSimpleRadosWriteCR<rgw_meta_sync_marker>(sync_env->dpp, store,
rgw_raw_obj(store->svc()->zone->get_zone_params().log_pool, marker_oid),
sync_marker);
}
ldpp_dout(sync_env->dpp, 4) << *this << ": saving marker pos=" << temp_marker->marker << " realm_epoch=" << realm_epoch << dendl;
using WriteMarkerCR = RGWSimpleRadosWriteCR<rgw_meta_sync_marker>;
- yield call(new WriteMarkerCR(sync_env->dpp, sync_env->async_rados, sync_env->store->svc()->sysobj,
+ yield call(new WriteMarkerCR(sync_env->dpp, sync_env->store,
rgw_raw_obj(pool, sync_env->shard_obj_name(shard_id)),
*temp_marker));
}
// write the updated sync info
sync_status.sync_info.period = cursor.get_period().get_id();
sync_status.sync_info.realm_epoch = cursor.get_epoch();
- yield call(new RGWSimpleRadosWriteCR<rgw_meta_sync_info>(dpp, sync_env->async_rados,
- sync_env->store->svc()->sysobj,
+ yield call(new RGWSimpleRadosWriteCR<rgw_meta_sync_info>(dpp, sync_env->store,
rgw_raw_obj(pool, sync_env->status_oid()),
sync_status.sync_info));
}
int RGWRemoteMetaLog::store_sync_info(const DoutPrefixProvider *dpp, const rgw_meta_sync_info& sync_info)
{
tn->log(20, "store sync info");
- return run(dpp, new RGWSimpleRadosWriteCR<rgw_meta_sync_info>(dpp, async_rados, store->svc()->sysobj,
+ return run(dpp, new RGWSimpleRadosWriteCR<rgw_meta_sync_info>(dpp, store,
rgw_raw_obj(store->svc()->zone->get_zone_params().log_pool, sync_env.status_oid()),
sync_info));
}
return set_cr_error(ret_err);
}
- yield call(new RGWSimpleRadosWriteCR<rgw_sync_aws_multipart_upload_info>(dpp, sync_env->async_rados, sync_env->svc->sysobj, status_obj, status));
+ yield call(new RGWSimpleRadosWriteCR<rgw_sync_aws_multipart_upload_info>(dpp, sync_env->driver, status_obj, status));
if (retcode < 0) {
ldpp_dout(dpp, 0) << "ERROR: failed to store multipart upload state, retcode=" << retcode << dendl;
/* continue with upload anyway */
status.marker = std::move(last_cold_marker);
ldpp_dout(dpp, 20) << "writing bucket trim marker=" << status.marker << dendl;
using WriteStatus = RGWSimpleRadosWriteCR<BucketTrimStatus>;
- yield call(new WriteStatus(dpp, store->svc()->rados->get_async_processor(), store->svc()->sysobj, obj,
- status, &objv));
+ yield call(new WriteStatus(dpp, store, obj, status, &objv));
if (retcode < 0) {
ldpp_dout(dpp, 4) << "failed to write updated trim status: "
<< cpp_strerror(retcode) << dendl;
return 0;
}
};
+
+template <class T>
+class SysObjWriteCR : public RGWSimpleCoroutine {
+ const DoutPrefixProvider *dpp;
+ RGWAsyncRadosProcessor *async_rados;
+ RGWSI_SysObj *svc;
+ bufferlist bl;
+ rgw_raw_obj obj;
+ RGWObjVersionTracker *objv_tracker;
+ bool exclusive;
+ RGWAsyncPutSystemObj *req{nullptr};
+
+public:
+ SysObjWriteCR(const DoutPrefixProvider *_dpp,
+ RGWAsyncRadosProcessor *_async_rados, RGWSI_SysObj *_svc,
+ const rgw_raw_obj& _obj, const T& _data,
+ RGWObjVersionTracker *objv_tracker = nullptr,
+ bool exclusive = false)
+ : RGWSimpleCoroutine(_svc->ctx()), dpp(_dpp), async_rados(_async_rados),
+ svc(_svc), obj(_obj), objv_tracker(objv_tracker), exclusive(exclusive) {
+ encode(_data, bl);
+ }
+
+ ~SysObjWriteCR() override {
+ request_cleanup();
+ }
+
+ void request_cleanup() override {
+ if (req) {
+ req->finish();
+ req = NULL;
+ }
+ }
+
+ int send_request(const DoutPrefixProvider *dpp) override {
+ req = new RGWAsyncPutSystemObj(dpp, this, stack->create_completion_notifier(),
+ svc, objv_tracker, obj, exclusive, std::move(bl));
+ async_rados->queue(req);
+ return 0;
+ }
+
+ int request_complete() override {
+ if (objv_tracker) { // copy the updated version
+ *objv_tracker = req->objv_tracker;
+ }
+ return req->get_ret_status();
+ }
+};
}
/// read the mdlog history and use it to initialize the given cursor
rgw_raw_obj obj{svc.zone->get_zone_params().log_pool,
RGWMetadataLogHistory::oid};
- using WriteCR = RGWSimpleRadosWriteCR<RGWMetadataLogHistory>;
+ using WriteCR = SysObjWriteCR<RGWMetadataLogHistory>;
call(new WriteCR(dpp, async_processor, svc.sysobj, obj, state, objv));
}
if (retcode < 0) {
ASSERT_EQ(wobjv.read_version, robjv.read_version);
}
+TEST(Write, Exclusive) {
+ TempPool pool;
+ auto oid = "object"s;
+ {
+ bufferlist bl;
+ bl.append("I'm some data!"s);
+ librados::IoCtx ioctx(pool);
+ auto r = ioctx.write_full(oid, bl);
+ ASSERT_EQ(0, r);
+ }
+ auto r = run(new RGWSimpleRadosWriteCR(dpp(), store, {pool, oid},
+ "I am some DIFFERENT data!"s, nullptr,
+ true));
+ ASSERT_EQ(-EEXIST, r);
+}
+
+TEST(Write, Write) {
+ TempPool pool;
+ auto oid = "object"s;
+ auto data = "I'm some data!"s;
+ auto r = run(new RGWSimpleRadosWriteCR(dpp(), store, {pool, oid},
+ data, nullptr, true));
+ ASSERT_EQ(0, r);
+ bufferlist bl;
+ librados::IoCtx ioctx(pool);
+ ioctx.read(oid, bl, 0, 0);
+ ASSERT_EQ(0, r);
+ std::string result;
+ decode(result, bl);
+ ASSERT_EQ(data, result);
+}
+
+TEST(Write, ObjV) {
+ TempPool pool;
+ auto oid = "object"s;
+ RGWObjVersionTracker objv;
+ objv.generate_new_write_ver(store->ctx());
+ auto r = run(new RGWSimpleRadosWriteCR(dpp(), store, {pool, oid},
+ "I'm some data!"s, &objv,
+ true));
+ RGWObjVersionTracker interfering_objv(objv);
+ r = run(new RGWSimpleRadosWriteCR(dpp(), store, {pool, oid},
+ "I'm some newer, better data!"s,
+ &interfering_objv, false));
+ ASSERT_EQ(0, r);
+ r = run(new RGWSimpleRadosWriteCR(dpp(), store, {pool, oid},
+ "I'm some treacherous, obsolete data!"s,
+ &objv, false));
+ ASSERT_EQ(-ECANCELED, r);
+}
+
int main(int argc, const char **argv)
{
auto args = argv_to_vec(argc, argv);