]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw/d4n: modifications to the filter driver:
authorPritha Srivastava <prsrivas@redhat.com>
Tue, 30 Jan 2024 10:04:42 +0000 (15:34 +0530)
committerPritha Srivastava <prsrivas@redhat.com>
Tue, 2 Apr 2024 15:54:52 +0000 (21:24 +0530)
1. replaced put_async() with put() in handle_data()
2. moved calls to update() from iterate() to flush()

Signed-off-by: Pritha Srivastava <prsrivas@redhat.com>
src/rgw/driver/d4n/rgw_sal_d4n.cc
src/rgw/driver/d4n/rgw_sal_d4n.h

index 02123a26598a018062621407d64bcce7440424b1..4c33b9011f224ef05a5e1ea78864bf6e20729aed 100644 (file)
@@ -460,20 +460,20 @@ void D4NFilterObject::D4NFilterReadOp::cancel() {
   aio->drain();
 }
 
-int D4NFilterObject::D4NFilterReadOp::drain(const DoutPrefixProvider* dpp) {
+int D4NFilterObject::D4NFilterReadOp::drain(const DoutPrefixProvider* dpp, optional_yield y) {
   auto c = aio->wait();
   while (!c.empty()) {
-    int r = flush(dpp, std::move(c));
+    int r = flush(dpp, std::move(c), y);
     if (r < 0) {
       cancel();
       return r;
     }
     c = aio->wait();
   }
-  return flush(dpp, std::move(c));
+  return flush(dpp, std::move(c), y);
 }
 
-int D4NFilterObject::D4NFilterReadOp::flush(const DoutPrefixProvider* dpp, rgw::AioResultList&& results) {
+int D4NFilterObject::D4NFilterReadOp::flush(const DoutPrefixProvider* dpp, rgw::AioResultList&& results, optional_yield y) {
   int r = rgw::check_for_errors(results);
 
   if (r < 0) {
@@ -494,11 +494,29 @@ int D4NFilterObject::D4NFilterReadOp::flush(const DoutPrefixProvider* dpp, rgw::
     ldpp_dout(dpp, 20) << "D4NFilterObject::flush:: calling handle_data for offset: " << offset << " bufferlist length: " << bl.length() << dendl;
 
     bl_list.push_back(bl);
-    offset += bl.length();
     int r = client_cb->handle_data(bl, 0, bl.length());
     if (r < 0) {
       return r;
     }
+    auto it = blocks_info.find(offset);
+    if (it != blocks_info.end()) {
+      std::string version = source->get_object_version();
+      std::string prefix = source->get_prefix();
+      if (version.empty()) {
+        version = source->get_instance();
+      }
+      std::pair<uint64_t, uint64_t> ofs_len_pair = it->second;
+      uint64_t ofs = ofs_len_pair.first;
+      uint64_t len = ofs_len_pair.second;
+      std::string oid_in_cache = prefix + "_" + std::to_string(ofs) + "_" + std::to_string(len);
+      ldpp_dout(dpp, 20) << "D4NFilterObject::" << __func__ << " calling update for offset: " << offset << " adjusted offset: " << ofs  << " length: " << len << " oid_in_cache: " << oid_in_cache << dendl;
+      source->driver->get_policy_driver()->get_cache_policy()->update(dpp, oid_in_cache, ofs, len, version, y);
+      blocks_info.erase(it);
+    } else {
+      ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << " offset not found: " << offset << dendl;
+    }
+  
+    offset += bl.length();
     completed.pop_front_and_dispose(std::default_delete<rgw::AioResultEntry>{});
   }
 
@@ -523,7 +541,7 @@ int D4NFilterObject::D4NFilterReadOp::iterate(const DoutPrefixProvider* dpp, int
 
   this->client_cb = cb;
   this->cb->set_client_cb(cb, dpp, &y);
-  this->cb->set_prefix(prefix);
+  source->set_prefix(prefix);
 
   /* This algorithm stores chunks for ranged requests also in the cache, which might be smaller than obj_max_req_size
      One simplification could be to overwrite the smaller chunks with a bigger chunk of obj_max_req_size, and to serve requests for smaller
@@ -547,6 +565,10 @@ int D4NFilterObject::D4NFilterReadOp::iterate(const DoutPrefixProvider* dpp, int
 
   this->offset = ofs;
 
+  if (version.empty()) {
+    version = source->get_instance();
+  }
+
   do {
     uint64_t id = adjusted_start_ofs, read_ofs = 0; //read_ofs is the actual offset to start reading from the current part/ chunk
     if (start_part_num == (num_parts - 1)) {
@@ -566,9 +588,6 @@ int D4NFilterObject::D4NFilterReadOp::iterate(const DoutPrefixProvider* dpp, int
 
     ceph::bufferlist bl;
     std::string oid_in_cache = prefix + "_" + std::to_string(adjusted_start_ofs) + "_" + std::to_string(part_len);
-    if (version.empty()) {
-      version = source->get_instance();
-    }
 
     ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << "version stored in update method is: " << version << dendl;
 
@@ -579,14 +598,13 @@ int D4NFilterObject::D4NFilterReadOp::iterate(const DoutPrefixProvider* dpp, int
       // Read From Cache
       auto completed = source->driver->get_cache_driver()->get_async(dpp, y, aio.get(), oid_in_cache, read_ofs, len_to_read, cost, id); 
 
-      source->driver->get_policy_driver()->get_cache_policy()->update(dpp, oid_in_cache, adjusted_start_ofs, part_len, version, y);
+      this->blocks_info.insert(std::make_pair(id, std::make_pair(adjusted_start_ofs, part_len)));
 
       ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: flushing data for oid: " << oid_in_cache << dendl;
-
-      auto r = flush(dpp, std::move(completed));
+      auto r = flush(dpp, std::move(completed), y);
 
       if (r < 0) {
-        drain(dpp);
+        drain(dpp, y);
         ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to flush, r= " << r << dendl;
         return r;
       }
@@ -600,14 +618,13 @@ int D4NFilterObject::D4NFilterReadOp::iterate(const DoutPrefixProvider* dpp, int
         // Read From Cache
         auto completed = source->driver->get_cache_driver()->get_async(dpp, y, aio.get(), oid_in_cache, read_ofs, len_to_read, cost, id);  
 
-       source->driver->get_policy_driver()->get_cache_policy()->update(dpp, oid_in_cache, adjusted_start_ofs, obj_max_req_size, version, y);
+        this->blocks_info.insert(std::make_pair(id, std::make_pair(adjusted_start_ofs, obj_max_req_size)));
 
         ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: flushing data for oid: " << oid_in_cache << dendl;
-
-        auto r = flush(dpp, std::move(completed));
+        auto r = flush(dpp, std::move(completed), y);
 
         if (r < 0) {
-          drain(dpp);
+          drain(dpp, y);
           ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to flush, r= " << r << dendl;
           return r;
         }
@@ -615,7 +632,7 @@ int D4NFilterObject::D4NFilterReadOp::iterate(const DoutPrefixProvider* dpp, int
       } else {
         ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: draining data for oid: " << oid_in_cache << dendl;
 
-        auto r = drain(dpp);
+        auto r = drain(dpp, y);
 
         if (r < 0) {
           ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to drain, r= " << r << dendl;
@@ -628,7 +645,7 @@ int D4NFilterObject::D4NFilterReadOp::iterate(const DoutPrefixProvider* dpp, int
 
     if (start_part_num == (num_parts - 1)) {
       ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Info: draining data for oid: " << oid_in_cache << dendl;
-      return drain(dpp);
+      return drain(dpp, y);
     } else {
       adjusted_start_ofs += obj_max_req_size;
     }
@@ -701,6 +718,7 @@ int D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::handle_data(bufferlist& bl
     existing_block.cacheObj.bucketName = block.cacheObj.bucketName;
     Attrs attrs; // empty attrs for cache sets
     std::string version = source->get_object_version();
+    std::string prefix = source->get_prefix();
     if (version.empty()) {
       version = source->get_instance();
     }
@@ -715,7 +733,7 @@ int D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::handle_data(bufferlist& bl
         auto ret = filter->get_policy_driver()->get_cache_policy()->eviction(dpp, block.size, *y);
         if (ret == 0) {
           //Should we replace each put_async with put, to ensure data is actually written to the cache before updating the data structures and before the lock is released?
-          ret = filter->get_cache_driver()->put_async(dpp, oid, bl, bl.length(), attrs);
+          ret = filter->get_cache_driver()->put(dpp, oid, bl, bl.length(), attrs, *y);
           if (ret == 0) {
             filter->get_policy_driver()->get_cache_policy()->update(dpp, oid, ofs, bl.length(), version, *y);
 
@@ -755,7 +773,7 @@ int D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::handle_data(bufferlist& bl
       if (!filter->get_policy_driver()->get_cache_policy()->exist_key(oid)) { //In case of concurrent reads for the same object, the block is already cached
         auto ret = filter->get_policy_driver()->get_cache_policy()->eviction(dpp, block.size, *y);
         if (ret == 0) {
-          ret = filter->get_cache_driver()->put_async(dpp, oid, bl, bl.length(), attrs);
+          ret = filter->get_cache_driver()->put(dpp, oid, bl, bl.length(), attrs, *y);
           if (ret == 0) {
             filter->get_policy_driver()->get_cache_policy()->update(dpp, oid, ofs, bl.length(), version, *y);
 
@@ -802,7 +820,7 @@ int D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::handle_data(bufferlist& bl
           block.version = version;
           auto ret = filter->get_policy_driver()->get_cache_policy()->eviction(dpp, block.size, *y);
           if (ret == 0) {
-            ret = filter->get_cache_driver()->put_async(dpp, oid, bl_rem, bl_rem.length(), attrs);
+            ret = filter->get_cache_driver()->put(dpp, oid, bl_rem, bl_rem.length(), attrs, *y);
             if (ret == 0) {
               filter->get_policy_driver()->get_cache_policy()->update(dpp, oid, ofs, bl_rem.length(), version, *y);
 
index e43c0105265acec513aa571fa14021cce1d711c0..8c6cdca147057fa4ff4fe93e8cfecd799b033e63 100644 (file)
@@ -101,6 +101,7 @@ class D4NFilterObject : public FilterObject {
   private:
     D4NFilterDriver* driver;
     std::string version;
+    std::string prefix;
 
   public:
     struct D4NFilterReadOp : FilterReadOp {
@@ -108,7 +109,6 @@ class D4NFilterObject : public FilterObject {
        class D4NFilterGetCB: public RGWGetDataCB {
          private:
            D4NFilterDriver* filter;
-           std::string prefix;
            D4NFilterObject* source;
            RGWGetDataCB* client_cb;
            uint64_t ofs = 0, len = 0;
@@ -130,7 +130,6 @@ class D4NFilterObject : public FilterObject {
               this->y = y;
             }
            void set_ofs(uint64_t ofs) { this->ofs = ofs; }
-      void set_prefix(const std::string& prefix) { this->prefix = prefix; }
            int flush_last_part();
            void bypass_cache_write() { this->write_to_cache = false; }
        };
@@ -154,10 +153,11 @@ class D4NFilterObject : public FilterObject {
         std::unique_ptr<rgw::Aio> aio;
        uint64_t offset = 0; // next offset to write to client
         rgw::AioResultList completed; // completed read results, sorted by offset
+      std::unordered_map<uint64_t, std::pair<uint64_t,uint64_t>> blocks_info;
 
-       int flush(const DoutPrefixProvider* dpp, rgw::AioResultList&& results);
+       int flush(const DoutPrefixProvider* dpp, rgw::AioResultList&& results, optional_yield y);
        void cancel();
-       int drain(const DoutPrefixProvider* dpp);
+       int drain(const DoutPrefixProvider* dpp, optional_yield y);
     };
 
     struct D4NFilterDeleteOp : FilterDeleteOp {
@@ -209,6 +209,9 @@ class D4NFilterObject : public FilterObject {
 
     void set_object_version(const std::string& version) { this->version = version; }
     const std::string get_object_version() { return this->version; }
+
+    void set_prefix(const std::string& prefix) { this->prefix = prefix; }
+    const std::string get_prefix() { return this->prefix; }
 };
 
 class D4NFilterWriter : public FilterWriter {