]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: rest_client: work towards throttling of http read requests
authorYehuda Sadeh <yehuda@redhat.com>
Fri, 3 Nov 2017 22:57:56 +0000 (15:57 -0700)
committerYehuda Sadeh <yehuda@redhat.com>
Tue, 10 Apr 2018 15:05:39 +0000 (08:05 -0700)
Adjust the interfaces to provide the ability for the read callback
to pause the reads. While doing that, define a new class interface
for this instead of RGWGetDataCB. This had a butterfly effect that
required modifications to the obj read filters, but the end result
is a bit cleaner.

Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
17 files changed:
src/rgw/rgw_compression.cc
src/rgw/rgw_compression.h
src/rgw/rgw_cr_rest.cc
src/rgw/rgw_crypt.cc
src/rgw/rgw_crypt.h
src/rgw/rgw_http_client.cc
src/rgw/rgw_http_client.h
src/rgw/rgw_op.cc
src/rgw/rgw_op.h
src/rgw/rgw_rados.cc
src/rgw/rgw_rados.h
src/rgw/rgw_rest_client.cc
src/rgw/rgw_rest_client.h
src/rgw/rgw_rest_conn.cc
src/rgw/rgw_rest_conn.h
src/rgw/rgw_rest_s3.cc
src/rgw/rgw_rest_s3.h

index 9e3c57be93505edc11cd7babc15e1b3bc4276722..3ed47492c9f6e0eb424646da567441f17bdcb90f 100644 (file)
@@ -53,7 +53,7 @@ int RGWPutObj_Compress::handle_data(bufferlist& bl, off_t ofs, void **phandle, r
 RGWGetObj_Decompress::RGWGetObj_Decompress(CephContext* cct_, 
                                            RGWCompressionInfo* cs_info_, 
                                            bool partial_content_,
-                                           RGWGetDataCB* next): RGWGetObj_Filter(next),
+                                           RGWGetObj_Filter* next): RGWGetObj_Filter(next),
                                                                 cct(cct_),
                                                                 cs_info(cs_info_),
                                                                 partial_content(partial_content_),
index 721c510f02ab538be96bdb36ff30ea19ae644ecf..b95f91954a297a24e281954c2c89c4556e19929c 100644 (file)
@@ -23,7 +23,7 @@ public:
   RGWGetObj_Decompress(CephContext* cct_, 
                        RGWCompressionInfo* cs_info_, 
                        bool partial_content_,
-                       RGWGetDataCB* next);
+                       RGWGetObj_Filter* next);
   ~RGWGetObj_Decompress() override {}
 
   int handle_data(bufferlist& bl, off_t bl_ofs, off_t bl_len) override;
index 3d5fae456da86ee5659e779b214322ebfd8c0d57..da812a9bd5c3728d53b77d1e6e5262cd3c25188a 100644 (file)
@@ -10,7 +10,7 @@
 #define dout_context g_ceph_context
 #define dout_subsys ceph_subsys_rgw
 
-class RGWCRHTTPGetDataCB : public RGWGetDataCB {
+class RGWCRHTTPGetDataCB : public RGWHTTPStreamRWRequest::ReceiveCB {
   Mutex lock;
   RGWCoroutinesEnv *env;
   RGWCoroutine *cr;
@@ -21,12 +21,14 @@ class RGWCRHTTPGetDataCB : public RGWGetDataCB {
 public:
   RGWCRHTTPGetDataCB(RGWCoroutinesEnv *_env, RGWCoroutine *_cr, const rgw_io_id& _io_id) : lock("RGWCRHTTPGetDataCB"), env(_env), cr(_cr), io_id(_io_id) {}
 
-  int handle_data(bufferlist& bl, off_t bl_ofs, off_t bl_len) override {
+  int handle_data(bufferlist& bl, bool *pause) override {
     {
+      uint64_t bl_len = bl.length();
+
       Mutex::Locker l(lock);
 
       if (!got_all_extra_data) {
-        off_t max = extra_data_len - extra_data.length();
+        uint64_t max = extra_data_len - extra_data.length();
         if (max > bl_len) {
           max = bl_len;
         }
@@ -35,15 +37,11 @@ public:
         got_all_extra_data = extra_data.length() == extra_data_len;
       }
 
-      if (bl_len == bl.length()) {
-        data.append(bl);
-      } else {
-        bl.splice(0, bl_len, &data);
-      }
+      data.append(bl);
     }
 
 #define GET_DATA_WINDOW_SIZE 1 * 1024 * 1024
-    if (bl.length() >= GET_DATA_WINDOW_SIZE) {
+    if (data.length() >= GET_DATA_WINDOW_SIZE) {
       env->manager->io_complete(cr, io_id);
     }
     return 0;
index f8d6f1095e4151930506aea60549113719d8b4bf..03dcf6df49dfb0c1cdfe7fa1af447ed4a82484b8 100644 (file)
@@ -516,7 +516,7 @@ bool AES_256_ECB_encrypt(CephContext* cct,
 
 
 RGWGetObj_BlockDecrypt::RGWGetObj_BlockDecrypt(CephContext* cct,
-                                               RGWGetDataCB* next,
+                                               RGWGetObj_Filter* next,
                                                std::unique_ptr<BlockCrypt> crypt):
     RGWGetObj_Filter(next),
     cct(cct),
index 77f07f8f2fe2ead5dea00d57296c1068ac7bd3fb..db88d835ec056814e6bbd07ad42360dcd4ef2941 100644 (file)
@@ -96,7 +96,7 @@ class RGWGetObj_BlockDecrypt : public RGWGetObj_Filter {
   std::vector<size_t> parts_len; /**< size of parts of multipart object, parsed from manifest */
 public:
   RGWGetObj_BlockDecrypt(CephContext* cct,
-                         RGWGetDataCB* next,
+                         RGWGetObj_Filter* next,
                          std::unique_ptr<BlockCrypt> crypt);
   virtual ~RGWGetObj_BlockDecrypt();
 
index 757c6a5c7bffb243ad6bdab6d217638e1fb89125..fc884668dd126d734cb5f7f50b50718421a02199 100644 (file)
@@ -275,63 +275,6 @@ void RGWIOProvider::assign_io(RGWIOIDProvider& io_id_provider, int io_type)
   }
 }
 
-/*
- * the simple set of callbacks will be called on RGWHTTPClient::process()
- */
-/* Static methods - callbacks for libcurl. */
-size_t RGWHTTPClient::simple_receive_http_header(void * const ptr,
-                                                 const size_t size,
-                                                 const size_t nmemb,
-                                                 void * const _info)
-{
-  RGWHTTPClient *client = static_cast<RGWHTTPClient *>(_info);
-  const size_t len = size * nmemb;
-  int ret = client->receive_header(ptr, size * nmemb);
-  if (ret < 0) {
-    dout(0) << "WARNING: client->receive_header() returned ret="
-            << ret << dendl;
-  }
-
-  return len;
-}
-
-size_t RGWHTTPClient::simple_receive_http_data(void * const ptr,
-                                               const size_t size,
-                                               const size_t nmemb,
-                                               void * const _info)
-{
-  RGWHTTPClient *client = static_cast<RGWHTTPClient *>(_info);
-  const size_t len = size * nmemb;
-  int ret = client->receive_data(ptr, size * nmemb);
-  if (ret < 0) {
-    dout(0) << "WARNING: client->receive_data() returned ret="
-            << ret << dendl;
-  }
-
-  return len;
-}
-
-size_t RGWHTTPClient::simple_send_http_data(void * const ptr,
-                                            const size_t size,
-                                            const size_t nmemb,
-                                            void * const _info)
-{
-  RGWHTTPClient *client = static_cast<RGWHTTPClient *>(_info);
-  bool pause = false;
-  int ret = client->send_data(ptr, size * nmemb, &pause);
-  if (ret < 0) {
-    dout(0) << "WARNING: client->send_data() returned ret="
-            << ret << dendl;
-  }
-
-  if (ret == 0 &&
-      pause) {
-    return CURL_READFUNC_PAUSE;
-  }
-
-  return ret;
-}
-
 /*
  * the following set of callbacks will be called either on RGWHTTPManager::process(),
  * or via the RGWHTTPManager async processing.
@@ -366,17 +309,34 @@ size_t RGWHTTPClient::receive_http_data(void * const ptr,
   rgw_http_req_data *req_data = static_cast<rgw_http_req_data *>(_info);
   size_t len = size * nmemb;
 
+  bool pause = false;
+
+  size_t skip_bytes = req_data->client->receive_pause_skip;
+
+  if (skip_bytes >= len) {
+    skip_bytes -= len;
+    return len;
+  }
+
   Mutex::Locker l(req_data->lock);
   
   if (!req_data->registered) {
     return len;
   }
   
-  int ret = req_data->client->receive_data(ptr, size * nmemb);
+  int ret = req_data->client->receive_data((char *)ptr + skip_bytes, len - skip_bytes, &pause);
   if (ret < 0) {
     dout(0) << "WARNING: client->receive_data() returned ret=" << ret << dendl;
   }
 
+  skip_bytes = 0;
+  if (pause) {
+    skip_bytes = len;
+  } else {
+    skip_bytes = 0;
+  }
+
   return len;
 }
 
index ad1301531fa5ac406d9c3a886d99d5020923c5a7..e276d4d6fe074618e7eacf3eb0344bed014f4129 100644 (file)
@@ -68,6 +68,8 @@ class RGWHTTPClient : public RGWIOProvider
   size_t send_len;
   bool has_send_len;
   long http_status;
+  size_t receive_pause_skip{0}; /* how many bytes to skip next time receive_data is called
+                                   due to being paused */
 
   void *user_info{nullptr};
 
@@ -93,7 +95,7 @@ protected:
   virtual int receive_header(void *ptr, size_t len) {
     return 0;
   }
-  virtual int receive_data(void *ptr, size_t len) {
+  virtual int receive_data(void *ptr, size_t len, bool *pause) {
     return 0;
   }
   virtual int send_data(void *ptr, size_t len, bool *pause) {
@@ -104,28 +106,16 @@ protected:
   }
 
   /* Callbacks for libcurl. */
-  static size_t simple_receive_http_header(void *ptr,
-                                           size_t size,
-                                           size_t nmemb,
-                                           void *_info);
   static size_t receive_http_header(void *ptr,
                                     size_t size,
                                     size_t nmemb,
                                     void *_info);
 
-  static size_t simple_receive_http_data(void *ptr,
-                                         size_t size,
-                                         size_t nmemb,
-                                         void *_info);
   static size_t receive_http_data(void *ptr,
                                   size_t size,
                                   size_t nmemb,
                                   void *_info);
 
-  static size_t simple_send_http_data(void *ptr,
-                                      size_t size,
-                                      size_t nmemb,
-                                      void *_info);
   static size_t send_http_data(void *ptr,
                                size_t size,
                                size_t nmemb,
@@ -231,14 +221,6 @@ public:
 protected:
   int receive_header(void *ptr, size_t len) override;
 
-  int receive_data(void *ptr, size_t len) override {
-    return 0;
-  }
-
-  int send_data(void *ptr, size_t len) override {
-    return 0;
-  }
-
 private:
   const std::set<header_name_t, ltstr_nocase> relevant_headers;
   std::map<header_name_t, header_value_t, ltstr_nocase> found_headers;
@@ -280,7 +262,7 @@ public:
 protected:
   int send_data(void* ptr, size_t len) override;
 
-  int receive_data(void *ptr, size_t len) override {
+  int receive_data(void *ptr, size_t len, bool *pause) override {
     read_bl->append((char *)ptr, len);
     return 0;
   }
index 4028079fa0779b3d42c28f24247036b2611aceb0..bf06110bb8d4cf39ba201e49c89d61bb288eaf6b 100644 (file)
@@ -1135,7 +1135,7 @@ int RGWGetObj::read_user_manifest_part(rgw_bucket& bucket,
 {
   ldout(s->cct, 20) << "user manifest obj=" << ent.key.name << "[" << ent.key.instance << "]" << dendl;
   RGWGetObj_CB cb(this);
-  RGWGetDataCB* filter = &cb;
+  RGWGetObj_Filter* filter = &cb;
   boost::optional<RGWGetObj_Decompress> decompress;
 
   int64_t cur_ofs = start_ofs;
@@ -1719,9 +1719,9 @@ void RGWGetObj::execute()
   int64_t ofs_x, end_x;
 
   RGWGetObj_CB cb(this);
-  RGWGetDataCB* filter = (RGWGetDataCB*)&cb;
+  RGWGetObj_Filter* filter = (RGWGetObj_Filter *)&cb;
   boost::optional<RGWGetObj_Decompress> decompress;
-  std::unique_ptr<RGWGetDataCB> decrypt;
+  std::unique_ptr<RGWGetObj_Filter> decrypt;
   map<string, bufferlist>::iterator attr_iter;
 
   perfcounter->inc(l_rgw_get);
@@ -3309,7 +3309,7 @@ void RGWPutObj::pre_exec()
   rgw_bucket_object_pre_exec(s);
 }
 
-class RGWPutObj_CB : public RGWGetDataCB
+class RGWPutObj_CB : public RGWGetObj_Filter
 {
   RGWPutObj *op;
 public:
@@ -3334,9 +3334,9 @@ int RGWPutObj::get_data_cb(bufferlist& bl, off_t bl_ofs, off_t bl_len)
 int RGWPutObj::get_data(const off_t fst, const off_t lst, bufferlist& bl)
 {
   RGWPutObj_CB cb(this);
-  RGWGetDataCB* filter = &cb;
+  RGWGetObj_Filter* filter = &cb;
   boost::optional<RGWGetObj_Decompress> decompress;
-  std::unique_ptr<RGWGetDataCB> decrypt;
+  std::unique_ptr<RGWGetObj_Filter> decrypt;
   RGWCompressionInfo cs_info;
   map<string, bufferlist> attrs;
   map<string, bufferlist>::iterator attr_iter;
index 952fe36d1643b4db49a05c3d9f046b9873710731..875004fb391d5b6af1efbdc15db2895df86c013a 100644 (file)
@@ -180,6 +180,46 @@ public:
   virtual int error_handler(int err_no, string *error_content);
 };
 
+class RGWGetObj_Filter : public RGWGetDataCB
+{
+protected:
+  RGWGetObj_Filter *next{nullptr};
+public:
+  RGWGetObj_Filter() {}
+  RGWGetObj_Filter(RGWGetObj_Filter *next): next(next) {}
+  ~RGWGetObj_Filter() override {}
+  /**
+   * Passes data through filter.
+   * Filter can modify content of bl.
+   * When bl_len == 0 , it means 'flush
+   */
+  int handle_data(bufferlist& bl, off_t bl_ofs, off_t bl_len) override {
+    if (next) {
+      return next->handle_data(bl, bl_ofs, bl_len);
+    }
+    return 0;
+  }
+  /**
+   * Flushes any cached data. Used by RGWGetObjFilter.
+   * Return logic same as handle_data.
+   */
+  virtual int flush() {
+    if (next) {
+      return next->flush();
+    }
+    return 0;
+  }
+  /**
+   * Allows filter to extend range required for successful filtering
+   */
+  virtual int fixup_range(off_t& ofs, off_t& end) {
+    if (next) {
+      return next->fixup_range(ofs, end);
+    }
+    return 0;
+  }
+};
+
 class RGWGetObj : public RGWOp {
 protected:
   seed torrent; // get torrent
@@ -281,13 +321,13 @@ public:
   /**
    * calculates filter used to decrypt RGW objects data
    */
-  virtual int get_decrypt_filter(std::unique_ptr<RGWGetDataCB>* filter, RGWGetDataCB* cb, bufferlist* manifest_bl) {
+  virtual int get_decrypt_filter(std::unique_ptr<RGWGetObj_Filter>* filter, RGWGetObj_Filter* cb, bufferlist* manifest_bl) {
     *filter = nullptr;
     return 0;
   }
 };
 
-class RGWGetObj_CB : public RGWGetDataCB
+class RGWGetObj_CB : public RGWGetObj_Filter
 {
   RGWGetObj *op;
 public:
@@ -299,36 +339,6 @@ public:
   }
 };
 
-class RGWGetObj_Filter : public RGWGetDataCB
-{
-protected:
-  RGWGetDataCB* next;
-public:
-  RGWGetObj_Filter(RGWGetDataCB* next): next(next) {}
-  ~RGWGetObj_Filter() override {}
-  /**
-   * Passes data through filter.
-   * Filter can modify content of bl.
-   * When bl_len == 0 , it means 'flush
-   */
-  int handle_data(bufferlist& bl, off_t bl_ofs, off_t bl_len) override {
-    return next->handle_data(bl, bl_ofs, bl_len);
-  }
-  /**
-   * Flushes any cached data. Used by RGWGetObjFilter.
-   * Return logic same as handle_data.
-   */
-  int flush() override {
-    return next->flush();
-  }
-  /**
-   * Allows filter to extend range required for successful filtering
-   */
-  int fixup_range(off_t& ofs, off_t& end) override {
-    return next->fixup_range(ofs, end);
-  }
-};
-
 class RGWGetObjTags : public RGWOp {
  protected:
   bufferlist tags_bl;
@@ -1041,8 +1051,8 @@ public:
   void execute() override;
 
   /* this is for cases when copying data from other object */
-  virtual int get_decrypt_filter(std::unique_ptr<RGWGetDataCB>* filter,
-                                 RGWGetDataCB* cb,
+  virtual int get_decrypt_filter(std::unique_ptr<RGWGetObj_Filter>* filter,
+                                 RGWGetObj_Filter* cb,
                                  map<string, bufferlist>& attrs,
                                  bufferlist* manifest_bl) {
     *filter = nullptr;
index 43908c85f2ba4c81434402c869ef53cbda10080e..63438c0b87fe285a00d9a66358873f2780bb1a02 100644 (file)
@@ -7346,7 +7346,7 @@ bool RGWRados::aio_completed(void *handle)
   return c->is_safe();
 }
 
-class RGWRadosPutObj : public RGWGetDataCB
+class RGWRadosPutObj : public RGWHTTPStreamRWRequest::ReceiveCB
 {
   CephContext* cct;
   rgw_obj obj;
@@ -7361,6 +7361,7 @@ class RGWRadosPutObj : public RGWGetDataCB
   uint64_t extra_data_left;
   uint64_t data_len;
   map<string, bufferlist> src_attrs;
+  off_t ofs{0};
 public:
   RGWRadosPutObj(CephContext* cct,
                  CompressorRef& plugin,
@@ -7402,7 +7403,7 @@ public:
     return 0;
   }
 
-  int handle_data(bufferlist& bl, off_t ofs, off_t len) override {
+  int handle_data(bufferlist& bl, bool *pause) override {
     if (progress_cb) {
       progress_cb(ofs, progress_data);
     }
@@ -7444,6 +7445,8 @@ public:
       if (ret < 0)
         return ret;
 
+      ofs += size;
+
       if (need_opstate && opstate) {
         /* need to update opstate repository with new state. This is ratelimited, so we're not
          * really doing it every time
@@ -7475,7 +7478,7 @@ public:
 
   void set_extra_data_len(uint64_t len) override {
     extra_data_left = len;
-    RGWGetDataCB::set_extra_data_len(len);
+    RGWHTTPStreamRWRequest::ReceiveCB::set_extra_data_len(len);
   }
 
   uint64_t get_data_len() {
@@ -7625,11 +7628,12 @@ inline ostream& operator<<(ostream& out, const obj_time_weight &o) {
   return out;
 }
 
-class RGWGetExtraDataCB : public RGWGetDataCB {
+class RGWGetExtraDataCB : public RGWHTTPStreamRWRequest::ReceiveCB {
   bufferlist extra_data;
 public:
   RGWGetExtraDataCB() {}
-  int handle_data(bufferlist& bl, off_t bl_ofs, off_t bl_len) override {
+  int handle_data(bufferlist& bl, bool *pause) override {
+    int bl_len = (int)bl.length();
     if (extra_data.length() < extra_data_len) {
       off_t max = extra_data_len - extra_data.length();
       if (max > bl_len) {
index 8601600e39469b173c3941347b4021ba77810f0a..003c09e6ab1d1f936e010d659f6f1faf605e6c78 100644 (file)
@@ -296,28 +296,10 @@ struct RGWUsageIter {
 };
 
 class RGWGetDataCB {
-protected:
-  uint64_t extra_data_len;
 public:
   virtual int handle_data(bufferlist& bl, off_t bl_ofs, off_t bl_len) = 0;
-  RGWGetDataCB() : extra_data_len(0) {}
+  RGWGetDataCB() {}
   virtual ~RGWGetDataCB() {}
-  virtual void set_extra_data_len(uint64_t len) {
-    extra_data_len = len;
-  }
-  /**
-   * Flushes any cached data. Used by RGWGetObjFilter.
-   * Return logic same as handle_data.
-   */
-  virtual int flush() {
-    return 0;
-  }
-  /**
-   * Allows to extend fetch range of RGW object. Used by RGWGetObjFilter.
-   */
-  virtual int fixup_range(off_t& bl_ofs, off_t& bl_end) {
-    return 0;
-  }
 };
 
 class RGWAccessListFilter {
index ca63e09fb1d4bb843c67d41f37c85a6b2d236149..c802e81bee4e2de5bf31888e2dfd659defd163c4 100644 (file)
@@ -162,7 +162,7 @@ int RGWHTTPSimpleRequest::send_data(void *ptr, size_t len)
   return len;
 }
 
-int RGWHTTPSimpleRequest::receive_data(void *ptr, size_t len)
+int RGWHTTPSimpleRequest::receive_data(void *ptr, size_t len, bool *pause)
 {
   size_t cp_len, left_len;
 
@@ -747,18 +747,27 @@ int RGWHTTPStreamRWRequest::handle_header(const string& name, const string& val)
   return 0;
 }
 
-int RGWHTTPStreamRWRequest::receive_data(void *ptr, size_t len)
+int RGWHTTPStreamRWRequest::receive_data(void *ptr, size_t len, bool *pause)
 {
+  size_t orig_len = len;
+
   if (cb) {
     bufferptr bp((const char *)ptr, len);
-    bufferlist bl;
-    bl.append(bp);
-    int ret = cb->handle_data(bl, ofs, len);
+    in_data.append(bp);
+    int ret = cb->handle_data(in_data, pause);
     if (ret < 0)
       return ret;
+    if (ret == 0) {
+      in_data.clear();
+    } else {
+      /* partial read */
+      len = ret;
+      bufferlist bl;
+      in_data.splice(0, len, &bl);
+    }
   }
   ofs += len;
-  return len;
+  return orig_len;
 }
 
 void RGWHTTPStreamRWRequest::set_stream_write(bool s) {
index 964aa0dff7c3ffed9ab4511390a5a92474977a99..816c87474eddb66d723ddb180765c64b8d83902e 100644 (file)
@@ -49,7 +49,7 @@ public:
   }
 
   int receive_header(void *ptr, size_t len) override;
-  int receive_data(void *ptr, size_t len) override;
+  int receive_data(void *ptr, size_t len, bool *pause) override;
   int send_data(void *ptr, size_t len) override;
 
   bufferlist& get_response() { return response; }
@@ -78,15 +78,20 @@ public:
 
 
 class RGWHTTPStreamRWRequest : public RGWHTTPSimpleRequest {
+public:
+    class ReceiveCB;
+
+private:
   Mutex lock;
   Mutex write_lock;
-  RGWGetDataCB *cb{nullptr};
+  ReceiveCB *cb{nullptr};
   RGWWriteDrainCB *write_drain_cb{nullptr};
   bufferlist outbl;
   bufferlist in_data;
   size_t chunk_ofs{0};
   size_t ofs{0};
   uint64_t write_ofs{0};
+  bool read_paused{false};
   bool send_paused{false};
   bool stream_writes{false};
   bool write_stream_complete{false};
@@ -94,13 +99,25 @@ protected:
   int handle_header(const string& name, const string& val) override;
 public:
   int send_data(void *ptr, size_t len, bool *pause) override;
-  int receive_data(void *ptr, size_t len) override;
+  int receive_data(void *ptr, size_t len, bool *pause) override;
+
+  class ReceiveCB {
+    protected:
+      uint64_t extra_data_len{0};
+    public:
+      ReceiveCB() = default;
+      virtual ~ReceiveCB() = default;
+      virtual int handle_data(bufferlist& bl, bool *pause = nullptr) = 0;
+      virtual void set_extra_data_len(uint64_t len) {
+        extra_data_len = len;
+      }
+  };
 
   RGWHTTPStreamRWRequest(CephContext *_cct, const string& _method, const string& _url,
                          param_vec_t *_headers, param_vec_t *_params) : RGWHTTPSimpleRequest(_cct, _method, _url, _headers, _params),
                                                                         lock("RGWHTTPStreamRWRequest"), write_lock("RGWHTTPStreamRWRequest::write_lock") {
   }
-  RGWHTTPStreamRWRequest(CephContext *_cct, const string& _method, const string& _url, RGWGetDataCB *_cb,
+  RGWHTTPStreamRWRequest(CephContext *_cct, const string& _method, const string& _url, ReceiveCB *_cb,
                          param_vec_t *_headers, param_vec_t *_params) : RGWHTTPSimpleRequest(_cct, _method, _url, _headers, _params),
                                                                         lock("RGWHTTPStreamRWRequest"), write_lock("RGWHTTPStreamRWRequest::write_lock"), cb(_cb) {
   }
@@ -110,7 +127,7 @@ public:
     outbl.swap(_outbl);
   }
 
-  void set_in_cb(RGWGetDataCB *_cb) { cb = _cb; }
+  void set_in_cb(ReceiveCB *_cb) { cb = _cb; }
   void set_write_drain_cb(RGWWriteDrainCB *_cb) { write_drain_cb = _cb; }
 
   void add_send_data(bufferlist& bl);
@@ -126,7 +143,7 @@ public:
 class RGWRESTStreamRWRequest : public RGWHTTPStreamRWRequest {
   bool send_data_hint{false};
 public:
-  RGWRESTStreamRWRequest(CephContext *_cct, const string& _method, const string& _url, RGWGetDataCB *_cb,
+  RGWRESTStreamRWRequest(CephContext *_cct, const string& _method, const string& _url, RGWHTTPStreamRWRequest::ReceiveCB *_cb,
                param_vec_t *_headers, param_vec_t *_params) : RGWHTTPStreamRWRequest(_cct, _method, _url, _cb, _headers, _params) {
   }
   virtual ~RGWRESTStreamRWRequest() override {}
@@ -145,13 +162,13 @@ public:
 
 class RGWRESTStreamReadRequest : public RGWRESTStreamRWRequest {
 public:
-  RGWRESTStreamReadRequest(CephContext *_cct, const string& _url, RGWGetDataCB *_cb, param_vec_t *_headers,
+  RGWRESTStreamReadRequest(CephContext *_cct, const string& _url, ReceiveCB *_cb, param_vec_t *_headers,
                param_vec_t *_params) : RGWRESTStreamRWRequest(_cct, "GET", _url, _cb, _headers, _params) {}
 };
 
 class RGWRESTStreamHeadRequest : public RGWRESTStreamRWRequest {
 public:
-  RGWRESTStreamHeadRequest(CephContext *_cct, const string& _url, RGWGetDataCB *_cb, param_vec_t *_headers,
+  RGWRESTStreamHeadRequest(CephContext *_cct, const string& _url, ReceiveCB *_cb, param_vec_t *_headers,
                param_vec_t *_params) : RGWRESTStreamRWRequest(_cct, "HEAD", _url, _cb, _headers, _params) {}
 };
 
index 0bfce1af303557e089e3e10e70f9b3c680ccb4eb..14586bcd452f42bdee8d4ddb57e25f29365a46ea 100644 (file)
@@ -198,7 +198,7 @@ int RGWRESTConn::get_obj(const rgw_user& uid, req_info *info /* optional */, con
                          uint32_t mod_zone_id, uint64_t mod_pg_ver,
                          bool prepend_metadata, bool get_op, bool rgwx_stat,
                          bool sync_manifest, bool skip_decrypt,
-                         bool send, RGWGetDataCB *cb, RGWRESTStreamRWRequest **req)
+                         bool send, RGWHTTPStreamRWRequest::ReceiveCB *cb, RGWRESTStreamRWRequest **req)
 {
   get_obj_params params;
   params.uid = uid;
index 537077bb83f75d49df2dde23388b61d927b0a580..e4c9d3dbb1fb4185e69d49730a3e6a82ae16d056 100644 (file)
@@ -118,7 +118,7 @@ public:
     bool sync_manifest{false};
 
     bool skip_decrypt{true};
-    RGWGetDataCB *cb{nullptr};
+    RGWHTTPStreamRWRequest::ReceiveCB *cb{nullptr};
 
     bool range_is_set{false};
     uint64_t range_start{0};
@@ -131,7 +131,7 @@ public:
               const ceph::real_time *mod_ptr, const ceph::real_time *unmod_ptr,
               uint32_t mod_zone_id, uint64_t mod_pg_ver,
               bool prepend_metadata, bool get_op, bool rgwx_stat, bool sync_manifest,
-              bool skip_decrypt, bool send, RGWGetDataCB *cb, RGWRESTStreamRWRequest **req);
+              bool skip_decrypt, bool send, RGWHTTPStreamRWRequest::ReceiveCB *cb, RGWRESTStreamRWRequest **req);
   int complete_request(RGWRESTStreamRWRequest *req, string& etag, ceph::real_time *mtime, uint64_t *psize, map<string, string>& attrs);
 
   int get_resource(const string& resource,
@@ -180,13 +180,13 @@ int RGWRESTConn::get_json_resource(const string& resource,  const rgw_http_param
   return get_json_resource(resource, &params, t);
 }
 
-class RGWStreamIntoBufferlist : public RGWGetDataCB {
+class RGWStreamIntoBufferlist : public RGWHTTPStreamRWRequest::ReceiveCB {
   bufferlist& bl;
 public:
   RGWStreamIntoBufferlist(bufferlist& _bl) : bl(_bl) {}
-  int handle_data(bufferlist& inbl, off_t bl_ofs, off_t bl_len) override {
+  int handle_data(bufferlist& inbl, bool *pause) override {
     bl.claim_append(inbl);
-    return bl_len;
+    return inbl.length();
   }
 };
 
index 7d327b19a05834970aab02ffec1fffac30774d7d..ef67fcbbbcf5668014bb783245f36b1da7e7a56c 100644 (file)
@@ -339,7 +339,7 @@ send_data:
   return 0;
 }
 
-int RGWGetObj_ObjStore_S3::get_decrypt_filter(std::unique_ptr<RGWGetDataCB> *filter, RGWGetDataCB* cb, bufferlist* manifest_bl)
+int RGWGetObj_ObjStore_S3::get_decrypt_filter(std::unique_ptr<RGWGetObj_Filter> *filter, RGWGetObj_Filter* cb, bufferlist* manifest_bl)
 {
   if (skip_decrypt) { // bypass decryption for multisite sync requests
     return 0;
@@ -1466,8 +1466,8 @@ static inline void set_attr(map<string, bufferlist>& attrs, const char* key, con
 }
 
 int RGWPutObj_ObjStore_S3::get_decrypt_filter(
-    std::unique_ptr<RGWGetDataCB>* filter,
-    RGWGetDataCB* cb,
+    std::unique_ptr<RGWGetObj_Filter>* filter,
+    RGWGetObj_Filter* cb,
     map<string, bufferlist>& attrs,
     bufferlist* manifest_bl)
 {
index 3b15afa0b8db5efff97715d2120b25f894c1970c..9e5ef4316aeb00d77cb614e3f7145b738ebd137b 100644 (file)
@@ -50,8 +50,8 @@ public:
   int send_response_data_error() override;
   int send_response_data(bufferlist& bl, off_t ofs, off_t len) override;
   void set_custom_http_response(int http_ret) { custom_http_ret = http_ret; }
-  int get_decrypt_filter(std::unique_ptr<RGWGetDataCB>* filter,
-                         RGWGetDataCB* cb,
+  int get_decrypt_filter(std::unique_ptr<RGWGetObj_Filter>* filter,
+                         RGWGetObj_Filter* cb,
                          bufferlist* manifest_bl) override;
 };
 
@@ -215,8 +215,8 @@ public:
 
   int get_encrypt_filter(std::unique_ptr<RGWPutObjDataProcessor>* filter,
                          RGWPutObjDataProcessor* cb) override;
-  int get_decrypt_filter(std::unique_ptr<RGWGetDataCB>* filter,
-                         RGWGetDataCB* cb,
+  int get_decrypt_filter(std::unique_ptr<RGWGetObj_Filter>* filter,
+                         RGWGetObj_Filter* cb,
                          map<string, bufferlist>& attrs,
                          bufferlist* manifest_bl) override;
 };