]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: call processor->handle_data() again if needed
authorYehuda Sadeh <yehuda@redhat.com>
Sat, 26 Jul 2014 03:33:52 +0000 (20:33 -0700)
committerYehuda Sadeh <yehuda@redhat.com>
Mon, 4 Aug 2014 16:52:51 +0000 (09:52 -0700)
Fixes: #8937
Following the fix to #8928 we end up accumulating pending data that
needs to be written. Beforehand it was working fine because we were
feeding it with the exact amount of bytes we were writing.

Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
(cherry picked from commit 0553890e79b43414cc0ef97ceb694c1cb5f06bbb)

src/rgw/rgw_op.cc
src/rgw/rgw_rados.cc
src/rgw/rgw_rados.h

index 02d7c504f73a76523757b7fa69c0a5c908bf09f9..0bc6f71725ba0c19440f01c68a8e89aa706fc221 100644 (file)
@@ -1564,6 +1564,36 @@ int RGWPutObj::user_manifest_iterate_cb(rgw_bucket& bucket, RGWObjEnt& ent, RGWA
   return 0;
 }
 
+static int put_data_and_throttle(RGWPutObjProcessor *processor, bufferlist& data, off_t ofs,
+                                 MD5 *hash, bool need_to_wait)
+{
+  const unsigned char *data_ptr = (hash ? (const unsigned char *)data.c_str() : NULL);
+  bool again;
+  uint64_t len = data.length();
+
+  do {
+    void *handle;
+
+    int ret = processor->handle_data(data, ofs, &handle, &again);
+    if (ret < 0)
+      return ret;
+
+    if (hash) {
+      hash->Update(data_ptr, len);
+      hash = NULL; /* only calculate hash once */
+    }
+
+    ret = processor->throttle_data(handle, false);
+    if (ret < 0)
+      return ret;
+
+    need_to_wait = false; /* the need to wait only applies to the first iteration */
+  } while (again);
+
+  return 0;
+}
+
+
 void RGWPutObj::execute()
 {
   RGWPutObjProcessor *processor = NULL;
@@ -1637,23 +1667,12 @@ void RGWPutObj::execute()
     if (!len)
       break;
 
-    void *handle;
-    const unsigned char *data_ptr = (const unsigned char *)data.c_str();
-
-    ret = processor->handle_data(data, ofs, &handle);
-    if (ret < 0)
-      goto done;
-
-    if (need_calc_md5) {
-      hash.Update(data_ptr, len);
-    }
-
     /* do we need this operation to be synchronous? if we're dealing with an object with immutable
      * head, e.g., multipart object we need to make sure we're the first one writing to this object
      */
     bool need_to_wait = (ofs == 0) && multipart;
 
-    ret = processor->throttle_data(handle, need_to_wait);
+    ret = put_data_and_throttle(processor, data, ofs, (need_calc_md5 ? &hash : NULL), need_to_wait);
     if (ret < 0) {
       if (!need_to_wait || ret != -EEXIST) {
         ldout(s->cct, 20) << "processor->thottle_data() returned ret=" << ret << dendl;
@@ -1678,15 +1697,8 @@ void RGWPutObj::execute()
         goto done;
       }
 
-      ret = processor->handle_data(data, ofs, &handle);
+      ret = put_data_and_throttle(processor, data, ofs, NULL, false);
       if (ret < 0) {
-        ldout(s->cct, 0) << "ERROR: processor->handle_data() returned " << ret << dendl;
-        goto done;
-      }
-
-      ret = processor->throttle_data(handle, false);
-      if (ret < 0) {
-        ldout(s->cct, 0) << "ERROR: processor->throttle_data() returned " << ret << dendl;
         goto done;
       }
     }
@@ -1850,18 +1862,7 @@ void RGWPostObj::execute()
      if (!len)
        break;
 
-     void *handle;
-     const unsigned char *data_ptr = (const unsigned char *)data.c_str();
-
-     ret = processor->handle_data(data, ofs, &handle);
-     if (ret < 0)
-       goto done;
-
-     hash.Update(data_ptr, len);
-
-     ret = processor->throttle_data(handle, false);
-     if (ret < 0)
-       goto done;
+     ret = put_data_and_throttle(processor, data, ofs, &hash, false);
 
      ofs += len;
 
index c5b558b22cbe79979edfa7019ba90117a95819e0..65e5de923460905b4673e68ed27fa6b0c0ee87eb 100644 (file)
@@ -900,8 +900,10 @@ int RGWPutObjProcessor_Plain::prepare(RGWRados *store, void *obj_ctx, string *oi
   return 0;
 }
 
-int RGWPutObjProcessor_Plain::handle_data(bufferlist& bl, off_t _ofs, void **phandle)
+int RGWPutObjProcessor_Plain::handle_data(bufferlist& bl, off_t _ofs, void **phandle, bool *again)
 {
+  *again = false;
+
   if (ofs != _ofs)
     return -EINVAL;
 
@@ -1026,8 +1028,10 @@ int RGWPutObjProcessor_Atomic::write_data(bufferlist& bl, off_t ofs, void **phan
   return RGWPutObjProcessor_Aio::handle_obj_data(cur_obj, bl, ofs - cur_part_ofs, ofs, phandle, exclusive);
 }
 
-int RGWPutObjProcessor_Atomic::handle_data(bufferlist& bl, off_t ofs, void **phandle)
+int RGWPutObjProcessor_Atomic::handle_data(bufferlist& bl, off_t ofs, void **phandle, bool *again)
 {
+  *again = false;
+
   *phandle = NULL;
   if (extra_data_len) {
     size_t extra_len = bl.length();
@@ -1052,6 +1056,9 @@ int RGWPutObjProcessor_Atomic::handle_data(bufferlist& bl, off_t ofs, void **pha
 
   pending_data_bl.splice(0, max_write_size, &bl);
 
+  /* do we have enough data pending accumulated that needs to be written? */
+  *again = (pending_data_bl.length() >= max_chunk_size);
+
   if (!data_ofs && !immutable_head()) {
     first_chunk.claim(bl);
     obj_len = (uint64_t)first_chunk.length();
@@ -3023,25 +3030,33 @@ public:
   int handle_data(bufferlist& bl, off_t ofs, off_t len) {
     progress_cb(ofs, progress_data);
 
-    void *handle;
-    int ret = processor->handle_data(bl, ofs, &handle);
-    if (ret < 0)
-      return ret;
+    bool again;
 
-    if (opstate) {
-      /* need to update opstate repository with new state. This is ratelimited, so we're not
-       * really doing it every time
-       */
-      ret = opstate->renew_state();
-      if (ret < 0) {
-        /* could not renew state! might have been marked as cancelled */
+    bool need_opstate = true;
+
+    do {
+      void *handle;
+      int ret = processor->handle_data(bl, ofs, &handle, &again);
+      if (ret < 0)
         return ret;
+
+      if (need_opstate && opstate) {
+        /* need to update opstate repository with new state. This is ratelimited, so we're not
+         * really doing it every time
+         */
+        ret = opstate->renew_state();
+        if (ret < 0) {
+          /* could not renew state! might have been marked as cancelled */
+          return ret;
+        }
+
+        need_opstate = false;
       }
-    }
 
-    ret = processor->throttle_data(handle, false);
-    if (ret < 0)
-      return ret;
+      ret = processor->throttle_data(handle, false);
+      if (ret < 0)
+        return ret;
+    } while (again);
 
     return 0;
   }
index 6b93b98f9b2e46322183eab169eebda334d40192..2792352b8e80e88f3ce2aa870819a15a09abbe8d 100644 (file)
@@ -556,7 +556,7 @@ public:
     obj_ctx = _o;
     return 0;
   }
-  virtual int handle_data(bufferlist& bl, off_t ofs, void **phandle) = 0;
+  virtual int handle_data(bufferlist& bl, off_t ofs, void **phandle, bool *again) = 0;
   virtual int throttle_data(void *handle, bool need_to_wait) = 0;
   virtual int complete(string& etag, time_t *mtime, time_t set_mtime, map<string, bufferlist>& attrs);
 };
@@ -572,7 +572,7 @@ class RGWPutObjProcessor_Plain : public RGWPutObjProcessor
 
 protected:
   int prepare(RGWRados *store, void *obj_ctx, string *oid_rand);
-  int handle_data(bufferlist& bl, off_t ofs, void **phandle);
+  int handle_data(bufferlist& bl, off_t ofs, void **phandle, bool *again);
   int do_complete(string& etag, time_t *mtime, time_t set_mtime, map<string, bufferlist>& attrs);
 
 public:
@@ -662,7 +662,7 @@ public:
   void set_extra_data_len(uint64_t len) {
     extra_data_len = len;
   }
-  virtual int handle_data(bufferlist& bl, off_t ofs, void **phandle);
+  virtual int handle_data(bufferlist& bl, off_t ofs, void **phandle, bool *again);
   bufferlist& get_extra_data() { return extra_data_bl; }
 };