]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
librgw: partial implementation of continued request framework
authorMatt Benjamin <mbenjamin@redhat.com>
Tue, 15 Dec 2015 00:19:41 +0000 (19:19 -0500)
committerMatt Benjamin <mbenjamin@redhat.com>
Fri, 12 Feb 2016 17:06:43 +0000 (12:06 -0500)
Signed-off-by: Matt Benjamin <mbenjamin@redhat.com>
src/rgw/librgw.cc
src/rgw/rgw_file.h
src/rgw/rgw_lib.h
src/rgw/rgw_op.h

index 820ca67b1e1bc95b747bf74960f7e48ed8910215..091d18a020e3dc37091afb09d1d59253d3d59702 100644 (file)
@@ -271,6 +271,108 @@ done:
   return (ret < 0 ? ret : s->err.ret);
 } /* process_request */
 
+int start_continued_request(RGWLibContinuedReq* req)
+{
+  int ret = 0;
+
+  dout(1) << "====== " << __func__
+         << " starting new continued request req=" << hex << req << dec
+         << " ======" << dendl;
+
+  /*
+   * invariant: valid requests are derived from RGWOp--well-formed
+   * requests should have assigned RGWRequest::op in their descendant
+   * constructor--if not, the compiler can find it, at the cost of
+   * a runtime check
+   */
+  RGWOp *op = (req->op) ? req->op : dynamic_cast<RGWOp*>(req);
+  if (! op) {
+    dout(1) << "failed to derive cognate RGWOp (invalid op?)" << dendl;
+    return -EINVAL;
+  }
+
+  struct req_state* s = req->get_state();
+
+  /* XXXX the below stanza can be completely internalized--req has
+   * all these objects */
+
+#if 0
+  /* XXX and -then- stash req_state pointers everywhere they are needed */
+  ret = req->init(rgw_env, &rados_ctx, io, s);
+  if (ret < 0) {
+    dout(10) << "failed to initialize request" << dendl;
+    abort_req(s, op, ret);
+    goto done;
+  }
+#endif
+
+  /* req is-a RGWOp, currently initialized separately */
+  ret = req->op_init();
+  if (ret < 0) {
+    dout(10) << "failed to initialize RGWOp" << dendl;
+    abort_req(s, op, ret);
+    goto done;
+  }
+
+  /* XXX authorize does less here then in the REST path, e.g.,
+   * the user's info is cached, but still incomplete */
+  req->log(s, "authorizing");
+  ret = req->authorize();
+  if (ret < 0) {
+    dout(10) << "failed to authorize request" << dendl;
+    abort_req(s, op, ret);
+    goto done;
+  }
+
+  req->log(s, "reading op permissions");
+  ret = req->read_permissions(op);
+  if (ret < 0) {
+    abort_req(s, op, ret);
+    goto done;
+  }
+
+  req->log(s, "init op");
+  ret = op->init_processing();
+  if (ret < 0) {
+    abort_req(s, op, ret);
+    goto done;
+  }
+
+  req->log(s, "verifying op mask");
+  ret = op->verify_op_mask();
+  if (ret < 0) {
+    abort_req(s, op, ret);
+    goto done;
+  }
+
+  req->log(s, "verifying op permissions");
+  ret = op->verify_permission();
+  if (ret < 0) {
+    if (s->system_request) {
+      dout(2) << "overriding permissions due to system operation" << dendl;
+    } else {
+      abort_req(s, op, ret);
+      goto done;
+    }
+  }
+
+  req->log(s, "verifying op params");
+  ret = op->verify_params();
+  if (ret < 0) {
+    abort_req(s, op, ret);
+    goto done;
+  }
+
+  op->pre_exec();
+
+done:
+  return ret;
+}
+
+int finish_continued_request(RGWLibContinuedReq* req)
+{
+}
+
 int RGWLibFrontend::init()
 {
   pprocess = new RGWLibProcess(g_ceph_context, &env,
index d6cbbe1951c10f73a66a6be7d4834a82ee0ae831..d0e477876fb12bea0e18f906a4153aac5f4ca8bf 100644 (file)
@@ -1444,6 +1444,115 @@ public:
   }
 }; /* RGWStatLeafRequest */
 
+/*
+  put object
+*/
+
+class RGWWriteRequest : public RGWLibContinuedReq,
+                       public RGWPutObj /* RGWOp */
+{
+public:
+  const std::string& bucket_name;
+  const std::string& obj_name;
+  buffer::list& bl; /* XXX */
+  size_t bytes_written;
+
+  RGWWriteRequest(CephContext* _cct, RGWUserInfo *_user,
+                 const std::string& _bname, const std::string& _oname,
+                 buffer::list& _bl)
+    : RGWLibContinuedReq(_cct, _user), bucket_name(_bname), obj_name(_oname),
+      bl(_bl), bytes_written(0) {
+    magic = 81;
+    op = this;
+  }
+
+  virtual bool only_bucket() { return true; }
+
+  virtual int op_init() {
+    // assign store, s, and dialect_handler
+    RGWObjectCtx* rados_ctx
+      = static_cast<RGWObjectCtx*>(get_state()->obj_ctx);
+    // framework promises to call op_init after parent init
+    assert(rados_ctx);
+    RGWOp::init(rados_ctx->store, get_state(), this);
+    op = this; // assign self as op: REQUIRED
+    return 0;
+  }
+
+  virtual int header_init() {
+
+    struct req_state* s = get_state();
+    s->info.method = "PUT";
+    s->op = OP_PUT;
+
+    /* XXX derp derp derp */
+    std::string uri = make_uri(bucket_name, obj_name);
+    s->relative_uri = uri;
+    s->info.request_uri = uri; // XXX
+    s->info.effective_uri = uri;
+    s->info.request_params = "";
+    s->info.domain = ""; /* XXX ? */
+
+    /* XXX required in RGWOp::execute() */
+    s->content_length = bl.length();
+
+    // woo
+    s->user = user;
+
+    return 0;
+  }
+
+  virtual RGWPutObjProcessor *select_processor(RGWObjectCtx& obj_ctx,
+                                              bool *is_multipart) {
+    struct req_state* s = get_state();
+    uint64_t part_size = s->cct->_conf->rgw_obj_stripe_size;
+    RGWPutObjProcessor_Atomic *processor =
+      new RGWPutObjProcessor_Atomic(obj_ctx, s->bucket_info, s->bucket,
+                                   s->object.name, part_size, s->req_id,
+                                   s->bucket_info.versioning_enabled());
+    processor->set_olh_epoch(olh_epoch);
+    processor->set_version_id(version_id);
+    return processor;
+  }
+
+  virtual int get_params() {
+    struct req_state* s = get_state();
+    RGWAccessControlPolicy_S3 s3policy(s->cct);
+    /* we don't have (any) headers, so just create canned ACLs */
+    int ret = s3policy.create_canned(s->owner, s->bucket_owner, s->canned_acl);
+    policy = s3policy;
+    return ret;
+  }
+
+  virtual int get_data(buffer::list& _bl) {
+    /* XXX for now, use sharing semantics */
+    _bl.claim(bl);
+    uint32_t len = _bl.length();
+    bytes_written += len;
+    return len;
+  }
+
+  virtual int exec_start() {
+    return 0;
+  }
+
+  virtual int exec_continue() {
+    return 0;
+  }
+
+  virtual int exec_finish() {
+    return 0;
+  }
+
+  virtual void send_response() {}
+
+  virtual int verify_params() {
+    if (bl.length() > cct->_conf->rgw_max_put_size)
+      return -ERR_TOO_LARGE;
+    return 0;
+  }
+}; /* RGWWriteRequest */
+
 } /* namespace rgw */
 
 #endif /* RGW_FILE_H */
index 546d3b395cb691f48a2505fd501831e758fbfedf..3d570c64af923bc7da73807ea02b301579145aba 100644 (file)
@@ -38,6 +38,8 @@ public:
   int stop();
 };
 
+extern RGWLib librgw;
+
 /* request interface */
 
 class RGWLibIO : public RGWClientIO
@@ -146,6 +148,30 @@ public:
 
 }; /* RGWLibRequest */
 
+class RGWLibContinuedReq : public RGWLibRequest {
+  RGWLibIO io_ctx;
+  struct req_state rstate;
+  RGWObjectCtx rados_ctx;
+public:
+
+RGWLibContinuedReq(CephContext* _cct, RGWUserInfo* _user)
+  :  RGWLibRequest(_cct, _user), rstate(_cct, &io_ctx.get_env(), _user),
+     rados_ctx(librgw.get_store(), &rstate)
+    {
+      io_ctx.init(_cct);
+
+      /* XXX for now, use "";  could be a legit hostname, or, in future,
+       * perhaps a tenant (Yehuda) */
+      io_ctx.get_env().set("HTTP_HOST", "");
+    }
+
+  virtual int execute() final { abort(); }
+  virtual int exec_start() = 0;
+  virtual int exec_continue() = 0;
+  virtual int exec_finish() = 0;
+
+}; /* RGWLibContinuedReq */
+
 class RGWLibProcess : public RGWProcess {
     RGWAccessKey access_key;
 public:
@@ -165,10 +191,15 @@ public:
     req_wq.queue(req);
   } /* enqueue_req */
 
+  /* "regular" requests */
   void handle_request(RGWRequest* req); // async handler, deletes req
   int process_request(RGWLibRequest* req);
   int process_request(RGWLibRequest* req, RGWLibIO* io);
   void set_access_key(RGWAccessKey& key) { access_key = key; }
+
+  /* requests w/continue semantics */
+  int start_continued_request(RGWLibContinuedReq* req);
+  int finish_continued_request(RGWLibContinuedReq* req);
 }; /* RGWLibProcess */
 
 class RGWLibFrontend : public RGWProcessFrontend {
index e302e001a513d00f8548c5261bd19d20b4907c9a..c4d414d15d5afd846f9d5caf6f4ab86ea2dc8331 100644 (file)
@@ -666,7 +666,7 @@ public:
     policy.set_ctx(s->cct);
   }
 
-  RGWPutObjProcessor *select_processor(RGWObjectCtx& obj_ctx, bool *is_multipart);
+  virtual RGWPutObjProcessor *select_processor(RGWObjectCtx& obj_ctx, bool *is_multipart);
   void dispose_processor(RGWPutObjProcessor *processor);
 
   int verify_permission();