]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
rgw/d4n: squashing all commits related to initial implementation
authormosayyebzadeh <mosayyeb@bu.edu>
Mon, 26 Feb 2024 15:43:01 +0000 (15:43 +0000)
committerPritha Srivastava <prsrivas@redhat.com>
Mon, 21 Apr 2025 04:04:07 +0000 (09:34 +0530)
of write-back cache for non-multipart objects (writing objects to
cache and then to backend store in the cleaning process):

1. combining write cache with latest D4N code
2. cleaning the code
3. removing some bugs on bigger objects
4. updating iterate function to check the dirty flag
5. Updating write cache based on the pull request comments.
Read process needs to be updated based on write process. It needs to check where is the data and if it is dirty or clean.
If it is in the cache and dirty, we need to put D_ in the oid of the object before reading it from cache.
If it is clean, there is nothing to do.
6. updating flush functions and comments.

Signed-off-by: mosayyebzadeh <mosayyeb@bu.edu>
src/common/options/rgw.yaml.in
src/rgw/driver/d4n/d4n_directory.cc
src/rgw/driver/d4n/d4n_directory.h
src/rgw/driver/d4n/d4n_policy.cc
src/rgw/driver/d4n/d4n_policy.h
src/rgw/driver/d4n/rgw_sal_d4n.cc
src/rgw/driver/d4n/rgw_sal_d4n.h
src/rgw/rgw_cache_driver.h
src/rgw/rgw_ssd_driver.cc
src/rgw/rgw_ssd_driver.h

index ac945fdc52028b4b27dcc699276d7df5a8a09c09..0d6d1894f363e06f936afed44bf49804a41508ac 100644 (file)
@@ -3857,6 +3857,18 @@ options:
   default: 60
   services:
   - rgw
+  flags:
+  - startup
+  with_legacy: true
+- name: d4n_writecache_enabled
+  type: bool
+  level: advanced
+  desc: The d4n write cache
+  default: false
+  services: 
+  - rgw
+  flags:
+  - startup
   with_legacy: true
 - name: rgw_backend_store
   type: str
@@ -4121,6 +4133,16 @@ options:
   flags:
   - startup
   with_legacy: true
+- name: rgw_d4n_cache_cleaning_interval
+  type: int
+  level: advanced
+  desc: This is the interval for invoking write cache cleaning process
+  default: 1000
+  services: 
+  - rgw
+  flags:
+  - startup
+  with_legacy: true
 - name: rgw_topic_persistency_time_to_live
   type: uint
   level: advanced
@@ -4302,4 +4324,14 @@ options:
   flags:
   - startup
   see_also:
-  with_legacy: true
\ No newline at end of file
+  with_legacy: true
+- name: rgw_d4n_backend_address
+  type: str
+  level: advanced
+  desc: The backend address used by D4N cache 
+  default: 127.0.0.1:6379
+  services: 
+  - rgw
+  flags:
+  - startup
+  with_legacy: true
index 20f81ffd7da39ab3c98a613179f09633d710e25d..0be1d1784670ec874fde13d8a9907adb8743000b 100644 (file)
@@ -455,6 +455,7 @@ int BlockDirectory::get(CacheBlock* block, optional_yield y)
       block->cacheObj.bucketName = std::get<0>(resp).value()[6];
       block->cacheObj.creationTime = std::get<0>(resp).value()[7];
       block->cacheObj.dirty = boost::lexical_cast<bool>(std::get<0>(resp).value()[8]);
+      block->dirty = boost::lexical_cast<bool>(std::get<0>(resp).value()[8]);
 
       {
         std::stringstream ss(boost::lexical_cast<std::string>(std::get<0>(resp).value()[9]));
index cf562dd27301a45095243cf43bdd4ea67f3e25ef..be7bde82691d19f6b8fbe7014ee3b1f92176d51f 100644 (file)
@@ -26,6 +26,7 @@ struct CacheBlock {
   CacheObj cacheObj;
   uint64_t blockID;
   std::string version;
+  bool dirty;
   uint64_t size; /* Block size in bytes */
   int globalWeight = 0; /* LFUDA policy variable */
   std::vector<std::string> hostsList; /* List of hostnames <ip:port> of block locations */
index b42228078d0d9e0654ca92631826259284b75566..b75e1c6a423dc74b26f7a63bc3afe177fb27abfb 100644 (file)
@@ -48,8 +48,12 @@ void redis_exec(std::shared_ptr<connection> conn,
   }
 }
 
-int LFUDAPolicy::init(CephContext *cct, const DoutPrefixProvider* dpp, asio::io_context& io_context) {
+int LFUDAPolicy::init(CephContext *cct, const DoutPrefixProvider* dpp, asio::io_context& io_context, rgw::sal::Driver *_driver) {
+  this->cct = cct;
   dir->init(cct);
+  driver = _driver;
+  tc = std::thread(&CachePolicy::cleaning, this, dpp);
+  tc.detach();
   int result = 0;
   response<int, int, int, int> resp;
 
@@ -345,7 +349,7 @@ int LFUDAPolicy::eviction(const DoutPrefixProvider* dpp, uint64_t size, optional
   return 0;
 }
 
-void LFUDAPolicy::update(const DoutPrefixProvider* dpp, std::string& key, uint64_t offset, uint64_t len, std::string version, optional_yield y)
+void LFUDAPolicy::update(const DoutPrefixProvider* dpp, std::string& key, uint64_t offset, uint64_t len, std::string version, bool dirty, time_t creationTime, const rgw_user user, optional_yield y)
 {
   using handle_type = boost::heap::fibonacci_heap<LFUDAEntry*, boost::heap::compare<EntryComparator<LFUDAEntry>>>::handle_type;
   const std::lock_guard l(lfuda_lock);
@@ -357,17 +361,31 @@ void LFUDAPolicy::update(const DoutPrefixProvider* dpp, std::string& key, uint64
 
   erase(dpp, key, y);
   
-  LFUDAEntry *e = new LFUDAEntry(key, offset, len, version, localWeight);
+  LFUDAEntry *e = new LFUDAEntry(key, offset, len, version, dirty, creationTime, user, localWeight);
   handle_type handle = entries_heap.push(e);
   e->set_handle(handle);
   entries_map.emplace(key, e);
 
-  if (cacheDriver->set_attr(dpp, key, "user.rgw.localWeight", std::to_string(localWeight), y) < 0) 
+  std::string oid_in_cache = key;
+  if (dirty == true)
+    oid_in_cache = "D_"+key;
+
+  if (cacheDriver->set_attr(dpp, oid_in_cache, "user.rgw.localWeight", std::to_string(localWeight), y) < 0) 
     ldpp_dout(dpp, 10) << "LFUDAPolicy::" << __func__ << "(): CacheDriver set_attr method failed." << dendl;
 
   weightSum += ((localWeight < 0) ? 0 : localWeight);
 }
 
+void LFUDAPolicy::updateObj(const DoutPrefixProvider* dpp, std::string& key, std::string version, bool dirty, uint64_t size, time_t creationTime, const rgw_user user, std::string& etag, optional_yield y)
+{
+  eraseObj(dpp, key, y);
+  
+  const std::lock_guard l(lfuda_lock);
+  LFUDAObjEntry *e = new LFUDAObjEntry(key, version, dirty, size, creationTime, user, etag);
+  o_entries_map.emplace(key, e);
+}
+
+
 bool LFUDAPolicy::erase(const DoutPrefixProvider* dpp, const std::string& key, optional_yield y)
 {
   auto p = entries_map.find(key);
@@ -383,6 +401,150 @@ bool LFUDAPolicy::erase(const DoutPrefixProvider* dpp, const std::string& key, o
   return true;
 }
 
+bool LFUDAPolicy::eraseObj(const DoutPrefixProvider* dpp, const std::string& key, optional_yield y)
+{
+  const std::lock_guard l(lfuda_lock);
+  auto p = o_entries_map.find(key);
+  if (p == o_entries_map.end()) {
+    return false;
+  }
+
+  o_entries_map.erase(p);
+
+  return true;
+}
+
+void LFUDAPolicy::cleaning(const DoutPrefixProvider* dpp)
+{
+  const int interval = cct->_conf->rgw_d4n_cache_cleaning_interval;
+  while(true){
+    ldpp_dout(dpp, 20) << __func__ << " : " << " Cache cleaning!" << dendl;
+    std::string name = ""; 
+    std::string b_name = ""; 
+    std::string key = ""; 
+    uint64_t len = 0;
+    rgw::sal::Attrs obj_attrs;
+    int count = 0;
+
+    for (auto it = o_entries_map.begin(); it != o_entries_map.end(); it++){
+      if ((it->second->dirty == true) && (std::difftime(time(NULL), it->second->creationTime) > interval)){ //if block is dirty and written more than interval seconds ago
+       name = it->first;
+       rgw_user c_rgw_user = it->second->user;
+
+       size_t pos = 0;
+       std::string delimiter = "_";
+       while ((pos = name.find(delimiter)) != std::string::npos) {
+         if (count == 0){
+           b_name = name.substr(0, pos);
+           name.erase(0, pos + delimiter.length());
+         }
+         count ++;
+       }
+       key = name;
+
+       //writing data to the backend
+       //we need to create an atomic_writer
+       rgw_obj_key c_obj_key = rgw_obj_key(key);               
+       std::unique_ptr<rgw::sal::User> c_user = driver->get_user(c_rgw_user);
+
+       std::unique_ptr<rgw::sal::Bucket> c_bucket;
+        rgw_bucket c_rgw_bucket = rgw_bucket(c_rgw_user.tenant, b_name, "");
+
+       RGWBucketInfo c_bucketinfo;
+       c_bucketinfo.bucket = c_rgw_bucket;
+       c_bucketinfo.owner = c_rgw_user;
+       
+       
+       int ret = driver->load_bucket(dpp, c_rgw_bucket, &c_bucket, null_yield);
+       if (ret < 0) {
+         ldpp_dout(dpp, 10) << __func__ << "(): load_bucket() returned ret=" << ret << dendl;
+         break;
+        }
+
+       std::unique_ptr<rgw::sal::Object> c_obj = c_bucket->get_object(c_obj_key);
+
+  ACLOwner owner{c_user->get_id(), c_user->get_display_name()};
+
+       std::unique_ptr<rgw::sal::Writer> processor =  driver->get_atomic_writer(dpp,
+                                 null_yield,
+                                 c_obj.get(),
+          owner,
+                                 NULL,
+                                 0,
+                                 "");
+
+       int op_ret = processor->prepare(null_yield);
+       if (op_ret < 0) {
+         ldpp_dout(dpp, 20) << "processor->prepare() returned ret=" << op_ret << dendl;
+         break;
+       }
+
+       std::string prefix = b_name+"_"+key;
+       off_t lst = it->second->size;
+       off_t fst = 0;
+       off_t ofs = 0;
+
+       
+       rgw::sal::DataProcessor *filter = processor.get();
+       do {
+         ceph::bufferlist data;
+         if (fst >= lst){
+           break;
+         }
+         off_t cur_size = std::min<off_t>(fst + cct->_conf->rgw_max_chunk_size, lst);
+         off_t cur_len = cur_size - fst;
+         std::string oid_in_cache = "D_" + prefix + "_" + std::to_string(fst) + "_" + std::to_string(cur_len);
+         std::string new_oid_in_cache = prefix + "_" + std::to_string(fst) + "_" + std::to_string(cur_len);
+         cacheDriver->get(dpp, oid_in_cache, 0, cur_len, data, obj_attrs, null_yield);
+         len = data.length();
+         fst += len;
+
+         if (len == 0) {
+           break;
+         }
+
+         op_ret = filter->process(std::move(data), ofs);
+         if (op_ret < 0) {
+           ldpp_dout(dpp, 20) << "processor->process() returned ret="
+               << op_ret << dendl;
+           return;
+         }
+
+         rgw::d4n::CacheBlock block;
+         block.cacheObj.bucketName = c_obj->get_bucket()->get_name();
+         block.cacheObj.objName = c_obj->get_key().get_oid();
+         block.size = len;
+         block.blockID = ofs;
+         op_ret = dir->update_field(&block, "dirty", "false", null_yield); 
+         if (op_ret < 0) {
+           ldpp_dout(dpp, 20) << "updating dirty flag in Block directory failed!" << dendl;
+           return;
+         }
+
+         cacheDriver->rename(dpp, oid_in_cache, new_oid_in_cache, null_yield);
+
+         ofs += len;
+       } while (len > 0);
+
+       op_ret = filter->process({}, ofs);
+       
+       const req_context rctx{dpp, null_yield, nullptr};
+       ceph::real_time mtime = ceph::real_clock::from_time_t(it->second->creationTime);
+        op_ret = processor->complete(lst, it->second->etag, &mtime, ceph::real_clock::from_time_t(it->second->creationTime), obj_attrs,
+                               std::nullopt, ceph::real_time(), nullptr, nullptr,
+                               nullptr, nullptr, nullptr,
+                               rctx, rgw::sal::FLAG_LOG_OP);
+
+       //data is clean now, updating in-memory metadata
+       it->second->dirty = false;
+      }
+    }
+
+    std::this_thread::sleep_for(std::chrono::milliseconds(interval));
+  }
+}
+
+
 int LRUPolicy::exist_key(std::string key)
 {
   const std::lock_guard l(lru_lock);
@@ -413,15 +575,25 @@ int LRUPolicy::eviction(const DoutPrefixProvider* dpp, uint64_t size, optional_y
   return 0;
 }
 
-void LRUPolicy::update(const DoutPrefixProvider* dpp, std::string& key, uint64_t offset, uint64_t len, std::string version, optional_yield y)
+void LRUPolicy::update(const DoutPrefixProvider* dpp, std::string& key, uint64_t offset, uint64_t len, std::string version, bool dirty, time_t creationTime, const rgw_user user, optional_yield y)
 {
   const std::lock_guard l(lru_lock);
   _erase(dpp, key, y);
-  Entry *e = new Entry(key, offset, len, version);
+  Entry *e = new Entry(key, offset, len, version, dirty, creationTime, user);
   entries_lru_list.push_back(*e);
   entries_map.emplace(key, e);
 }
 
+void LRUPolicy::updateObj(const DoutPrefixProvider* dpp, std::string& key, std::string version, bool dirty, uint64_t size, time_t creationTime, const rgw_user user, std::string& etag, optional_yield y)
+{
+  eraseObj(dpp, key, y);
+  const std::lock_guard l(lru_lock);
+  ObjEntry *e = new ObjEntry(key, version, dirty, size, creationTime, user, etag);
+  o_entries_map.emplace(key, e);
+  return;
+}
+
+
 bool LRUPolicy::erase(const DoutPrefixProvider* dpp, const std::string& key, optional_yield y)
 {
   const std::lock_guard l(lru_lock);
@@ -439,4 +611,15 @@ bool LRUPolicy::_erase(const DoutPrefixProvider* dpp, const std::string& key, op
   return true;
 }
 
+bool LRUPolicy::eraseObj(const DoutPrefixProvider* dpp, const std::string& key, optional_yield y)
+{
+  const std::lock_guard l(lru_lock);
+  auto p = o_entries_map.find(key);
+  if (p == o_entries_map.end()) {
+    return false;
+  }
+  o_entries_map.erase(p);
+  return true;
+}
+
 } } // namespace rgw::d4n
index 3740f5f047265ca7680a1ca8d4dd024519768f7e..61a8743a172ba0fbc5e03fe8dff95d4fef6e4087 100644 (file)
@@ -26,10 +26,14 @@ class CachePolicy {
       uint64_t offset;
       uint64_t len;
       std::string version;
-      Entry(std::string& key, uint64_t offset, uint64_t len, std::string version) : key(key), offset(offset), 
-                                                                                     len(len), version(version) {}
+      bool dirty;
+      time_t creationTime;
+      rgw_user user;
+      Entry(std::string& key, uint64_t offset, uint64_t len, std::string version, bool dirty, time_t creationTime, rgw_user user) : key(key), offset(offset), 
+                                                                                     len(len), version(version), dirty(dirty), creationTime(creationTime), user(user) {}
     };
-    
+
+   
     //The disposer object function
     struct Entry_delete_disposer {
       void operator()(Entry *e) {
@@ -37,15 +41,35 @@ class CachePolicy {
       }
     };
 
+    struct ObjEntry : public boost::intrusive::list_base_hook<> {
+      std::string key;
+      std::string version;
+      bool dirty;
+      uint64_t size;
+      time_t creationTime;
+      rgw_user user;
+      std::string etag;
+      ObjEntry(std::string& key, std::string version, bool dirty, uint64_t size, time_t creationTime, rgw_user user, std::string& etag) : key(key), version(version), dirty(dirty), size(size), creationTime(creationTime), user(user), etag(etag) {}
+    };
+
+    struct ObjEntry_delete_disposer {
+      void operator()(ObjEntry *e) {
+        delete e;
+      }
+    };
+
   public:
     CachePolicy() {}
     virtual ~CachePolicy() = default; 
 
-    virtual int init(CephContext *cct, const DoutPrefixProvider* dpp, asio::io_context& io_context) = 0; 
+    virtual int init(CephContext *cct, const DoutPrefixProvider* dpp, asio::io_context& io_context, rgw::sal::Driver *_driver) = 0; 
     virtual int exist_key(std::string key) = 0;
     virtual int eviction(const DoutPrefixProvider* dpp, uint64_t size, optional_yield y) = 0;
-    virtual void update(const DoutPrefixProvider* dpp, std::string& key, uint64_t offset, uint64_t len, std::string version, optional_yield y) = 0;
+    virtual void update(const DoutPrefixProvider* dpp, std::string& key, uint64_t offset, uint64_t len, std::string version, bool dirty, time_t creationTime, const rgw_user user, optional_yield y) = 0;
+    virtual void updateObj(const DoutPrefixProvider* dpp, std::string& key, std::string version, bool dirty, uint64_t size, time_t creationTime, const rgw_user user, std::string& etag, optional_yield y) = 0;
     virtual bool erase(const DoutPrefixProvider* dpp, const std::string& key, optional_yield y) = 0;
+    virtual bool eraseObj(const DoutPrefixProvider* dpp, const std::string& key, optional_yield y) = 0;
+    virtual void cleaning(const DoutPrefixProvider* dpp) = 0;
 };
 
 class LFUDAPolicy : public CachePolicy {
@@ -62,16 +86,20 @@ class LFUDAPolicy : public CachePolicy {
       using handle_type = boost::heap::fibonacci_heap<LFUDAEntry*, boost::heap::compare<EntryComparator<LFUDAEntry>>>::handle_type;
       handle_type handle;
 
-      LFUDAEntry(std::string& key, uint64_t offset, uint64_t len, std::string& version, int localWeight) : Entry(key, offset, len, version),
-                                                                                                           localWeight(localWeight) {}
+      LFUDAEntry(std::string& key, uint64_t offset, uint64_t len, std::string& version, bool dirty, time_t creationTime, rgw_user user, int localWeight) : Entry(key, offset, len, version, dirty, creationTime, user), localWeight(localWeight) {}
       
       void set_handle(handle_type handle_) { handle = handle_; } 
     };
 
+    struct LFUDAObjEntry : public ObjEntry {
+      LFUDAObjEntry(std::string& key, std::string& version, bool dirty, uint64_t size, time_t creationTime, rgw_user user, std::string& etag) : ObjEntry(key, version, dirty, size, creationTime, user, etag) {}
+    };
+
     using Heap = boost::heap::fibonacci_heap<LFUDAEntry*, boost::heap::compare<EntryComparator<LFUDAEntry>>>;
     Heap entries_heap;
     std::unordered_map<std::string, LFUDAEntry*> entries_map;
-    std::mutex lfuda_lock; 
+    std::unordered_map<std::string, LFUDAObjEntry*> o_entries_map;
+    std::mutex lfuda_lock;
 
     int age = 1, weightSum = 0, postedSum = 0;
     optional_yield y = null_yield;
@@ -79,6 +107,9 @@ class LFUDAPolicy : public CachePolicy {
     BlockDirectory* dir;
     rgw::cache::CacheDriver* cacheDriver;
     std::optional<asio::steady_timer> rthread_timer;
+    rgw::sal::Driver *driver;
+    std::thread tc;
+    CephContext* cct;
 
     CacheBlock* get_victim_block(const DoutPrefixProvider* dpp, optional_yield y);
     int age_sync(const DoutPrefixProvider* dpp, optional_yield y); 
@@ -110,12 +141,21 @@ class LFUDAPolicy : public CachePolicy {
       delete dir;
     } 
 
-    virtual int init(CephContext *cct, const DoutPrefixProvider* dpp, asio::io_context& io_context);
+    virtual int init(CephContext *cct, const DoutPrefixProvider* dpp, asio::io_context& io_context, rgw::sal::Driver *_driver);
     virtual int exist_key(std::string key) override;
     virtual int eviction(const DoutPrefixProvider* dpp, uint64_t size, optional_yield y) override;
-    virtual void update(const DoutPrefixProvider* dpp, std::string& key, uint64_t offset, uint64_t len, std::string version, optional_yield y) override;
+    virtual void update(const DoutPrefixProvider* dpp, std::string& key, uint64_t offset, uint64_t len, std::string version, bool dirty, time_t creationTime, const rgw_user user, optional_yield y) override;
     virtual bool erase(const DoutPrefixProvider* dpp, const std::string& key, optional_yield y) override;
     void save_y(optional_yield y) { this->y = y; }
+    virtual void updateObj(const DoutPrefixProvider* dpp, std::string& key, std::string version, bool dirty, uint64_t size, time_t creationTime, const rgw_user user, std::string& etag, optional_yield y) override;
+    virtual bool eraseObj(const DoutPrefixProvider* dpp, const std::string& key, optional_yield y) override;
+    virtual void cleaning(const DoutPrefixProvider* dpp) override;
+    LFUDAObjEntry* find_obj_entry(const std::string& key) {
+      auto it = o_entries_map.find(key);
+      if (it == o_entries_map.end())
+        return nullptr;
+      return it->second;
+    }
 };
 
 class LRUPolicy : public CachePolicy {
@@ -123,6 +163,7 @@ class LRUPolicy : public CachePolicy {
     typedef boost::intrusive::list<Entry> List;
 
     std::unordered_map<std::string, Entry*> entries_map;
+    std::unordered_map<std::string, ObjEntry*> o_entries_map;
     std::mutex lru_lock;
     List entries_lru_list;
     rgw::cache::CacheDriver* cacheDriver;
@@ -132,11 +173,14 @@ class LRUPolicy : public CachePolicy {
   public:
     LRUPolicy(rgw::cache::CacheDriver* cacheDriver) : cacheDriver{cacheDriver} {}
 
-    virtual int init(CephContext *cct, const DoutPrefixProvider* dpp, asio::io_context& io_context) { return 0; } 
+    virtual int init(CephContext *cct, const DoutPrefixProvider* dpp, asio::io_context& io_context, rgw::sal::Driver* _driver) { return 0; } 
     virtual int exist_key(std::string key) override;
     virtual int eviction(const DoutPrefixProvider* dpp, uint64_t size, optional_yield y) override;
-    virtual void update(const DoutPrefixProvider* dpp, std::string& key, uint64_t offset, uint64_t len, std::string version, optional_yield y) override;
+    virtual void update(const DoutPrefixProvider* dpp, std::string& key, uint64_t offset, uint64_t len, std::string version, bool dirty, time_t creationTime, const rgw_user user, optional_yield y) override;
+    virtual void updateObj(const DoutPrefixProvider* dpp, std::string& key, std::string version, bool dirty, uint64_t size, time_t creationTime, const rgw_user user, std::string& etag, optional_yield y) override;
     virtual bool erase(const DoutPrefixProvider* dpp, const std::string& key, optional_yield y) override;
+    virtual bool eraseObj(const DoutPrefixProvider* dpp, const std::string& key, optional_yield y) override;
+    virtual void cleaning(const DoutPrefixProvider* dpp) override {}
 };
 
 class PolicyDriver {
index 88c10b552babb70e43f8e798dbea847e3873aa72..5803ce2ca9b27c41bb8e4120d3a1b4720052e1c7 100644 (file)
@@ -76,7 +76,7 @@ int D4NFilterDriver::initialize(CephContext *cct, const DoutPrefixProvider *dpp)
   cacheDriver->initialize(dpp);
   objDir->init(cct);
   blockDir->init(cct);
-  policyDriver->get_cache_policy()->init(cct, dpp, io_context);
+  policyDriver->get_cache_policy()->init(cct, dpp, io_context, next);
 
   return 0;
 }
@@ -317,7 +317,8 @@ int D4NFilterObject::get_obj_attrs(optional_yield y, const DoutPrefixProvider* d
     ret = this->driver->get_cache_driver()->set_attrs(dpp, head_oid_in_cache, attrs, y);
     if (ret == 0) {
       ldpp_dout(dpp, 20) << "D4NFilterObject::" << __func__ << " version stored in update method is: " << this->get_object_version() << dendl;
-      this->driver->get_policy_driver()->get_cache_policy()->update(dpp, head_oid_in_cache, 0, 0, version, y);
+      time_t creationTime = ceph::real_clock::to_time_t(this->get_mtime());
+      this->driver->get_policy_driver()->get_cache_policy()->update(dpp, head_oid_in_cache, 0, 0, version, false, creationTime, get<rgw_user>(this->get_bucket()->get_owner()), y);
       ret = set_head_obj_dir_entry(dpp, y);
       if (ret < 0) {
         ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): BlockDirectory set method failed for head object with ret: " << ret << dendl;
@@ -446,7 +447,8 @@ int D4NFilterObject::D4NFilterReadOp::prepare(optional_yield y, const DoutPrefix
       ret = source->driver->get_cache_driver()->put(dpp, head_oid_in_cache, bl, 0, attrs, y);
       if (ret == 0) {
         ldpp_dout(dpp, 20) << "D4NFilterObject::" << __func__ << " version stored in update method is: " << this->source->get_object_version() << dendl;
-        source->driver->get_policy_driver()->get_cache_policy()->update(dpp, head_oid_in_cache, 0, bl.length(), version, y);
+        time_t creationTime = ceph::real_clock::to_time_t(this->source->get_mtime());
+        source->driver->get_policy_driver()->get_cache_policy()->update(dpp, head_oid_in_cache, 0, bl.length(), version, false, creationTime, std::get<rgw_user>(source->get_bucket()->get_owner()), y);
         ret = source->set_head_obj_dir_entry(dpp, y);
         if (ret < 0) {
           ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): BlockDirectory set method failed for head object with ret: " << ret << dendl;
@@ -511,10 +513,31 @@ int D4NFilterObject::D4NFilterReadOp::flush(const DoutPrefixProvider* dpp, rgw::
       std::pair<uint64_t, uint64_t> ofs_len_pair = it->second;
       uint64_t ofs = ofs_len_pair.first;
       uint64_t len = ofs_len_pair.second;
+      bool dirty = false;
+      /*
+      std::stringstream s;
+      utime_t ut(source->get_mtime());
+      ut.gmtime(s);
+      */
+      time_t creationTime = ceph::real_clock::to_time_t(source->get_mtime());
+
+      rgw::d4n::CacheBlock block;
+      block.cacheObj.objName = source->get_key().get_oid();
+      block.cacheObj.bucketName = source->get_bucket()->get_name();
+      block.blockID = ofs;
+      block.size = len;
+
       std::string oid_in_cache = prefix + "_" + std::to_string(ofs) + "_" + std::to_string(len);
+
+      if (source->driver->get_block_dir()->get(&block, y) == 0){
+        if (block.dirty == true){ 
+          dirty = true;
+        }
+      }
+
       ldpp_dout(dpp, 20) << "D4NFilterObject::" << __func__ << " calling update for offset: " << offset << " adjusted offset: " << ofs  << " length: " << len << " oid_in_cache: " << oid_in_cache << dendl;
       ldpp_dout(dpp, 20) << "D4NFilterObject::" << __func__ << " version stored in update method is: " << version << " " << source->get_object_version() << dendl;
-      source->driver->get_policy_driver()->get_cache_policy()->update(dpp, oid_in_cache, ofs, len, version, y);
+      source->driver->get_policy_driver()->get_cache_policy()->update(dpp, oid_in_cache, ofs, len, version, dirty, creationTime, std::get<rgw_user>(source->get_bucket()->get_owner()), y);
       blocks_info.erase(it);
     } else {
       ldpp_dout(dpp, 0) << "D4NFilterObject::" << __func__ << " offset not found: " << offset << dendl;
@@ -607,11 +630,18 @@ int D4NFilterObject::D4NFilterReadOp::iterate(const DoutPrefixProvider* dpp, int
       auto it = find(block.hostsList.begin(), block.hostsList.end(), source->driver->get_block_dir()->cct->_conf->rgw_d4n_l1_datacache_address);
       if (it != block.hostsList.end()) { /* Local copy */
        ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Block found in local cache. " << oid_in_cache << dendl;
-
+        std::string key = oid_in_cache;
+        ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): READ FROM CACHE: block is dirty = " << block.dirty << dendl;
+        if (block.dirty == true) { 
+          key = "D_" + oid_in_cache; //we keep track of dirty data in the cache for the metadata failure case
+          ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): READ FROM CACHE: key=" << key << " data is Dirty." << dendl;
+        }
+        ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__  << "(): " << __LINE__ << ": READ FROM CACHE: block dirty =" << block.dirty << dendl;
+        ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): " << __LINE__ << ": READ FROM CACHE: key=" << key << dendl;
        if (block.version == version) {
          if (source->driver->get_policy_driver()->get_cache_policy()->exist_key(oid_in_cache) > 0) { 
            // Read From Cache
-           auto completed = source->driver->get_cache_driver()->get_async(dpp, y, aio.get(), oid_in_cache, read_ofs, len_to_read, cost, id); 
+           auto completed = source->driver->get_cache_driver()->get_async(dpp, y, aio.get(), key, read_ofs, len_to_read, cost, id); 
 
            this->blocks_info.insert(std::make_pair(id, std::make_pair(adjusted_start_ofs, part_len)));
 
@@ -631,8 +661,13 @@ int D4NFilterObject::D4NFilterReadOp::iterate(const DoutPrefixProvider* dpp, int
             " length to read is: " << len_to_read << " part num: " << start_part_num << " read_ofs: " << read_ofs << " part len: " << part_len << dendl;
 
            if ((part_len != obj_max_req_size) && source->driver->get_policy_driver()->get_cache_policy()->exist_key(oid_in_cache) > 0) {
+        if (block.dirty == true){ 
+          key = "D_" + oid_in_cache; //we keep track of dirty data in the cache for the metadata failure case
+        }
+        ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__  << "(): " << __LINE__ << ": READ FROM CACHE: block dirty =" << block.dirty << dendl;
+        ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): " << __LINE__ << ": READ FROM CACHE: key=" << key << dendl;
              // Read From Cache
-             auto completed = source->driver->get_cache_driver()->get_async(dpp, y, aio.get(), oid_in_cache, read_ofs, len_to_read, cost, id);  
+             auto completed = source->driver->get_cache_driver()->get_async(dpp, y, aio.get(), key, read_ofs, len_to_read, cost, id);  
 
              this->blocks_info.insert(std::make_pair(id, std::make_pair(adjusted_start_ofs, obj_max_req_size)));
 
@@ -640,9 +675,9 @@ int D4NFilterObject::D4NFilterReadOp::iterate(const DoutPrefixProvider* dpp, int
              auto r = flush(dpp, std::move(completed), y);
 
              if (r < 0) {
-               drain(dpp, y);
-               ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to flush, r= " << r << dendl;
-               return r;
+          drain(dpp, y);
+          ldpp_dout(dpp, 20) << "D4NFilterObject::iterate:: " << __func__ << "(): Error: failed to flush, r= " << r << dendl;
+          return r;
              }
            }
          }
@@ -744,10 +779,10 @@ int D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::handle_data(bufferlist& bl
     block.cacheObj.objName = source->get_key().get_oid();
     block.cacheObj.bucketName = source->get_bucket()->get_name();
     std::stringstream s;
-    utime_t ut(source->get_mtime());
-    ut.gmtime(s);
-    block.cacheObj.creationTime = s.str(); 
+    block.cacheObj.creationTime = std::to_string(ceph::real_clock::to_time_t(source->get_mtime()));
     block.cacheObj.dirty = false;
+    bool dirty = false;
+    time_t creationTime = ceph::real_clock::to_time_t(source->get_mtime());
 
     //populating fields needed for building directory index
     existing_block.cacheObj.objName = block.cacheObj.objName;
@@ -764,11 +799,14 @@ int D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::handle_data(bufferlist& bl
         block.blockID = ofs;
         block.size = bl.length();
         block.version = version;
+        block.dirty = false; //Reading from the backend, data is clean
         auto ret = filter->get_policy_driver()->get_cache_policy()->eviction(dpp, block.size, *y);
         if (ret == 0) {
           ret = filter->get_cache_driver()->put(dpp, oid, bl, bl.length(), attrs, *y);
           if (ret == 0) {
-            filter->get_policy_driver()->get_cache_policy()->update(dpp, oid, ofs, bl.length(), version, *y);
+           std::string objEtag = "";
+           filter->get_policy_driver()->get_cache_policy()->update(dpp, oid, ofs, bl.length(), version, dirty, creationTime,  std::get<rgw_user>(source->get_bucket()->get_owner()), *y);
+           filter->get_policy_driver()->get_cache_policy()->updateObj(dpp, prefix, version, dirty, source->get_size(), creationTime, std::get<rgw_user>(source->get_bucket()->get_owner()), objEtag, *y);
 
            /* Store block in directory */
             if (!blockDir->exist_key(&block, *y)) {
@@ -777,6 +815,7 @@ int D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::handle_data(bufferlist& bl
             } else {
               existing_block.blockID = block.blockID;
               existing_block.size = block.size;
+             existing_block.dirty = block.dirty;
               if (blockDir->get(&existing_block, *y) < 0) {
                 ldpp_dout(dpp, 10) << "Failed to fetch existing block for: " << existing_block.cacheObj.objName << " blockID: " << existing_block.blockID << " block size: " << existing_block.size << dendl;
               } else {
@@ -802,13 +841,13 @@ int D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::handle_data(bufferlist& bl
       block.size = bl.length();
       block.version = version;
       ofs += bl_len;
-
+      block.dirty = dirty;
       if (!filter->get_policy_driver()->get_cache_policy()->exist_key(oid)) {
         auto ret = filter->get_policy_driver()->get_cache_policy()->eviction(dpp, block.size, *y);
         if (ret == 0) {
           ret = filter->get_cache_driver()->put(dpp, oid, bl, bl.length(), attrs, *y);
           if (ret == 0) {
-            filter->get_policy_driver()->get_cache_policy()->update(dpp, oid, ofs, bl.length(), version, *y);
+            filter->get_policy_driver()->get_cache_policy()->update(dpp, oid, ofs, bl.length(), version, dirty, creationTime, std::get<rgw_user>(source->get_bucket()->get_owner()), *y);
 
             /* Store block in directory */
             if (!blockDir->exist_key(&block, *y)) {
@@ -817,6 +856,7 @@ int D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::handle_data(bufferlist& bl
             } else {
               existing_block.blockID = block.blockID;
               existing_block.size = block.size;
+             existing_block.dirty = block.dirty;
               if (blockDir->get(&existing_block, *y) < 0) {
                 ldpp_dout(dpp, 10) << "Failed to fetch existing block for: " << existing_block.cacheObj.objName << " blockID: " << existing_block.blockID << " block size: " << existing_block.size << dendl;
               }
@@ -850,12 +890,12 @@ int D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::handle_data(bufferlist& bl
           block.size = bl_rem.length();
           block.version = version;
           ofs += bl_rem.length();
-
+               block.dirty = dirty;
           auto ret = filter->get_policy_driver()->get_cache_policy()->eviction(dpp, block.size, *y);
           if (ret == 0) {
             ret = filter->get_cache_driver()->put(dpp, oid, bl_rem, bl_rem.length(), attrs, *y);
             if (ret == 0) {
-              filter->get_policy_driver()->get_cache_policy()->update(dpp, oid, ofs, bl_rem.length(), version, *y);
+              filter->get_policy_driver()->get_cache_policy()->update(dpp, oid, ofs, bl_rem.length(), version, dirty, creationTime, std::get<rgw_user>(source->get_bucket()->get_owner()), *y);
 
               /* Store block in directory */
               if (!blockDir->exist_key(&block, *y)) {
@@ -864,6 +904,7 @@ int D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::handle_data(bufferlist& bl
              } else {
                existing_block.blockID = block.blockID;
                existing_block.size = block.size;
+               existing_block.dirty = block.dirty;
                if (blockDir->get(&existing_block, *y) < 0) {
                  ldpp_dout(dpp, 10) << "Failed to fetch existing block for: " << existing_block.cacheObj.objName << " blockID: " << existing_block.blockID << " block size: " << existing_block.size << dendl;
                } else {
@@ -873,8 +914,8 @@ int D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::handle_data(bufferlist& bl
                    if (blockDir->set(&block, *y) < 0)
                      ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): BlockDirectory set method failed." << dendl;
                  } else {
-                 if (blockDir->update_field(&block, "blockHosts", blockDir->cct->_conf->rgw_d4n_l1_datacache_address, *y) < 0)
-                   ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): BlockDirectory update_field method failed." << dendl;
+                   if (blockDir->update_field(&block, "blockHosts", blockDir->cct->_conf->rgw_d4n_l1_datacache_address, *y) < 0)
+                     ldpp_dout(dpp, 10) << "D4NFilterObject::D4NFilterReadOp::D4NFilterGetCB::" << __func__ << "(): BlockDirectory update_field method failed." << dendl;
                  }
                }
              }
@@ -927,27 +968,129 @@ int D4NFilterObject::D4NFilterDeleteOp::delete_obj(const DoutPrefixProvider* dpp
   return next->delete_obj(dpp, y, flags);
 }
 
+
 int D4NFilterWriter::prepare(optional_yield y) 
 {
+  startTime = time(NULL);
   if (driver->get_cache_driver()->delete_data(save_dpp, obj->get_key().get_oid(), y) < 0) 
     ldpp_dout(save_dpp, 10) << "D4NFilterWriter::" << __func__ << "(): CacheDriver delete_data method failed." << dendl;
-
-  return next->prepare(y);
+  d4n_writecache = g_conf()->d4n_writecache_enabled;
+  if (!d4n_writecache){
+    ldpp_dout(save_dpp, 10) << "D4NFilterObject::D4NFilterWriteOp::" << __func__ << "(): calling next iterate" << dendl;
+    return next->prepare(y);
+  }
+  else
+    return 0;
 }
 
 int D4NFilterWriter::process(bufferlist&& data, uint64_t offset)
 {
-  /*
-  int append_dataReturn = driver->get_cache_driver()->append_data(save_dpp, obj->get_key().get_oid(), 
-                                                                   data, y);
+    bufferlist bl = data;
+    off_t bl_len = bl.length();
+    off_t ofs = offset;
+    bool dirty = true;
+    rgw::d4n::CacheBlock block, existing_block;
+    auto creationTime = startTime;
 
-  if (append_dataReturn < 0) {
-    ldpp_dout(save_dpp, 20) << "D4N Filter: Cache append data operation failed." << dendl;
-  } else {
-    ldpp_dout(save_dpp, 20) << "D4N Filter: Cache append data operation succeeded." << dendl;
-  }*/
+    auto version = obj->get_instance();
 
-  return next->process(std::move(data), offset);
+    std::string prefix;
+    if (version.empty()) { //for versioned objects, get_oid() returns an oid with versionId added
+      prefix =obj->get_bucket()->get_name() + "_" + obj->get_key().get_oid();
+    } else {
+      prefix = obj->get_bucket()->get_name() + "_" + version + "_" + obj->get_key().get_oid();
+    }
+
+    rgw::d4n::BlockDirectory* blockDir = driver->get_block_dir();
+
+    block.hostsList.push_back(blockDir->cct->_conf->rgw_d4n_l1_datacache_address); 
+    block.cacheObj.bucketName = obj->get_bucket()->get_name();
+    block.cacheObj.objName = obj->get_key().get_oid();
+    block.cacheObj.dirty = dirty;
+    existing_block.cacheObj.objName = block.cacheObj.objName;
+    existing_block.cacheObj.bucketName = block.cacheObj.bucketName;
+
+
+    int ret = 0;
+
+    if (!d4n_writecache){
+      std::string oid = prefix + "_" + std::to_string(ofs)+ "_" + std::to_string(bl_len);
+      block.size = bl.length();
+      block.blockID = ofs;
+      block.dirty = false; //writing to the backend, hence the data is clean
+      block.hostsList.push_back(blockDir->cct->_conf->rgw_d4n_backend_address);
+       
+      ldpp_dout(save_dpp, 10) << "D4NFilterObject::D4NFilterWriteOp::" << __func__ << "(): calling next process" << dendl;
+      ret = next->process(std::move(data), offset);
+      if (ret == 0){
+        if (!blockDir->exist_key(&block, y)) {
+          if (blockDir->set(&block, y) < 0) 
+            ldpp_dout(save_dpp, 10) << "D4NFilterObject::D4NFilterWriteOp::" << __func__ << "(): BlockDirectory set method failed." << dendl;
+          } else {
+            existing_block.blockID = block.blockID;
+            existing_block.size = block.size;
+            if (blockDir->get(&existing_block, y) < 0) {
+              ldpp_dout(save_dpp, 10) << "Failed to fetch existing block for: " << existing_block.cacheObj.objName << " blockID: " << existing_block.blockID << " block size: " << existing_block.size << dendl;
+            } else {
+              if (existing_block.version != block.version) {
+                if (blockDir->del(&existing_block, y) < 0) //delete existing block
+                  ldpp_dout(save_dpp, 10) << "D4NFilterObject::D4NFilterWriteOp::" << __func__ << "(): BlockDirectory del method failed." << dendl;
+                if (blockDir->set(&block, y) < 0) //new versioned block will have new version, hostsList etc, how about globalWeight?
+                  ldpp_dout(save_dpp, 10) << "D4NFilterObject::D4NFilterWriteOp::" << __func__ << "(): BlockDirectory set method failed." << dendl;
+              } else {
+            if (blockDir->update_field(&block, "blockHosts", blockDir->cct->_conf->rgw_d4n_l1_datacache_address, y) < 0)
+              ldpp_dout(save_dpp, 10) << "D4NFilterObject::D4NFilterWriteOp::" << __func__ << "(): BlockDirectory update_field method failed for hostsList." << dendl;
+            }
+          }
+        }
+      } else{
+          ldpp_dout(save_dpp, 1) << "D4NFilterObject::D4NFilterWriteOp::process" << __func__ << "(): ERROR: writing data to the backend failed!" << dendl;
+         return ret;
+      }
+    } else {
+      std::string oid = prefix + "_" + std::to_string(ofs);
+      std::string key = "D_" + oid + "_" + std::to_string(bl_len);
+      std::string oid_in_cache = oid + "_" + std::to_string(bl_len);
+      block.size = bl.length();
+      block.blockID = ofs;
+      block.dirty = true;
+      block.hostsList.push_back(blockDir->cct->_conf->rgw_d4n_l1_datacache_address);
+      dirty = true;
+      ret = driver->get_policy_driver()->get_cache_policy()->eviction(save_dpp, block.size, y);
+      if (ret == 0) {
+        //Should we replace each put_async with put, to ensure data is actually written to the cache before updating the data structures and before the lock is released?
+       if (bl.length() > 0) {          
+          ret = driver->get_cache_driver()->put(save_dpp, key, bl, bl.length(), obj->get_attrs(), y);
+          if (ret == 0) {
+           driver->get_policy_driver()->get_cache_policy()->update(save_dpp, oid_in_cache, ofs, bl.length(), version, dirty, creationTime,  std::get<rgw_user>(obj->get_bucket()->get_owner()), y);
+            if (!blockDir->exist_key(&block, y)) {
+              if (blockDir->set(&block, y) < 0) //should we revert previous steps if this step fails?
+               ldpp_dout(save_dpp, 10) << "D4NFilterObject::D4NFilterWriteOp::" << __func__ << "(): BlockDirectory set method failed." << dendl;
+            } else {
+              existing_block.blockID = block.blockID;
+              existing_block.size = block.size;
+              if (blockDir->get(&existing_block, y) < 0) {
+                ldpp_dout(save_dpp, 10) << "Failed to fetch existing block for: " << existing_block.cacheObj.objName << " blockID: " << existing_block.blockID << " block size: " << existing_block.size << dendl;
+              } else {
+                if (existing_block.version != block.version) {
+                  if (blockDir->del(&existing_block, y) < 0) //delete existing block
+                    ldpp_dout(save_dpp, 10) << "D4NFilterObject::D4NFilterWriteOp::" << __func__ << "(): BlockDirectory del method failed." << dendl;
+                  if (blockDir->set(&block, y) < 0) //new versioned block will have new version, hostsList etc, how about globalWeight?
+                    ldpp_dout(save_dpp, 10) << "D4NFilterObject::D4NFilterWriteOp::" << __func__ << "(): BlockDirectory set method failed." << dendl;
+                } else {
+                  if (blockDir->update_field(&block, "blockHosts", blockDir->cct->_conf->rgw_d4n_l1_datacache_address, y) < 0)
+                    ldpp_dout(save_dpp, 10) << "D4NFilterObject::D4NFilterWriteOp::" << __func__ << "(): BlockDirectory update_field method failed for hostsList." << dendl;
+                }
+              }
+           }
+          } else {
+            ldpp_dout(save_dpp, 1) << "D4NFilterObject::D4NFilterWriteOp::process" << __func__ << "(): ERROR: writting data to the cache failed!" << dendl;
+           return ret;
+         }
+       }
+      }
+    } 
+    return 0;
 }
 
 int D4NFilterWriter::complete(size_t accounted_size, const std::string& etag,
@@ -961,17 +1104,37 @@ int D4NFilterWriter::complete(size_t accounted_size, const std::string& etag,
                        const req_context& rctx,
                        uint32_t flags)
 {
-  rgw::d4n::CacheObj object = rgw::d4n::CacheObj{
+  bool dirty = true;
+  std::vector<std::string> hostsList = {};
+  auto creationTime = startTime;
+  std::string objEtag = etag;
+
+  auto version = obj->get_instance();
+
+  std::string prefix;
+  if (version.empty()) { //for versioned objects, get_oid() returns an oid with versionId added
+    prefix = obj->get_bucket()->get_name() + "_" + obj->get_key().get_oid();
+  } else {
+    prefix = obj->get_bucket()->get_name() + "_" + version + "_" + obj->get_key().get_oid();
+  }
+
+
+  if (d4n_writecache){
+    driver->get_policy_driver()->get_cache_policy()->updateObj(save_dpp, prefix, version, dirty, accounted_size, creationTime, std::get<rgw_user>(obj->get_bucket()->get_owner()), objEtag, y);
+    hostsList = { driver->get_block_dir()->cct->_conf->rgw_d4n_l1_datacache_address };
+  
+    rgw::d4n::CacheObj object = rgw::d4n::CacheObj{
                                 .objName = obj->get_key().get_oid(), 
                                 .bucketName = obj->get_bucket()->get_name(),
-                                .creationTime = to_iso_8601(*mtime), 
-                                .dirty = false,
-                                .hostsList = { /*driver->get_block_dir()->cct->_conf->rgw_d4n_l1_datacache_address*/ } //TODO: Object is not currently being cached 
+                                .creationTime = std::to_string(creationTime), 
+                                .dirty = dirty,
+                                .hostsList = hostsList
                                };
 
-  if (driver->get_obj_dir()->set(&object, y) < 0) 
-    ldpp_dout(save_dpp, 10) << "D4NFilterWriter::" << __func__ << "(): ObjectDirectory set method failed." << dendl;
-   
+    if (driver->get_obj_dir()->set(&object, y) < 0) 
+      ldpp_dout(save_dpp, 10) << "D4NFilterWriter::" << __func__ << "(): ObjectDirectory set method failed." << dendl;
+    return 0;
+  }
   /* Retrieve complete set of attrs */
   int ret = next->complete(accounted_size, etag, mtime, set_mtime, attrs, cksum,
                        delete_at, if_match, if_nomatch, user_data, zones_trace,
@@ -984,7 +1147,7 @@ int D4NFilterWriter::complete(size_t accounted_size, const std::string& etag,
   buffer::list bl;
   obj->load_obj_state(save_dpp, rctx.y);
 
-  bl.append(to_iso_8601(obj->get_mtime()));
+  bl.append(std::to_string(creationTime));
   baseAttrs.insert({"mtime", bl});
   bl.clear();
 
index 7011f2a64bf123742995c9609aa50cd1907dc57c..27f4aa582fd17dcb5ae8b6388f6d72603b148c49 100644 (file)
@@ -212,6 +212,8 @@ class D4NFilterWriter : public FilterWriter {
     const DoutPrefixProvider* save_dpp;
     bool atomic;
     optional_yield y;
+    bool d4n_writecache;
+    time_t startTime;
 
   public:
     D4NFilterWriter(std::unique_ptr<Writer> _next, D4NFilterDriver* _driver, Object* _obj, 
index 9940d1a6d0e6a7d471250314c172ceba1d5b3c46..50886e51d430ac52f7c7b3f10534fb5add4e23f5 100644 (file)
@@ -25,6 +25,7 @@ class CacheDriver {
     virtual rgw::AioResultList put_async(const DoutPrefixProvider* dpp, optional_yield y, rgw::Aio* aio, const std::string& key, const bufferlist& bl, uint64_t len, const rgw::sal::Attrs& attrs, uint64_t cost, uint64_t id) = 0;
     virtual int append_data(const DoutPrefixProvider* dpp, const::std::string& key, const bufferlist& bl_data, optional_yield y) = 0;
     virtual int delete_data(const DoutPrefixProvider* dpp, const::std::string& key, optional_yield y) = 0;
+    virtual int rename(const DoutPrefixProvider* dpp, const::std::string& oldKey, const::std::string& newKey, optional_yield y) = 0;
     virtual int get_attrs(const DoutPrefixProvider* dpp, const std::string& key, rgw::sal::Attrs& attrs, optional_yield y) = 0;
     virtual int set_attrs(const DoutPrefixProvider* dpp, const std::string& key, const rgw::sal::Attrs& attrs, optional_yield y) = 0;
     virtual int update_attrs(const DoutPrefixProvider* dpp, const std::string& key, const rgw::sal::Attrs& attrs, optional_yield y) = 0;
index 5d7a5a97119c252fafd48c824cf7b9a21480f8ff..f46b9aac3cfcc98d96c7c2fd64a63f1d8a8e6cd6 100644 (file)
@@ -336,6 +336,20 @@ int SSDDriver::delete_data(const DoutPrefixProvider* dpp, const::std::string& ke
     return 0;
 }
 
+int SSDDriver::rename(const DoutPrefixProvider* dpp, const::std::string& oldKey, const::std::string& newKey, optional_yield y)
+{
+    std::string location = partition_info.location;
+
+    int ret = std::rename((location+oldKey).c_str(), (location+newKey).c_str());
+    if (ret < 0) {
+        ldpp_dout(dpp, 0) << "SSDDriver: ERROR: failed to rename the file: " << location+oldKey << dendl;
+        return ret;
+    }
+
+    return 0;
+}
+
+
 int SSDDriver::AsyncWriteRequest::prepare_libaio_write_op(const DoutPrefixProvider *dpp, bufferlist& bl, unsigned int len, std::string key, std::string cache_location)
 {
     std::string location = cache_location + key;
@@ -398,7 +412,7 @@ void SSDDriver::AsyncWriteRequest::libaio_write_cb(sigval sigval) {
 
     ldpp_dout(op.dpp, 20) << "INFO: AsyncWriteRequest::libaio_write_yield_cb: temp_key: " << op.temp_key << dendl;
 
-    ret = rename(old_path.c_str(), new_path.c_str());
+    ret = std::rename(old_path.c_str(), new_path.c_str());
     if (ret < 0) {
         ret = errno;
         ldpp_dout(op.dpp, 0) << "ERROR: put::rename: failed to rename file: " << ret << dendl;
index 9b2b4edb26a8feac8f03f3d1f0aad84a457e118c..e12d1646dc60c731923a0779371d26539ae1a1a7 100644 (file)
@@ -19,6 +19,7 @@ public:
   virtual rgw::AioResultList put_async(const DoutPrefixProvider* dpp, optional_yield y, rgw::Aio* aio, const std::string& key, const bufferlist& bl, uint64_t len, const rgw::sal::Attrs& attrs, uint64_t cost, uint64_t id) override;
   virtual int append_data(const DoutPrefixProvider* dpp, const::std::string& key, const bufferlist& bl_data, optional_yield y) override;
   virtual int delete_data(const DoutPrefixProvider* dpp, const::std::string& key, optional_yield y) override;
+  virtual int rename(const DoutPrefixProvider* dpp, const::std::string& oldKey, const::std::string& newKey, optional_yield y) override;
   virtual int get_attrs(const DoutPrefixProvider* dpp, const std::string& key, rgw::sal::Attrs& attrs, optional_yield y) override;
   virtual int set_attrs(const DoutPrefixProvider* dpp, const std::string& key, const rgw::sal::Attrs& attrs, optional_yield y) override;
   virtual int update_attrs(const DoutPrefixProvider* dpp, const std::string& key, const rgw::sal::Attrs& attrs, optional_yield y) override;