]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw/d4n: removing lock from code path that writes to the cache
authorPritha Srivastava <prsrivas@redhat.com>
Mon, 19 Feb 2024 07:17:35 +0000 (12:47 +0530)
committerPritha Srivastava <prsrivas@redhat.com>
Tue, 2 Apr 2024 15:54:52 +0000 (21:24 +0530)
backend, and ensuring that write to the cache is atomic.

Signed-off-by: Pritha Srivastava <prsrivas@redhat.com>
src/rgw/driver/d4n/rgw_sal_d4n.cc
src/rgw/driver/d4n/rgw_sal_d4n.h
src/rgw/rgw_ssd_driver.cc
src/rgw/rgw_ssd_driver.h

index 4c33b9011f224ef05a5e1ea78864bf6e20729aed..691d589bf2ae5d2b763fbbb16df856c82d935a44 100644 (file)
@@ -701,7 +701,6 @@ int D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::handle_data(bufferlist& bl
 
   //Accumulating data from backend store into rgw_get_obj_max_req_size sized chunks and then writing to cache
   if (write_to_cache) {
-    const std::lock_guard l(d4n_get_data_lock);
     rgw::d4n::CacheBlock block, existing_block;
     rgw::d4n::BlockDirectory* blockDir = source->driver->get_block_dir();
     block.hostsList.push_back(blockDir->cct->_conf->rgw_local_cache_address); 
@@ -726,13 +725,12 @@ int D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::handle_data(bufferlist& bl
 
     if (bl.length() > 0 && last_part) { // if bl = bl_rem has data and this is the last part, write it to cache
       std::string oid = prefix + "_" + std::to_string(ofs) + "_" + std::to_string(bl_len);
-      if (!filter->get_policy_driver()->get_cache_policy()->exist_key(oid)) { //In case of concurrent reads for the same object, the block is already cached
+      if (!filter->get_policy_driver()->get_cache_policy()->exist_key(oid)) {
         block.blockID = ofs;
         block.size = bl.length();
         block.version = version;
         auto ret = filter->get_policy_driver()->get_cache_policy()->eviction(dpp, block.size, *y);
         if (ret == 0) {
-          //Should we replace each put_async with put, to ensure data is actually written to the cache before updating the data structures and before the lock is released?
           ret = filter->get_cache_driver()->put(dpp, oid, bl, bl.length(), attrs, *y);
           if (ret == 0) {
             filter->get_policy_driver()->get_cache_policy()->update(dpp, oid, ofs, bl.length(), version, *y);
@@ -759,8 +757,7 @@ int D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::handle_data(bufferlist& bl
               }
             }
           } else {
-            if (blockDir->update_field(&block, "blockHosts", blockDir->cct->_conf->rgw_local_cache_address, *y) < 0)
-             ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): BlockDirectory update_field method failed." << dendl;
+                 ldpp_dout(dpp, 0) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): put() to cache backend failed with error: " << ret << dendl;
           }
         }
       }
@@ -770,7 +767,7 @@ int D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::handle_data(bufferlist& bl
       block.blockID = ofs;
       block.size = bl.length();
       block.version = version;
-      if (!filter->get_policy_driver()->get_cache_policy()->exist_key(oid)) { //In case of concurrent reads for the same object, the block is already cached
+      if (!filter->get_policy_driver()->get_cache_policy()->exist_key(oid)) {
         auto ret = filter->get_policy_driver()->get_cache_policy()->eviction(dpp, block.size, *y);
         if (ret == 0) {
           ret = filter->get_cache_driver()->put(dpp, oid, bl, bl.length(), attrs, *y);
@@ -798,8 +795,7 @@ int D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::handle_data(bufferlist& bl
               }
             }
           } else {
-            if (blockDir->update_field(&block, "blockHosts", blockDir->cct->_conf->rgw_local_cache_address, *y) < 0)
-             ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): BlockDirectory update_field method failed." << dendl;
+            ldpp_dout(dpp, 0) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): put() to cache backend failed with error: " << ret << dendl;
           }
         }
       }
@@ -813,7 +809,7 @@ int D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::handle_data(bufferlist& bl
 
       if (bl_rem.length() == rgw_get_obj_max_req_size) {
         std::string oid = prefix + "_" + std::to_string(ofs) + "_" + std::to_string(bl_rem.length());
-          if (!filter->get_policy_driver()->get_cache_policy()->exist_key(oid)) { //In case of concurrent reads for the same object, the block is already cached
+          if (!filter->get_policy_driver()->get_cache_policy()->exist_key(oid)) {
           ofs += bl_rem.length();
           block.blockID = ofs;
           block.size = bl_rem.length();
@@ -846,7 +842,7 @@ int D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::handle_data(bufferlist& bl
                }
              }
             } else {
-              ldpp_dout(dpp, 20) << "D4N Filter: " << __func__ << " An error occured while caching oid: " << oid << " error: " << ret << dendl;
+              ldpp_dout(dpp, 0) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): put() to cache backend failed with error: " << ret << dendl;
             }
           } else {
             ldpp_dout(dpp, 20) << "D4N Filter: " << __func__ << " An error occured during eviction: " << " error: " << ret << dendl;
index 8c6cdca147057fa4ff4fe93e8cfecd799b033e63..0893371a190bcd414716bf53c9fb13de35bf8687 100644 (file)
@@ -114,7 +114,6 @@ class D4NFilterObject : public FilterObject {
            uint64_t ofs = 0, len = 0;
            bufferlist bl_rem;
            bool last_part{false};
-           std::mutex d4n_get_data_lock;
            bool write_to_cache{true};
            const DoutPrefixProvider* dpp;
            optional_yield* y;
index 4e703a5d4b93cfd2b489daa6b85b30f067022223..86c1973d5e1db379e97d82a5f7aa4e5de75821fb 100644 (file)
@@ -202,16 +202,18 @@ void SSDDriver::put_async(const DoutPrefixProvider *dpp, ExecutionContext& ctx,
 
     int r = 0;
     bufferlist src = bl;
-    r = op.prepare_libaio_write_op(dpp, src, len, key, partition_info.location);
+    std::string temp_key = key + "_" + std::to_string(index++);
+    ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): temp key=" << temp_key << dendl;
+    r = op.prepare_libaio_write_op(dpp, src, len, temp_key, partition_info.location);
     op.cb->aio_sigevent.sigev_notify = SIGEV_THREAD;
     op.cb->aio_sigevent.sigev_notify_function = SSDDriver::AsyncWriteRequest::libaio_write_cb;
     op.cb->aio_sigevent.sigev_notify_attributes = nullptr;
     op.cb->aio_sigevent.sigev_value.sival_ptr = (void*)p.get();
     op.key = key;
+    op.temp_key = temp_key;
     op.dpp = dpp;
     op.priv_data = this;
     op.attrs = std::move(attrs);
-
     if (r >= 0) {
         r = ::aio_write(op.cb.get());
     } else {
@@ -322,13 +324,23 @@ void SSDDriver::AsyncWriteRequest::libaio_write_cb(sigval sigval) {
     auto p = std::unique_ptr<Completion>{static_cast<Completion*>(sigval.sival_ptr)};
     auto op = std::move(p->user_data);
     ldpp_dout(op.dpp, 20) << "INFO: AsyncWriteRequest::libaio_write_cb: key: " << op.key << dendl;
+    int ret = -aio_error(op.cb.get());
+    boost::system::error_code ec;
+    if (ret < 0) {
+        ec.assign(-ret, boost::system::system_category());
+        ceph::async::dispatch(std::move(p), ec);
+        return;
+    }
     int attr_ret = 0;
     if (op.attrs.size() > 0) {
         //TODO - fix yield_context
         optional_yield y{null_yield};
-        attr_ret = op.priv_data->set_attrs(op.dpp, op.key, op.attrs, y);
+        attr_ret = op.priv_data->set_attrs(op.dpp, op.temp_key, op.attrs, y);
         if (attr_ret < 0) {
-            ldpp_dout(op.dpp, 0) << "ERROR: put::set_attrs: failed to set attrs, ret = " << attr_ret << dendl;
+            ldpp_dout(op.dpp, 0) << "ERROR: AsyncWriteRequest::libaio_write_yield_cb::set_attrs: failed to set attrs, ret = " << attr_ret << dendl;
+            ec.assign(-ret, boost::system::system_category());
+            ceph::async::dispatch(std::move(p), ec);
+            return;
         }
     }
 
@@ -336,12 +348,16 @@ void SSDDriver::AsyncWriteRequest::libaio_write_cb(sigval sigval) {
     efs::space_info space = efs::space(partition_info.location);
     op.priv_data->set_free_space(op.dpp, space.available);
 
-    const int ret = -aio_error(op.cb.get());
-    boost::system::error_code ec;
+    std::string new_path = partition_info.location + op.key;
+    std::string old_path = partition_info.location + op.temp_key;
+
+    ldpp_dout(op.dpp, 20) << "INFO: AsyncWriteRequest::libaio_write_yield_cb: temp_key: " << op.temp_key << dendl;
+
+    ret = rename(old_path.c_str(), new_path.c_str());
     if (ret < 0) {
+        ret = errno;
+        ldpp_dout(op.dpp, 0) << "ERROR: put::rename: failed to rename file: " << ret << dendl;
         ec.assign(-ret, boost::system::system_category());
-    } else if (attr_ret < 0) {
-        ec.assign(-attr_ret, boost::system::system_category());
     }
     ceph::async::dispatch(std::move(p), ec);
 }
index de0110eb584d33cde9a2c156cf5ad36e77d08d47..d142467b8a79e188dcb9d8e884de24e8474bdef6 100644 (file)
@@ -36,6 +36,7 @@ private:
   Partition partition_info;
   uint64_t free_space;
   CephContext* cct;
+  inline static std::atomic<uint64_t> index{0};
 
   struct libaio_read_handler {
     rgw::Aio* throttle = nullptr;
@@ -101,6 +102,7 @@ private:
   struct AsyncWriteRequest {
     const DoutPrefixProvider* dpp;
          std::string key;
+    std::string temp_key;
          void *data;
          int fd;
          unique_aio_cb_ptr cb;