]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: use adapting window for put obj
authorYehuda Sadeh <yehuda@hq.newdream.net>
Tue, 21 Jun 2011 18:22:47 +0000 (11:22 -0700)
committerYehuda Sadeh <yehuda@hq.newdream.net>
Tue, 21 Jun 2011 18:22:47 +0000 (11:22 -0700)
src/rgw/rgw_access.h
src/rgw/rgw_op.cc
src/rgw/rgw_rados.cc
src/rgw/rgw_rados.h

index 95edf0e7c625a5ff49ce48d4c8a51ecaf808145f..93a1b0bb8543534d3d9936a0b01aab8692bd432f 100644 (file)
@@ -69,6 +69,8 @@ public:
   }
   virtual int aio_wait(void *handle) { return -ENOTSUP; }
 
+  virtual bool aio_completed(void *handle) { return false; }
+
   /**
    * Copy an object.
    * id: unused (well, it's passed to put_obj)
index 508fc528058dfa072167404dc4b8a8cb2dc76790..dd2a9b89e977fe1e8039fc17a1788e6b1a69e7c3 100644 (file)
@@ -437,27 +437,50 @@ struct put_obj_aio_info {
   void *handle;
 };
 
+static struct put_obj_aio_info pop_pending(std::list<struct put_obj_aio_info>& pending)
+{
+  struct put_obj_aio_info info;
+  info = pending.front();
+  pending.pop_front();
+  return info;
+}
+
+static int wait_pending_front(std::list<struct put_obj_aio_info>& pending)
+{
+  struct put_obj_aio_info info = pop_pending(pending);
+  int ret = rgwstore->aio_wait(info.handle);
+  free(info.data);
+  return ret;
+}
+
+static bool pending_has_completed(std::list<struct put_obj_aio_info>& pending)
+{
+  if (pending.size() == 0)
+    return false;
+
+  struct put_obj_aio_info& info = pending.front();
+  return rgwstore->aio_completed(info.handle);
+}
+
 static int drain_pending(std::list<struct put_obj_aio_info>& pending)
 {
   int ret = 0;
   while (!pending.empty()) {
-   struct put_obj_aio_info info = pending.front();
-   int r = rgwstore->aio_wait(info.handle);
-   free(info.data);
-   if (r < 0)
-     ret = r;
-
-   pending.pop_front();
+    int r = wait_pending_front(pending);
+    if (r < 0)
+      ret = r;
   }
   return ret;
 }
 
+
 void RGWPutObj::execute()
 {
   bool multipart;
   string multipart_meta_obj;
   string part_num;
   list<struct put_obj_aio_info> pending;
+  size_t max_chunks = RGW_MAX_PENDING_CHUNKS;
 
   ret = -EINVAL;
   if (!s->object) {
@@ -525,6 +548,7 @@ void RGWPutObj::execute()
       get_data();
       if (len > 0) {
         struct put_obj_aio_info info;
+        size_t orig_size;
        // For the first call to put_obj_data, pass -1 as the offset to
        // do a write_full.
         void *handle;
@@ -538,11 +562,20 @@ void RGWPutObj::execute()
         info.handle = handle;
         info.data = data;
         pending.push_back(info);
-        if (pending.size() > RGW_MAX_PENDING_CHUNKS) {
-          info = pending.front();
-          pending.pop_front();
-          ret = rgwstore->aio_wait(info.handle);
-          free(info.data);
+        orig_size = pending.size();
+        while (pending_has_completed(pending)) {
+          ret = wait_pending_front(pending);
+          if (ret < 0)
+            goto done;
+
+        }
+
+        /* resize window in case messages are draining too fast */
+        if (orig_size - pending.size() >= max_chunks)
+          max_chunks++;
+
+        if (pending.size() > max_chunks) {
+          ret = wait_pending_front(pending);
           if (ret < 0)
             goto done;
         }
index 1627b4baeef509d81b859841fcffbfe9dcf4cea4..5a971c5f6f9ac4c0da392604a9dfa5c7ece19006 100644 (file)
@@ -398,6 +398,11 @@ int RGWRados::aio_wait(void *handle)
   return ret;
 }
 
+bool RGWRados::aio_completed(void *handle)
+{
+  AioCompletion *c = (AioCompletion *)handle;
+  return c->is_complete();
+}
 /**
  * Copy an object.
  * id: unused (well, it's passed to put_obj)
index a6e2d5795d6ddd50d344dfe4425811ecb2ab5bf6..4ea23367905baf20c07e296412f52f7349d8468c 100644 (file)
@@ -51,6 +51,7 @@ public:
   virtual int aio_put_obj_data(std::string& id, rgw_obj& obj, const char *data,
                                off_t ofs, size_t len, void **handle);
   virtual int aio_wait(void *handle);
+  virtual bool aio_completed(void *handle);
   virtual int clone_range(rgw_obj& dst_obj, off_t dst_ofs,
                           rgw_obj& src_obj, off_t src_ofs, size_t size);
   /** Copy an object, with many extra options */