From 6e630d692425e47e386813c97e799b89e6247064 Mon Sep 17 00:00:00 2001 From: Matt Benjamin Date: Tue, 15 Dec 2015 16:48:41 -0500 Subject: [PATCH] librgw: block in RGWWriteRequest::exec_start Signed-off-by: Matt Benjamin --- src/rgw/rgw_file.cc | 33 ++++++++++++++++++++++++++ src/rgw/rgw_file.h | 9 +++---- src/rgw/rgw_frontend.cc | 3 +++ src/rgw/rgw_lib.h | 2 ++ src/rgw/rgw_op.cc | 47 ------------------------------------- src/rgw/rgw_op.h | 52 +++++++++++++++++++++++++++++++++++++++++ 6 files changed, 95 insertions(+), 51 deletions(-) diff --git a/src/rgw/rgw_file.cc b/src/rgw/rgw_file.cc index 0ee06383b20a4..60e25e785b19d 100644 --- a/src/rgw/rgw_file.cc +++ b/src/rgw/rgw_file.cc @@ -137,6 +137,39 @@ RGWFileHandle::file::~file() delete write_req; } +int RGWWriteRequest::exec_start() { + struct req_state* s = get_state(); + + perfcounter->inc(l_rgw_put); + op_ret = -EINVAL; + + // XXX check this + if (s->object.empty()) { + goto done; + } + + op_ret = get_params(); + if (op_ret < 0) + goto done; + + op_ret = get_system_versioning_params(s, &olh_epoch, &version_id); + if (op_ret < 0) { + goto done; + } + + /* user-supplied MD5 check skipped (not supplied) */ + /* early quota check skipped--we don't have size yet */ + /* skipping user-supplied etag--we might have one in future, but + * like data it and other attrs would arrive after open */ + processor = select_processor(*static_cast(s->obj_ctx), + &multipart); + op_ret = processor->prepare(get_store(), NULL); + +done: + return op_ret; +} /* exec_start */ + + /* librgw */ extern "C" { diff --git a/src/rgw/rgw_file.h b/src/rgw/rgw_file.h index 0d5858590fc32..b626bf37031d8 100644 --- a/src/rgw/rgw_file.h +++ b/src/rgw/rgw_file.h @@ -1461,15 +1461,18 @@ class RGWWriteRequest : public RGWLibContinuedReq, public: const std::string& bucket_name; const std::string& obj_name; + RGWPutObjProcessor *processor; buffer::list bl; off_t last_off; off_t next_off; size_t bytes_written; + bool multipart; RGWWriteRequest(CephContext* _cct, RGWUserInfo *_user, const std::string& _bname, const std::string& _oname) : RGWLibContinuedReq(_cct, _user), bucket_name(_bname), obj_name(_oname), - last_off(0), next_off(0), bytes_written(0) { + processor(nullptr), last_off(0), next_off(0), bytes_written(0), + multipart(false) { magic = 81; op = this; } @@ -1545,9 +1548,7 @@ public: bl.claim(_bl); } - virtual int exec_start() { - return 0; - } + virtual int exec_start(); virtual int exec_continue() { if (next_off != last_off) diff --git a/src/rgw/rgw_frontend.cc b/src/rgw/rgw_frontend.cc index 35f9c2438d2d2..7c25095bc1f51 100644 --- a/src/rgw/rgw_frontend.cc +++ b/src/rgw/rgw_frontend.cc @@ -5,6 +5,9 @@ #include "include/str_list.h" +#include "include/assert.h" + + #define dout_subsys ceph_subsys_rgw int RGWFrontendConfig::parse_config(const string& config, diff --git a/src/rgw/rgw_lib.h b/src/rgw/rgw_lib.h index 54994707055bf..a139fc9cea65a 100644 --- a/src/rgw/rgw_lib.h +++ b/src/rgw/rgw_lib.h @@ -165,6 +165,8 @@ RGWLibContinuedReq(CephContext* _cct, RGWUserInfo* _user) io_ctx.get_env().set("HTTP_HOST", ""); } + inline RGWRados* get_store() { return store; } + virtual int execute() final { abort(); } virtual int exec_start() = 0; virtual int exec_continue() = 0; diff --git a/src/rgw/rgw_op.cc b/src/rgw/rgw_op.cc index ccd75b41856cb..08eae5f0fb8ad 100644 --- a/src/rgw/rgw_op.cc +++ b/src/rgw/rgw_op.cc @@ -2112,53 +2112,6 @@ void RGWPutObj::pre_exec() rgw_bucket_object_pre_exec(s); } -static int put_data_and_throttle(RGWPutObjProcessor *processor, bufferlist& data, off_t ofs, - MD5 *hash, bool need_to_wait) -{ - bool again; - - do { - void *handle; - - int ret = processor->handle_data(data, ofs, hash, &handle, &again); - if (ret < 0) - return ret; - - ret = processor->throttle_data(handle, need_to_wait); - if (ret < 0) - return ret; - - need_to_wait = false; /* the need to wait only applies to the first iteration */ - } while (again); - - return 0; -} - -static int get_system_versioning_params(req_state *s, uint64_t *olh_epoch, string *version_id) -{ - if (!s->system_request) { - return 0; - } - - if (olh_epoch) { - string epoch_str = s->info.args.get(RGW_SYS_PARAM_PREFIX "versioned-epoch"); - if (!epoch_str.empty()) { - string err; - *olh_epoch = strict_strtol(epoch_str.c_str(), 10, &err); - if (!err.empty()) { - ldout(s->cct, 0) << "failed to parse versioned-epoch param" << dendl; - return -EINVAL; - } - } - } - - if (version_id) { - *version_id = s->info.args.get(RGW_SYS_PARAM_PREFIX "version-id"); - } - - return 0; -} - static void encode_delete_at_attr(time_t delete_at, map& attrs) { if (delete_at == 0) { diff --git a/src/rgw/rgw_op.h b/src/rgw/rgw_op.h index c4d414d15d5af..388af73dbc1ac 100644 --- a/src/rgw/rgw_op.h +++ b/src/rgw/rgw_op.h @@ -1331,4 +1331,56 @@ public: extern int rgw_build_bucket_policies(RGWRados* store, struct req_state* s); +static inline int put_data_and_throttle(RGWPutObjProcessor *processor, + bufferlist& data, off_t ofs, + MD5 *hash, bool need_to_wait) +{ + bool again; + + do { + void *handle; + + int ret = processor->handle_data(data, ofs, hash, &handle, &again); + if (ret < 0) + return ret; + + ret = processor->throttle_data(handle, need_to_wait); + if (ret < 0) + return ret; + + need_to_wait = false; /* the need to wait only applies to the first + * iteration */ + } while (again); + + return 0; +} /* put_data_and_throttle */ + +static inline int get_system_versioning_params(req_state *s, + uint64_t *olh_epoch, + string *version_id) +{ + if (!s->system_request) { + return 0; + } + + if (olh_epoch) { + string epoch_str = s->info.args.get(RGW_SYS_PARAM_PREFIX "versioned-epoch"); + if (!epoch_str.empty()) { + string err; + *olh_epoch = strict_strtol(epoch_str.c_str(), 10, &err); + if (!err.empty()) { + lsubdout(s->cct, rgw, 0) << "failed to parse versioned-epoch param" + << dendl; + return -EINVAL; + } + } + } + + if (version_id) { + *version_id = s->info.args.get(RGW_SYS_PARAM_PREFIX "version-id"); + } + + return 0; +} /* get_system_versioning_params */ + #endif /* CEPH_RGW_OP_H */ -- 2.39.5