RGWCoroutine *cr;
int64_t io_id;
bufferlist data;
+ bufferlist extra_data;
public:
RGWCRHTTPGetDataCB(RGWCoroutinesEnv *_env, RGWCoroutine *_cr, int64_t _io_id) : lock("RGWCRHTTPGetDataCB"), env(_env), cr(_cr), io_id(_io_id) {}
int handle_data(bufferlist& bl, off_t bl_ofs, off_t bl_len) override {
{
Mutex::Locker l(lock);
+
+ if (!has_all_extra_data()) {
+ off_t max = extra_data_len - extra_data.length();
+ if (max > bl_len) {
+ max = bl_len;
+ }
+ bl.splice(0, max, &extra_data);
+ bl_len -= max;
+ }
+
if (bl_len == bl.length()) {
data.claim_append(bl);
} else {
data.splice(0, max, dest);
}
+ bufferlist& get_extra_data() {
+ return extra_data;
+ }
+
bool has_data() {
return (data.length() > 0);
}
+
+ bool has_all_extra_data() {
+ return (extra_data.length() == extra_data_len);
+ }
};
yield caller->io_block(0, req->get_io_id());
}
got_attrs = true;
+ if (need_extra_data() && !got_extra_data) {
+ if (!in_cb->has_all_extra_data()) {
+ continue;
+ }
+ extra_data.claim_append(in_cb->get_extra_data());
+ got_extra_data = true;
+ }
*io_pending = false;
in_cb->claim_data(out, max_size);
if (!req->is_done()) {
TestSpliceCR::TestSpliceCR(CephContext *_cct, RGWHTTPManager *_mgr,
RGWHTTPStreamRWRequest *_in_req,
RGWHTTPStreamRWRequest *_out_req) : RGWCoroutine(_cct), cct(_cct), http_manager(_mgr),
- in_req(_in_req), out_req(_out_req) {}
+ in_req(_in_req), out_req(_out_req) {
+ in_crf.reset(new RGWStreamReadHTTPResourceCRF(cct, get_env(), this, http_manager));
+ in_crf->set_req(in_req);
+ out_crf.reset(new RGWStreamWriteHTTPResourceCRF(cct, get_env(), this, http_manager));
+ out_crf->set_req(out_req);
+}
+
int TestSpliceCR::operate() {
reenter(this) {
- in_crf.reset(new RGWStreamReadHTTPResourceCRF(cct, get_env(), this, http_manager, in_req));
- out_crf.reset(new RGWStreamWriteHTTPResourceCRF(cct, get_env(), this, http_manager, out_req));
-
yield call(new RGWStreamSpliceCR(cct, http_manager, in_crf, out_crf));
if (retcode < 0) {
RGWCoroutine *caller;
RGWHTTPManager *http_manager;
- RGWHTTPStreamRWRequest *req;
+ RGWHTTPStreamRWRequest *req{nullptr};
RGWCRHTTPGetDataCB *in_cb{nullptr};
+ bufferlist extra_data;
+
bool got_attrs{false};
+ bool got_extra_data{false};
public:
RGWStreamReadHTTPResourceCRF(CephContext *_cct,
RGWCoroutinesEnv *_env,
RGWCoroutine *_caller,
- RGWHTTPManager *_http_manager,
- RGWHTTPStreamRWRequest *_req) : env(_env),
+ RGWHTTPManager *_http_manager) : env(_env),
caller(_caller),
- http_manager(_http_manager),
- req(_req) {}
+ http_manager(_http_manager) {}
virtual ~RGWStreamReadHTTPResourceCRF();
int init() override;
int read(bufferlist *data, uint64_t max, bool *need_retry) override; /* reentrant */
bool has_attrs() override;
void get_attrs(std::map<string, string> *pattrs) override;
+ virtual bool need_extra_data() { return false; }
+
+ void set_req(RGWHTTPStreamRWRequest *r) {
+ req = r;
+ }
};
class RGWStreamWriteHTTPResourceCRF : public RGWStreamWriteResourceCRF {
RGWCoroutine *caller;
RGWHTTPManager *http_manager;
- RGWHTTPStreamRWRequest *req;
+ RGWHTTPStreamRWRequest *req{nullptr};
public:
RGWStreamWriteHTTPResourceCRF(CephContext *_cct,
RGWCoroutinesEnv *_env,
RGWCoroutine *_caller,
- RGWHTTPManager *_http_manager,
- RGWHTTPStreamRWRequest *_req) : env(_env),
+ RGWHTTPManager *_http_manager) : env(_env),
caller(_caller),
- http_manager(_http_manager),
- req(_req) {}
+ http_manager(_http_manager) {}
virtual ~RGWStreamWriteHTTPResourceCRF() {}
int init() override;
void send_ready(const std::map<string, string>& attrs) override;
int write(bufferlist& data) override; /* reentrant */
int drain_writes(bool *need_retry) override; /* reentrant */
+
+ void set_req(RGWHTTPStreamRWRequest *r) {
+ req = r;
+ }
};
class RGWStreamSpliceCR : public RGWCoroutine {
std::unique_ptr<RGWRESTConn> conn;
};
+class RGWRESTStreamGetCRF : public RGWStreamReadHTTPResourceCRF
+{
+ RGWDataSyncEnv *sync_env;
+ RGWRESTConn *conn;
+ rgw_obj src_obj;
+public:
+ RGWRESTStreamGetCRF(CephContext *_cct,
+ RGWCoroutinesEnv *_env,
+ RGWCoroutine *_caller,
+ RGWDataSyncEnv *_sync_env,
+ RGWRESTConn *_conn,
+ rgw_obj& _src_obj) : RGWStreamReadHTTPResourceCRF(_cct, _env, _caller, _sync_env->http_manager),
+ sync_env(_sync_env), conn(_conn), src_obj(_src_obj) {
+ }
+
+ int init(RGWBucketInfo& bucket_info, rgw_obj_key& key) {
+ /* init input connection */
+ RGWRESTStreamRWRequest *in_req;
+ int ret = conn->get_obj(rgw_user(), nullptr, src_obj,
+ nullptr /* mod_ptr */, nullptr /* unmod_ptr */, 0 /* mod_zone_id */, 0 /* mod_pg_ver */,
+ true /* prepend_metadata */, true /* get_op */, true /*rgwx_stat */,
+ false /* sync_manifest */, true /* skip_descrypt */, false /* send */,
+ nullptr /* cb */, &in_req);
+ if (ret < 0) {
+ ldout(sync_env->cct, 0) << "ERROR: " << __func__ << "(): conn->get_obj() returned ret=" << ret << dendl;
+ return ret;
+ }
+
+ set_req(in_req);
+
+ return 0;
+ }
+
+ bool need_extra_data() override {
+ return true;
+ }
+};
+
class RGWAWSStreamPutCRF : public RGWStreamWriteHTTPResourceCRF
{
- RGWAccessKey access_key;
+ RGWDataSyncEnv *sync_env;
+ RGWRESTConn *conn;
+ rgw_obj dest_obj;
public:
RGWAWSStreamPutCRF(CephContext *_cct,
RGWCoroutinesEnv *_env,
RGWCoroutine *_caller,
- RGWHTTPManager *_http_manager,
- RGWAccessKey& _key,
- RGWRESTStreamS3PutObj *_req) : RGWStreamWriteHTTPResourceCRF(_cct, _env, _caller, _http_manager, _req), access_key(_key) {}
+ RGWDataSyncEnv *_sync_env,
+ RGWRESTConn* _conn,
+ rgw_obj& _dest_obj) : RGWStreamWriteHTTPResourceCRF(_cct, _env, _caller, _sync_env->http_manager),
+ sync_env(_sync_env), conn(_conn), dest_obj(_dest_obj) {
+ }
+
+ int init() {
+ /* init output connection */
+ RGWRESTStreamS3PutObj *out_req{nullptr};
+
+ conn->put_obj_send_init(dest_obj, &out_req);
+
+ set_req(out_req);
+
+ return 0;
+ }
void send_ready(const std::map<string, string>& attrs) override {
RGWRESTStreamS3PutObj *r = (RGWRESTStreamS3PutObj *)req;
RGWAccessControlPolicy policy;
::encode(policy, new_attrs[RGW_ATTR_ACL]);
- r->send_ready(access_key, new_attrs, false);
+ r->send_ready(conn->get_key(), new_attrs, false);
}
};
// maybe use Fetch Remote Obj instead?
class RGWAWSHandleRemoteObjCBCR: public RGWStatRemoteObjCBCR {
const AWSConfig& conf;
+ RGWRESTConn *source_conn;
bufferlist res;
unordered_map <string, bool> bucket_created;
string target_bucket_name;
std::shared_ptr<RGWStreamReadHTTPResourceCRF> in_crf;
std::shared_ptr<RGWStreamWriteHTTPResourceCRF> out_crf;
- RGWRESTStreamRWRequest *in_req{nullptr};
- RGWRESTStreamS3PutObj *out_req{nullptr};
string obj_path;
int ret{0};
~RGWAWSHandleRemoteObjCBCR(){
}
-#if 0
- int operate () override {
-
+ int operate() override {
reenter(this) {
ldout(sync_env->cct, 0) << "AWS: download begin: z=" << sync_env->source_zone
<< " mtime=" << mtime << " attrs=" << attrs
<< dendl;
- yield {
- string obj_path = bucket_info.bucket.name + "/" + key.name;
-
- // TODO-future: And we should do a part by part get and initiate mp on the aws side
- call(new RGWReadRawRESTResourceCR(sync_env->cct,
- sync_env->store->rest_master_conn,
- sync_env->http_manager,
- obj_path,
- nullptr,
- &res));
-
- }
- if (retcode < 0) {
- return set_cr_error(retcode);
- }
-
- bucket_name=aws_bucket_name(bucket_info);
- if (bucket_created.find(bucket_name) == bucket_created.end()){
- // // TODO: maybe do a head request for subsequent tries & make it configurable
- yield {
- //string bucket_name = aws_bucket_name(bucket_info);
- ldout(sync_env->cct,0) << "AWS: creating bucket" << bucket_name << dendl;
- bufferlist bl;
- call(new RGWPutRawRESTResourceCR <int> (sync_env->cct, conf.conn.get(),
- sync_env->http_manager,
- bucket_name, nullptr, bl, nullptr));
- }
- if (retcode < 0) {
- return set_cr_error(retcode);
- }
-
- bucket_created[bucket_name]=true;
- }
-
- yield {
- string path=aws_object_name(bucket_info, key);
- ldout(sync_env->cct,0) << "AWS: creating object at path" << path << dendl;
- call(new RGWPutRawRESTResourceCR<int> (sync_env->cct, conf.conn.get(),
- sync_env->http_manager,
- path, nullptr,
- res, nullptr));
- }
- if (retcode < 0) {
- return set_cr_error(retcode);
+ source_conn = sync_env->store->get_zone_conn_by_id(sync_env->source_zone);
+ if (!source_conn) {
+ ldout(sync_env->cct, 0) << "ERROR: cannot find http connection to zone " << sync_env->source_zone << dendl;
+ return set_cr_error(-EINVAL);
}
-
- return set_cr_done();
- }
-
- return 0;
- }
-#endif
-
- int operate() override {
-
- reenter(this) {
-
- ldout(sync_env->cct, 0) << "AWS: download begin: z=" << sync_env->source_zone
- << " b=" << bucket_info.bucket << " k=" << key << " size=" << size
- << " mtime=" << mtime << " attrs=" << attrs
- << dendl;
-
obj_path = bucket_info.bucket.name + "/" + key.name;
target_bucket_name = aws_bucket_name(bucket_info);
bucket_created[target_bucket_name] = true;
}
-#warning FIXME conn
{
- /* init input connection */
- rgw_obj source_obj(bucket_info.bucket, key);
- ret = sync_env->store->rest_master_conn->get_obj(rgw_user(), nullptr, source_obj,
- nullptr /* mod_ptr */, nullptr /* unmod_ptr */, 0 /* mod_zone_id */, 0 /* mod_pg_ver */,
- false /* prepend_metadata */, true /* get_op */, true /*rgwx_stat */,
- false /* sync_manifest */, true /* skip_descrypt */, false /* send */,
- nullptr /* cb */, &in_req);
+ rgw_obj src_obj(bucket_info.bucket, key);
+ /* init input */
+ in_crf.reset(new RGWRESTStreamGetCRF(cct, get_env(), this, sync_env, source_conn, src_obj));
+
+ ret = in_crf->init();
if (ret < 0) {
return set_cr_error(ret);
}
- /* init output connection */
+ /* init output */
rgw_bucket target_bucket;
target_bucket.name = target_bucket_name; /* this is only possible because we only use bucket name for
- uri resolution */
- rgw_obj target_obj(target_bucket, aws_object_name(bucket_info, key));
- in_crf.reset(new RGWStreamReadHTTPResourceCRF(cct, get_env(), this, sync_env->http_manager, in_req));
+ uri resolution */
+ rgw_obj dest_obj(target_bucket, aws_object_name(bucket_info, key));
- map<string, bufferlist> attrs;
- RGWAccessControlPolicy empty_policy;
- ::encode(empty_policy, attrs[RGW_ATTR_ACL]);
- conf.conn->put_obj_send_init(target_obj, &out_req);
-
- out_crf.reset(new RGWAWSStreamPutCRF(cct, get_env(), this, sync_env->http_manager, conf.conn->get_key(), out_req));
+ out_crf.reset(new RGWAWSStreamPutCRF(cct, get_env(), this, sync_env, conf.conn.get(),
+ dest_obj));
+ ret = out_crf->init();
+ if (ret < 0) {
+ return set_cr_error(ret);
+ }
}
yield call(new RGWStreamSpliceCR(cct, sync_env->http_manager, in_crf, out_crf));