From: Yehuda Sadeh Date: Fri, 24 May 2013 21:24:03 +0000 (-0700) Subject: rgw: enable data sending via http client X-Git-Tag: v0.67-rc1~128^2~100 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=51876499e00a7021057dce8d04ed27fb8369c7b5;p=ceph.git rgw: enable data sending via http client Signed-off-by: Yehuda Sadeh --- diff --git a/src/rgw/rgw_http_client.cc b/src/rgw/rgw_http_client.cc index 877d0e034a3c..1f8f3d636b2f 100644 --- a/src/rgw/rgw_http_client.cc +++ b/src/rgw/rgw_http_client.cc @@ -6,30 +6,41 @@ #define dout_subsys ceph_subsys_rgw -static size_t read_http_header(void *ptr, size_t size, size_t nmemb, void *_info) +static size_t receive_http_header(void *ptr, size_t size, size_t nmemb, void *_info) { RGWHTTPClient *client = static_cast(_info); size_t len = size * nmemb; - int ret = client->read_header(ptr, size * nmemb); + int ret = client->receive_header(ptr, size * nmemb); if (ret < 0) { - dout(0) << "WARNING: client->read_header() returned ret=" << ret << dendl; + dout(0) << "WARNING: client->receive_header() returned ret=" << ret << dendl; } return len; } -static size_t read_http_data(void *ptr, size_t size, size_t nmemb, void *_info) +static size_t receive_http_data(void *ptr, size_t size, size_t nmemb, void *_info) { RGWHTTPClient *client = static_cast(_info); size_t len = size * nmemb; - int ret = client->read_data(ptr, size * nmemb); + int ret = client->receive_data(ptr, size * nmemb); if (ret < 0) { - dout(0) << "WARNING: client->read_data() returned ret=" << ret << dendl; + dout(0) << "WARNING: client->receive_data() returned ret=" << ret << dendl; } return len; } +static size_t send_http_data(void *ptr, size_t size, size_t nmemb, void *_info) +{ + RGWHTTPClient *client = static_cast(_info); + int ret = client->send_data(ptr, size * nmemb); + if (ret < 0) { + dout(0) << "WARNING: client->receive_data() returned ret=" << ret << dendl; + } + + return ret; +} + int RGWHTTPClient::process(const char *method, const char *url) { int ret = 0; @@ -60,14 +71,20 @@ int RGWHTTPClient::process(const char *method, const char *url) curl_easy_setopt(curl_handle, CURLOPT_URL, url); curl_easy_setopt(curl_handle, CURLOPT_NOPROGRESS, 1L); curl_easy_setopt(curl_handle, CURLOPT_NOSIGNAL, 1L); - curl_easy_setopt(curl_handle, CURLOPT_HEADERFUNCTION, read_http_header); + curl_easy_setopt(curl_handle, CURLOPT_HEADERFUNCTION, receive_http_header); curl_easy_setopt(curl_handle, CURLOPT_WRITEHEADER, (void *)this); - curl_easy_setopt(curl_handle, CURLOPT_WRITEFUNCTION, read_http_data); + curl_easy_setopt(curl_handle, CURLOPT_WRITEFUNCTION, receive_http_data); curl_easy_setopt(curl_handle, CURLOPT_WRITEDATA, (void *)this); curl_easy_setopt(curl_handle, CURLOPT_ERRORBUFFER, (void *)error_buf); if (h) { curl_easy_setopt(curl_handle, CURLOPT_HTTPHEADER, (void *)h); } + curl_easy_setopt(curl_handle, CURLOPT_READFUNCTION, send_http_data); + curl_easy_setopt(curl_handle, CURLOPT_READDATA, (void *)this); + curl_easy_setopt(curl_handle, CURLOPT_UPLOAD, 1L); + if (has_send_len) { + curl_easy_setopt(curl_handle, CURLOPT_INFILESIZE, (void *)send_len); + } CURLcode status = curl_easy_perform(curl_handle); if (status) { dout(0) << "curl_easy_performed returned error: " << error_buf << dendl; diff --git a/src/rgw/rgw_http_client.h b/src/rgw/rgw_http_client.h index 5e8b55a61b63..da7ab875beb5 100644 --- a/src/rgw/rgw_http_client.h +++ b/src/rgw/rgw_http_client.h @@ -5,18 +5,28 @@ class RGWHTTPClient { + bufferlist send_bl; + bufferlist::iterator send_iter; + size_t send_len; + bool has_send_len; protected: list > headers; public: virtual ~RGWHTTPClient() {} - RGWHTTPClient() {} + RGWHTTPClient(): send_len (0), has_send_len(false) {} void append_header(const string& name, const string& val) { headers.push_back(pair(name, val)); } - virtual int read_header(void *ptr, size_t len) { return 0; } - virtual int read_data(void *ptr, size_t len) { return 0; } + virtual int receive_header(void *ptr, size_t len) { return 0; } + virtual int receive_data(void *ptr, size_t len) { return 0; } + virtual int send_data(void *ptr, size_t len) { return 0; } + + void set_send_length(size_t len) { + send_len = len; + has_send_len = true; + } int process(const char *method, const char *url); int process(const char *url) { return process("GET", url); } diff --git a/src/rgw/rgw_op.cc b/src/rgw/rgw_op.cc index 20a65631e63a..74a4b99b1340 100644 --- a/src/rgw/rgw_op.cc +++ b/src/rgw/rgw_op.cc @@ -866,7 +866,7 @@ void RGWCreateBucket::execute() } ldout(s->cct, 0) << "sending create_bucket request to master region" << dendl; - ret = store->rest_conn->forward(s->user.user_id, s->info); + ret = store->rest_conn->forward(s->user.user_id, s->info, &in_data); if (ret < 0) return; } diff --git a/src/rgw/rgw_op.h b/src/rgw/rgw_op.h index f678f3be6b02..ec802d7893e2 100644 --- a/src/rgw/rgw_op.h +++ b/src/rgw/rgw_op.h @@ -234,6 +234,8 @@ protected: RGWAccessControlPolicy policy; string location_constraint; + bufferlist in_data; + public: RGWCreateBucket() : ret(0) {} diff --git a/src/rgw/rgw_rest_client.cc b/src/rgw/rgw_rest_client.cc index fd92c6e20fe2..f84950e28273 100644 --- a/src/rgw/rgw_rest_client.cc +++ b/src/rgw/rgw_rest_client.cc @@ -8,13 +8,13 @@ #define dout_subsys ceph_subsys_rgw -int RGWRESTClient::read_header(void *ptr, size_t len) +int RGWRESTClient::receive_header(void *ptr, size_t len) { char line[len + 1]; char *s = (char *)ptr, *end = (char *)ptr + len; char *p = line; - ldout(cct, 10) << "read_http_header" << dendl; + ldout(cct, 10) << "receive_http_header" << dendl; while (s != end) { if (*s == '\r') { @@ -102,7 +102,20 @@ int RGWRESTClient::execute(RGWAccessKey& key, const char *method, const char *re return rgw_http_error_to_errno(status); } -int RGWRESTClient::forward_request(RGWAccessKey& key, req_info& info) +int RGWRESTClient::send_data(void *ptr, size_t len) +{ + if (!send_iter) + return 0; + + if (len > send_iter->get_remaining()) + len = send_iter->get_remaining(); + + send_iter->copy(len, (char *)ptr); + + return len; +} + +int RGWRESTClient::forward_request(RGWAccessKey& key, req_info& info, bufferlist *inbl) { string date_str; @@ -156,9 +169,19 @@ int RGWRESTClient::forward_request(RGWAccessKey& key, req_info& info) new_resource.append(resource); } new_url.append(new_resource); - + + bufferlist::iterator bliter; + + if (inbl) { + bliter = inbl->begin(); + send_iter = &bliter; + + set_send_length(inbl->length()); + } + int r = process(new_info.method, new_url.c_str()); if (r < 0) return r; - return rgw_http_error_to_errno(status);} + return rgw_http_error_to_errno(status); +} diff --git a/src/rgw/rgw_rest_client.h b/src/rgw/rgw_rest_client.h index a117bb12a779..216399b26b7b 100644 --- a/src/rgw/rgw_rest_client.h +++ b/src/rgw/rgw_rest_client.h @@ -6,18 +6,20 @@ #include "rgw_http_client.h" class RGWRESTClient : public RGWHTTPClient { +protected: CephContext *cct; -protected: int status; string url; map out_headers; list > params; + + bufferlist::iterator *send_iter; public: RGWRESTClient(CephContext *_cct, string& _url, list > *_headers, - list > *_params) : cct(_cct), status(0), url(_url) { + list > *_params) : cct(_cct), status(0), url(_url), send_iter(NULL) { if (_headers) headers = *_headers; @@ -25,10 +27,11 @@ public: params = *_params; } - int read_header(void *ptr, size_t len); + int receive_header(void *ptr, size_t len); + int send_data(void *ptr, size_t len); int execute(RGWAccessKey& key, const char *method, const char *resource); - int forward_request(RGWAccessKey& key, req_info& info); + int forward_request(RGWAccessKey& key, req_info& info, bufferlist *inbl); }; diff --git a/src/rgw/rgw_rest_conn.cc b/src/rgw/rgw_rest_conn.cc index 3c1d6c2234d7..82fc07c1ec46 100644 --- a/src/rgw/rgw_rest_conn.cc +++ b/src/rgw/rgw_rest_conn.cc @@ -26,15 +26,16 @@ int RGWRegionConnection::get_url(string& endpoint) return 0; } -int RGWRegionConnection::forward(const string& uid, req_info& info) +int RGWRegionConnection::forward(const string& uid, req_info& info, bufferlist *inbl) { string url; int ret = get_url(url); if (ret < 0) return ret; list > params; + params.push_back(make_pair("uid", uid)); RGWRESTClient client(cct, url, NULL, ¶ms); - return client.forward_request(key, info); + return client.forward_request(key, info, inbl); } int RGWRegionConnection::create_bucket(const string& uid, const string& bucket) diff --git a/src/rgw/rgw_rest_conn.h b/src/rgw/rgw_rest_conn.h index 06657bf91e7d..af0e74964b9f 100644 --- a/src/rgw/rgw_rest_conn.h +++ b/src/rgw/rgw_rest_conn.h @@ -18,7 +18,7 @@ public: RGWRegionConnection(CephContext *_cct, RGWRados *store, RGWRegion& upstream); int get_url(string& endpoint); - int forward(const string& uid, req_info& info); + int forward(const string& uid, req_info& info, bufferlist *inbl); int create_bucket(const string& uid, const string& bucket); }; diff --git a/src/rgw/rgw_rest_s3.cc b/src/rgw/rgw_rest_s3.cc index f170953f8c40..9902dfc2e13b 100644 --- a/src/rgw/rgw_rest_s3.cc +++ b/src/rgw/rgw_rest_s3.cc @@ -369,6 +369,9 @@ int RGWCreateBucket_ObjStore_S3::get_params() if (ret < 0) return ret; + bufferptr in_ptr(data, len); + in_data.append(in_ptr); + if (len) { RGWCreateBucketParser parser;