]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: configurable write obj window size
authorYehuda Sadeh <yehuda@redhat.com>
Fri, 20 Jan 2017 22:40:58 +0000 (14:40 -0800)
committerYehuda Sadeh <yehuda@redhat.com>
Mon, 6 Feb 2017 21:08:00 +0000 (13:08 -0800)
Fixes: http://tracker.ceph.com/issues/18623
Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
src/common/config_opts.h
src/rgw/rgw_common.h
src/rgw/rgw_op.h
src/rgw/rgw_rados.cc
src/rgw/rgw_rados.h

index 41cfa7dd4abcff8354e97487fdbae9c9955a3884..d194d1b5bfe1ceb9b6ec8523cf910f839fd2a805 100644 (file)
@@ -1366,6 +1366,8 @@ OPTION(nss_db_path, OPT_STR, "") // path to nss db
 
 
 OPTION(rgw_max_chunk_size, OPT_INT, 4 * 1024 * 1024)
+OPTION(rgw_put_obj_min_window_size, OPT_INT, 16 * 1024 * 1024)
+OPTION(rgw_put_obj_max_window_size, OPT_INT, 64 * 1024 * 1024)
 OPTION(rgw_max_put_size, OPT_U64, 5ULL*1024*1024*1024)
 
 /**
index b852520e4c6ecb7b1f34486b1aae4b7d0f5910e8..e3acf3c2059e0a4a5feec0e1ffc0ee35397732e9 100644 (file)
@@ -117,8 +117,6 @@ using ceph::crypto::MD5;
 
 #define RGW_BUCKETS_OBJ_SUFFIX ".buckets"
 
-#define RGW_MAX_PENDING_CHUNKS  16
-
 #define RGW_FORMAT_PLAIN        0
 #define RGW_FORMAT_XML          1
 #define RGW_FORMAT_JSON         2
index 44a693c10bb805abddb49fcde8ab9efba5fae79a..d1098c341e752ad658310cc0548b6825d61bef29 100644 (file)
@@ -779,8 +779,8 @@ public:
   virtual int handle_data(bufferlist& bl, off_t ofs, void **phandle, rgw_obj *pobj, bool *again) override {
     return next->handle_data(bl, ofs, phandle, pobj, again);
   }
-  virtual int throttle_data(void *handle, const rgw_obj& obj, bool need_to_wait) override {
-    return next->throttle_data(handle, obj, need_to_wait);
+  virtual int throttle_data(void *handle, const rgw_obj& obj, uint64_t size, bool need_to_wait) override {
+    return next->throttle_data(handle, obj, size, need_to_wait);
   }
 }; /* RGWPutObj_Filter */
 
@@ -1606,11 +1606,13 @@ static inline int put_data_and_throttle(RGWPutObjDataProcessor *processor,
     void *handle;
     rgw_obj obj;
 
+    uint64_t size = data.length();
+
     int ret = processor->handle_data(data, ofs, &handle, &obj, &again);
     if (ret < 0)
       return ret;
 
-    ret = processor->throttle_data(handle, obj, need_to_wait);
+    ret = processor->throttle_data(handle, obj, size, need_to_wait);
     if (ret < 0)
       return ret;
 
index 8757baf1c57864a34744a161cb9b38949fc3e614..7c9f6b881f3798f67867cef9c4e66c7a18faafee 100644 (file)
@@ -2287,6 +2287,7 @@ struct put_obj_aio_info RGWPutObjProcessor_Aio::pop_pending()
   struct put_obj_aio_info info;
   info = pending.front();
   pending.pop_front();
+  pending_size -= info.size;
   return info;
 }
 
@@ -2325,7 +2326,7 @@ int RGWPutObjProcessor_Aio::drain_pending()
   return ret;
 }
 
-int RGWPutObjProcessor_Aio::throttle_data(void *handle, const rgw_obj& obj, bool need_to_wait)
+int RGWPutObjProcessor_Aio::throttle_data(void *handle, const rgw_obj& obj, uint64_t size, bool need_to_wait)
 {
   bool _wait = need_to_wait;
 
@@ -2333,9 +2334,11 @@ int RGWPutObjProcessor_Aio::throttle_data(void *handle, const rgw_obj& obj, bool
     struct put_obj_aio_info info;
     info.handle = handle;
     info.obj = obj;
+    info.size = size;
+    pending_size += size;
     pending.push_back(info);
   }
-  size_t orig_size = pending.size();
+  size_t orig_size = pending_size;
 
   /* first drain complete IOs */
   while (pending_has_completed()) {
@@ -2347,12 +2350,16 @@ int RGWPutObjProcessor_Aio::throttle_data(void *handle, const rgw_obj& obj, bool
   }
 
   /* resize window in case messages are draining too fast */
-  if (orig_size - pending.size() >= max_chunks) {
-    max_chunks++;
+  if (orig_size - pending_size >= window_size) {
+    window_size += store->ctx()->_conf->rgw_max_chunk_size;
+    uint64_t max_window_size = store->ctx()->_conf->rgw_put_obj_max_window_size;
+    if (window_size > max_window_size) {
+      window_size = max_window_size;
+    }
   }
 
   /* now throttle. Note that need_to_wait should only affect the first IO operation */
-  if (pending.size() > max_chunks || _wait) {
+  if (pending_size > window_size || _wait) {
     int r = wait_pending_front();
     if (r < 0)
       return r;
@@ -2374,6 +2381,15 @@ int RGWPutObjProcessor_Atomic::write_data(bufferlist& bl, off_t ofs, void **phan
   return RGWPutObjProcessor_Aio::handle_obj_data(cur_obj, bl, ofs - cur_part_ofs, ofs, phandle, exclusive);
 }
 
+int RGWPutObjProcessor_Aio::prepare(RGWRados *store, string *oid_rand)
+{
+  RGWPutObjProcessor::prepare(store, oid_rand);
+
+  window_size = store->ctx()->_conf->rgw_put_obj_min_window_size;
+
+  return 0;
+}
+
 int RGWPutObjProcessor_Atomic::handle_data(bufferlist& bl, off_t ofs, void **phandle, rgw_obj *pobj, bool *again)
 {
   *phandle = NULL;
@@ -2415,7 +2431,7 @@ int RGWPutObjProcessor_Atomic::handle_data(bufferlist& bl, off_t ofs, void **pha
 
 int RGWPutObjProcessor_Atomic::prepare_init(RGWRados *store, string *oid_rand)
 {
-  RGWPutObjProcessor::prepare(store, oid_rand);
+  RGWPutObjProcessor_Aio::prepare(store, oid_rand);
 
   int r = store->get_max_chunk_size(bucket, &max_chunk_size);
   if (r < 0) {
@@ -2492,13 +2508,14 @@ int RGWPutObjProcessor_Atomic::complete_writing_data()
     }
     bufferlist bl;
     pending_data_bl.splice(0, max_write_size, &bl);
+    uint64_t write_len = bl.length();
     int r = write_data(bl, data_ofs, &handle, &obj, false);
     if (r < 0) {
       ldout(store->ctx(), 0) << "ERROR: write_data() returned " << r << dendl;
       return r;
     }
-    data_ofs += bl.length();
-    r = throttle_data(handle, obj, false);
+    data_ofs += write_len;
+    r = throttle_data(handle, obj, write_len, false);
     if (r < 0) {
       ldout(store->ctx(), 0) << "ERROR: throttle_data() returned " << r << dendl;
       return r;
@@ -6713,6 +6730,7 @@ public:
     do {
       void *handle = NULL;
       rgw_obj obj;
+      uint64_t size = bl.length();
       int ret = filter->handle_data(bl, ofs, &handle, &obj, &again);
       if (ret < 0)
         return ret;
@@ -6724,7 +6742,7 @@ public:
         ret = opstate->renew_state();
         if (ret < 0) {
           ldout(cct, 0) << "ERROR: RGWRadosPutObj::handle_data(): failed to renew op state ret=" << ret << dendl;
-          int r = filter->throttle_data(handle, obj, false);
+          int r = filter->throttle_data(handle, obj, size, false);
           if (r < 0) {
             ldout(cct, 0) << "ERROR: RGWRadosPutObj::handle_data(): processor->throttle_data() returned " << r << dendl;
           }
@@ -6735,7 +6753,7 @@ public:
         need_opstate = false;
       }
 
-      ret = filter->throttle_data(handle, obj, false);
+      ret = filter->throttle_data(handle, obj, size, false);
       if (ret < 0)
         return ret;
     } while (again);
@@ -7648,7 +7666,7 @@ int RGWRados::copy_obj_data(RGWObjectCtx& obj_ctx,
       if (ret < 0) {
         return ret;
       }
-      ret = processor.throttle_data(handle, obj, false);
+      ret = processor.throttle_data(handle, obj, end - ofs + 1, false);
       if (ret < 0)
         return ret;
     } while (again);
index 9d820ff7f00dcc45cab4b08abeddb427e2f6b022..8cea3ad63e0c7d1f385b40a8cb9b5a8b190fbf5d 100644 (file)
@@ -3331,7 +3331,7 @@ public:
   RGWPutObjDataProcessor(){}
   virtual ~RGWPutObjDataProcessor(){}
   virtual int handle_data(bufferlist& bl, off_t ofs, void **phandle, rgw_obj *pobj, bool *again) = 0;
-  virtual int throttle_data(void *handle, const rgw_obj& obj, bool need_to_wait) = 0;
+  virtual int throttle_data(void *handle, const rgw_obj& obj, uint64_t size, bool need_to_wait) = 0;
 }; /* RGWPutObjDataProcessor */
 
 
@@ -3374,12 +3374,16 @@ public:
 struct put_obj_aio_info {
   void *handle;
   rgw_obj obj;
+  uint64_t size;
 };
 
+#define RGW_PUT_OBJ_MIN_WINDOW_SIZE_DEFAULT (16 * 1024 * 1024)
+
 class RGWPutObjProcessor_Aio : public RGWPutObjProcessor
 {
   list<struct put_obj_aio_info> pending;
-  size_t max_chunks;
+  uint64_t window_size{RGW_PUT_OBJ_MIN_WINDOW_SIZE_DEFAULT};
+  uint64_t pending_size{0};
 
   struct put_obj_aio_info pop_pending();
   int wait_pending_front();
@@ -3388,7 +3392,7 @@ class RGWPutObjProcessor_Aio : public RGWPutObjProcessor
   rgw_obj last_written_obj;
 
 protected:
-  uint64_t obj_len;
+  uint64_t obj_len{0};
 
   set<rgw_obj> written_objs;
 
@@ -3400,9 +3404,10 @@ protected:
   int handle_obj_data(rgw_obj& obj, bufferlist& bl, off_t ofs, off_t abs_ofs, void **phandle, bool exclusive);
 
 public:
-  int throttle_data(void *handle, const rgw_obj& obj, bool need_to_wait);
+  int prepare(RGWRados *store, string *oid_rand);
+  int throttle_data(void *handle, const rgw_obj& obj, uint64_t size, bool need_to_wait);
 
-  RGWPutObjProcessor_Aio(RGWObjectCtx& obj_ctx, RGWBucketInfo& bucket_info) : RGWPutObjProcessor(obj_ctx, bucket_info), max_chunks(RGW_MAX_PENDING_CHUNKS), obj_len(0) {}
+  RGWPutObjProcessor_Aio(RGWObjectCtx& obj_ctx, RGWBucketInfo& bucket_info) : RGWPutObjProcessor(obj_ctx, bucket_info) {}
   virtual ~RGWPutObjProcessor_Aio();
 }; /* RGWPutObjProcessor_Aio */