]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
rgw: `SimpleRadosWriteCR` uses an async RADOS call
authorAdam C. Emerson <aemerson@redhat.com>
Wed, 24 Aug 2022 17:36:55 +0000 (13:36 -0400)
committerAdam C. Emerson <aemerson@redhat.com>
Thu, 12 Jan 2023 23:03:43 +0000 (18:03 -0500)
Don't go through the 'system object' cache. This also saves us the use
of the RADOS async completion processor.

Signed-off-by: Adam C. Emerson <aemerson@redhat.com>
src/rgw/driver/rados/rgw_cr_rados.h
src/rgw/driver/rados/rgw_data_sync.cc
src/rgw/driver/rados/rgw_sync.cc
src/rgw/driver/rados/rgw_sync_module_aws.cc
src/rgw/driver/rados/rgw_trim_bilog.cc
src/rgw/services/svc_mdlog.cc
src/test/rgw/rgw_cr_test.cc

index f3220602ed49546ab48f8181eb4ad4f62a1a669d..f4362e6b2f28d916adb5746f8a06a0bb81d53ffa 100644 (file)
@@ -515,49 +515,59 @@ public:
 
 template <class T>
 class RGWSimpleRadosWriteCR : public RGWSimpleCoroutine {
-  const DoutPrefixProvider *dpp;
-  RGWAsyncRadosProcessor *async_rados;
-  RGWSI_SysObj *svc;
-  bufferlist bl;
+  const DoutPrefixProvider* dpp;
+  rgw::sal::RadosStore* const store;
   rgw_raw_obj obj;
-  RGWObjVersionTracker *objv_tracker;
+  RGWObjVersionTrackerobjv_tracker;
   bool exclusive;
-  RGWAsyncPutSystemObj *req{nullptr};
+
+  bufferlist bl;
+  rgw_rados_ref ref;
+  std::map<std::string, bufferlist> unfiltered_attrs;
+  boost::intrusive_ptr<RGWAioCompletionNotifier> cn;
+
 
 public:
-  RGWSimpleRadosWriteCR(const DoutPrefixProvider *_dpp, 
-                       RGWAsyncRadosProcessor *_async_rados, RGWSI_SysObj *_svc,
-                       const rgw_raw_obj& _obj, const T& _data,
-                       RGWObjVersionTracker *objv_tracker = nullptr,
+  RGWSimpleRadosWriteCR(const DoutPrefixProvider* dpp,
+                       rgw::sal::RadosStore* const store,
+                       rgw_raw_obj obj, const T& data,
+                       RGWObjVersionTrackerobjv_tracker = nullptr,
                        bool exclusive = false)
-    : RGWSimpleCoroutine(_svc->ctx()), dpp(_dpp), async_rados(_async_rados),
-      svc(_svc), obj(_obj), objv_tracker(objv_tracker), exclusive(exclusive) {
-    encode(_data, bl);
+    : RGWSimpleCoroutine(store->ctx()), dpp(dpp), store(store),
+      obj(std::move(obj)), objv_tracker(objv_tracker), exclusive(exclusive) {
+    encode(data, bl);
   }
 
-  ~RGWSimpleRadosWriteCR() override {
-    request_cleanup();
-  }
+  int send_request(const DoutPrefixProvider *dpp) override {
+    int r = store->getRados()->get_raw_obj_ref(dpp, obj, &ref);
+    if (r < 0) {
+      ldpp_dout(dpp, -1) << "ERROR: failed to get ref for (" << obj << ") ret="
+                        << r << dendl;
+      return r;
+    }
 
-  void request_cleanup() override {
-    if (req) {
-      req->finish();
-      req = NULL;
+    set_status() << "sending request";
+
+    librados::ObjectWriteOperation op;
+    if (exclusive) {
+      op.create(true);
     }
-  }
+    if (objv_tracker) {
+      objv_tracker->prepare_op_for_write(&op);
+    }
+    op.write_full(bl);
 
-  int send_request(const DoutPrefixProvider *dpp) override {
-    req = new RGWAsyncPutSystemObj(dpp, this, stack->create_completion_notifier(),
-                                  svc, objv_tracker, obj, exclusive, std::move(bl));
-    async_rados->queue(req);
-    return 0;
+    cn = stack->create_completion_notifier();
+    return ref.pool.ioctx().aio_operate(ref.obj.oid, cn->completion(), &op);
   }
 
   int request_complete() override {
-    if (objv_tracker) { // copy the updated version
-      *objv_tracker = req->objv_tracker;
+    int ret = cn->completion()->get_return_value();
+    set_status() << "request complete; ret=" << ret;
+    if (ret >= 0 && objv_tracker) {
+      objv_tracker->apply_write();
     }
-    return req->get_ret_status();
+    return ret;
   }
 };
 
index 385801c5789f57e33b06307721b6e2d2a6034fa5..c030d6ddc4d3bf14c0631a1648e96ec23c678232 100644 (file)
@@ -573,7 +573,7 @@ public:
       }
 
       using WriteInfoCR = RGWSimpleRadosWriteCR<rgw_data_sync_info>;
-      yield call(new WriteInfoCR(dpp, sync_env->async_rados, sync_env->svc->sysobj,
+      yield call(new WriteInfoCR(dpp, sync_env->driver,
                                  rgw_raw_obj{pool, sync_status_oid},
                                  status->sync_info, &objv_tracker));
       if (retcode < 0) {
@@ -616,7 +616,7 @@ public:
           auto& objv = objvs[i];
           objv.generate_new_write_ver(cct);
           using WriteMarkerCR = RGWSimpleRadosWriteCR<rgw_data_sync_marker>;
-          spawn(new WriteMarkerCR(dpp, sync_env->async_rados, sync_env->svc->sysobj,
+          spawn(new WriteMarkerCR(dpp, sync_env->driver,
                                   rgw_raw_obj{pool, oid}, marker, &objv), true);
         }
       }
@@ -629,7 +629,7 @@ public:
       }
 
       status->sync_info.state = rgw_data_sync_info::StateBuildingFullSyncMaps;
-      yield call(new WriteInfoCR(dpp, sync_env->async_rados, sync_env->svc->sysobj,
+      yield call(new WriteInfoCR(dpp, sync_env->driver,
                                  rgw_raw_obj{pool, sync_status_oid},
                                  status->sync_info, &objv_tracker));
       if (retcode < 0) {
@@ -1042,7 +1042,7 @@ public:
           rgw_data_sync_marker& marker = iter->second;
           marker.total_entries = entries_index->get_total_entries(shard_id);
           spawn(new RGWSimpleRadosWriteCR<rgw_data_sync_marker>(
-                 dpp, sync_env->async_rados, sync_env->svc->sysobj,
+                 dpp, sync_env->driver,
                  rgw_raw_obj(sync_env->svc->zone->get_zone_params().log_pool,
                              RGWDataSyncStatusManager::shard_obj_name(
                                sc->source_zone, shard_id)),
@@ -1101,7 +1101,7 @@ public:
 
     tn->log(20, SSTR("updating marker marker_oid=" << marker_oid << " marker=" << new_marker));
 
-    return new RGWSimpleRadosWriteCR<rgw_data_sync_marker>(sync_env->dpp, sync_env->async_rados, sync_env->svc->sysobj,
+    return new RGWSimpleRadosWriteCR<rgw_data_sync_marker>(sync_env->dpp, sync_env->driver,
                                                            rgw_raw_obj(sync_env->svc->zone->get_zone_params().log_pool, marker_oid),
                                                            sync_marker, &objv);
   }
@@ -1824,7 +1824,7 @@ public:
       sync_marker.marker = sync_marker.next_step_marker;
       sync_marker.next_step_marker.clear();
       yield call(new RGWSimpleRadosWriteCR<rgw_data_sync_marker>(
-             sc->env->dpp,sc->env->async_rados, sc->env->svc->sysobj,
+             sc->env->dpp, sc->env->driver,
              rgw_raw_obj(pool, status_oid), sync_marker, &objv));
       if (retcode < 0) {
         tn->log(0, SSTR("ERROR: failed to set sync marker: retcode=" << retcode));
@@ -2405,7 +2405,7 @@ public:
   }
 
   RGWCoroutine *set_sync_info_cr() {
-    return new RGWSimpleRadosWriteCR<rgw_data_sync_info>(sync_env->dpp, sync_env->async_rados, sync_env->svc->sysobj,
+    return new RGWSimpleRadosWriteCR<rgw_data_sync_info>(sync_env->dpp, sync_env->driver,
                                                          rgw_raw_obj(sync_env->svc->zone->get_zone_params().log_pool, RGWDataSyncStatusManager::sync_status_oid(sc->source_zone)),
                                                          sync_status.sync_info, &obj_version);
   }
@@ -3628,7 +3628,7 @@ public:
 
       // write bucket sync status
       using CR = RGWSimpleRadosWriteCR<rgw_bucket_sync_status>;
-      yield call(new CR(dpp, sync_env->async_rados, sync_env->svc->sysobj,
+      yield call(new CR(dpp, sync_env->driver,
                        status_obj, status, &objv, false));
       if (retcode < 0) {
         ldout(cct, 20) << "failed to write bucket shard status: "
@@ -4034,7 +4034,7 @@ public:
 
     tn->log(20, SSTR("updating marker oid=" << status_obj.oid << " marker=" << new_marker));
     return new RGWSimpleRadosWriteCR<rgw_bucket_sync_status>(
-        sync_env->dpp, sync_env->async_rados, sync_env->svc->sysobj,
+        sync_env->dpp, sync_env->driver,
        status_obj, sync_status, &objv_tracker);
   }
 
@@ -4547,8 +4547,7 @@ int RGWBucketFullSyncCR::operate(const DoutPrefixProvider *dpp)
       sync_status.state = BucketSyncState::Incremental;
       tn->log(5, SSTR("set bucket state=" << sync_status.state));
       yield call(new RGWSimpleRadosWriteCR<rgw_bucket_sync_status>(
-             dpp, sync_env->async_rados, sync_env->svc->sysobj,
-              status_obj, sync_status, &objv));
+             dpp, sync_env->driver, status_obj, sync_status, &objv));
       tn->log(5, SSTR("bucket status objv=" << objv));
     } else {
       tn->log(10, SSTR("backing out with sync_status=" << sync_result));
@@ -4631,7 +4630,7 @@ public:
           }
           ldpp_dout(dpp, 20) << "bucket status incremental gen is " << bucket_status.incremental_gen << dendl;
           using WriteCR = RGWSimpleRadosWriteCR<rgw_bucket_sync_status>;
-          call(new WriteCR(dpp, sync_env->async_rados, sync_env->svc->sysobj,
+          call(new WriteCR(dpp, sync_env->driver,
                             bucket_status_obj, bucket_status, &objv_tracker, false));
         }
         if (retcode < 0 && retcode != -ECANCELED) {
@@ -5632,8 +5631,7 @@ int RGWSyncBucketCR::operate(const DoutPrefixProvider *dpp)
     if (retcode == -ENOENT) {
       // use exclusive create to set state=Init
       objv.generate_new_write_ver(cct);
-      yield call(new WriteCR(dpp, env->async_rados, env->svc->sysobj,
-                             status_obj, bucket_status, &objv, true));
+      yield call(new WriteCR(dpp, env->driver, status_obj, bucket_status, &objv, true));
       tn->log(20, "bucket status object does not exist, create a new one");
       if (retcode == -EEXIST) {
         // raced with another create, read its status
@@ -5699,8 +5697,8 @@ int RGWSyncBucketCR::operate(const DoutPrefixProvider *dpp)
           if (bucket_status.state != BucketSyncState::Stopped) {
             // make sure that state is changed to stopped localy
             bucket_status.state = BucketSyncState::Stopped;
-            yield call(new WriteCR(dpp, env->async_rados, env->svc->sysobj,
-                  status_obj, bucket_status, &objv, false));
+            yield call(new WriteCR(dpp, env->driver, status_obj, bucket_status,
+                                  &objv, false));
             if (retcode < 0) {
               tn->log(20, SSTR("ERROR: failed to write 'stopped' status. error: " << retcode));
               RELEASE_LOCK(bucket_lease_cr);
@@ -5943,7 +5941,7 @@ int RGWBucketPipeSyncStatusManager::init_sync_status(
     pretty_print(source.sc.env, "Initializing sync state of bucket {} with zone {}.\n",
                 source.info.bucket.name, source.zone_name);
     stack->call(new RGWSimpleRadosWriteCR<rgw_bucket_sync_status>(
-                 dpp, source.sc.env->async_rados, source.sc.env->svc->sysobj,
+                 dpp, source.sc.env->driver,
                  {sync_env.svc->zone->get_zone_params().log_pool,
                    full_status_oid(source.sc.source_zone,
                                   source.info.bucket,
index 7710a2d0ed4fadd2883e60fe830ac0e2372d13cd..4a56595fe386c208571cad5c4e121276ae6af265 100644 (file)
@@ -652,7 +652,7 @@ public:
       yield {
         set_status("writing sync status");
        rgw::sal::RadosStore* store = sync_env->store;
-        call(new RGWSimpleRadosWriteCR<rgw_meta_sync_info>(dpp, sync_env->async_rados, store->svc()->sysobj,
+        call(new RGWSimpleRadosWriteCR<rgw_meta_sync_info>(dpp, store,
                                                            rgw_raw_obj(store->svc()->zone->get_zone_params().log_pool, sync_env->status_oid()),
                                                            status));
       }
@@ -683,8 +683,7 @@ public:
          marker.timestamp = info.last_update;
          rgw::sal::RadosStore* store = sync_env->store;
           spawn(new RGWSimpleRadosWriteCR<rgw_meta_sync_marker>(dpp,
-                                                                sync_env->async_rados,
-                                                                store->svc()->sysobj,
+                                                                store,
                                                                 rgw_raw_obj(store->svc()->zone->get_zone_params().log_pool, sync_env->shard_obj_name(i)),
                                                                 marker), true);
         }
@@ -693,7 +692,7 @@ public:
         set_status("changing sync state: build full sync maps");
        status.state = rgw_meta_sync_info::StateBuildingFullSyncMaps;
        rgw::sal::RadosStore* store = sync_env->store;
-        call(new RGWSimpleRadosWriteCR<rgw_meta_sync_info>(dpp, sync_env->async_rados, store->svc()->sysobj,
+        call(new RGWSimpleRadosWriteCR<rgw_meta_sync_info>(dpp, store,
                                                            rgw_raw_obj(store->svc()->zone->get_zone_params().log_pool, sync_env->status_oid()),
                                                            status));
       }
@@ -971,7 +970,7 @@ public:
           int shard_id = (int)iter->first;
           rgw_meta_sync_marker& marker = iter->second;
           marker.total_entries = entries_index->get_total_entries(shard_id);
-          spawn(new RGWSimpleRadosWriteCR<rgw_meta_sync_marker>(dpp, sync_env->async_rados, sync_env->store->svc()->sysobj,
+          spawn(new RGWSimpleRadosWriteCR<rgw_meta_sync_marker>(dpp, sync_env->store,
                                                                 rgw_raw_obj(sync_env->store->svc()->zone->get_zone_params().log_pool, sync_env->shard_obj_name(shard_id)),
                                                                 marker), true);
         }
@@ -1232,8 +1231,7 @@ public:
     ldpp_dout(sync_env->dpp, 20) << __func__ << "(): updating marker marker_oid=" << marker_oid << " marker=" << new_marker << " realm_epoch=" << sync_marker.realm_epoch << dendl;
     tn->log(20, SSTR("new marker=" << new_marker));
     rgw::sal::RadosStore* store = sync_env->store;
-    return new RGWSimpleRadosWriteCR<rgw_meta_sync_marker>(sync_env->dpp, sync_env->async_rados,
-                                                           store->svc()->sysobj,
+    return new RGWSimpleRadosWriteCR<rgw_meta_sync_marker>(sync_env->dpp, store,
                                                            rgw_raw_obj(store->svc()->zone->get_zone_params().log_pool, marker_oid),
                                                            sync_marker);
   }
@@ -1649,7 +1647,7 @@ public:
          ldpp_dout(sync_env->dpp, 4) << *this << ": saving marker pos=" << temp_marker->marker << " realm_epoch=" << realm_epoch << dendl;
 
          using WriteMarkerCR = RGWSimpleRadosWriteCR<rgw_meta_sync_marker>;
-         yield call(new WriteMarkerCR(sync_env->dpp, sync_env->async_rados, sync_env->store->svc()->sysobj,
+         yield call(new WriteMarkerCR(sync_env->dpp, sync_env->store,
                                       rgw_raw_obj(pool, sync_env->shard_obj_name(shard_id)),
                                       *temp_marker));
         }
@@ -2016,8 +2014,7 @@ public:
         // write the updated sync info
         sync_status.sync_info.period = cursor.get_period().get_id();
         sync_status.sync_info.realm_epoch = cursor.get_epoch();
-        yield call(new RGWSimpleRadosWriteCR<rgw_meta_sync_info>(dpp, sync_env->async_rados,
-                                                                 sync_env->store->svc()->sysobj,
+        yield call(new RGWSimpleRadosWriteCR<rgw_meta_sync_info>(dpp, sync_env->store,
                                                                  rgw_raw_obj(pool, sync_env->status_oid()),
                                                                  sync_status.sync_info));
       }
@@ -2094,7 +2091,7 @@ int RGWRemoteMetaLog::init_sync_status(const DoutPrefixProvider *dpp)
 int RGWRemoteMetaLog::store_sync_info(const DoutPrefixProvider *dpp, const rgw_meta_sync_info& sync_info)
 {
   tn->log(20, "store sync info");
-  return run(dpp, new RGWSimpleRadosWriteCR<rgw_meta_sync_info>(dpp, async_rados, store->svc()->sysobj,
+  return run(dpp, new RGWSimpleRadosWriteCR<rgw_meta_sync_info>(dpp, store,
                                                            rgw_raw_obj(store->svc()->zone->get_zone_params().log_pool, sync_env.status_oid()),
                                                            sync_info));
 }
index 9f79a79742a08d6adac9074be07bd01bb100df7b..484df96045b6418acee024f79b879c5a226af97e 100644 (file)
@@ -1515,7 +1515,7 @@ public:
           return set_cr_error(ret_err);
         }
 
-        yield call(new RGWSimpleRadosWriteCR<rgw_sync_aws_multipart_upload_info>(dpp, sync_env->async_rados, sync_env->svc->sysobj, status_obj, status));
+        yield call(new RGWSimpleRadosWriteCR<rgw_sync_aws_multipart_upload_info>(dpp, sync_env->driver, status_obj, status));
         if (retcode < 0) {
           ldpp_dout(dpp, 0) << "ERROR: failed to store multipart upload state, retcode=" << retcode << dendl;
           /* continue with upload anyway */
index 71811d4b45d27eee11e3b956b690662cec92466d..200d674230b54713e01988e65eefc5f0229207c5 100644 (file)
@@ -1154,8 +1154,7 @@ int BucketTrimCR::operate(const DoutPrefixProvider *dpp)
       status.marker = std::move(last_cold_marker);
       ldpp_dout(dpp, 20) << "writing bucket trim marker=" << status.marker << dendl;
       using WriteStatus = RGWSimpleRadosWriteCR<BucketTrimStatus>;
-      yield call(new WriteStatus(dpp, store->svc()->rados->get_async_processor(), store->svc()->sysobj, obj,
-                                 status, &objv));
+      yield call(new WriteStatus(dpp, store, obj, status, &objv));
       if (retcode < 0) {
         ldpp_dout(dpp, 4) << "failed to write updated trim status: "
             << cpp_strerror(retcode) << dendl;
index 93eb556e03436dbbd3253a8ab7c9038fb42705fb..3ff306e97635455eefab901a635c3574b7b910bf 100644 (file)
@@ -185,6 +185,54 @@ public:
     return 0;
   }
 };
+
+template <class T>
+class SysObjWriteCR : public RGWSimpleCoroutine {
+  const DoutPrefixProvider *dpp;
+  RGWAsyncRadosProcessor *async_rados;
+  RGWSI_SysObj *svc;
+  bufferlist bl;
+  rgw_raw_obj obj;
+  RGWObjVersionTracker *objv_tracker;
+  bool exclusive;
+  RGWAsyncPutSystemObj *req{nullptr};
+
+public:
+  SysObjWriteCR(const DoutPrefixProvider *_dpp, 
+               RGWAsyncRadosProcessor *_async_rados, RGWSI_SysObj *_svc,
+               const rgw_raw_obj& _obj, const T& _data,
+               RGWObjVersionTracker *objv_tracker = nullptr,
+               bool exclusive = false)
+    : RGWSimpleCoroutine(_svc->ctx()), dpp(_dpp), async_rados(_async_rados),
+      svc(_svc), obj(_obj), objv_tracker(objv_tracker), exclusive(exclusive) {
+    encode(_data, bl);
+  }
+
+  ~SysObjWriteCR() override {
+    request_cleanup();
+  }
+
+  void request_cleanup() override {
+    if (req) {
+      req->finish();
+      req = NULL;
+    }
+  }
+
+  int send_request(const DoutPrefixProvider *dpp) override {
+    req = new RGWAsyncPutSystemObj(dpp, this, stack->create_completion_notifier(),
+                                  svc, objv_tracker, obj, exclusive, std::move(bl));
+    async_rados->queue(req);
+    return 0;
+  }
+
+  int request_complete() override {
+    if (objv_tracker) { // copy the updated version
+      *objv_tracker = req->objv_tracker;
+    }
+    return req->get_ret_status();
+  }
+};
 }
 
 /// read the mdlog history and use it to initialize the given cursor
@@ -265,7 +313,7 @@ class WriteHistoryCR : public RGWCoroutine {
         rgw_raw_obj obj{svc.zone->get_zone_params().log_pool,
                         RGWMetadataLogHistory::oid};
 
-        using WriteCR = RGWSimpleRadosWriteCR<RGWMetadataLogHistory>;
+        using WriteCR = SysObjWriteCR<RGWMetadataLogHistory>;
         call(new WriteCR(dpp, async_processor, svc.sysobj, obj, state, objv));
       }
       if (retcode < 0) {
index 34afa93f191f85cfe608a2421ba8ab5841069c85..957c625574f60013967e0fa03301edfa796bdc30 100644 (file)
@@ -216,6 +216,57 @@ TEST(Read, ReadVersion) {
   ASSERT_EQ(wobjv.read_version, robjv.read_version);
 }
 
+TEST(Write, Exclusive) {
+  TempPool pool;
+  auto oid = "object"s;
+  {
+    bufferlist bl;
+    bl.append("I'm some data!"s);
+    librados::IoCtx ioctx(pool);
+    auto r = ioctx.write_full(oid, bl);
+    ASSERT_EQ(0, r);
+  }
+  auto r = run(new RGWSimpleRadosWriteCR(dpp(), store, {pool, oid},
+                                        "I am some DIFFERENT data!"s, nullptr,
+                                        true));
+  ASSERT_EQ(-EEXIST, r);
+}
+
+TEST(Write, Write) {
+  TempPool pool;
+  auto oid = "object"s;
+  auto data = "I'm some data!"s;
+  auto r = run(new RGWSimpleRadosWriteCR(dpp(), store, {pool, oid},
+                                        data, nullptr, true));
+  ASSERT_EQ(0, r);
+  bufferlist bl;
+  librados::IoCtx ioctx(pool);
+  ioctx.read(oid, bl, 0, 0);
+  ASSERT_EQ(0, r);
+  std::string result;
+  decode(result, bl);
+  ASSERT_EQ(data, result);
+}
+
+TEST(Write, ObjV) {
+  TempPool pool;
+  auto oid = "object"s;
+  RGWObjVersionTracker objv;
+  objv.generate_new_write_ver(store->ctx());
+  auto r = run(new RGWSimpleRadosWriteCR(dpp(), store, {pool, oid},
+                                        "I'm some data!"s, &objv,
+                                        true));
+  RGWObjVersionTracker interfering_objv(objv);
+  r = run(new RGWSimpleRadosWriteCR(dpp(), store, {pool, oid},
+                                   "I'm some newer, better data!"s,
+                                   &interfering_objv, false));
+  ASSERT_EQ(0, r);
+  r = run(new RGWSimpleRadosWriteCR(dpp(), store, {pool, oid},
+                                   "I'm some treacherous, obsolete data!"s,
+                                   &objv, false));
+  ASSERT_EQ(-ECANCELED, r);
+}
+
 int main(int argc, const char **argv)
 {
   auto args = argv_to_vec(argc, argv);