From af31e2ac24e11a72326de791cc96a1690a0e73c4 Mon Sep 17 00:00:00 2001 From: Yehuda Sadeh Date: Tue, 10 Oct 2017 16:26:14 -0700 Subject: [PATCH] rgw: aws sync, in_crf init abstraction Signed-off-by: Yehuda Sadeh --- src/rgw/rgw_cr_rest.cc | 37 +++++++- src/rgw/rgw_cr_rest.h | 28 ++++-- src/rgw/rgw_sync_module_aws.cc | 167 +++++++++++++++------------------ 3 files changed, 129 insertions(+), 103 deletions(-) diff --git a/src/rgw/rgw_cr_rest.cc b/src/rgw/rgw_cr_rest.cc index 4ecebf8a7fb81..65ad60b9aabba 100644 --- a/src/rgw/rgw_cr_rest.cc +++ b/src/rgw/rgw_cr_rest.cc @@ -16,12 +16,23 @@ class RGWCRHTTPGetDataCB : public RGWGetDataCB { RGWCoroutine *cr; int64_t io_id; bufferlist data; + bufferlist extra_data; public: RGWCRHTTPGetDataCB(RGWCoroutinesEnv *_env, RGWCoroutine *_cr, int64_t _io_id) : lock("RGWCRHTTPGetDataCB"), env(_env), cr(_cr), io_id(_io_id) {} int handle_data(bufferlist& bl, off_t bl_ofs, off_t bl_len) override { { Mutex::Locker l(lock); + + if (!has_all_extra_data()) { + off_t max = extra_data_len - extra_data.length(); + if (max > bl_len) { + max = bl_len; + } + bl.splice(0, max, &extra_data); + bl_len -= max; + } + if (bl_len == bl.length()) { data.claim_append(bl); } else { @@ -47,9 +58,17 @@ public: data.splice(0, max, dest); } + bufferlist& get_extra_data() { + return extra_data; + } + bool has_data() { return (data.length() > 0); } + + bool has_all_extra_data() { + return (extra_data.length() == extra_data_len); + } }; @@ -106,6 +125,13 @@ int RGWStreamReadHTTPResourceCRF::read(bufferlist *out, uint64_t max_size, bool yield caller->io_block(0, req->get_io_id()); } got_attrs = true; + if (need_extra_data() && !got_extra_data) { + if (!in_cb->has_all_extra_data()) { + continue; + } + extra_data.claim_append(in_cb->get_extra_data()); + got_extra_data = true; + } *io_pending = false; in_cb->claim_data(out, max_size); if (!req->is_done()) { @@ -243,12 +269,15 @@ int RGWStreamSpliceCR::operate() { TestSpliceCR::TestSpliceCR(CephContext *_cct, RGWHTTPManager *_mgr, RGWHTTPStreamRWRequest *_in_req, RGWHTTPStreamRWRequest *_out_req) : RGWCoroutine(_cct), cct(_cct), http_manager(_mgr), - in_req(_in_req), out_req(_out_req) {} + in_req(_in_req), out_req(_out_req) { + in_crf.reset(new RGWStreamReadHTTPResourceCRF(cct, get_env(), this, http_manager)); + in_crf->set_req(in_req); + out_crf.reset(new RGWStreamWriteHTTPResourceCRF(cct, get_env(), this, http_manager)); + out_crf->set_req(out_req); +} + int TestSpliceCR::operate() { reenter(this) { - in_crf.reset(new RGWStreamReadHTTPResourceCRF(cct, get_env(), this, http_manager, in_req)); - out_crf.reset(new RGWStreamWriteHTTPResourceCRF(cct, get_env(), this, http_manager, out_req)); - yield call(new RGWStreamSpliceCR(cct, http_manager, in_crf, out_crf)); if (retcode < 0) { diff --git a/src/rgw/rgw_cr_rest.h b/src/rgw/rgw_cr_rest.h index 40374f651630e..1338682549e68 100644 --- a/src/rgw/rgw_cr_rest.h +++ b/src/rgw/rgw_cr_rest.h @@ -336,27 +336,33 @@ class RGWStreamReadHTTPResourceCRF : public RGWStreamReadResourceCRF { RGWCoroutine *caller; RGWHTTPManager *http_manager; - RGWHTTPStreamRWRequest *req; + RGWHTTPStreamRWRequest *req{nullptr}; RGWCRHTTPGetDataCB *in_cb{nullptr}; + bufferlist extra_data; + bool got_attrs{false}; + bool got_extra_data{false}; public: RGWStreamReadHTTPResourceCRF(CephContext *_cct, RGWCoroutinesEnv *_env, RGWCoroutine *_caller, - RGWHTTPManager *_http_manager, - RGWHTTPStreamRWRequest *_req) : env(_env), + RGWHTTPManager *_http_manager) : env(_env), caller(_caller), - http_manager(_http_manager), - req(_req) {} + http_manager(_http_manager) {} virtual ~RGWStreamReadHTTPResourceCRF(); int init() override; int read(bufferlist *data, uint64_t max, bool *need_retry) override; /* reentrant */ bool has_attrs() override; void get_attrs(std::map *pattrs) override; + virtual bool need_extra_data() { return false; } + + void set_req(RGWHTTPStreamRWRequest *r) { + req = r; + } }; class RGWStreamWriteHTTPResourceCRF : public RGWStreamWriteResourceCRF { @@ -365,23 +371,25 @@ protected: RGWCoroutine *caller; RGWHTTPManager *http_manager; - RGWHTTPStreamRWRequest *req; + RGWHTTPStreamRWRequest *req{nullptr}; public: RGWStreamWriteHTTPResourceCRF(CephContext *_cct, RGWCoroutinesEnv *_env, RGWCoroutine *_caller, - RGWHTTPManager *_http_manager, - RGWHTTPStreamRWRequest *_req) : env(_env), + RGWHTTPManager *_http_manager) : env(_env), caller(_caller), - http_manager(_http_manager), - req(_req) {} + http_manager(_http_manager) {} virtual ~RGWStreamWriteHTTPResourceCRF() {} int init() override; void send_ready(const std::map& attrs) override; int write(bufferlist& data) override; /* reentrant */ int drain_writes(bool *need_retry) override; /* reentrant */ + + void set_req(RGWHTTPStreamRWRequest *r) { + req = r; + } }; class RGWStreamSpliceCR : public RGWCoroutine { diff --git a/src/rgw/rgw_sync_module_aws.cc b/src/rgw/rgw_sync_module_aws.cc index 63cf07f9bb899..d9723a1a14429 100644 --- a/src/rgw/rgw_sync_module_aws.cc +++ b/src/rgw/rgw_sync_module_aws.cc @@ -37,16 +37,69 @@ struct AWSConfig { std::unique_ptr conn; }; +class RGWRESTStreamGetCRF : public RGWStreamReadHTTPResourceCRF +{ + RGWDataSyncEnv *sync_env; + RGWRESTConn *conn; + rgw_obj src_obj; +public: + RGWRESTStreamGetCRF(CephContext *_cct, + RGWCoroutinesEnv *_env, + RGWCoroutine *_caller, + RGWDataSyncEnv *_sync_env, + RGWRESTConn *_conn, + rgw_obj& _src_obj) : RGWStreamReadHTTPResourceCRF(_cct, _env, _caller, _sync_env->http_manager), + sync_env(_sync_env), conn(_conn), src_obj(_src_obj) { + } + + int init(RGWBucketInfo& bucket_info, rgw_obj_key& key) { + /* init input connection */ + RGWRESTStreamRWRequest *in_req; + int ret = conn->get_obj(rgw_user(), nullptr, src_obj, + nullptr /* mod_ptr */, nullptr /* unmod_ptr */, 0 /* mod_zone_id */, 0 /* mod_pg_ver */, + true /* prepend_metadata */, true /* get_op */, true /*rgwx_stat */, + false /* sync_manifest */, true /* skip_descrypt */, false /* send */, + nullptr /* cb */, &in_req); + if (ret < 0) { + ldout(sync_env->cct, 0) << "ERROR: " << __func__ << "(): conn->get_obj() returned ret=" << ret << dendl; + return ret; + } + + set_req(in_req); + + return 0; + } + + bool need_extra_data() override { + return true; + } +}; + class RGWAWSStreamPutCRF : public RGWStreamWriteHTTPResourceCRF { - RGWAccessKey access_key; + RGWDataSyncEnv *sync_env; + RGWRESTConn *conn; + rgw_obj dest_obj; public: RGWAWSStreamPutCRF(CephContext *_cct, RGWCoroutinesEnv *_env, RGWCoroutine *_caller, - RGWHTTPManager *_http_manager, - RGWAccessKey& _key, - RGWRESTStreamS3PutObj *_req) : RGWStreamWriteHTTPResourceCRF(_cct, _env, _caller, _http_manager, _req), access_key(_key) {} + RGWDataSyncEnv *_sync_env, + RGWRESTConn* _conn, + rgw_obj& _dest_obj) : RGWStreamWriteHTTPResourceCRF(_cct, _env, _caller, _sync_env->http_manager), + sync_env(_sync_env), conn(_conn), dest_obj(_dest_obj) { + } + + int init() { + /* init output connection */ + RGWRESTStreamS3PutObj *out_req{nullptr}; + + conn->put_obj_send_init(dest_obj, &out_req); + + set_req(out_req); + + return 0; + } void send_ready(const std::map& attrs) override { RGWRESTStreamS3PutObj *r = (RGWRESTStreamS3PutObj *)req; @@ -61,20 +114,19 @@ public: RGWAccessControlPolicy policy; ::encode(policy, new_attrs[RGW_ATTR_ACL]); - r->send_ready(access_key, new_attrs, false); + r->send_ready(conn->get_key(), new_attrs, false); } }; // maybe use Fetch Remote Obj instead? class RGWAWSHandleRemoteObjCBCR: public RGWStatRemoteObjCBCR { const AWSConfig& conf; + RGWRESTConn *source_conn; bufferlist res; unordered_map bucket_created; string target_bucket_name; std::shared_ptr in_crf; std::shared_ptr out_crf; - RGWRESTStreamRWRequest *in_req{nullptr}; - RGWRESTStreamS3PutObj *out_req{nullptr}; string obj_path; int ret{0}; @@ -90,9 +142,7 @@ public: ~RGWAWSHandleRemoteObjCBCR(){ } -#if 0 - int operate () override { - + int operate() override { reenter(this) { ldout(sync_env->cct, 0) << "AWS: download begin: z=" << sync_env->source_zone @@ -100,69 +150,12 @@ public: << " mtime=" << mtime << " attrs=" << attrs << dendl; - yield { - string obj_path = bucket_info.bucket.name + "/" + key.name; - - // TODO-future: And we should do a part by part get and initiate mp on the aws side - call(new RGWReadRawRESTResourceCR(sync_env->cct, - sync_env->store->rest_master_conn, - sync_env->http_manager, - obj_path, - nullptr, - &res)); - - } - if (retcode < 0) { - return set_cr_error(retcode); - } - - bucket_name=aws_bucket_name(bucket_info); - if (bucket_created.find(bucket_name) == bucket_created.end()){ - // // TODO: maybe do a head request for subsequent tries & make it configurable - yield { - //string bucket_name = aws_bucket_name(bucket_info); - ldout(sync_env->cct,0) << "AWS: creating bucket" << bucket_name << dendl; - bufferlist bl; - call(new RGWPutRawRESTResourceCR (sync_env->cct, conf.conn.get(), - sync_env->http_manager, - bucket_name, nullptr, bl, nullptr)); - } - if (retcode < 0) { - return set_cr_error(retcode); - } - - bucket_created[bucket_name]=true; - } - - yield { - string path=aws_object_name(bucket_info, key); - ldout(sync_env->cct,0) << "AWS: creating object at path" << path << dendl; - call(new RGWPutRawRESTResourceCR (sync_env->cct, conf.conn.get(), - sync_env->http_manager, - path, nullptr, - res, nullptr)); - } - if (retcode < 0) { - return set_cr_error(retcode); + source_conn = sync_env->store->get_zone_conn_by_id(sync_env->source_zone); + if (!source_conn) { + ldout(sync_env->cct, 0) << "ERROR: cannot find http connection to zone " << sync_env->source_zone << dendl; + return set_cr_error(-EINVAL); } - - return set_cr_done(); - } - - return 0; - } -#endif - - int operate() override { - - reenter(this) { - - ldout(sync_env->cct, 0) << "AWS: download begin: z=" << sync_env->source_zone - << " b=" << bucket_info.bucket << " k=" << key << " size=" << size - << " mtime=" << mtime << " attrs=" << attrs - << dendl; - obj_path = bucket_info.bucket.name + "/" + key.name; target_bucket_name = aws_bucket_name(bucket_info); @@ -181,32 +174,28 @@ public: bucket_created[target_bucket_name] = true; } -#warning FIXME conn { - /* init input connection */ - rgw_obj source_obj(bucket_info.bucket, key); - ret = sync_env->store->rest_master_conn->get_obj(rgw_user(), nullptr, source_obj, - nullptr /* mod_ptr */, nullptr /* unmod_ptr */, 0 /* mod_zone_id */, 0 /* mod_pg_ver */, - false /* prepend_metadata */, true /* get_op */, true /*rgwx_stat */, - false /* sync_manifest */, true /* skip_descrypt */, false /* send */, - nullptr /* cb */, &in_req); + rgw_obj src_obj(bucket_info.bucket, key); + /* init input */ + in_crf.reset(new RGWRESTStreamGetCRF(cct, get_env(), this, sync_env, source_conn, src_obj)); + + ret = in_crf->init(); if (ret < 0) { return set_cr_error(ret); } - /* init output connection */ + /* init output */ rgw_bucket target_bucket; target_bucket.name = target_bucket_name; /* this is only possible because we only use bucket name for - uri resolution */ - rgw_obj target_obj(target_bucket, aws_object_name(bucket_info, key)); - in_crf.reset(new RGWStreamReadHTTPResourceCRF(cct, get_env(), this, sync_env->http_manager, in_req)); + uri resolution */ + rgw_obj dest_obj(target_bucket, aws_object_name(bucket_info, key)); - map attrs; - RGWAccessControlPolicy empty_policy; - ::encode(empty_policy, attrs[RGW_ATTR_ACL]); - conf.conn->put_obj_send_init(target_obj, &out_req); - - out_crf.reset(new RGWAWSStreamPutCRF(cct, get_env(), this, sync_env->http_manager, conf.conn->get_key(), out_req)); + out_crf.reset(new RGWAWSStreamPutCRF(cct, get_env(), this, sync_env, conf.conn.get(), + dest_obj)); + ret = out_crf->init(); + if (ret < 0) { + return set_cr_error(ret); + } } yield call(new RGWStreamSpliceCR(cct, sync_env->http_manager, in_crf, out_crf)); -- 2.39.5