#define dout_context g_ceph_context
#define dout_subsys ceph_subsys_rgw
+class TestSpliceCR : public RGWCoroutine {
+ CephContext *cct;
+ RGWHTTPManager *http_manager;
+ RGWHTTPStreamRWRequest *in_req{nullptr};
+ RGWHTTPStreamRWRequest *out_req{nullptr};
+ std::shared_ptr<RGWStreamReadHTTPResourceCRF> in_crf;
+ std::shared_ptr<RGWStreamWriteHTTPResourceCRF> out_crf;
+public:
+ TestSpliceCR(CephContext *_cct, RGWHTTPManager *_mgr,
+ RGWHTTPStreamRWRequest *_in_req,
+ RGWHTTPStreamRWRequest *_out_req);
+
+ int operate();
+};
+
class RGWCRHTTPGetDataCB : public RGWGetDataCB {
Mutex lock;
RGWCoroutinesEnv *env;
return 0;
}
-int RGWStreamWriteHTTPResourceCRF::init()
+int RGWStreamWriteHTTPResourceCRF::send()
{
env->stack->init_new_io(req);
*attrs = req->get_out_headers();
}
+int RGWStreamReadHTTPResourceCRF::decode_rest_obj(map<string, string>& headers, bufferlist& extra_data, rgw_rest_obj *info) {
+ /* basic generic implementation */
+ for (auto header : headers) {
+ const string& val = header.second;
+
+ rest_obj.attrs[header.first] = val;
+ }
+
+ return 0;
+}
+
int RGWStreamReadHTTPResourceCRF::read(bufferlist *out, uint64_t max_size, bool *io_pending)
{
reenter(&read_state) {
continue;
}
extra_data.claim_append(in_cb->get_extra_data());
+ map<string, string> attrs = req->get_out_headers();
+ int ret = decode_rest_obj(attrs, extra_data, &rest_obj);
+ if (ret < 0) {
+ ldout(cct, 0) << "ERROR: " << __func__ << " decode_rest_obj() returned ret=" << ret << dendl;
+ return ret;
+ }
got_extra_data = true;
}
*io_pending = false;
in_cb->claim_data(out, max_size);
+ if (out->length() == 0) {
+ /* this may happen if we just read the prepended extra_data and didn't have any data
+ * after. In that case, retry reading, so that caller doesn't assume it's EOF.
+ */
+ continue;
+ }
if (!req->is_done()) {
yield;
}
return 0;
}
-void RGWStreamWriteHTTPResourceCRF::send_ready(const map<string, string>& attrs)
+void RGWStreamWriteHTTPResourceCRF::send_ready(const rgw_rest_obj& rest_obj)
{
- for (auto h : attrs) {
- if (h.first == "CONTENT_LENGTH") {
- req->set_send_length(atoi(h.second.c_str()));
- } else {
- req->append_header(h.first, h.second);
- }
+ req->set_send_length(rest_obj.content_len);
+ for (auto h : rest_obj.attrs) {
+ req->append_header(h.first, h.second);
}
}
}
if (!sent_attrs) {
- map<string, string> attrs;
- in_crf->get_attrs(&attrs);
- out_crf->send_ready(attrs);
int ret = out_crf->init();
if (ret < 0) {
return set_cr_error(ret);
}
+ out_crf->send_ready(in_crf->get_rest_obj());
+ ret = out_crf->send();
+ if (ret < 0) {
+ return set_cr_error(ret);
+ }
sent_attrs = true;
}
#include "rgw_coroutine.h"
#include "rgw_rest_conn.h"
-class RGWReadRawRESTResourceCR : public RGWSimpleCoroutine{
+
+struct rgw_rest_obj {
+ rgw_obj_key key;
+ uint64_t content_len;
+ std::map<string, string> attrs;
+ std::map<string, string> custom_attrs;
+ RGWAccessControlPolicy acls;
+};
+
+class RGWReadRawRESTResourceCR : public RGWSimpleCoroutine {
bufferlist *result;
protected:
RGWRESTConn *conn;
public:
virtual int init() = 0;
virtual int read(bufferlist *data, uint64_t max, bool *need_retry) = 0; /* reentrant */
+ virtual int decode_rest_obj(map<string, string>& headers, bufferlist& extra_data, rgw_rest_obj *info) = 0;
virtual bool has_attrs() = 0;
virtual void get_attrs(std::map<string, string> *attrs) = 0;
};
public:
virtual int init() = 0;
- virtual void send_ready(const std::map<string, string>& attrs) = 0;
+ virtual void send_ready(const rgw_rest_obj& rest_obj) = 0;
+ virtual int send() = 0;
virtual int write(bufferlist& data) = 0; /* reentrant */
virtual int drain_writes(bool *need_retry) = 0; /* reentrant */
};
class RGWStreamReadHTTPResourceCRF : public RGWStreamReadResourceCRF {
+ CephContext *cct;
RGWCoroutinesEnv *env;
RGWCoroutine *caller;
RGWHTTPManager *http_manager;
bool got_attrs{false};
bool got_extra_data{false};
+protected:
+ rgw_rest_obj rest_obj;
+
public:
RGWStreamReadHTTPResourceCRF(CephContext *_cct,
RGWCoroutinesEnv *_env,
RGWCoroutine *_caller,
- RGWHTTPManager *_http_manager) : env(_env),
- caller(_caller),
- http_manager(_http_manager) {}
+ RGWHTTPManager *_http_manager) : cct(_cct),
+ env(_env),
+ caller(_caller),
+ http_manager(_http_manager) {}
virtual ~RGWStreamReadHTTPResourceCRF();
int init() override;
int read(bufferlist *data, uint64_t max, bool *need_retry) override; /* reentrant */
+ int decode_rest_obj(map<string, string>& headers, bufferlist& extra_data, rgw_rest_obj *info);
bool has_attrs() override;
- void get_attrs(std::map<string, string> *pattrs) override;
+ void get_attrs(std::map<string, string> *attrs);
virtual bool need_extra_data() { return false; }
void set_req(RGWHTTPStreamRWRequest *r) {
req = r;
}
+
+ rgw_rest_obj& get_rest_obj() {
+ return rest_obj;
+ }
};
class RGWStreamWriteHTTPResourceCRF : public RGWStreamWriteResourceCRF {
http_manager(_http_manager) {}
virtual ~RGWStreamWriteHTTPResourceCRF() {}
- int init() override;
- void send_ready(const std::map<string, string>& attrs) override;
+ int init() override {
+ return 0;
+ }
+ void send_ready(const rgw_rest_obj& rest_obj) override;
+ int send() override;
int write(bufferlist& data) override; /* reentrant */
int drain_writes(bool *need_retry) override; /* reentrant */
int operate();
};
-class TestSpliceCR : public RGWCoroutine {
- CephContext *cct;
- RGWHTTPManager *http_manager;
- RGWHTTPStreamRWRequest *in_req{nullptr};
- RGWHTTPStreamRWRequest *out_req{nullptr};
- std::shared_ptr<RGWStreamReadHTTPResourceCRF> in_crf;
- std::shared_ptr<RGWStreamWriteHTTPResourceCRF> out_crf;
-public:
- TestSpliceCR(CephContext *_cct, RGWHTTPManager *_mgr,
- RGWHTTPStreamRWRequest *_in_req,
- RGWHTTPStreamRWRequest *_out_req);
-
- int operate();
-};
-
#endif
return 0;
}
-int RGWRESTStreamRWRequest::send_request(RGWAccessKey& key, map<string, string>& extra_headers, rgw_obj& obj, RGWHTTPManager *mgr)
+static void send_prepare_convert(const rgw_obj& obj, string *resource)
{
string urlsafe_bucket, urlsafe_object;
url_encode(obj.bucket.get_key(':', 0), urlsafe_bucket);
url_encode(obj.key.name, urlsafe_object);
- string resource = urlsafe_bucket + "/" + urlsafe_object;
+ *resource = urlsafe_bucket + "/" + urlsafe_object;
+}
+
+int RGWRESTStreamRWRequest::send_request(RGWAccessKey& key, map<string, string>& extra_headers, rgw_obj& obj, RGWHTTPManager *mgr)
+{
+ string resource;
+ send_prepare_convert(obj, &resource);
return send_request(&key, extra_headers, resource, mgr);
}
-int RGWRESTStreamRWRequest::send_request(RGWAccessKey *key, map<string, string>& extra_headers, const string& resource,
- RGWHTTPManager *mgr, bufferlist *send_data)
+int RGWRESTStreamRWRequest::send_prepare(RGWAccessKey& key, map<string, string>& extra_headers, rgw_obj& obj)
+{
+ string resource;
+ send_prepare_convert(obj, &resource);
+
+ return send_prepare(&key, extra_headers, resource);
+}
+
+int RGWRESTStreamRWRequest::send_prepare(RGWAccessKey *key, map<string, string>& extra_headers, const string& resource,
+ bufferlist *send_data)
{
string new_url = url;
if (new_url[new_url.size() - 1] != '/')
headers.emplace_back(kv);
}
- bool send_data_hint = false;
if (send_data) {
set_outbl(*send_data);
send_data_hint = true;
method = new_info.method;
url = new_url;
+ return 0;
+}
+
+int RGWRESTStreamRWRequest::send_request(RGWAccessKey *key, map<string, string>& extra_headers, const string& resource,
+ RGWHTTPManager *mgr, bufferlist *send_data)
+{
+ int ret = send_prepare(key, extra_headers, resource, send_data);
+ if (ret < 0) {
+ return ret;
+ }
+
+ return send(mgr);
+}
+
+
+int RGWRESTStreamRWRequest::send(RGWHTTPManager *mgr)
+{
if (!mgr) {
return RGWHTTP::send(this);
}
};
class RGWRESTStreamRWRequest : public RGWHTTPStreamRWRequest {
+ bool send_data_hint{false};
public:
RGWRESTStreamRWRequest(CephContext *_cct, const string& _method, const string& _url, RGWGetDataCB *_cb,
param_vec_t *_headers, param_vec_t *_params) : RGWHTTPStreamRWRequest(_cct, _method, _url, _cb, _headers, _params) {
}
virtual ~RGWRESTStreamRWRequest() override {}
+
+ int send_prepare(RGWAccessKey *key, map<string, string>& extra_headers, const string& resource, bufferlist *send_data = nullptr /* optional input data */);
+ int send_prepare(RGWAccessKey& key, map<string, string>& extra_headers, rgw_obj& obj);
+ int send(RGWHTTPManager *mgr);
+
int send_request(RGWAccessKey& key, map<string, string>& extra_headers, rgw_obj& obj, RGWHTTPManager *mgr);
int send_request(RGWAccessKey *key, map<string, string>& extra_headers, const string& resource, RGWHTTPManager *mgr, bufferlist *send_data = nullptr /* optional input data */);
+
int complete_request(string& etag, real_time *mtime, uint64_t *psize, map<string, string>& attrs);
};
param_vec_t params;
populate_params(params, &uid, self_zone_group);
if (prepend_metadata) {
- params.push_back(param_pair_t(RGW_SYS_PARAM_PREFIX "prepend-metadata", self_zone_group));
+ params.push_back(param_pair_t(RGW_SYS_PARAM_PREFIX "prepend-metadata", "true"));
}
if (rgwx_stat) {
params.push_back(param_pair_t(RGW_SYS_PARAM_PREFIX "stat", "true"));
set_header(mod_pg_ver, extra_headers, "HTTP_DEST_PG_VER");
}
+ int r = (*req)->send_prepare(key, extra_headers, obj);
+ if (r < 0) {
+ goto done_err;
+ }
+
if (!send) {
return 0;
}
- int r = (*req)->send_request(key, extra_headers, obj, nullptr);
+ r = (*req)->send(nullptr);
if (r < 0) {
- delete *req;
- *req = nullptr;
- return r;
+ goto done_err;
}
-
return 0;
+done_err:
+ delete *req;
+ *req = nullptr;
+ return r;
}
int RGWRESTConn::complete_request(RGWRESTStreamRWRequest *req, string& etag, real_time *mtime,
sync_env(_sync_env), conn(_conn), src_obj(_src_obj) {
}
- int init(RGWBucketInfo& bucket_info, rgw_obj_key& key) {
+ int init() override {
/* init input connection */
RGWRESTStreamRWRequest *in_req;
int ret = conn->get_obj(rgw_user(), nullptr, src_obj,
nullptr /* mod_ptr */, nullptr /* unmod_ptr */, 0 /* mod_zone_id */, 0 /* mod_pg_ver */,
- true /* prepend_metadata */, true /* get_op */, true /*rgwx_stat */,
+ true /* prepend_metadata */, true /* get_op */, false /*rgwx_stat */,
false /* sync_manifest */, true /* skip_descrypt */, false /* send */,
nullptr /* cb */, &in_req);
if (ret < 0) {
set_req(in_req);
+ return RGWStreamReadHTTPResourceCRF::init();
+ }
+
+ int decode_rest_obj(map<string, string>& headers, bufferlist& extra_data, rgw_rest_obj *info) override {
+ for (auto header : headers) {
+ const string& val = header.second;
+ if (header.first == "RGWX_OBJECT_SIZE") {
+ rest_obj.content_len = atoi(val.c_str());
+ } else {
+ rest_obj.attrs[header.first] = val;
+ }
+ }
+
+ ldout(sync_env->cct, 20) << __func << ":" << " headers=" << headers << " extra_data.length()=" << extra_data.length() << dendl;
+
return 0;
+
}
bool need_extra_data() override {
set_req(out_req);
- return 0;
+ return RGWStreamWriteHTTPResourceCRF::init();
}
- void send_ready(const std::map<string, string>& attrs) override {
+ void send_ready(const rgw_rest_obj& rest_obj) override {
RGWRESTStreamS3PutObj *r = (RGWRESTStreamS3PutObj *)req;
+ /* here we need to convert rest_obj.attrs to cloud specific representation */
+
map<string, bufferlist> new_attrs;
- for (auto attr : attrs) {
- const string& val = attr.second;
- new_attrs[attr.first].append(bufferptr(val.c_str(), val.size() - 1));
+ for (auto attr : rest_obj.attrs) {
+ new_attrs[attr.first].append(attr.second);
}
RGWAccessControlPolicy policy;
::encode(policy, new_attrs[RGW_ATTR_ACL]);
+ r->set_send_length(rest_obj.content_len);
+
r->send_ready(conn->get_key(), new_attrs, false);
}
};
string target_bucket_name;
std::shared_ptr<RGWStreamReadHTTPResourceCRF> in_crf;
std::shared_ptr<RGWStreamWriteHTTPResourceCRF> out_crf;
-
+ rgw_rest_obj rest_obj;
string obj_path;
int ret{0};