]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: http client, streaming writes api
authorYehuda Sadeh <yehuda@redhat.com>
Tue, 15 Aug 2017 15:36:19 +0000 (08:36 -0700)
committerYehuda Sadeh <yehuda@redhat.com>
Tue, 10 Apr 2018 15:03:11 +0000 (08:03 -0700)
Add a new class that enables streaming writes, that is --
doesn't require having all the data before the start of the
send.

Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
src/rgw/rgw_http_client.cc
src/rgw/rgw_http_client.h
src/rgw/rgw_rest_client.cc
src/rgw/rgw_rest_client.h

index f96b7d5c51cfcc8057144b04fff0ddb635394fec..68a426c41a6328535ea0966b0ce3993aca7b46ae 100644 (file)
 #define dout_subsys ceph_subsys_rgw
 
 struct rgw_http_req_data : public RefCountedObject {
-  CURL *easy_handle;
-  curl_slist *h;
+  CURL *easy_handle{nullptr};
+  curl_slist *h{nullptr};
   uint64_t id;
-  int ret;
+  int ret{0};
   std::atomic<bool> done = { false };
-  RGWHTTPClient *client;
-  void *user_info;
-  bool registered;
-  RGWHTTPManager *mgr;
+  RGWHTTPClient *client{nullptr};
+  void *user_info{nullptr};
+  bool registered{false};
+  RGWHTTPManager *mgr{nullptr};
   char error_buf[CURL_ERROR_SIZE];
+  bool write_paused{false};
 
   Mutex lock;
   Cond cond;
 
-  rgw_http_req_data() : easy_handle(NULL), h(NULL), id(-1), ret(0),
-                        client(nullptr), user_info(nullptr), registered(false),
-                        mgr(NULL), lock("rgw_http_req_data::lock") {
+  rgw_http_req_data() : id(-1), lock("rgw_http_req_data::lock") {
     memset(error_buf, 0, sizeof(error_buf));
   }
 
@@ -48,6 +47,27 @@ struct rgw_http_req_data : public RefCountedObject {
     return ret;
   }
 
+  void set_state(RGWHTTPRequestSetState state) {
+    Mutex::Locker l(lock);
+    CURLcode rc;
+    int bitmask;
+    switch (state) {
+      case SET_WRITE_PAUSED:
+        bitmask = CURLPAUSE_SEND;
+        break;
+      case SET_WRITE_RESUME:
+        bitmask = CURLPAUSE_CONT;
+        break;
+      default:
+        /* shouldn't really be here */
+        return;
+    }
+    rc = curl_easy_pause(easy_handle, bitmask);
+    if (rc != CURLE_OK) {
+      dout(0) << "ERROR: curl_easy_pause() returned rc=" << rc << dendl;
+    }
+  }
+
 
   void finish(int r) {
     Mutex::Locker l(lock);
@@ -250,12 +270,18 @@ size_t RGWHTTPClient::simple_send_http_data(void * const ptr,
                                             void * const _info)
 {
   RGWHTTPClient *client = static_cast<RGWHTTPClient *>(_info);
-  int ret = client->send_data(ptr, size * nmemb);
+  bool pause = false;
+  int ret = client->send_data(ptr, size * nmemb, &pause);
   if (ret < 0) {
     dout(0) << "WARNING: client->send_data() returned ret="
             << ret << dendl;
   }
 
+  if (ret == 0 &&
+      pause) {
+    return CURL_READFUNC_PAUSE;
+  }
+
   return ret;
 }
 
@@ -320,14 +346,42 @@ size_t RGWHTTPClient::send_http_data(void * const ptr,
     return 0;
   }
 
-  int ret = req_data->client->send_data(ptr, size * nmemb);
+  bool pause;
+
+  int ret = req_data->client->send_data(ptr, size * nmemb, &pause);
   if (ret < 0) {
     dout(0) << "WARNING: client->receive_data() returned ret=" << ret << dendl;
   }
 
+  if (ret == 0 &&
+      pause) {
+    req_data->write_paused = true;
+    return CURL_READFUNC_PAUSE;
+  }
+
   return ret;
 }
 
+Mutex& RGWHTTPClient::get_req_lock()
+{
+  return req_data->lock;
+}
+
+void RGWHTTPClient::_set_write_paused(bool pause)
+{
+  assert(req_data->lock.is_locked());
+  
+  RGWHTTPManager *mgr = req_data->mgr;
+  if (pause == req_data->write_paused) {
+    return;
+  }
+  if (pause) {
+    mgr->set_request_state(this, SET_WRITE_PAUSED);
+  } else {
+    mgr->set_request_state(this, SET_WRITE_RESUME);
+  }
+}
+
 static curl_slist *headers_to_slist(param_vec_t& headers)
 {
   curl_slist *h = NULL;
@@ -512,7 +566,7 @@ int RGWHTTPClient::wait()
 RGWHTTPClient::~RGWHTTPClient()
 {
   if (req_data) {
-    RGWHTTPManager *http_manager = req_data->get_manager();
+    RGWHTTPManager *http_manager = req_data->mgr;
     if (http_manager) {
       http_manager->remove_request(this);
     }
@@ -806,6 +860,10 @@ void RGWHTTPManager::_finish_request(rgw_http_req_data *req_data, int ret)
   _complete_request(req_data);
 }
 
+void RGWHTTPManager::_set_req_state(set_state& ss)
+{
+  ss.req->set_state(ss.state);
+}
 /*
  * hook request to the curl multi handle
  */
@@ -843,7 +901,9 @@ void RGWHTTPManager::unlink_request(rgw_http_req_data *req_data)
 void RGWHTTPManager::manage_pending_requests()
 {
   reqs_lock.get_read();
-  if (max_threaded_req == num_reqs && unregistered_reqs.empty()) {
+  if (max_threaded_req == num_reqs &&
+      unregistered_reqs.empty() &&
+      reqs_change_state.empty()) {
     reqs_lock.unlock();
     return;
   }
@@ -875,6 +935,13 @@ void RGWHTTPManager::manage_pending_requests()
     }
   }
 
+  if (!reqs_change_state.empty()) {
+    for (auto siter : reqs_change_state) {
+      _set_req_state(siter);
+    }
+    reqs_change_state.clear();
+  }
+
   for (auto piter : remove_reqs) {
     rgw_http_req_data *req_data = piter.first;
     int r = piter.second;
@@ -933,6 +1000,44 @@ int RGWHTTPManager::remove_request(RGWHTTPClient *client)
   return 0;
 }
 
+int RGWHTTPManager::set_request_state(RGWHTTPClient *client, RGWHTTPRequestSetState state)
+{
+  rgw_http_req_data *req_data = client->get_req_data();
+
+  assert(req_data->lock.is_locked());
+
+  /* can only do that if threaded */
+  if (!is_threaded) {
+    return -EINVAL;
+  }
+
+  bool suggested_paused;
+  switch (state) {
+    case SET_WRITE_PAUSED:
+      suggested_paused = true;
+      break;
+    case SET_WRITE_RESUME:
+      suggested_paused = false;
+      break;
+    default:
+      /* shouldn't really be here */
+      return -EIO;
+  }
+  if (suggested_paused == req_data->write_paused) {
+    return 0;
+  }
+
+  req_data->write_paused = suggested_paused;
+
+  reqs_change_state.push_back(set_state(req_data, state));
+  int ret = signal_thread();
+  if (ret < 0) {
+    return ret;
+  }
+
+  return 0;
+}
+
 /*
  * the synchronous, non-threaded request processing method.
  */
index 53bb2905c79db4fc90b8e79554abb0d1e089fa7d..40c4bf15a084ef5a2526903ceedaf793ede60c99 100644 (file)
@@ -15,6 +15,7 @@ using param_pair_t = pair<string, string>;
 using param_vec_t = vector<param_pair_t>;
 
 struct rgw_http_req_data;
+class RGWHTTPManager;
 
 class RGWHTTPClient
 {
@@ -40,6 +41,8 @@ protected:
   CephContext *cct;
   param_vec_t headers;
 
+  RGWHTTPManager *get_manager();
+
   int init_request(const char *method,
                    const char *url,
                    rgw_http_req_data *req_data,
@@ -51,6 +54,9 @@ protected:
   virtual int receive_data(void *ptr, size_t len) {
     return 0;
   }
+  virtual int send_data(void *ptr, size_t len, bool *pause) {
+    return send_data(ptr, len);
+  }
   virtual int send_data(void *ptr, size_t len) {
     return 0;
   }
@@ -82,6 +88,11 @@ protected:
                                size_t size,
                                size_t nmemb,
                                void *_info);
+
+  Mutex& get_req_lock();
+
+  /* needs to be called under req_lock() */
+  void _set_write_paused(bool pause);
 public:
   static const long HTTP_STATUS_NOSTATUS     = 0;
   static const long HTTP_STATUS_UNAUTHORIZED = 401;
@@ -216,7 +227,19 @@ typedef RGWHTTPTransceiver RGWPostHTTPData;
 
 class RGWCompletionManager;
 
+enum RGWHTTPRequestSetState {
+  SET_NOP = 0,
+  SET_WRITE_PAUSED = 1,
+  SET_WRITE_RESUME = 2,
+};
+
 class RGWHTTPManager {
+  struct set_state {
+    rgw_http_req_data *req;
+    RGWHTTPRequestSetState state;
+
+    set_state(rgw_http_req_data *_req, RGWHTTPRequestSetState _state) : req(_req), state(_state) {}
+  };
   CephContext *cct;
   RGWCompletionManager *completion_mgr;
   void *multi_handle;
@@ -227,6 +250,7 @@ class RGWHTTPManager {
   RWLock reqs_lock;
   map<uint64_t, rgw_http_req_data *> reqs;
   list<rgw_http_req_data *> unregistered_reqs;
+  list<set_state> reqs_change_state;
   map<uint64_t, rgw_http_req_data *> complete_reqs;
   int64_t num_reqs;
   int64_t max_threaded_req;
@@ -240,6 +264,7 @@ class RGWHTTPManager {
   void unlink_request(rgw_http_req_data *req_data);
   void finish_request(rgw_http_req_data *req_data, int r);
   void _finish_request(rgw_http_req_data *req_data, int r);
+  void _set_req_state(set_state& ss);
   int link_request(rgw_http_req_data *req_data);
 
   void manage_pending_requests();
@@ -268,6 +293,7 @@ public:
   int add_request(RGWHTTPClient *client, const char *method, const char *url,
                   bool send_data_hint = false);
   int remove_request(RGWHTTPClient *client);
+  int set_request_state(RGWHTTPClient *client, RGWHTTPRequestSetState state);
 
   /* only for non threaded case */
   int process_requests(bool wait_for_data, bool *done);
index 92111725e56c64676150128b70e46fb0076725fb..20e614adf61a85a95c7c5722baa5079dffc14820 100644 (file)
@@ -786,15 +786,30 @@ int RGWRESTStreamRWRequest::receive_data(void *ptr, size_t len)
   return len;
 }
 
-int RGWRESTStreamRWRequest::send_data(void *ptr, size_t len)
+void RGWRESTStreamRWRequest::add_send_data(bufferlist& bl)
 {
+  Mutex::Locker req_locker(get_req_lock());
+  Mutex::Locker wl(write_lock);
+  outbl.claim_append(bl);
+  _set_write_paused(false);
+}
+
+int RGWRESTStreamRWRequest::send_data(void *ptr, size_t len, bool *pause)
+{
+  Mutex::Locker wl(write_lock);
+
   if (outbl.length() == 0) {
+    *pause = true;
     return 0;
   }
 
-  uint64_t send_size = min(len, (size_t)(outbl.length() - write_ofs));
+  len = std::min(len, (size_t)outbl.length());
+
+  bufferlist bl;
+  outbl.splice(0, len, &bl);
+  uint64_t send_size = bl.length();
   if (send_size > 0) {
-    memcpy(ptr, outbl.c_str() + write_ofs, send_size);
+    memcpy(ptr, bl.c_str(), send_size);
     write_ofs += send_size;
   }
   return send_size;
index 09393f8ea4af851e0fb26a804220e9aab5b7574a..bf64a03f39ad3a6417fe0aad2885337b62f586a3 100644 (file)
@@ -84,24 +84,26 @@ public:
 
 class RGWRESTStreamRWRequest : public RGWRESTSimpleRequest {
   Mutex lock;
+  Mutex write_lock;
   RGWGetDataCB *cb;
   bufferlist outbl;
   bufferlist in_data;
-  size_t chunk_ofs;
-  size_t ofs;
+  size_t chunk_ofs{0};
+  size_t ofs{0};
   RGWHTTPManager http_manager;
   const char *method;
-  uint64_t write_ofs;
+  uint64_t write_ofs{0};
+  bool send_paused{false};
 protected:
   int handle_header(const string& name, const string& val) override;
 public:
-  int send_data(void *ptr, size_t len) override;
+  int send_data(void *ptr, size_t len, bool *pause) override;
   int receive_data(void *ptr, size_t len) override;
 
   RGWRESTStreamRWRequest(CephContext *_cct, const char *_method, const string& _url, RGWGetDataCB *_cb,
                param_vec_t *_headers, param_vec_t *_params) : RGWRESTSimpleRequest(_cct, _url, _headers, _params),
-                lock("RGWRESTStreamReadRequest"), cb(_cb),
-                chunk_ofs(0), ofs(0), http_manager(_cct), method(_method), write_ofs(0) {
+                lock("RGWRESTStreamRWRequest"), write_lock("RGWRESTStreamRWRequest::write_lock"), cb(_cb),
+                http_manager(_cct), method(_method) {
   }
   virtual ~RGWRESTStreamRWRequest() override {}
   int send_request(RGWAccessKey& key, map<string, string>& extra_headers, rgw_obj& obj, RGWHTTPManager *mgr = NULL);
@@ -113,6 +115,8 @@ public:
   }
 
   void set_in_cb(RGWGetDataCB *_cb) { cb = _cb; }
+
+  void add_send_data(bufferlist& bl);
 };
 
 class RGWRESTStreamReadRequest : public RGWRESTStreamRWRequest {