]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: stream obj into http request
authorYehuda Sadeh <yehuda@inktank.com>
Thu, 6 Jun 2013 04:06:52 +0000 (21:06 -0700)
committerYehuda Sadeh <yehuda@inktank.com>
Mon, 10 Jun 2013 21:28:03 +0000 (14:28 -0700)
still need to figure out curl handle polling, handle client
errors correctly.

Signed-off-by: Yehuda Sadeh <yehuda@inktank.com>
src/rgw/rgw_http_client.cc
src/rgw/rgw_http_client.h
src/rgw/rgw_rados.cc
src/rgw/rgw_rest_client.cc
src/rgw/rgw_rest_client.h
src/rgw/rgw_rest_conn.cc
src/rgw/rgw_rest_conn.h

index 1f8f3d636b2f8eb129a1b0ac58bf2f29a34aa134..a3a8c42bdeb6b7184a208015e372ba93240ab5ee 100644 (file)
@@ -1,5 +1,6 @@
 #include <curl/curl.h>
 #include <curl/easy.h>
+#include <curl/multi.h>
 
 #include "rgw_common.h"
 #include "rgw_http_client.h"
@@ -96,4 +97,171 @@ int RGWHTTPClient::process(const char *method, const char *url)
   return ret;
 }
 
+struct multi_req_data {
+  CURL *easy_handle;
+  CURLM *multi_handle;
+  curl_slist *h;
 
+  multi_req_data() : easy_handle(NULL), multi_handle(NULL), h(NULL) {}
+  ~multi_req_data() {
+    if (multi_handle)
+      curl_multi_cleanup(multi_handle);
+
+    if (easy_handle)
+      curl_easy_cleanup(easy_handle);
+
+    if (h)
+      curl_slist_free_all(h);
+  }
+};
+
+int RGWHTTPClient::init_async(const char *method, const char *url, void **handle)
+{
+  CURL *easy_handle;
+  CURLM *multi_handle;
+  multi_req_data *req_data = new multi_req_data;
+  *handle = (void *)req_data;
+
+  char error_buf[CURL_ERROR_SIZE];
+
+  multi_handle = curl_multi_init();
+  easy_handle = curl_easy_init();
+
+  req_data->multi_handle = multi_handle;
+  req_data->easy_handle = easy_handle;
+
+  CURLMcode mstatus = curl_multi_add_handle(multi_handle, easy_handle);
+  if (mstatus) {
+    dout(0) << "ERROR: failed on curl_multi_add_handle, status=" << mstatus << dendl;
+    delete req_data;
+    return -EIO;
+  }
+
+  dout(20) << "sending request to " << url << dendl;
+
+  curl_slist *h = NULL;
+
+  list<pair<string, string> >::iterator iter;
+  for (iter = headers.begin(); iter != headers.end(); ++iter) {
+    pair<string, string>& p = *iter;
+    string val = p.first;
+
+    if (strncmp(val.c_str(), "HTTP_", 5) == 0) {
+      val = val.substr(5);
+    }
+    val.append(": ");
+    val.append(p.second);
+    h = curl_slist_append(h, val.c_str());
+  }
+
+  req_data->h = h;
+
+  curl_easy_setopt(easy_handle, CURLOPT_CUSTOMREQUEST, method);
+  curl_easy_setopt(easy_handle, CURLOPT_URL, url);
+  curl_easy_setopt(easy_handle, CURLOPT_NOPROGRESS, 1L);
+  curl_easy_setopt(easy_handle, CURLOPT_NOSIGNAL, 1L);
+  curl_easy_setopt(easy_handle, CURLOPT_HEADERFUNCTION, receive_http_header);
+  curl_easy_setopt(easy_handle, CURLOPT_WRITEHEADER, (void *)this);
+  curl_easy_setopt(easy_handle, CURLOPT_WRITEFUNCTION, receive_http_data);
+  curl_easy_setopt(easy_handle, CURLOPT_WRITEDATA, (void *)this);
+  curl_easy_setopt(easy_handle, CURLOPT_ERRORBUFFER, (void *)error_buf);
+  if (h) {
+    curl_easy_setopt(easy_handle, CURLOPT_HTTPHEADER, (void *)h);
+  }
+  curl_easy_setopt(easy_handle, CURLOPT_READFUNCTION, send_http_data);
+  curl_easy_setopt(easy_handle, CURLOPT_READDATA, (void *)this);
+  curl_easy_setopt(easy_handle, CURLOPT_UPLOAD, 1L); 
+  if (has_send_len) {
+    curl_easy_setopt(easy_handle, CURLOPT_INFILESIZE, (void *)send_len); 
+  }
+
+  return 0;
+}
+
+
+int RGWHTTPClient::process_request(void *handle, bool *done)
+{
+  multi_req_data *req_data = (multi_req_data *)handle;
+  int still_running;
+  int mstatus;
+
+  do {
+#if 0
+    struct timeval timeout;
+    fd_set fdread;
+    fd_set fdwrite;
+    fd_set fdexcep;
+    int maxfd = -1;
+    long curl_timeo = -1;
+    FD_ZERO(&fdread);
+    FD_ZERO(&fdwrite);
+    FD_ZERO(&fdexcep);
+#if 0 
+    /* set a suitable timeout to play around with */ 
+    timeout.tv_sec = 1;
+    timeout.tv_usec = 0;
+    curl_multi_timeout(multi_handle, &curl_timeo);
+    if(curl_timeo >= 0) {
+      timeout.tv_sec = curl_timeo / 1000;
+      if(timeout.tv_sec > 1)
+        timeout.tv_sec = 1;
+      else
+        timeout.tv_usec = (curl_timeo % 1000) * 1000;
+    }
+#endif
+    /* get file descriptors from the transfers */ 
+    int ret = curl_multi_fdset(req_data->multi_handle, &fdread, &fdwrite, &fdexcep, &maxfd);
+    if (ret) {
+      dout(0) << "ERROR: curl_multi_fdset returned " << ret << dendl;
+      return -EIO;
+    }
+
+#warning FIXME: replace select with poll 
+    ret = select(maxfd+1, &fdread, &fdwrite, &fdexcep, NULL);
+    if (ret < 0) {
+      ret = -errno;
+      dout(0) << "ERROR: select returned " << ret << dendl;
+      return ret;
+    }
+#endif
+    mstatus = curl_multi_perform(req_data->multi_handle, &still_running);
+    dout(20) << "curl_multi_perform returned: " << mstatus << dendl;
+    switch (mstatus) {
+      case CURLM_OK:
+      case CURLM_CALL_MULTI_PERFORM:
+        break;
+      default:
+        return -EINVAL;
+    }
+    int msgs_left;
+    CURLMsg *msg;
+    while ((msg = curl_multi_info_read(req_data->multi_handle, &msgs_left))) {
+      if (msg->msg == CURLMSG_DONE) {
+#warning FIXME: check result
+        dout(20) << "msg->data.result=" << msg->data.result << dendl;
+      }
+    }
+  } while (mstatus == CURLM_CALL_MULTI_PERFORM);
+
+  *done = (still_running == 0);
+
+  return 0;
+}
+
+int RGWHTTPClient::complete_request(void *handle)
+{
+  bool done;
+  int ret;
+  do {
+    ret = process_request(handle, &done);
+  } while (!done && !ret);
+  multi_req_data *req_data = (multi_req_data *)handle;
+  delete req_data;
+
+  return ret;
+}
index da7ab875beb5246a3cca8c3d8e4a49bc7d8d93fe..937dfe76ab606ff3a87c5a0a5b29b3b38b78506b 100644 (file)
@@ -30,6 +30,10 @@ public:
 
   int process(const char *method, const char *url);
   int process(const char *url) { return process("GET", url); }
+
+  int init_async(const char *method, const char *url, void **handle);
+  int process_request(void *handle,bool *done);
+  int complete_request(void *handle);
 };
 
 #endif
index 4ae126e83480294c5b511dbe973a7b73f3b40888..b0ede4ff9df52d364de7f32d76d16b6eff9de43c 100644 (file)
@@ -1950,10 +1950,22 @@ int RGWRados::copy_obj(void *ctx,
     /* dest is in a different region, copy it there */
 
     map<string, bufferlist> src_attrs;
+
+    RGWRESTStreamRequest *out_stream_req;
   
-    int ret = rest_conn->put_obj(user_id, dest_obj, astate->size, NULL);
+    int ret = rest_conn->put_obj_init(user_id, dest_obj, astate->size, &out_stream_req);
     if (ret < 0)
       return ret;
+
+    ret = get_obj_iterate(ctx, &handle, src_obj, 0, astate->size - 1, out_stream_req->get_out_cb());
+    if (ret < 0)
+      return ret;
+
+    ret = rest_conn->complete_request(out_stream_req);
+    if (ret < 0)
+      return ret;
+
+    return 0;
   }
       
   if (copy_data) { /* refcounting tail wouldn't work here, just copy the data */
index 6702237b7450c26e3f9a23a12783d1408e6c239a..1a85aa5c764e835d4434dafd88983d63363f679e 100644 (file)
@@ -2,13 +2,14 @@
 #include "rgw_rest_client.h"
 #include "rgw_auth_s3.h"
 #include "rgw_http_errors.h"
+#include "rgw_rados.h"
 
 #include "common/ceph_crypto_cms.h"
 #include "common/armor.h"
 
 #define dout_subsys ceph_subsys_rgw
 
-int RGWRESTClient::receive_header(void *ptr, size_t len)
+int RGWRESTSimpleRequest::receive_header(void *ptr, size_t len)
 {
   char line[len + 1];
 
@@ -60,7 +61,7 @@ static void get_new_date_str(CephContext *cct, string& date_str)
   date_str = s.str();
 }
 
-int RGWRESTClient::execute(RGWAccessKey& key, const char *method, const char *resource)
+int RGWRESTSimpleRequest::execute(RGWAccessKey& key, const char *method, const char *resource)
 {
   string new_url = url;
   string new_resource = resource;
@@ -102,7 +103,7 @@ int RGWRESTClient::execute(RGWAccessKey& key, const char *method, const char *re
   return rgw_http_error_to_errno(status);
 }
 
-int RGWRESTClient::send_data(void *ptr, size_t len)
+int RGWRESTSimpleRequest::send_data(void *ptr, size_t len)
 {
   if (!send_iter)
     return 0;
@@ -115,7 +116,7 @@ int RGWRESTClient::send_data(void *ptr, size_t len)
   return len;
 }
 
-int RGWRESTClient::receive_data(void *ptr, size_t len)
+int RGWRESTSimpleRequest::receive_data(void *ptr, size_t len)
 {
   if (response.length() > max_response)
     return 0; /* don't read extra data */
@@ -127,7 +128,8 @@ int RGWRESTClient::receive_data(void *ptr, size_t len)
   return 0;
 
 }
-void RGWRESTClient::append_param(string& dest, const string& name, const string& val)
+
+void RGWRESTSimpleRequest::append_param(string& dest, const string& name, const string& val)
 {
   if (dest.empty()) {
     dest.append("?");
@@ -142,7 +144,7 @@ void RGWRESTClient::append_param(string& dest, const string& name, const string&
   }
 }
 
-void RGWRESTClient::get_params_str(map<string, string>& extra_args, string& dest)
+void RGWRESTSimpleRequest::get_params_str(map<string, string>& extra_args, string& dest)
 {
   map<string, string>::iterator miter;
   for (miter = extra_args.begin(); miter != extra_args.end(); ++miter) {
@@ -154,7 +156,7 @@ void RGWRESTClient::get_params_str(map<string, string>& extra_args, string& dest
   }
 }
 
-int RGWRESTClient::sign_request(RGWAccessKey& key, RGWEnv& env, req_info& info)
+int RGWRESTSimpleRequest::sign_request(RGWAccessKey& key, RGWEnv& env, req_info& info)
 {
   map<string, string>& m = env.get_map();
 
@@ -185,7 +187,7 @@ int RGWRESTClient::sign_request(RGWAccessKey& key, RGWEnv& env, req_info& info)
   return 0;
 }
 
-int RGWRESTClient::forward_request(RGWAccessKey& key, req_info& info, size_t max_response, bufferlist *inbl, bufferlist *outbl)
+int RGWRESTSimpleRequest::forward_request(RGWAccessKey& key, req_info& info, size_t max_response, bufferlist *inbl, bufferlist *outbl)
 {
 
   string date_str;
@@ -246,7 +248,43 @@ int RGWRESTClient::forward_request(RGWAccessKey& key, req_info& info, size_t max
   return rgw_http_error_to_errno(status);
 }
 
-int RGWRESTClient::put_obj(RGWAccessKey& key, rgw_obj& obj, uint64_t obj_size, void (*get_data)(uint64_t ofs, uint64_t len, bufferlist& bl, void *))
+class RGWRESTStreamOutCB : public RGWGetDataCB {
+  RGWRESTStreamRequest *req;
+public:
+  RGWRESTStreamOutCB(RGWRESTStreamRequest *_req) : req(_req) {}
+  int handle_data(bufferlist& bl, off_t bl_ofs, off_t bl_len); /* callback for object iteration when sending data */
+};
+
+int RGWRESTStreamOutCB::handle_data(bufferlist& bl, off_t bl_ofs, off_t bl_len)
+{
+  dout(20) << "RGWRESTStreamOutCB::handle_data bl.length()=" << bl.length() << " bl_ofs=" << bl_ofs << " bl_len=" << bl_len << dendl;
+  if (!bl_ofs && bl_len == bl.length()) {
+    return req->add_output_data(bl);
+  }
+
+  bufferptr bp(bl.c_str() + bl_ofs, bl_len);
+  bufferlist new_bl;
+  new_bl.push_back(bp);
+
+  return req->add_output_data(new_bl);
+}
+
+RGWRESTStreamRequest::~RGWRESTStreamRequest()
+{
+  delete cb;
+}
+
+int RGWRESTStreamRequest::add_output_data(bufferlist& bl)
+{
+  lock.Lock();
+  pending_send.push_back(bl);
+  lock.Unlock();
+
+  bool done;
+  return process_request(handle, &done);
+}
+
+int RGWRESTStreamRequest::put_obj_init(RGWAccessKey& key, rgw_obj& obj, uint64_t obj_size)
 {
   string resource = obj.bucket.name + "/" + obj.object;
   string new_url = url;
@@ -285,10 +323,61 @@ int RGWRESTClient::put_obj(RGWAccessKey& key, rgw_obj& obj, uint64_t obj_size, v
     headers.push_back(make_pair<string, string>(iter->first, iter->second));
   }
 
-  int r = process(new_info.method, new_url.c_str());
+  cb = new RGWRESTStreamOutCB(this);
+
+  set_send_length(obj_size);
+
+  int r = init_async(new_info.method, new_url.c_str(), &handle);
   if (r < 0)
     return r;
 
   return 0;
 }
 
+int RGWRESTStreamRequest::send_data(void *ptr, size_t len)
+{
+  uint64_t sent = 0;
+
+  dout(20) << "RGWRESTStreamRequest::send_data()" << dendl;
+  lock.Lock();
+  if (pending_send.empty()) {
+    lock.Unlock();
+    return 0;
+  }
+
+  list<bufferlist>::iterator iter = pending_send.begin();
+  while (iter != pending_send.end() && len > 0) {
+    bufferlist& bl = *iter;
+    
+    list<bufferlist>::iterator next_iter = iter;
+    ++next_iter;
+    lock.Unlock();
+
+    uint64_t send_len = min(len, (size_t)bl.length());
+
+    memcpy(ptr, bl.c_str(), send_len);
+
+    len -= send_len;
+    sent += send_len;
+
+    lock.Lock();
+    pending_send.pop_front();
+
+    if (bl.length() > send_len) {
+      bufferptr bp(bl.c_str() + send_len, bl.length() - send_len);
+      bufferlist new_bl;
+      new_bl.append(bp);
+      pending_send.push_front(new_bl);
+    }
+    iter = next_iter;
+  }
+  lock.Unlock();
+
+  return sent;
+}
+
+
+int RGWRESTStreamRequest::complete()
+{
+  return complete_request(handle);
+}
index fee9c1d38b4d4298a0cdde65c18076f6b722abba..48a0bf04a1d6cba6161995c12196702e668bbf71 100644 (file)
@@ -5,7 +5,9 @@
 
 #include "rgw_http_client.h"
 
-class RGWRESTClient : public RGWHTTPClient {
+class RGWGetDataCB;
+
+class RGWRESTSimpleRequest : public RGWHTTPClient {
 protected:
   CephContext *cct;
 
@@ -26,7 +28,7 @@ protected:
 
   int sign_request(RGWAccessKey& key, RGWEnv& env, req_info& info);
 public:
-  RGWRESTClient(CephContext *_cct, string& _url, list<pair<string, string> > *_headers,
+  RGWRESTSimpleRequest(CephContext *_cct, string& _url, list<pair<string, string> > *_headers,
                 list<pair<string, string> > *_params) : cct(_cct), status(0), url(_url), send_iter(NULL),
                                                         max_response(0) {
     if (_headers)
@@ -38,16 +40,33 @@ public:
 
   int receive_header(void *ptr, size_t len);
   virtual int receive_data(void *ptr, size_t len);
-  int send_data(void *ptr, size_t len);
+  virtual int send_data(void *ptr, size_t len);
 
   bufferlist& get_response() { return response; }
 
   int execute(RGWAccessKey& key, const char *method, const char *resource);
   int forward_request(RGWAccessKey& key, req_info& info, size_t max_response, bufferlist *inbl, bufferlist *outbl);
-
-  int put_obj(RGWAccessKey& key, rgw_obj& obj, uint64_t obj_size, void (*get_data)(uint64_t ofs, uint64_t len, bufferlist& bl, void *));
 };
 
 
+class RGWRESTStreamRequest : public RGWRESTSimpleRequest {
+  Mutex lock;
+  list<bufferlist> pending_send;
+  void *handle;
+  RGWGetDataCB *cb;
+public:
+  int add_output_data(bufferlist& bl);
+  int send_data(void *ptr, size_t len);
+
+  RGWRESTStreamRequest(CephContext *_cct, string& _url, list<pair<string, string> > *_headers,
+                list<pair<string, string> > *_params) : RGWRESTSimpleRequest(_cct, _url, _headers, _params),
+                lock("RGWRESTStreamRequest"), handle(NULL), cb(NULL) {}
+  ~RGWRESTStreamRequest();
+  int put_obj_init(RGWAccessKey& key, rgw_obj& obj, uint64_t obj_size);
+  int complete();
+
+  RGWGetDataCB *get_out_cb() { return cb; }
+};
+
 #endif
 
index ddcad57f4e8b303833a3707c5f4a94205a365b95..cba8548d3b4f6e6e03d123cdb77be75caaa1bfb3 100644 (file)
@@ -36,11 +36,17 @@ int RGWRegionConnection::forward(const string& uid, req_info& info, size_t max_r
   list<pair<string, string> > params;
   params.push_back(make_pair<string, string>(RGW_SYS_PARAM_PREFIX "uid", uid));
   params.push_back(make_pair<string, string>(RGW_SYS_PARAM_PREFIX "region", region));
-  RGWRESTClient client(cct, url, NULL, &params);
-  return client.forward_request(key, info, max_response, inbl, outbl);
+  RGWRESTSimpleRequest req(cct, url, NULL, &params);
+  return req.forward_request(key, info, max_response, inbl, outbl);
 }
 
-int RGWRegionConnection::put_obj(const string& uid, rgw_obj& obj, uint64_t obj_size, void (*get_data)(uint64_t ofs, uint64_t len, bufferlist& bl, void *))
+class StreamObjData : public RGWGetDataCB {
+  rgw_obj obj;
+public:
+    StreamObjData(rgw_obj& _obj) : obj(_obj) {}
+};
+
+int RGWRegionConnection::put_obj_init(const string& uid, rgw_obj& obj, uint64_t obj_size, RGWRESTStreamRequest **req)
 {
   string url;
   int ret = get_url(url);
@@ -50,7 +56,14 @@ int RGWRegionConnection::put_obj(const string& uid, rgw_obj& obj, uint64_t obj_s
   list<pair<string, string> > params;
   params.push_back(make_pair<string, string>(RGW_SYS_PARAM_PREFIX "uid", uid));
   params.push_back(make_pair<string, string>(RGW_SYS_PARAM_PREFIX "region", region));
-  RGWRESTClient client(cct, url, NULL, &params);
-  return client.put_obj(key, obj, obj_size, get_data);
+  *req = new RGWRESTStreamRequest(cct, url, NULL, &params);
+  return (*req)->put_obj_init(key, obj, obj_size);
 }
 
+int RGWRegionConnection::complete_request(RGWRESTStreamRequest *req)
+{
+  int ret = req->complete();
+  delete req;
+
+  return ret;
+}
index 00bb286b1302989728e746bface343a151ac7e96..1554e513db144a56f8aeeceea7ae1e9932f4e740 100644 (file)
@@ -1,11 +1,12 @@
-#ifndef CEPH_RGW_REST_REQ_H
-#define CEPH_RGW_REST_REQ_H
+#ifndef CEPH_RGW_REST_CONN_H
+#define CEPH_RGW_REST_CONN_H
 
 #include "rgw_rest_client.h"
 
 class CephContext;
 class RGWRados;
 class RGWRegion;
+class RGWGetObjData;
 
 class RGWRegionConnection
 {
@@ -19,8 +20,12 @@ public:
   RGWRegionConnection(CephContext *_cct, RGWRados *store, RGWRegion& upstream);
   int get_url(string& endpoint);
 
+  /* sync request */
   int forward(const string& uid, req_info& info, size_t max_response, bufferlist *inbl, bufferlist *outbl);
-  int put_obj(const string& uid, rgw_obj& obj, uint64_t obj_size, void (*get_data)(uint64_t ofs, uint64_t len, bufferlist& bl, void *));
+
+  /* async request */
+  int put_obj_init(const string& uid, rgw_obj& obj, uint64_t obj_size, RGWRESTStreamRequest **req);
+  int complete_request(RGWRESTStreamRequest *req);
 };
 
 #endif