]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: http client, simplify interfaces
authorYehuda Sadeh <yehuda@redhat.com>
Mon, 28 Aug 2017 12:56:19 +0000 (05:56 -0700)
committerYehuda Sadeh <yehuda@redhat.com>
Tue, 10 Apr 2018 15:05:37 +0000 (08:05 -0700)
work towards removal of duplicate synchronous api

Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
src/rgw/rgw_http_client.cc
src/rgw/rgw_http_client.h
src/rgw/rgw_rados.cc
src/rgw/rgw_rest_client.cc
src/rgw/rgw_rest_client.h
src/rgw/rgw_rest_conn.cc
src/rgw/rgw_rest_conn.h
src/test/rgw/test_http_manager.cc

index 32ca8613329e97bad948858dd616d20c5701d992..7535633d2ec1704b983841e48f3b78c9e5f837a5 100644 (file)
 
 RGWHTTPManager *rgw_http_manager;
 
+struct RGWCurlHandle;
+
+static void do_curl_easy_cleanup(RGWCurlHandle *curl_handle);
+
 struct rgw_http_req_data : public RefCountedObject {
-  CURL *easy_handle{nullptr};
+  RGWCurlHandle *curl_handle{nullptr};
   curl_slist *h{nullptr};
   uint64_t id;
   int ret{0};
@@ -64,7 +68,7 @@ struct rgw_http_req_data : public RefCountedObject {
         /* shouldn't really be here */
         return;
     }
-    rc = curl_easy_pause(easy_handle, bitmask);
+    rc = curl_easy_pause(**curl_handle, bitmask);
     if (rc != CURLE_OK) {
       dout(0) << "ERROR: curl_easy_pause() returned rc=" << rc << dendl;
     }
@@ -74,13 +78,13 @@ struct rgw_http_req_data : public RefCountedObject {
   void finish(int r) {
     Mutex::Locker l(lock);
     ret = r;
-    if (easy_handle)
-      curl_easy_cleanup(easy_handle);
+    if (curl_handle)
+      do_curl_easy_cleanup(curl_handle);
 
     if (h)
       curl_slist_free_all(h);
 
-    easy_handle = NULL;
+    curl_handle = NULL;
     h = NULL;
     done = true;
     cond.Signal();
@@ -99,6 +103,8 @@ struct rgw_http_req_data : public RefCountedObject {
     Mutex::Locker l(lock);
     return mgr;
   }
+
+  CURL *get_easy_handle() const;
 };
 
 struct RGWCurlHandle {
@@ -112,6 +118,16 @@ struct RGWCurlHandle {
   }
 };
 
+void rgw_http_req_data::set_state(int bitmask) {
+  /* no need to lock here, moreover curl_easy_pause() might trigger
+   * the data receive callback :/
+   */
+  CURLcode rc = curl_easy_pause(**curl_handle, bitmask);
+  if (rc != CURLE_OK) {
+    dout(0) << "ERROR: curl_easy_pause() returned rc=" << rc << dendl;
+  }
+}
+
 #define MAXIDLE 5
 class RGWCurlHandles : public Thread {
 public:
@@ -214,7 +230,23 @@ void RGWCurlHandles::flush_curl_handles()
   saved_curl.shrink_to_fit();
 }
 
+CURL *rgw_http_req_data::get_easy_handle() const
+{
+  return **curl_handle;
+}
+
 static RGWCurlHandles *handles;
+
+static RGWCurlHandle *do_curl_easy_init()
+{
+  return handles->get_curl_handle();
+}
+
+static void do_curl_easy_cleanup(RGWCurlHandle *curl_handle)
+{
+  handles->release_curl_handle(curl_handle);
+}
+
 // XXX make this part of the token cache?  (but that's swift-only;
 //     and this especially needs to integrates with s3...)
 
@@ -422,59 +454,11 @@ static bool is_upload_request(const string& method)
 }
 
 /*
- * process a single simple one off request, not going through RGWHTTPManager. Not using
- * req_data.
+ * process a single simple one off request
  */
 int RGWHTTPClient::process()
 {
-  int ret = 0;
-  CURL *curl_handle;
-
-  char error_buf[CURL_ERROR_SIZE];
-
-  auto ca = handles->get_curl_handle();
-  curl_handle = **ca;
-
-  dout(20) << "sending request to " << url << dendl;
-
-  curl_slist *h = headers_to_slist(headers);
-
-  curl_easy_setopt(curl_handle, CURLOPT_CUSTOMREQUEST, method.c_str());
-  curl_easy_setopt(curl_handle, CURLOPT_URL, url.c_str());
-  curl_easy_setopt(curl_handle, CURLOPT_NOPROGRESS, 1L);
-  curl_easy_setopt(curl_handle, CURLOPT_NOSIGNAL, 1L);
-  curl_easy_setopt(curl_handle, CURLOPT_HEADERFUNCTION, simple_receive_http_header);
-  curl_easy_setopt(curl_handle, CURLOPT_WRITEHEADER, (void *)this);
-  curl_easy_setopt(curl_handle, CURLOPT_WRITEFUNCTION, simple_receive_http_data);
-  curl_easy_setopt(curl_handle, CURLOPT_WRITEDATA, (void *)this);
-  curl_easy_setopt(curl_handle, CURLOPT_ERRORBUFFER, (void *)error_buf);
-  if (h) {
-    curl_easy_setopt(curl_handle, CURLOPT_HTTPHEADER, (void *)h);
-  }
-  curl_easy_setopt(curl_handle, CURLOPT_READFUNCTION, simple_send_http_data);
-  curl_easy_setopt(curl_handle, CURLOPT_READDATA, (void *)this);
-  if (is_upload_request(method)) {
-    curl_easy_setopt(curl_handle, CURLOPT_UPLOAD, 1L);
-  }
-  if (has_send_len) {
-    curl_easy_setopt(curl_handle, CURLOPT_INFILESIZE, (void *)send_len); 
-  }
-  if (!verify_ssl) {
-    curl_easy_setopt(curl_handle, CURLOPT_SSL_VERIFYPEER, 0L);
-    curl_easy_setopt(curl_handle, CURLOPT_SSL_VERIFYHOST, 0L);
-    dout(20) << "ssl verification is set to off" << dendl;
-  }
-
-  CURLcode status = curl_easy_perform(curl_handle);
-  if (status) {
-    dout(0) << "curl_easy_perform returned status " << status << " error: " << error_buf << dendl;
-    ret = -EINVAL;
-  }
-  curl_easy_getinfo(curl_handle, CURLINFO_RESPONSE_CODE, &http_status);
-  handles->release_curl_handle(ca);
-  curl_slist_free_all(h);
-
-  return ret;
+  return RGWHTTP::process(this);
 }
 
 string RGWHTTPClient::to_str()
@@ -502,11 +486,9 @@ int RGWHTTPClient::init_request(rgw_http_req_data *_req_data, bool send_data_hin
   _req_data->get();
   req_data = _req_data;
 
-  CURL *easy_handle;
-
-  easy_handle = curl_easy_init();
+  req_data->curl_handle = do_curl_easy_init();
 
-  req_data->easy_handle = easy_handle;
+  CURL *easy_handle = req_data->get_easy_handle();
 
   dout(20) << "sending request to " << url << dendl;
 
@@ -806,7 +788,7 @@ void RGWHTTPManager::register_request(rgw_http_req_data *req_data)
   req_data->registered = true;
   reqs[num_reqs] = req_data;
   num_reqs++;
-  ldout(cct, 20) << __func__ << " mgr=" << this << " req_data->id=" << req_data->id << ", easy_handle=" << req_data->easy_handle << dendl;
+  ldout(cct, 20) << __func__ << " mgr=" << this << " req_data->id=" << req_data->id << ", curl_handle=" << req_data->curl_handle << dendl;
 }
 
 void RGWHTTPManager::unregister_request(rgw_http_req_data *req_data)
@@ -815,7 +797,7 @@ void RGWHTTPManager::unregister_request(rgw_http_req_data *req_data)
   req_data->get();
   req_data->registered = false;
   unregistered_reqs.push_back(req_data);
-  ldout(cct, 20) << __func__ << " mgr=" << this << " req_data->id=" << req_data->id << ", easy_handle=" << req_data->easy_handle << dendl;
+  ldout(cct, 20) << __func__ << " mgr=" << this << " req_data->id=" << req_data->id << ", curl_handle=" << req_data->curl_handle << dendl;
 }
 
 void RGWHTTPManager::complete_request(rgw_http_req_data *req_data)
@@ -862,8 +844,8 @@ void RGWHTTPManager::_set_req_state(set_state& ss)
  */
 int RGWHTTPManager::link_request(rgw_http_req_data *req_data)
 {
-  ldout(cct, 20) << __func__ << " req_data=" << req_data << " req_data->id=" << req_data->id << ", easy_handle=" << req_data->easy_handle << dendl;
-  CURLMcode mstatus = curl_multi_add_handle((CURLM *)multi_handle, req_data->easy_handle);
+  ldout(cct, 20) << __func__ << " req_data=" << req_data << " req_data->id=" << req_data->id << ", curl_handle=" << req_data->curl_handle << dendl;
+  CURLMcode mstatus = curl_multi_add_handle((CURLM *)multi_handle, req_data->get_easy_handle());
   if (mstatus) {
     dout(0) << "ERROR: failed on curl_multi_add_handle, status=" << mstatus << dendl;
     return -EIO;
@@ -877,8 +859,8 @@ int RGWHTTPManager::link_request(rgw_http_req_data *req_data)
  */
 void RGWHTTPManager::_unlink_request(rgw_http_req_data *req_data)
 {
-  if (req_data->easy_handle) {
-    curl_multi_remove_handle((CURLM *)multi_handle, req_data->easy_handle);
+  if (req_data->curl_handle) {
+    curl_multi_remove_handle((CURLM *)multi_handle, req_data->get_easy_handle());
   }
   if (!req_data->is_done()) {
     _finish_request(req_data, -ECANCELED);
@@ -1031,77 +1013,6 @@ int RGWHTTPManager::set_request_state(RGWHTTPClient *client, RGWHTTPRequestSetSt
   return 0;
 }
 
-/*
- * the synchronous, non-threaded request processing method.
- */
-int RGWHTTPManager::process_requests(bool wait_for_data, bool *done)
-{
-  assert(!is_threaded);
-
-  int still_running;
-  int mstatus;
-
-  do {
-    if (wait_for_data) {
-      int ret = do_curl_wait(cct, (CURLM *)multi_handle, -1);
-      if (ret < 0) {
-        return ret;
-      }
-    }
-
-    mstatus = curl_multi_perform((CURLM *)multi_handle, &still_running);
-    switch (mstatus) {
-      case CURLM_OK:
-      case CURLM_CALL_MULTI_PERFORM:
-        break;
-      default:
-        dout(20) << "curl_multi_perform returned: " << mstatus << dendl;
-        return -EINVAL;
-    }
-    int msgs_left;
-    CURLMsg *msg;
-    while ((msg = curl_multi_info_read((CURLM *)multi_handle, &msgs_left))) {
-      if (msg->msg == CURLMSG_DONE) {
-       CURL *e = msg->easy_handle;
-       rgw_http_req_data *req_data;
-       curl_easy_getinfo(e, CURLINFO_PRIVATE, (void **)&req_data);
-
-       long http_status;
-       curl_easy_getinfo(e, CURLINFO_RESPONSE_CODE, (void **)&http_status);
-
-       int status = rgw_http_error_to_errno(http_status);
-       int result = msg->data.result;
-       finish_request(req_data, status);
-        switch (result) {
-          case CURLE_OK:
-            break;
-          default:
-            dout(20) << "ERROR: msg->data.result=" << result << dendl;
-            return -EIO;
-        }
-      }
-    }
-  } while (mstatus == CURLM_CALL_MULTI_PERFORM);
-
-  *done = (still_running == 0);
-
-  return 0;
-}
-
-/*
- * the synchronous, non-threaded request processing completion method.
- */
-int RGWHTTPManager::complete_requests()
-{
-  bool done = false;
-  int ret;
-  do {
-    ret = process_requests(true, &done);
-  } while (!done && !ret);
-
-  return ret;
-}
-
 int RGWHTTPManager::set_threaded()
 {
   int r = pipe(thread_pipe);
index fbe5d3626c0d9e2d362b4ae15fa1807bd5152128..ac230bfb2a6ae29d4e38a42f4c662e587d2053e8 100644 (file)
@@ -313,11 +313,6 @@ public:
   int add_request(RGWHTTPClient *client, bool send_data_hint = false);
   int remove_request(RGWHTTPClient *client);
   int set_request_state(RGWHTTPClient *client, RGWHTTPRequestSetState state);
-
-  /* only for non threaded case */
-  int process_requests(bool wait_for_data, bool *done);
-
-  int complete_requests();
 };
 
 class RGWHTTP
index 77697cf58d3e8c855631c23240ce246b5ff72799..fa334ef906a0b98466fbb17b8ee3e30f18b1caae 100644 (file)
@@ -8016,7 +8016,7 @@ int RGWRados::copy_obj_to_remote_dest(RGWObjState *astate,
 {
   string etag;
 
-  RGWRESTStreamWriteRequest *out_stream_req;
+  RGWRESTStreamS3PutObj *out_stream_req;
 
   int ret = rest_master_conn->put_obj_init(user_id, dest_obj, astate->size, src_attrs, &out_stream_req);
   if (ret < 0) {
index 6c2d6c5dda5dca0ae23b92efcc13a884216e6c2b..af3ce2667bc56f8195b847c8c7fde9ae11de6ddf 100644 (file)
@@ -316,9 +316,9 @@ int RGWRESTSimpleRequest::forward_request(RGWAccessKey& key, req_info& info, siz
 }
 
 class RGWRESTStreamOutCB : public RGWGetDataCB {
-  RGWRESTStreamWriteRequest *req;
+  RGWRESTStreamS3PutObj *req;
 public:
-  explicit RGWRESTStreamOutCB(RGWRESTStreamWriteRequest *_req) : req(_req) {}
+  explicit RGWRESTStreamOutCB(RGWRESTStreamS3PutObj *_req) : req(_req) {}
   int handle_data(bufferlist& bl, off_t bl_ofs, off_t bl_len) override; /* callback for object iteration when sending data */
 };
 
@@ -326,34 +326,21 @@ int RGWRESTStreamOutCB::handle_data(bufferlist& bl, off_t bl_ofs, off_t bl_len)
 {
   dout(20) << "RGWRESTStreamOutCB::handle_data bl.length()=" << bl.length() << " bl_ofs=" << bl_ofs << " bl_len=" << bl_len << dendl;
   if (!bl_ofs && bl_len == bl.length()) {
-    return req->add_output_data(bl);
+    req->add_send_data(bl);
+    return 0;
   }
 
   bufferptr bp(bl.c_str() + bl_ofs, bl_len);
   bufferlist new_bl;
   new_bl.push_back(bp);
 
-  return req->add_output_data(new_bl);
+  req->add_send_data(new_bl);
+  return 0;
 }
 
-RGWRESTStreamWriteRequest::~RGWRESTStreamWriteRequest()
+RGWRESTStreamS3PutObj::~RGWRESTStreamS3PutObj()
 {
-  delete cb;
-}
-
-int RGWRESTStreamWriteRequest::add_output_data(bufferlist& bl)
-{
-  lock.Lock();
-  if (status < 0) {
-    int ret = status;
-    lock.Unlock();
-    return ret;
-  }
-  pending_send.push_back(bl);
-  lock.Unlock();
-
-  bool done;
-  return http_manager.process_requests(false, &done);
+  delete out_cb;
 }
 
 static void grants_by_type_add_one_grant(map<int, string>& grants_by_type, int perm, ACLGrant& grant)
@@ -426,7 +413,7 @@ static void add_grants_headers(map<int, string>& grants, RGWEnv& env, map<string
   }
 }
 
-int RGWRESTStreamWriteRequest::put_obj_init(RGWAccessKey& key, rgw_obj& obj, uint64_t obj_size, map<string, bufferlist>& attrs)
+int RGWRESTStreamS3PutObj::put_obj_init(RGWAccessKey& key, rgw_obj& obj, uint64_t obj_size, map<string, bufferlist>& attrs)
 {
   string resource = obj.bucket.name + "/" + obj.get_oid();
   string new_url = url;
@@ -493,66 +480,20 @@ int RGWRESTStreamWriteRequest::put_obj_init(RGWAccessKey& key, rgw_obj& obj, uin
     headers.emplace_back(kv);
   }
 
-  cb = new RGWRESTStreamOutCB(this);
+  out_cb = new RGWRESTStreamOutCB(this);
 
   set_send_length(obj_size);
 
   method = new_info.method;
   url = new_url;
 
-  int r = http_manager.add_request(this);
+  int r = RGWHTTP::send(this);
   if (r < 0)
     return r;
 
   return 0;
 }
 
-int RGWRESTStreamWriteRequest::send_data(void *ptr, size_t len)
-{
-  uint64_t sent = 0;
-
-  dout(20) << "RGWRESTStreamWriteRequest::send_data()" << dendl;
-  lock.Lock();
-  if (pending_send.empty() || status < 0) {
-    lock.Unlock();
-    return status;
-  }
-
-  list<bufferlist>::iterator iter = pending_send.begin();
-  while (iter != pending_send.end() && len > 0) {
-    bufferlist& bl = *iter;
-    
-    list<bufferlist>::iterator next_iter = iter;
-    ++next_iter;
-    lock.Unlock();
-
-    uint64_t send_len = min(len, (size_t)bl.length());
-
-    memcpy(ptr, bl.c_str(), send_len);
-
-    ptr = (char *)ptr + send_len;
-    len -= send_len;
-    sent += send_len;
-
-    lock.Lock();
-
-    bufferlist new_bl;
-    if (bl.length() > send_len) {
-      bufferptr bp(bl.c_str() + send_len, bl.length() - send_len);
-      new_bl.append(bp);
-    }
-    pending_send.pop_front(); /* need to do this after we copy data from bl */
-    if (new_bl.length()) {
-      pending_send.push_front(new_bl);
-    }
-    iter = next_iter;
-  }
-  lock.Unlock();
-
-  return sent;
-}
-
-
 void set_str_from_headers(map<string, string>& out_headers, const string& header_name, string& str)
 {
   map<string, string>::iterator iter = out_headers.find(header_name);
@@ -594,26 +535,6 @@ static int parse_rgwx_mtime(CephContext *cct, const string& s, ceph::real_time *
   return 0;
 }
 
-int RGWRESTStreamWriteRequest::complete(string& etag, real_time *mtime)
-{
-  int ret = http_manager.complete_requests();
-  if (ret < 0)
-    return ret;
-
-  set_str_from_headers(out_headers, "ETAG", etag);
-
-  if (mtime) {
-    string mtime_str;
-    set_str_from_headers(out_headers, "RGWX_MTIME", mtime_str);
-
-    ret = parse_rgwx_mtime(cct, mtime_str, mtime);
-    if (ret < 0) {
-      return ret;
-    }
-  }
-  return status;
-}
-
 int RGWRESTStreamRWRequest::send_request(RGWAccessKey& key, map<string, string>& extra_headers, rgw_obj& obj, RGWHTTPManager *mgr)
 {
   string urlsafe_bucket, urlsafe_object;
@@ -621,11 +542,11 @@ int RGWRESTStreamRWRequest::send_request(RGWAccessKey& key, map<string, string>&
   url_encode(obj.key.name, urlsafe_object);
   string resource = urlsafe_bucket + "/" + urlsafe_object;
 
-  return send_request(&key, extra_headers, resource, nullptr, mgr);
+  return send_request(&key, extra_headers, resource, mgr);
 }
 
 int RGWRESTStreamRWRequest::send_request(RGWAccessKey *key, map<string, string>& extra_headers, const string& resource,
-                                         bufferlist *send_data, RGWHTTPManager *mgr)
+                                         RGWHTTPManager *mgr, bufferlist *send_data)
 {
   string new_url = url;
   if (new_url[new_url.size() - 1] != '/')
@@ -688,11 +609,6 @@ int RGWRESTStreamRWRequest::send_request(RGWAccessKey *key, map<string, string>&
     send_data_hint = true;
   }
 
-  RGWHTTPManager *pmanager = &http_manager;
-  if (mgr) {
-    pmanager = mgr;
-  }
-
   // Not sure if this is the place to set a send_size, curl otherwise sets
   // chunked option and doesn't send content length anymore
   uint64_t send_size = (size_t)(outbl.length() - write_ofs);
@@ -705,21 +621,23 @@ int RGWRESTStreamRWRequest::send_request(RGWAccessKey *key, map<string, string>&
   method = new_info.method;
   url = new_url;
 
-  int r = pmanager->add_request(this, send_data_hint);
-  if (r < 0)
-    return r;
-
   if (!mgr) {
-    r = pmanager->complete_requests();
-    if (r < 0)
-      return r;
+    return RGWHTTP::send(this);
   }
 
+  int r = mgr->add_request(this, send_data_hint);
+  if (r < 0)
+    return r;
+
   return 0;
 }
 
 int RGWRESTStreamRWRequest::complete_request(string& etag, real_time *mtime, uint64_t *psize, map<string, string>& attrs)
 {
+  int ret = wait();
+  if (ret < 0) {
+    return ret;
+  }
   set_str_from_headers(out_headers, "ETAG", etag);
   if (status >= 0) {
     if (mtime) {
index 9c474534e9ceb4068d0f1b8ed01268eccf20ce1b..51a7ec4310b0be5d846473c65b9ac464dc79bda0 100644 (file)
@@ -67,25 +67,6 @@ public:
 };
 
 
-class RGWRESTStreamWriteRequest : public RGWRESTSimpleRequest {
-  Mutex lock;
-  list<bufferlist> pending_send;
-  RGWGetDataCB *cb;
-  RGWHTTPManager http_manager;
-public:
-  int add_output_data(bufferlist& bl);
-  int send_data(void *ptr, size_t len) override;
-
-  RGWRESTStreamWriteRequest(CephContext *_cct, const string& _method, const string& _url, param_vec_t *_headers,
-               param_vec_t *_params) : RGWRESTSimpleRequest(_cct, _method, _url, _headers, _params),
-                lock("RGWRESTStreamWriteRequest"), cb(NULL), http_manager(_cct) {}
-  ~RGWRESTStreamWriteRequest() override;
-  int put_obj_init(RGWAccessKey& key, rgw_obj& obj, uint64_t obj_size, map<string, bufferlist>& attrs);
-  int complete(string& etag, real_time *mtime);
-
-  RGWGetDataCB *get_out_cb() { return cb; }
-};
-
 class RGWHTTPStreamRWRequest : public RGWHTTPSimpleRequest {
   Mutex lock;
   Mutex write_lock;
@@ -125,15 +106,13 @@ public:
 };
 
 class RGWRESTStreamRWRequest : public RGWHTTPStreamRWRequest {
-  RGWHTTPManager http_manager;
 public:
   RGWRESTStreamRWRequest(CephContext *_cct, const string& _method, const string& _url, RGWGetDataCB *_cb,
-               param_vec_t *_headers, param_vec_t *_params) : RGWHTTPStreamRWRequest(_cct, _method, _url, _cb, _headers, _params),
-                http_manager(_cct) {
+               param_vec_t *_headers, param_vec_t *_params) : RGWHTTPStreamRWRequest(_cct, _method, _url, _cb, _headers, _params) {
   }
   virtual ~RGWRESTStreamRWRequest() override {}
-  int send_request(RGWAccessKey& key, map<string, string>& extra_headers, rgw_obj& obj, RGWHTTPManager *mgr = NULL);
-  int send_request(RGWAccessKey *key, map<string, string>& extra_headers, const string& resource, bufferlist *send_data = NULL /* optional input data */, RGWHTTPManager *mgr = NULL);
+  int send_request(RGWAccessKey& key, map<string, string>& extra_headers, rgw_obj& obj, RGWHTTPManager *mgr);
+  int send_request(RGWAccessKey *key, map<string, string>& extra_headers, const string& resource, RGWHTTPManager *mgr, bufferlist *send_data = nullptr /* optional input data */);
   int complete_request(string& etag, real_time *mtime, uint64_t *psize, map<string, string>& attrs);
 };
 
@@ -149,5 +128,17 @@ public:
                param_vec_t *_params) : RGWRESTStreamRWRequest(_cct, "HEAD", _url, _cb, _headers, _params) {}
 };
 
+class RGWRESTStreamS3PutObj : public RGWRESTStreamRWRequest {
+  RGWGetDataCB *out_cb;
+public:
+  RGWRESTStreamS3PutObj(CephContext *_cct, const string& _method, const string& _url, param_vec_t *_headers,
+               param_vec_t *_params) : RGWRESTStreamRWRequest(_cct, _method, _url, nullptr, _headers, _params),
+                out_cb(NULL) {}
+  ~RGWRESTStreamS3PutObj() override;
+  int put_obj_init(RGWAccessKey& key, rgw_obj& obj, uint64_t obj_size, map<string, bufferlist>& attrs);
+
+  RGWGetDataCB *get_out_cb() { return out_cb; }
+};
+
 #endif
 
index 0970a252e13077bf782131123e651a71f305dc16..8b168f9a66da7cbaf60f2effd68ec0e640adcaa9 100644 (file)
@@ -114,7 +114,7 @@ int RGWRESTConn::forward(const rgw_user& uid, req_info& info, obj_version *objv,
 }
 
 int RGWRESTConn::put_obj_init(const rgw_user& uid, rgw_obj& obj, uint64_t obj_size,
-                             map<string, bufferlist>& attrs, RGWRESTStreamWriteRequest **req)
+                              map<string, bufferlist>& attrs, RGWRESTStreamS3PutObj **req)
 {
   string url;
   int ret = get_url(url);
@@ -123,7 +123,7 @@ int RGWRESTConn::put_obj_init(const rgw_user& uid, rgw_obj& obj, uint64_t obj_si
 
   param_vec_t params;
   populate_params(params, &uid, self_zone_group);
-  RGWRESTStreamWriteRequest *wr = new RGWRESTStreamWriteRequest(cct, "PUT", url, NULL, &params);
+  RGWRESTStreamS3PutObj *wr = new RGWRESTStreamS3PutObj(cct, "PUT", url, NULL, &params);
   ret = wr->put_obj_init(key, obj, obj_size, attrs);
   if (ret < 0) {
     delete wr;
@@ -133,9 +133,10 @@ int RGWRESTConn::put_obj_init(const rgw_user& uid, rgw_obj& obj, uint64_t obj_si
   return 0;
 }
 
-int RGWRESTConn::complete_request(RGWRESTStreamWriteRequest *req, string& etag, real_time *mtime)
+int RGWRESTConn::complete_request(RGWRESTStreamS3PutObj *req, string& etag, real_time *mtime)
 {
-  int ret = req->complete(etag, mtime);
+  map<string, string> attrs;
+  int ret = req->complete_request(etag, mtime, nullptr, attrs);
   delete req;
 
   return ret;
@@ -221,7 +222,7 @@ int RGWRESTConn::get_obj(const rgw_user& uid, req_info *info /* optional */, rgw
     set_header(mod_pg_ver, extra_headers, "HTTP_DEST_PG_VER");
   }
 
-  int r = (*req)->send_request(key, extra_headers, obj);
+  int r = (*req)->send_request(key, extra_headers, obj, nullptr);
   if (r < 0) {
     delete *req;
     *req = nullptr;
@@ -268,7 +269,7 @@ int RGWRESTConn::get_resource(const string& resource,
     headers.insert(extra_headers->begin(), extra_headers->end());
   }
 
-  ret = req.send_request(&key, headers, resource, send_data, mgr);
+  ret = req.send_request(&key, headers, resource, mgr, send_data);
   if (ret < 0) {
     ldout(cct, 5) << __func__ << ": send_request() resource=" << resource << " returned ret=" << ret << dendl;
     return ret;
@@ -315,7 +316,7 @@ void RGWRESTReadResource::init_common(param_vec_t *extra_headers)
 
 int RGWRESTReadResource::read()
 {
-  int ret = req.send_request(&conn->get_key(), headers, resource, nullptr, mgr);
+  int ret = req.send_request(&conn->get_key(), headers, resource, mgr);
   if (ret < 0) {
     ldout(cct, 5) << __func__ << ": send_request() resource=" << resource << " returned ret=" << ret << dendl;
     return ret;
@@ -328,7 +329,7 @@ int RGWRESTReadResource::read()
 
 int RGWRESTReadResource::aio_read()
 {
-  int ret = req.send_request(&conn->get_key(), headers, resource, nullptr, mgr);
+  int ret = req.send_request(&conn->get_key(), headers, resource, mgr);
   if (ret < 0) {
     ldout(cct, 5) << __func__ << ": send_request() resource=" << resource << " returned ret=" << ret << dendl;
     return ret;
@@ -376,7 +377,7 @@ void RGWRESTSendResource::init_common(param_vec_t *extra_headers)
 int RGWRESTSendResource::send(bufferlist& outbl)
 {
   req.set_outbl(outbl);
-  int ret = req.send_request(&conn->get_key(), headers, resource, nullptr, mgr);
+  int ret = req.send_request(&conn->get_key(), headers, resource, mgr);
   if (ret < 0) {
     ldout(cct, 5) << __func__ << ": send_request() resource=" << resource << " returned ret=" << ret << dendl;
     return ret;
@@ -390,7 +391,7 @@ int RGWRESTSendResource::send(bufferlist& outbl)
 int RGWRESTSendResource::aio_send(bufferlist& outbl)
 {
   req.set_outbl(outbl);
-  int ret = req.send_request(&conn->get_key(), headers, resource, nullptr, mgr);
+  int ret = req.send_request(&conn->get_key(), headers, resource, mgr);
   if (ret < 0) {
     ldout(cct, 5) << __func__ << ": send_request() resource=" << resource << " returned ret=" << ret << dendl;
     return ret;
index e0d448777b54e29f12a6fa6d6629d8cd9836ff61..7d7556f1689d9c3b2cccc100cc5809ab4f986dec 100644 (file)
@@ -89,8 +89,8 @@ public:
 
   /* async request */
   int put_obj_init(const rgw_user& uid, rgw_obj& obj, uint64_t obj_size,
-                   map<string, bufferlist>& attrs, RGWRESTStreamWriteRequest **req);
-  int complete_request(RGWRESTStreamWriteRequest *req, string& etag, ceph::real_time *mtime);
+                   map<string, bufferlist>& attrs, RGWRESTStreamS3PutObj **req);
+  int complete_request(RGWRESTStreamS3PutObj *req, string& etag, ceph::real_time *mtime);
 
   int get_obj(const rgw_user& uid, req_info *info /* optional */, rgw_obj& obj,
               const ceph::real_time *mod_ptr, const ceph::real_time *unmod_ptr,
index bfbcad7299911a38b5a62ab1881290988b871995..73626a7d9cf711c5117638af120e4a3848bf736b 100644 (file)
@@ -38,8 +38,8 @@ TEST(HTTPManager, SignalThread)
   constexpr size_t num_requests = max_requests + 1;
 
   for (size_t i = 0; i < num_requests; i++) {
-    RGWHTTPClient client{cct};
-    http.add_request(&client, "PUT", "http://127.0.0.1:80");
+    RGWHTTPClient client{cct, "PUT", "http://127.0.0.1:80"};
+    http.add_request(&client);
   }
 }