]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: buffer atomic put handler
authorYehuda Sadeh <yehuda@inktank.com>
Fri, 21 Jun 2013 04:29:05 +0000 (21:29 -0700)
committerYehuda Sadeh <yehuda@inktank.com>
Fri, 21 Jun 2013 04:57:40 +0000 (21:57 -0700)
Since we tied the atomic put handler to libcurl output
data, which uses much smaller chunks, we need to buffer
data, otherwise we'd end up with a huge amount of small
writes.

Signed-off-by: Yehuda Sadeh <yehuda@inktank.com>
src/rgw/rgw_rados.cc
src/rgw/rgw_rados.h

index 9115c6b5e889f92997804f63db339f0bd838351d..1c1194fd22d50d868ac8d8f9d6c359f75231cdf3 100644 (file)
@@ -612,13 +612,21 @@ int RGWPutObjProcessor_Aio::throttle_data(void *handle)
   return 0;
 }
 
+int RGWPutObjProcessor_Atomic::write_data(bufferlist& bl, off_t ofs, void **phandle)
+{
+  if (ofs >= next_part_ofs)
+    prepare_next_part(ofs);
+
+  return RGWPutObjProcessor_Aio::handle_obj_data(cur_obj, bl, ofs - cur_part_ofs, ofs, phandle);
+}
+
 int RGWPutObjProcessor_Atomic::handle_data(bufferlist& bl, off_t ofs, void **phandle) {
+  *phandle = NULL;
   if (extra_data_len) {
     size_t extra_len = bl.length();
     if (extra_len > extra_data_len)
       extra_len = extra_data_len;
 
-    /* is there a better way to split a bl into two bufferlists? */
     bufferlist extra;
     bl.splice(0, extra_len, &extra);
     extra_data_bl.append(extra);
@@ -629,19 +637,23 @@ int RGWPutObjProcessor_Atomic::handle_data(bufferlist& bl, off_t ofs, void **pha
     }
     ofs = extra_data_bl.length();
   }
-  off_t actual_ofs = ofs - extra_data_bl.length();
-  if (!actual_ofs && !immutable_head()) {
+
+  pending_data_bl.claim_append(bl);
+  if (pending_data_bl.length() < RGW_MAX_CHUNK_SIZE)
+    return 0;
+
+  pending_data_bl.splice(0, RGW_MAX_CHUNK_SIZE, &bl);
+
+  if (!data_ofs && !immutable_head()) {
     first_chunk.claim(bl);
-    *phandle = NULL;
     obj_len = (uint64_t)first_chunk.length();
     prepare_next_part(first_chunk.length());
+    data_ofs = obj_len;
     return 0;
   }
-  if (actual_ofs >= next_part_ofs)
-    prepare_next_part(actual_ofs);
-  int r = RGWPutObjProcessor_Aio::handle_obj_data(cur_obj, bl, actual_ofs - cur_part_ofs, actual_ofs, phandle);
-
-  return r;
+  off_t write_ofs = data_ofs;
+  data_ofs = write_ofs + bl.length();
+  return write_data(bl, write_ofs, phandle);
 }
 
 int RGWPutObjProcessor_Atomic::prepare(RGWRados *store, void *obj_ctx)
@@ -700,6 +712,19 @@ void RGWPutObjProcessor_Atomic::complete_parts()
 
 int RGWPutObjProcessor_Atomic::do_complete(string& etag, time_t *mtime, time_t set_mtime, map<string, bufferlist>& attrs)
 {
+  if (pending_data_bl.length()) {
+    void *handle;
+    int r = write_data(pending_data_bl, data_ofs, &handle);
+    if (r < 0) {
+      ldout(store->ctx(), 0) << "ERROR: write_data() returned " << r << dendl;
+      return r;
+    }
+    r = throttle_data(handle);
+    if (r < 0) {
+      ldout(store->ctx(), 0) << "ERROR: throttle_data() returned " << r << dendl;
+      return r;
+    }
+  }
   complete_parts();
 
   store->set_atomic(obj_ctx, head_obj);
index 866736fc816c40fa872e9c03fb699ea5b1d4704f..1a78c6d6a9f9c8d4e3239cd69a4e21b028eea954 100644 (file)
@@ -259,9 +259,11 @@ class RGWPutObjProcessor_Atomic : public RGWPutObjProcessor_Aio
   off_t cur_part_ofs;
   off_t next_part_ofs;
   int cur_part_id;
+  off_t data_ofs;
 
   uint64_t extra_data_len;
   bufferlist extra_data_bl;
+  bufferlist pending_data_bl;
 protected:
   rgw_bucket bucket;
   string obj_str;
@@ -275,6 +277,7 @@ protected:
 
   virtual bool immutable_head() { return false; }
 
+  int write_data(bufferlist& bl, off_t ofs, void **phandle);
   virtual int do_complete(string& etag, time_t *mtime, time_t set_mtime, map<string, bufferlist>& attrs);
 
   void prepare_next_part(off_t ofs);
@@ -286,6 +289,7 @@ public:
                                 cur_part_ofs(0),
                                 next_part_ofs(_p),
                                 cur_part_id(0),
+                                data_ofs(0),
                                 extra_data_len(0),
                                 bucket(_b),
                                 obj_str(_o),