]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: call processor->handle_data() again if needed
authorYehuda Sadeh <yehuda@redhat.com>
Sat, 26 Jul 2014 03:33:52 +0000 (20:33 -0700)
committerYehuda Sadeh <yehuda@redhat.com>
Sat, 2 Aug 2014 20:18:25 +0000 (13:18 -0700)
Fixes: #8937
Following the fix to #8928 we end up accumulating pending data that
needs to be written. Beforehand it was working fine because we were
feeding it with the exact amount of bytes we were writing.

Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
(cherry picked from commit 0553890e79b43414cc0ef97ceb694c1cb5f06bbb)

Conflicts:
src/rgw/rgw_rados.h

src/rgw/rgw_op.cc
src/rgw/rgw_rados.cc
src/rgw/rgw_rados.h

index 502929ea87ba9fcb5699618436ffa290c10e2501..c43ca146701f773b1aa2a511d376787930ec2669 100644 (file)
@@ -1563,6 +1563,36 @@ int RGWPutObj::user_manifest_iterate_cb(rgw_bucket& bucket, RGWObjEnt& ent, RGWA
   return 0;
 }
 
+static int put_data_and_throttle(RGWPutObjProcessor *processor, bufferlist& data, off_t ofs,
+                                 MD5 *hash, bool need_to_wait)
+{
+  const unsigned char *data_ptr = (hash ? (const unsigned char *)data.c_str() : NULL);
+  bool again;
+  uint64_t len = data.length();
+
+  do {
+    void *handle;
+
+    int ret = processor->handle_data(data, ofs, &handle, &again);
+    if (ret < 0)
+      return ret;
+
+    if (hash) {
+      hash->Update(data_ptr, len);
+      hash = NULL; /* only calculate hash once */
+    }
+
+    ret = processor->throttle_data(handle, false);
+    if (ret < 0)
+      return ret;
+
+    need_to_wait = false; /* the need to wait only applies to the first iteration */
+  } while (again);
+
+  return 0;
+}
+
+
 void RGWPutObj::execute()
 {
   RGWPutObjProcessor *processor = NULL;
@@ -1636,23 +1666,12 @@ void RGWPutObj::execute()
     if (!len)
       break;
 
-    void *handle;
-    const unsigned char *data_ptr = (const unsigned char *)data.c_str();
-
-    ret = processor->handle_data(data, ofs, &handle);
-    if (ret < 0)
-      goto done;
-
-    if (need_calc_md5) {
-      hash.Update(data_ptr, len);
-    }
-
     /* do we need this operation to be synchronous? if we're dealing with an object with immutable
      * head, e.g., multipart object we need to make sure we're the first one writing to this object
      */
     bool need_to_wait = (ofs == 0) && multipart;
 
-    ret = processor->throttle_data(handle, need_to_wait);
+    ret = put_data_and_throttle(processor, data, ofs, (need_calc_md5 ? &hash : NULL), need_to_wait);
     if (ret < 0) {
       if (!need_to_wait || ret != -EEXIST) {
         ldout(s->cct, 20) << "processor->thottle_data() returned ret=" << ret << dendl;
@@ -1677,15 +1696,8 @@ void RGWPutObj::execute()
         goto done;
       }
 
-      ret = processor->handle_data(data, ofs, &handle);
+      ret = put_data_and_throttle(processor, data, ofs, NULL, false);
       if (ret < 0) {
-        ldout(s->cct, 0) << "ERROR: processor->handle_data() returned " << ret << dendl;
-        goto done;
-      }
-
-      ret = processor->throttle_data(handle, false);
-      if (ret < 0) {
-        ldout(s->cct, 0) << "ERROR: processor->throttle_data() returned " << ret << dendl;
         goto done;
       }
     }
@@ -1849,18 +1861,7 @@ void RGWPostObj::execute()
      if (!len)
        break;
 
-     void *handle;
-     const unsigned char *data_ptr = (const unsigned char *)data.c_str();
-
-     ret = processor->handle_data(data, ofs, &handle);
-     if (ret < 0)
-       goto done;
-
-     hash.Update(data_ptr, len);
-
-     ret = processor->throttle_data(handle, false);
-     if (ret < 0)
-       goto done;
+     ret = put_data_and_throttle(processor, data, ofs, &hash, false);
 
      ofs += len;
 
index 5a62e738caec19f71ffbfe22ea394d34381309ee..e22bef0be04dbd3464567a5e52dd88546f2d5d17 100644 (file)
@@ -900,8 +900,10 @@ int RGWPutObjProcessor_Plain::prepare(RGWRados *store, void *obj_ctx, string *oi
   return 0;
 };
 
-int RGWPutObjProcessor_Plain::handle_data(bufferlist& bl, off_t _ofs, void **phandle)
+int RGWPutObjProcessor_Plain::handle_data(bufferlist& bl, off_t _ofs, void **phandle, bool *again)
 {
+  *again = false;
+
   if (ofs != _ofs)
     return -EINVAL;
 
@@ -1026,8 +1028,10 @@ int RGWPutObjProcessor_Atomic::write_data(bufferlist& bl, off_t ofs, void **phan
   return RGWPutObjProcessor_Aio::handle_obj_data(cur_obj, bl, ofs - cur_part_ofs, ofs, phandle, exclusive);
 }
 
-int RGWPutObjProcessor_Atomic::handle_data(bufferlist& bl, off_t ofs, void **phandle)
+int RGWPutObjProcessor_Atomic::handle_data(bufferlist& bl, off_t ofs, void **phandle, bool *again)
 {
+  *again = false;
+
   *phandle = NULL;
   if (extra_data_len) {
     size_t extra_len = bl.length();
@@ -1052,6 +1056,9 @@ int RGWPutObjProcessor_Atomic::handle_data(bufferlist& bl, off_t ofs, void **pha
 
   pending_data_bl.splice(0, max_write_size, &bl);
 
+  /* do we have enough data pending accumulated that needs to be written? */
+  *again = (pending_data_bl.length() >= max_chunk_size);
+
   if (!data_ofs && !immutable_head()) {
     first_chunk.claim(bl);
     obj_len = (uint64_t)first_chunk.length();
@@ -3006,25 +3013,33 @@ public:
   int handle_data(bufferlist& bl, off_t ofs, off_t len) {
     progress_cb(ofs, progress_data);
 
-    void *handle;
-    int ret = processor->handle_data(bl, ofs, &handle);
-    if (ret < 0)
-      return ret;
+    bool again;
 
-    if (opstate) {
-      /* need to update opstate repository with new state. This is ratelimited, so we're not
-       * really doing it every time
-       */
-      ret = opstate->renew_state();
-      if (ret < 0) {
-        /* could not renew state! might have been marked as cancelled */
+    bool need_opstate = true;
+
+    do {
+      void *handle;
+      int ret = processor->handle_data(bl, ofs, &handle, &again);
+      if (ret < 0)
         return ret;
+
+      if (need_opstate && opstate) {
+        /* need to update opstate repository with new state. This is ratelimited, so we're not
+         * really doing it every time
+         */
+        ret = opstate->renew_state();
+        if (ret < 0) {
+          /* could not renew state! might have been marked as cancelled */
+          return ret;
+        }
+
+        need_opstate = false;
       }
-    }
 
-    ret = processor->throttle_data(handle, false);
-    if (ret < 0)
-      return ret;
+      ret = processor->throttle_data(handle, false);
+      if (ret < 0)
+        return ret;
+    } while (again);
 
     return 0;
   }
index ba7e3eea2e8970d4e7c044a70d851266e673933c..d811b4996c0ce15a8b1959e53c5798edd1291547 100644 (file)
@@ -548,7 +548,7 @@ public:
     obj_ctx = _o;
     return 0;
   };
-  virtual int handle_data(bufferlist& bl, off_t ofs, void **phandle) = 0;
+  virtual int handle_data(bufferlist& bl, off_t ofs, void **phandle, bool *again) = 0;
   virtual int throttle_data(void *handle, bool need_to_wait) = 0;
   virtual int complete(string& etag, time_t *mtime, time_t set_mtime, map<string, bufferlist>& attrs);
 };
@@ -564,7 +564,7 @@ class RGWPutObjProcessor_Plain : public RGWPutObjProcessor
 
 protected:
   int prepare(RGWRados *store, void *obj_ctx, string *oid_rand);
-  int handle_data(bufferlist& bl, off_t ofs, void **phandle);
+  int handle_data(bufferlist& bl, off_t ofs, void **phandle, bool *again);
   int do_complete(string& etag, time_t *mtime, time_t set_mtime, map<string, bufferlist>& attrs);
 
 public:
@@ -654,7 +654,7 @@ public:
   void set_extra_data_len(uint64_t len) {
     extra_data_len = len;
   }
-  virtual int handle_data(bufferlist& bl, off_t ofs, void **phandle);
+  virtual int handle_data(bufferlist& bl, off_t ofs, void **phandle, bool *again);
   bufferlist& get_extra_data() { return extra_data_bl; }
 };