return (ret < 0 ? ret : s->err.ret);
} /* process_request */
+int start_continued_request(RGWLibContinuedReq* req)
+{
+ int ret = 0;
+
+ dout(1) << "====== " << __func__
+ << " starting new continued request req=" << hex << req << dec
+ << " ======" << dendl;
+
+ /*
+ * invariant: valid requests are derived from RGWOp--well-formed
+ * requests should have assigned RGWRequest::op in their descendant
+ * constructor--if not, the compiler can find it, at the cost of
+ * a runtime check
+ */
+ RGWOp *op = (req->op) ? req->op : dynamic_cast<RGWOp*>(req);
+ if (! op) {
+ dout(1) << "failed to derive cognate RGWOp (invalid op?)" << dendl;
+ return -EINVAL;
+ }
+
+ struct req_state* s = req->get_state();
+
+ /* XXXX the below stanza can be completely internalized--req has
+ * all these objects */
+
+#if 0
+ /* XXX and -then- stash req_state pointers everywhere they are needed */
+ ret = req->init(rgw_env, &rados_ctx, io, s);
+ if (ret < 0) {
+ dout(10) << "failed to initialize request" << dendl;
+ abort_req(s, op, ret);
+ goto done;
+ }
+#endif
+
+ /* req is-a RGWOp, currently initialized separately */
+ ret = req->op_init();
+ if (ret < 0) {
+ dout(10) << "failed to initialize RGWOp" << dendl;
+ abort_req(s, op, ret);
+ goto done;
+ }
+
+ /* XXX authorize does less here then in the REST path, e.g.,
+ * the user's info is cached, but still incomplete */
+ req->log(s, "authorizing");
+ ret = req->authorize();
+ if (ret < 0) {
+ dout(10) << "failed to authorize request" << dendl;
+ abort_req(s, op, ret);
+ goto done;
+ }
+
+ req->log(s, "reading op permissions");
+ ret = req->read_permissions(op);
+ if (ret < 0) {
+ abort_req(s, op, ret);
+ goto done;
+ }
+
+ req->log(s, "init op");
+ ret = op->init_processing();
+ if (ret < 0) {
+ abort_req(s, op, ret);
+ goto done;
+ }
+
+ req->log(s, "verifying op mask");
+ ret = op->verify_op_mask();
+ if (ret < 0) {
+ abort_req(s, op, ret);
+ goto done;
+ }
+
+ req->log(s, "verifying op permissions");
+ ret = op->verify_permission();
+ if (ret < 0) {
+ if (s->system_request) {
+ dout(2) << "overriding permissions due to system operation" << dendl;
+ } else {
+ abort_req(s, op, ret);
+ goto done;
+ }
+ }
+
+ req->log(s, "verifying op params");
+ ret = op->verify_params();
+ if (ret < 0) {
+ abort_req(s, op, ret);
+ goto done;
+ }
+
+ op->pre_exec();
+
+done:
+ return ret;
+}
+
+int finish_continued_request(RGWLibContinuedReq* req)
+{
+}
+
int RGWLibFrontend::init()
{
pprocess = new RGWLibProcess(g_ceph_context, &env,
}
}; /* RGWStatLeafRequest */
+/*
+ put object
+*/
+
+class RGWWriteRequest : public RGWLibContinuedReq,
+ public RGWPutObj /* RGWOp */
+{
+public:
+ const std::string& bucket_name;
+ const std::string& obj_name;
+ buffer::list& bl; /* XXX */
+ size_t bytes_written;
+
+ RGWWriteRequest(CephContext* _cct, RGWUserInfo *_user,
+ const std::string& _bname, const std::string& _oname,
+ buffer::list& _bl)
+ : RGWLibContinuedReq(_cct, _user), bucket_name(_bname), obj_name(_oname),
+ bl(_bl), bytes_written(0) {
+ magic = 81;
+ op = this;
+ }
+
+ virtual bool only_bucket() { return true; }
+
+ virtual int op_init() {
+ // assign store, s, and dialect_handler
+ RGWObjectCtx* rados_ctx
+ = static_cast<RGWObjectCtx*>(get_state()->obj_ctx);
+ // framework promises to call op_init after parent init
+ assert(rados_ctx);
+ RGWOp::init(rados_ctx->store, get_state(), this);
+ op = this; // assign self as op: REQUIRED
+ return 0;
+ }
+
+ virtual int header_init() {
+
+ struct req_state* s = get_state();
+ s->info.method = "PUT";
+ s->op = OP_PUT;
+
+ /* XXX derp derp derp */
+ std::string uri = make_uri(bucket_name, obj_name);
+ s->relative_uri = uri;
+ s->info.request_uri = uri; // XXX
+ s->info.effective_uri = uri;
+ s->info.request_params = "";
+ s->info.domain = ""; /* XXX ? */
+
+ /* XXX required in RGWOp::execute() */
+ s->content_length = bl.length();
+
+ // woo
+ s->user = user;
+
+ return 0;
+ }
+
+ virtual RGWPutObjProcessor *select_processor(RGWObjectCtx& obj_ctx,
+ bool *is_multipart) {
+ struct req_state* s = get_state();
+ uint64_t part_size = s->cct->_conf->rgw_obj_stripe_size;
+ RGWPutObjProcessor_Atomic *processor =
+ new RGWPutObjProcessor_Atomic(obj_ctx, s->bucket_info, s->bucket,
+ s->object.name, part_size, s->req_id,
+ s->bucket_info.versioning_enabled());
+ processor->set_olh_epoch(olh_epoch);
+ processor->set_version_id(version_id);
+ return processor;
+ }
+
+ virtual int get_params() {
+ struct req_state* s = get_state();
+ RGWAccessControlPolicy_S3 s3policy(s->cct);
+ /* we don't have (any) headers, so just create canned ACLs */
+ int ret = s3policy.create_canned(s->owner, s->bucket_owner, s->canned_acl);
+ policy = s3policy;
+ return ret;
+ }
+
+ virtual int get_data(buffer::list& _bl) {
+ /* XXX for now, use sharing semantics */
+ _bl.claim(bl);
+ uint32_t len = _bl.length();
+ bytes_written += len;
+ return len;
+ }
+
+ virtual int exec_start() {
+ return 0;
+ }
+
+ virtual int exec_continue() {
+ return 0;
+ }
+
+ virtual int exec_finish() {
+ return 0;
+ }
+
+ virtual void send_response() {}
+
+ virtual int verify_params() {
+ if (bl.length() > cct->_conf->rgw_max_put_size)
+ return -ERR_TOO_LARGE;
+ return 0;
+ }
+}; /* RGWWriteRequest */
+
} /* namespace rgw */
#endif /* RGW_FILE_H */
int stop();
};
+extern RGWLib librgw;
+
/* request interface */
class RGWLibIO : public RGWClientIO
}; /* RGWLibRequest */
+class RGWLibContinuedReq : public RGWLibRequest {
+ RGWLibIO io_ctx;
+ struct req_state rstate;
+ RGWObjectCtx rados_ctx;
+public:
+
+RGWLibContinuedReq(CephContext* _cct, RGWUserInfo* _user)
+ : RGWLibRequest(_cct, _user), rstate(_cct, &io_ctx.get_env(), _user),
+ rados_ctx(librgw.get_store(), &rstate)
+ {
+ io_ctx.init(_cct);
+
+ /* XXX for now, use ""; could be a legit hostname, or, in future,
+ * perhaps a tenant (Yehuda) */
+ io_ctx.get_env().set("HTTP_HOST", "");
+ }
+
+ virtual int execute() final { abort(); }
+ virtual int exec_start() = 0;
+ virtual int exec_continue() = 0;
+ virtual int exec_finish() = 0;
+
+}; /* RGWLibContinuedReq */
+
class RGWLibProcess : public RGWProcess {
RGWAccessKey access_key;
public:
req_wq.queue(req);
} /* enqueue_req */
+ /* "regular" requests */
void handle_request(RGWRequest* req); // async handler, deletes req
int process_request(RGWLibRequest* req);
int process_request(RGWLibRequest* req, RGWLibIO* io);
void set_access_key(RGWAccessKey& key) { access_key = key; }
+
+ /* requests w/continue semantics */
+ int start_continued_request(RGWLibContinuedReq* req);
+ int finish_continued_request(RGWLibContinuedReq* req);
}; /* RGWLibProcess */
class RGWLibFrontend : public RGWProcessFrontend {
policy.set_ctx(s->cct);
}
- RGWPutObjProcessor *select_processor(RGWObjectCtx& obj_ctx, bool *is_multipart);
+ virtual RGWPutObjProcessor *select_processor(RGWObjectCtx& obj_ctx, bool *is_multipart);
void dispose_processor(RGWPutObjProcessor *processor);
int verify_permission();