nullptr, /* const char *if_match, */
nullptr, /* const char *if_nomatch, */
pattrs,
+ pheaders,
nullptr,
nullptr, /* string *ptag, */
petag); /* string *petag, */
uint64_t *psize;
string *petag;
map<string, bufferlist> *pattrs;
+ map<string, string> *pheaders;
protected:
int _send_request() override;
ceph::real_time *_pmtime,
uint64_t *_psize,
string *_petag,
- map<string, bufferlist> *_pattrs) : RGWAsyncRadosRequest(caller, cn), store(_store),
+ map<string, bufferlist> *_pattrs,
+ map<string, string> *_pheaders) : RGWAsyncRadosRequest(caller, cn), store(_store),
source_zone(_source_zone),
bucket_info(_bucket_info),
key(_key),
pmtime(_pmtime),
psize(_psize),
petag(_petag),
- pattrs(_pattrs) {}
+ pattrs(_pattrs),
+ pheaders(_pheaders) {}
};
class RGWStatRemoteObjCR : public RGWSimpleCoroutine {
uint64_t *psize;
string *petag;
map<string, bufferlist> *pattrs;
+ map<string, string> *pheaders;
RGWAsyncStatRemoteObj *req;
ceph::real_time *_pmtime,
uint64_t *_psize,
string *_petag,
- map<string, bufferlist> *_pattrs) : RGWSimpleCoroutine(_store->ctx()), cct(_store->ctx()),
+ map<string, bufferlist> *_pattrs,
+ map<string, string> *_pheaders) : RGWSimpleCoroutine(_store->ctx()), cct(_store->ctx()),
async_rados(_async_rados), store(_store),
source_zone(_source_zone),
bucket_info(_bucket_info),
psize(_psize),
petag(_petag),
pattrs(_pattrs),
+ pheaders(_pheaders),
req(NULL) {}
int send_request() override {
req = new RGWAsyncStatRemoteObj(this, stack->create_completion_notifier(), store, source_zone,
- bucket_info, key, pmtime, psize, petag, pattrs);
+ bucket_info, key, pmtime, psize, petag, pattrs, pheaders);
async_rados->queue(req);
return 0;
}
req->get_out_headers(attrs);
}
-int RGWStreamReadHTTPResourceCRF::decode_rest_obj(map<string, string>& headers, bufferlist& extra_data, rgw_rest_obj *info) {
+int RGWStreamReadHTTPResourceCRF::decode_rest_obj(map<string, string>& headers, bufferlist& extra_data) {
/* basic generic implementation */
for (auto header : headers) {
const string& val = header.second;
extra_data.claim_append(in_cb->get_extra_data());
map<string, string> attrs;
req->get_out_headers(&attrs);
- int ret = decode_rest_obj(attrs, extra_data, &rest_obj);
+ int ret = decode_rest_obj(attrs, extra_data);
if (ret < 0) {
ldout(cct, 0) << "ERROR: " << __func__ << " decode_rest_obj() returned ret=" << ret << dendl;
return ret;
string method;
string path;
param_vec_t params;
+ param_vec_t headers;
T *result;
bufferlist input_bl;
bool send_content_length=false;
RGWSendRawRESTResourceCR(CephContext *_cct, RGWRESTConn *_conn,
RGWHTTPManager *_http_manager,
const string& _method, const string& _path,
- rgw_http_param_pair *_params, bufferlist& _input, T *_result, bool _send_content_length)
+ rgw_http_param_pair *_params,
+ map<string, string> *_headers,
+ bufferlist& _input, T *_result, bool _send_content_length)
: RGWSimpleCoroutine(_cct), conn(_conn), http_manager(_http_manager),
- method(_method), path(_path), params(make_param_list(_params)), result(_result),
+ method(_method), path(_path), params(make_param_list(_params)), headers(make_param_list(_headers)), result(_result),
input_bl(_input), send_content_length(_send_content_length)
{}
RGWSendRawRESTResourceCR(CephContext *_cct, RGWRESTConn *_conn,
RGWHTTPManager *_http_manager,
const string& _method, const string& _path,
- rgw_http_param_pair *_params, T *_result)
+ rgw_http_param_pair *_params, map<string, string> *_headers,
+ T *_result)
: RGWSimpleCoroutine(_cct), conn(_conn), http_manager(_http_manager),
- method(_method), path(_path), params(make_param_list(_params)), result(_result)
+ method(_method), path(_path), params(make_param_list(_params)), headers(make_param_list(_headers)), result(_result)
{}
int send_request() override {
auto op = boost::intrusive_ptr<RGWRESTSendResource>(
- new RGWRESTSendResource(conn, method, path, params, nullptr, http_manager));
+ new RGWRESTSendResource(conn, method, path, params, &headers, http_manager));
init_new_io(op.get());
RGWSendRESTResourceCR(CephContext *_cct, RGWRESTConn *_conn,
RGWHTTPManager *_http_manager,
const string& _method, const string& _path,
- rgw_http_param_pair *_params,S& _input, T *_result)
- : RGWSendRawRESTResourceCR<T>(_cct, _conn, _http_manager, _method, _path, _params, _result){
+ rgw_http_param_pair *_params, map<string, string> *_headers,
+ S& _input, T *_result)
+ : RGWSendRawRESTResourceCR<T>(_cct, _conn, _http_manager, _method, _path, _params, _headers, _result) {
JSONFormatter jf;
encode_json("data", _input, &jf);
rgw_http_param_pair *_params, S& _input, T *_result)
: RGWSendRESTResourceCR<S, T>(_cct, _conn, _http_manager,
"POST", _path,
- _params, _input, _result) {}
+ _params, nullptr, _input, _result) {}
};
template <class T>
RGWHTTPManager *_http_manager,
const string& _path,
rgw_http_param_pair *_params, bufferlist& _input, T *_result)
- : RGWSendRawRESTResourceCR<T>(_cct, _conn, _http_manager, "PUT", _path, _params, _input, _result, true){}
+ : RGWSendRawRESTResourceCR<T>(_cct, _conn, _http_manager, "PUT", _path, _params, nullptr, _input, _result, true){}
};
RGWPostRawRESTResourceCR(CephContext *_cct, RGWRESTConn *_conn,
RGWHTTPManager *_http_manager,
const string& _path,
- rgw_http_param_pair *_params, bufferlist& _input, T *_result)
- : RGWSendRawRESTResourceCR<T>(_cct, _conn, _http_manager, "POST", _path, _params, _input, _result, true){}
+ rgw_http_param_pair *_params,
+ map<string, string> * _headers,
+ bufferlist& _input, T *_result)
+ : RGWSendRawRESTResourceCR<T>(_cct, _conn, _http_manager, "POST", _path, _params, _headers, _input, _result, true){}
};
rgw_http_param_pair *_params, S& _input, T *_result)
: RGWSendRESTResourceCR<S, T>(_cct, _conn, _http_manager,
"PUT", _path,
- _params, _input, _result) {}
+ _params, nullptr, _input, _result) {}
};
class RGWDeleteRESTResourceCR : public RGWSimpleCoroutine {
public:
virtual int init() = 0;
virtual int read(bufferlist *data, uint64_t max, bool *need_retry) = 0; /* reentrant */
- virtual int decode_rest_obj(map<string, string>& headers, bufferlist& extra_data, rgw_rest_obj *info) = 0;
+ virtual int decode_rest_obj(map<string, string>& headers, bufferlist& extra_data) = 0;
virtual bool has_attrs() = 0;
virtual void get_attrs(std::map<string, string> *attrs) = 0;
virtual ~RGWStreamReadResourceCRF() = default;
int init() override;
int read(bufferlist *data, uint64_t max, bool *need_retry) override; /* reentrant */
- int decode_rest_obj(map<string, string>& headers, bufferlist& extra_data, rgw_rest_obj *info) override;
+ int decode_rest_obj(map<string, string>& headers, bufferlist& extra_data) override;
bool has_attrs() override;
void get_attrs(std::map<string, string> *attrs);
bool is_done();
const char *if_match,
const char *if_nomatch,
map<string, bufferlist> *pattrs,
+ map<string, string> *pheaders,
string *version_id,
string *ptag,
string *petag)
}
if (pattrs) {
- *pattrs = src_attrs;
+ *pattrs = std::move(src_attrs);
+ }
+
+ if (pheaders) {
+ *pheaders = std::move(req_headers);
}
return 0;
const char *if_match,
const char *if_nomatch,
map<string, bufferlist> *pattrs,
+ map<string, string> *pheaders,
string *version_id,
string *ptag,
string *petag);
return params;
}
+inline param_vec_t make_param_list(const map<string, string> *pp)
+{
+ param_vec_t params;
+ if (!pp) {
+ return params;
+ }
+ for (auto iter : *pp) {
+ params.emplace_back(make_pair(iter.first, iter.second));
+ }
+ return params;
+}
+
class RGWRESTConn
{
CephContext *cct;
yield {
call(new RGWStatRemoteObjCR(sync_env->async_rados, sync_env->store,
sync_env->source_zone,
- bucket_info, key, &mtime, &size, &etag, &attrs));
+ bucket_info, key, &mtime, &size, &etag, &attrs, &headers));
}
if (retcode < 0) {
ldout(sync_env->cct, 0) << "RGWStatRemoteObjCR() returned " << retcode << dendl;
}
ldout(sync_env->cct, 20) << "stat of remote obj: z=" << sync_env->source_zone
<< " b=" << bucket_info.bucket << " k=" << key << " size=" << size << " mtime=" << mtime
- << " attrs=" << attrs << dendl;
+ << " attrs=" << attrs << " headers=" << headers << dendl;
yield {
RGWStatRemoteObjCBCR *cb = allocate_callback();
if (cb) {
- cb->set_result(mtime, size, etag, std::move(attrs));
+ cb->set_result(mtime, size, etag, std::move(attrs), std::move(headers));
call(cb);
}
}
uint64_t size = 0;
string etag;
map<string, bufferlist> attrs;
+ map<string, string> headers;
public:
RGWStatRemoteObjCBCR(RGWDataSyncEnv *_sync_env,
RGWBucketInfo& _bucket_info, rgw_obj_key& _key);
void set_result(ceph::real_time& _mtime,
uint64_t _size,
const string& _etag,
- map<string, bufferlist>&& _attrs) {
+ map<string, bufferlist>&& _attrs,
+ map<string, string>&& _headers) {
mtime = _mtime;
size = _size;
etag = _etag;
uint64_t size{0};
string etag;
map<string, bufferlist> attrs;
+ map<string, string> headers;
protected:
RGWDataSyncEnv *sync_env;
}
};
+int do_decode_rest_obj(CephContext *cct, map<string, bufferlist>& attrs, map<string, string>& headers, rgw_rest_obj *info)
+{
+ for (auto header : headers) {
+ const string& val = header.second;
+ if (header.first == "RGWX_OBJECT_SIZE") {
+ info->content_len = atoi(val.c_str());
+ } else {
+ info->attrs[header.first] = val;
+ }
+ }
+
+ info->acls.set_ctx(cct);
+ auto aiter = attrs.find(RGW_ATTR_ACL);
+ if (aiter != attrs.end()) {
+ bufferlist& bl = aiter->second;
+ bufferlist::iterator bliter = bl.begin();
+ try {
+ info->acls.decode(bliter);
+ } catch (buffer::error& err) {
+ ldout(cct, 0) << "ERROR: failed to decode policy off attrs" << dendl;
+ return -EIO;
+ }
+ } else {
+ ldout(cct, 0) << "WARNING: acl attrs not provided" << dendl;
+ }
+
+ return 0;
+}
+
class RGWRESTStreamGetCRF : public RGWStreamReadHTTPResourceCRF
{
RGWDataSyncEnv *sync_env;
return RGWStreamReadHTTPResourceCRF::init();
}
- int decode_rest_obj(map<string, string>& headers, bufferlist& extra_data, rgw_rest_obj *info) override {
- for (auto header : headers) {
- const string& val = header.second;
- if (header.first == "RGWX_OBJECT_SIZE") {
- rest_obj.content_len = atoi(val.c_str());
- } else {
- rest_obj.attrs[header.first] = val;
- }
- }
+ int decode_rest_obj(map<string, string>& headers, bufferlist& extra_data) override {
+ map<string, bufferlist> src_attrs;
ldout(sync_env->cct, 20) << __func__ << ":" << " headers=" << headers << " extra_data.length()=" << extra_data.length() << dendl;
return -EIO;
}
- map<string, bufferlist> src_attrs;
-
JSONDecoder::decode_json("attrs", src_attrs, &jp);
-
- info->acls.set_ctx(sync_env->cct);
- auto aiter = src_attrs.find(RGW_ATTR_ACL);
- if (aiter != src_attrs.end()) {
- bufferlist& bl = aiter->second;
- bufferlist::iterator bliter = bl.begin();
- try {
- info->acls.decode(bliter);
- } catch (buffer::error& err) {
- ldout(sync_env->cct, 0) << "ERROR: failed to decode policy off extra data" << dendl;
- return -EIO;
- }
- } else {
- ldout(sync_env->cct, 0) << "WARNING: acl attrs not provided in extra data" << dendl;
- }
}
-
- return 0;
-
+ return do_decode_rest_obj(sync_env->cct, src_attrs, headers, &rest_obj);
}
bool need_extra_data() override {
return RGWStreamWriteHTTPResourceCRF::init();
}
- void send_ready(const rgw_rest_obj& rest_obj) override {
- RGWRESTStreamS3PutObj *r = (RGWRESTStreamS3PutObj *)req;
+ static void init_send_attrs(CephContext *cct,
+ const rgw_rest_obj& rest_obj,
+ const rgw_sync_aws_src_obj_properties& src_properties,
+ const AWSSyncConfig_Profile *target,
+ map<string, string> *attrs) {
+ auto& new_attrs = *attrs;
- map<string, string> new_attrs = rest_obj.attrs;
+ new_attrs = rest_obj.attrs;
auto acl = rest_obj.acls.get_acl();
auto iter = am.find(orig_grantee);
if (iter == am.end()) {
- ldout(sync_env->cct, 20) << "acl_mappings: Could not find " << orig_grantee << " .. ignoring" << dendl;
+ ldout(cct, 20) << "acl_mappings: Could not find " << orig_grantee << " .. ignoring" << dendl;
continue;
}
s.append(viter);
}
- ldout(sync_env->cct, 20) << "acl_mappings: set acl: " << header_str << "=" << s << dendl;
+ ldout(cct, 20) << "acl_mappings: set acl: " << header_str << "=" << s << dendl;
new_attrs[header_str] = s;
}
if (!rest_obj.key.instance.empty()) {
new_attrs["x-amz-meta-rgwx-source-version-id"] = rest_obj.key.instance;
}
+ }
+
+ void send_ready(const rgw_rest_obj& rest_obj) override {
+ RGWRESTStreamS3PutObj *r = (RGWRESTStreamS3PutObj *)req;
+
+ map<string, string> new_attrs;
+ if (!multipart.is_multipart) {
+ init_send_attrs(sync_env->cct, rest_obj, src_properties, target.get(), &new_attrs);
+ } else {
+ new_attrs = rest_obj.attrs;
+ }
r->set_send_length(rest_obj.content_len);
rgw_obj dest_obj;
uint64_t obj_size;
+ map<string, string> obj_headers;
bufferlist out_bl;
RGWRESTConn *_dest_conn,
const rgw_obj& _dest_obj,
uint64_t _obj_size,
+ const map<string, string>& _obj_headers,
string *_upload_id) : RGWCoroutine(_sync_env->cct),
sync_env(_sync_env),
dest_conn(_dest_conn),
dest_obj(_dest_obj),
obj_size(_obj_size),
+ obj_headers(_obj_headers),
upload_id(_upload_id) {}
int operate() {
rgw_http_param_pair params[] = { { "uploads", nullptr }, {nullptr, nullptr} };
bufferlist bl;
call(new RGWPostRawRESTResourceCR <bufferlist> (sync_env->cct, dest_conn, sync_env->http_manager,
- obj_to_aws_path(dest_obj), params, bl, &out_bl));
+ obj_to_aws_path(dest_obj), params, &obj_headers, bl, &out_bl));
}
if (retcode < 0) {
bl.append(ss.str());
call(new RGWPostRawRESTResourceCR <bufferlist> (sync_env->cct, dest_conn, sync_env->http_manager,
- obj_to_aws_path(dest_obj), params, bl, &out_bl));
+ obj_to_aws_path(dest_obj), params, nullptr, bl, &out_bl));
}
if (retcode < 0) {
uint64_t obj_size;
string src_etag;
rgw_sync_aws_src_obj_properties src_properties;
+ rgw_rest_obj rest_obj;
rgw_sync_aws_multipart_upload_info status;
+ map<string, string> obj_headers;
+
rgw_sync_aws_multipart_part_info *pcur_part_info{nullptr};
int ret_err{0};
std::shared_ptr<AWSSyncConfig_Profile>& _target,
const rgw_obj& _dest_obj,
uint64_t _obj_size,
- const rgw_sync_aws_src_obj_properties& _src_properties) : RGWCoroutine(_sync_env->cct),
+ const rgw_sync_aws_src_obj_properties& _src_properties,
+ const rgw_rest_obj& _rest_obj) : RGWCoroutine(_sync_env->cct),
sync_env(_sync_env),
conf(_conf),
source_conn(_source_conn),
dest_obj(_dest_obj),
obj_size(_obj_size),
src_properties(_src_properties),
+ rest_obj(_rest_obj),
status_obj(sync_env->store->get_zone_params().log_pool,
RGWBucketSyncStatusManager::obj_status_oid(sync_env->source_zone, src_obj)) {
}
}
if (retcode == -ENOENT) {
- yield call(new RGWAWSInitMultipartCR(sync_env, target->conn.get(), dest_obj, status.obj_size, &status.upload_id));
+ RGWAWSStreamPutCRF::init_send_attrs(sync_env->cct, rest_obj, src_properties, target.get(), &obj_headers);
+
+ yield call(new RGWAWSInitMultipartCR(sync_env, target->conn.get(), dest_obj, status.obj_size, std::move(obj_headers), &status.upload_id));
if (retcode < 0) {
return set_cr_error(retcode);
}
target,
dest_obj));
} else {
+ rgw_rest_obj rest_obj;
+ if (do_decode_rest_obj(sync_env->cct, attrs, headers, &rest_obj)) {
+ ldout(sync_env->cct, 0) << "ERROR: failed to decode rest obj out of headers=" << headers << ", attrs=" << attrs << dendl;
+ return set_cr_error(-EINVAL);
+ }
call(new RGWAWSStreamObjToCloudMultipartCR(sync_env, instance.conf, source_conn, src_obj,
- target, dest_obj, size, src_properties));
+ target, dest_obj, size, src_properties, rest_obj));
}
}
if (retcode < 0) {