]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: enable data sending via http client
authorYehuda Sadeh <yehuda@inktank.com>
Fri, 24 May 2013 21:24:03 +0000 (14:24 -0700)
committerYehuda Sadeh <yehuda@inktank.com>
Fri, 24 May 2013 21:27:42 +0000 (14:27 -0700)
Signed-off-by: Yehuda Sadeh <yehuda@inktank.com>
src/rgw/rgw_http_client.cc
src/rgw/rgw_http_client.h
src/rgw/rgw_op.cc
src/rgw/rgw_op.h
src/rgw/rgw_rest_client.cc
src/rgw/rgw_rest_client.h
src/rgw/rgw_rest_conn.cc
src/rgw/rgw_rest_conn.h
src/rgw/rgw_rest_s3.cc

index 877d0e034a3c2635c9b6eb8435f4797d6baae12d..1f8f3d636b2f8eb129a1b0ac58bf2f29a34aa134 100644 (file)
@@ -6,30 +6,41 @@
 
 #define dout_subsys ceph_subsys_rgw
 
-static size_t read_http_header(void *ptr, size_t size, size_t nmemb, void *_info)
+static size_t receive_http_header(void *ptr, size_t size, size_t nmemb, void *_info)
 {
   RGWHTTPClient *client = static_cast<RGWHTTPClient *>(_info);
   size_t len = size * nmemb;
-  int ret = client->read_header(ptr, size * nmemb);
+  int ret = client->receive_header(ptr, size * nmemb);
   if (ret < 0) {
-    dout(0) << "WARNING: client->read_header() returned ret=" << ret << dendl;
+    dout(0) << "WARNING: client->receive_header() returned ret=" << ret << dendl;
   }
 
   return len;
 }
 
-static size_t read_http_data(void *ptr, size_t size, size_t nmemb, void *_info)
+static size_t receive_http_data(void *ptr, size_t size, size_t nmemb, void *_info)
 {
   RGWHTTPClient *client = static_cast<RGWHTTPClient *>(_info);
   size_t len = size * nmemb;
-  int ret = client->read_data(ptr, size * nmemb);
+  int ret = client->receive_data(ptr, size * nmemb);
   if (ret < 0) {
-    dout(0) << "WARNING: client->read_data() returned ret=" << ret << dendl;
+    dout(0) << "WARNING: client->receive_data() returned ret=" << ret << dendl;
   }
 
   return len;
 }
 
+static size_t send_http_data(void *ptr, size_t size, size_t nmemb, void *_info)
+{
+  RGWHTTPClient *client = static_cast<RGWHTTPClient *>(_info);
+  int ret = client->send_data(ptr, size * nmemb);
+  if (ret < 0) {
+    dout(0) << "WARNING: client->receive_data() returned ret=" << ret << dendl;
+  }
+
+  return ret;
+}
+
 int RGWHTTPClient::process(const char *method, const char *url)
 {
   int ret = 0;
@@ -60,14 +71,20 @@ int RGWHTTPClient::process(const char *method, const char *url)
   curl_easy_setopt(curl_handle, CURLOPT_URL, url);
   curl_easy_setopt(curl_handle, CURLOPT_NOPROGRESS, 1L);
   curl_easy_setopt(curl_handle, CURLOPT_NOSIGNAL, 1L);
-  curl_easy_setopt(curl_handle, CURLOPT_HEADERFUNCTION, read_http_header);
+  curl_easy_setopt(curl_handle, CURLOPT_HEADERFUNCTION, receive_http_header);
   curl_easy_setopt(curl_handle, CURLOPT_WRITEHEADER, (void *)this);
-  curl_easy_setopt(curl_handle, CURLOPT_WRITEFUNCTION, read_http_data);
+  curl_easy_setopt(curl_handle, CURLOPT_WRITEFUNCTION, receive_http_data);
   curl_easy_setopt(curl_handle, CURLOPT_WRITEDATA, (void *)this);
   curl_easy_setopt(curl_handle, CURLOPT_ERRORBUFFER, (void *)error_buf);
   if (h) {
     curl_easy_setopt(curl_handle, CURLOPT_HTTPHEADER, (void *)h);
   }
+  curl_easy_setopt(curl_handle, CURLOPT_READFUNCTION, send_http_data);
+  curl_easy_setopt(curl_handle, CURLOPT_READDATA, (void *)this);
+  curl_easy_setopt(curl_handle, CURLOPT_UPLOAD, 1L); 
+  if (has_send_len) {
+    curl_easy_setopt(curl_handle, CURLOPT_INFILESIZE, (void *)send_len); 
+  }
   CURLcode status = curl_easy_perform(curl_handle);
   if (status) {
     dout(0) << "curl_easy_performed returned error: " << error_buf << dendl;
index 5e8b55a61b63891666af7ded1ceaa928b6e5e04a..da7ab875beb5246a3cca8c3d8e4a49bc7d8d93fe 100644 (file)
@@ -5,18 +5,28 @@
 
 class RGWHTTPClient
 {
+  bufferlist send_bl;
+  bufferlist::iterator send_iter;
+  size_t send_len;
+  bool has_send_len;
 protected:
   list<pair<string, string> > headers;
 public:
   virtual ~RGWHTTPClient() {}
-  RGWHTTPClient() {}
+  RGWHTTPClient(): send_len (0), has_send_len(false) {}
 
   void append_header(const string& name, const string& val) {
     headers.push_back(pair<string, string>(name, val));
   }
 
-  virtual int read_header(void *ptr, size_t len) { return 0; }
-  virtual int read_data(void *ptr, size_t len) { return 0; }
+  virtual int receive_header(void *ptr, size_t len) { return 0; }
+  virtual int receive_data(void *ptr, size_t len) { return 0; }
+  virtual int send_data(void *ptr, size_t len) { return 0; }
+
+  void set_send_length(size_t len) {
+    send_len = len;
+    has_send_len = true;
+  }
 
   int process(const char *method, const char *url);
   int process(const char *url) { return process("GET", url); }
index 20a65631e63a24ecfcca6a8e80955b99e1b248fd..74a4b99b134084543fab26d3404b9faf51141ad2 100644 (file)
@@ -866,7 +866,7 @@ void RGWCreateBucket::execute()
     }
 
     ldout(s->cct, 0) << "sending create_bucket request to master region" << dendl;
-    ret = store->rest_conn->forward(s->user.user_id, s->info);
+    ret = store->rest_conn->forward(s->user.user_id, s->info, &in_data);
     if (ret < 0)
       return;
   }
index f678f3be6b026caf07aa2075ecb99087c0fb78cb..ec802d7893e26f2ef70c43ced4df324f7c919400 100644 (file)
@@ -234,6 +234,8 @@ protected:
   RGWAccessControlPolicy policy;
   string location_constraint;
 
+  bufferlist in_data;
+
 public:
   RGWCreateBucket() : ret(0) {}
 
index fd92c6e20fe23e966786949ac421cd056314af90..f84950e282730b12fa700e7cbfe1c65da13a5a6f 100644 (file)
@@ -8,13 +8,13 @@
 
 #define dout_subsys ceph_subsys_rgw
 
-int RGWRESTClient::read_header(void *ptr, size_t len)
+int RGWRESTClient::receive_header(void *ptr, size_t len)
 {
   char line[len + 1];
 
   char *s = (char *)ptr, *end = (char *)ptr + len;
   char *p = line;
-  ldout(cct, 10) << "read_http_header" << dendl;
+  ldout(cct, 10) << "receive_http_header" << dendl;
 
   while (s != end) {
     if (*s == '\r') {
@@ -102,7 +102,20 @@ int RGWRESTClient::execute(RGWAccessKey& key, const char *method, const char *re
   return rgw_http_error_to_errno(status);
 }
 
-int RGWRESTClient::forward_request(RGWAccessKey& key, req_info& info)
+int RGWRESTClient::send_data(void *ptr, size_t len)
+{
+  if (!send_iter)
+    return 0;
+
+  if (len > send_iter->get_remaining())
+    len = send_iter->get_remaining();
+
+  send_iter->copy(len, (char *)ptr);
+
+  return len;
+}
+
+int RGWRESTClient::forward_request(RGWAccessKey& key, req_info& info, bufferlist *inbl)
 {
 
   string date_str;
@@ -156,9 +169,19 @@ int RGWRESTClient::forward_request(RGWAccessKey& key, req_info& info)
     new_resource.append(resource);
   }
   new_url.append(new_resource);
-  
+
+  bufferlist::iterator bliter;
+
+  if (inbl) {
+    bliter = inbl->begin();
+    send_iter = &bliter;
+
+    set_send_length(inbl->length());
+  }
+
   int r = process(new_info.method, new_url.c_str());
   if (r < 0)
     return r;
 
-  return rgw_http_error_to_errno(status);}
+  return rgw_http_error_to_errno(status);
+}
index a117bb12a7794e0f49903faa826ccc9d1614b949..216399b26b7b3119d620944d0922b7b3bc307e9e 100644 (file)
@@ -6,18 +6,20 @@
 #include "rgw_http_client.h"
 
 class RGWRESTClient : public RGWHTTPClient {
+protected:
   CephContext *cct;
 
-protected:
   int status;
 
   string url;
 
   map<string, string> out_headers;
   list<pair<string, string> > params;
+
+  bufferlist::iterator *send_iter;
 public:
   RGWRESTClient(CephContext *_cct, string& _url, list<pair<string, string> > *_headers,
-                list<pair<string, string> > *_params) : cct(_cct), status(0), url(_url) {
+                list<pair<string, string> > *_params) : cct(_cct), status(0), url(_url), send_iter(NULL) {
     if (_headers)
       headers = *_headers;
 
@@ -25,10 +27,11 @@ public:
       params = *_params;
   }
 
-  int read_header(void *ptr, size_t len);
+  int receive_header(void *ptr, size_t len);
+  int send_data(void *ptr, size_t len);
 
   int execute(RGWAccessKey& key, const char *method, const char *resource);
-  int forward_request(RGWAccessKey& key, req_info& info);
+  int forward_request(RGWAccessKey& key, req_info& info, bufferlist *inbl);
 };
 
 
index 3c1d6c2234d76cbc5d68d578cd427673b9f5e49d..82fc07c1ec46b904407584ae4df9fcde3334fdeb 100644 (file)
@@ -26,15 +26,16 @@ int RGWRegionConnection::get_url(string& endpoint)
   return 0;
 }
 
-int RGWRegionConnection::forward(const string& uid, req_info& info)
+int RGWRegionConnection::forward(const string& uid, req_info& info, bufferlist *inbl)
 {
   string url;
   int ret = get_url(url);
   if (ret < 0)
     return ret;
   list<pair<string, string> > params;
+  params.push_back(make_pair<string, string>("uid", uid));
   RGWRESTClient client(cct, url, NULL, &params);
-  return client.forward_request(key, info);
+  return client.forward_request(key, info, inbl);
 }
 
 int RGWRegionConnection::create_bucket(const string& uid, const string& bucket)
index 06657bf91e7d722c32893a3a949c7da12f6e458c..af0e74964b9f08685fbb2b968f79078bb5cb93ea 100644 (file)
@@ -18,7 +18,7 @@ public:
   RGWRegionConnection(CephContext *_cct, RGWRados *store, RGWRegion& upstream);
   int get_url(string& endpoint);
 
-  int forward(const string& uid, req_info& info);
+  int forward(const string& uid, req_info& info, bufferlist *inbl);
   int create_bucket(const string& uid, const string& bucket);
 
 };
index f170953f8c407d15917de7d30f6b0347a4f81980..9902dfc2e13b3574df047efd145e9ba29ccf6fdd 100644 (file)
@@ -369,6 +369,9 @@ int RGWCreateBucket_ObjStore_S3::get_params()
   if (ret < 0)
     return ret;
 
+  bufferptr in_ptr(data, len);
+  in_data.append(in_ptr);
+
   if (len) {
     RGWCreateBucketParser parser;