]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
rgw: http interfaces take optional_yield
authorCasey Bodley <cbodley@redhat.com>
Sat, 24 Nov 2018 01:46:47 +0000 (20:46 -0500)
committerCasey Bodley <cbodley@redhat.com>
Fri, 29 Mar 2019 03:39:42 +0000 (23:39 -0400)
Signed-off-by: Casey Bodley <cbodley@redhat.com>
14 files changed:
src/rgw/rgw_auth_keystone.cc
src/rgw/rgw_cr_rest.cc
src/rgw/rgw_cr_rest.h
src/rgw/rgw_crypt.cc
src/rgw/rgw_data_sync.cc
src/rgw/rgw_http_client.cc
src/rgw/rgw_http_client.h
src/rgw/rgw_keystone.cc
src/rgw/rgw_opa.cc
src/rgw/rgw_rest_client.cc
src/rgw/rgw_rest_conn.h
src/rgw/rgw_rest_sts.cc
src/rgw/rgw_swift_auth.cc
src/rgw/rgw_sync.cc

index 173c948251ef4581500f1e91ec7d1abe8ae0d27a..5249600d85b73a22937520507ac6ad314ef10ea3 100644 (file)
@@ -95,7 +95,7 @@ TokenEngine::get_from_keystone(const DoutPrefixProvider* dpp, const std::string&
 
   validate.set_url(url);
 
-  int ret = validate.process();
+  int ret = validate.process(null_yield);
   if (ret < 0) {
     throw ret;
   }
@@ -352,7 +352,7 @@ EC2Engine::get_from_keystone(const DoutPrefixProvider* dpp, const boost::string_
   validate.set_send_length(os.str().length());
 
   /* send request */
-  ret = validate.process();
+  ret = validate.process(null_yield);
   if (ret < 0) {
     ldpp_dout(dpp, 2) << "s3 keystone: token validation ERROR: "
                   << token_body_bl.c_str() << dendl;
@@ -428,7 +428,7 @@ std::pair<boost::optional<std::string>, int> EC2Engine::get_secret_from_keystone
   secret.set_verify_ssl(cct->_conf->rgw_keystone_verify_ssl);
 
   /* send request */
-  ret = secret.process();
+  ret = secret.process(null_yield);
   if (ret < 0) {
     ldpp_dout(dpp, 2) << "s3 keystone: secret fetching error: "
                   << token_body_bl.c_str() << dendl;
index 6a5e38a23184dae2aee1a63a27441df06799c134..9c2b4f0f5f0ad7dd84939f7dabd4eb74aa09a158 100644 (file)
@@ -82,7 +82,7 @@ RGWStreamReadHTTPResourceCRF::~RGWStreamReadHTTPResourceCRF()
 {
   if (req) {
     req->cancel();
-    req->wait();
+    req->wait(null_yield);
     delete req;
   }
 }
@@ -186,7 +186,7 @@ RGWStreamWriteHTTPResourceCRF::~RGWStreamWriteHTTPResourceCRF()
 {
   if (req) {
     req->cancel();
-    req->wait();
+    req->wait(null_yield);
     delete req;
   }
 }
index a73828b3f22cf9e4f139995bbd05f743fd8eb853..48106d7e3a84ffb8cd3194df5ea0e502b34c9186 100644 (file)
@@ -90,7 +90,7 @@ public:
 
 
   virtual int wait_result() {
-    return http_op->wait(result);
+    return http_op->wait(result, null_yield);
   }
 
   int request_complete() override {
@@ -138,7 +138,7 @@ class RGWReadRESTResourceCR : public RGWReadRawRESTResourceCR {
   {}
 
   int wait_result() override {
-    return http_op->wait(result);
+    return http_op->wait(result, null_yield);
   }
 
 };
@@ -206,10 +206,10 @@ class RGWSendRawRESTResourceCR: public RGWSimpleCoroutine {
   int request_complete() override {
     int ret;
     if (result || err_result) {
-      ret = http_op->wait(result, err_result);
+      ret = http_op->wait(result, null_yield, err_result);
     } else {
       bufferlist bl;
-      ret = http_op->wait(&bl);
+      ret = http_op->wait(&bl, null_yield);
     }
     auto op = std::move(http_op); // release ref on return
     if (ret < 0) {
@@ -363,7 +363,7 @@ public:
   int request_complete() override {
     int ret;
     bufferlist bl;
-    ret = http_op->wait(&bl);
+    ret = http_op->wait(&bl, null_yield);
     auto op = std::move(http_op); // release ref on return
     if (ret < 0) {
       error_stream << "http operation failed: " << op->to_str()
index c50a8c0d66bcc60a04c6a95b40e608c5edad7fc8..5456e6b8cb0aa9f70198015e8bf00f6e691fe48a 100644 (file)
@@ -747,7 +747,7 @@ static int request_key_from_barbican(CephContext *cct,
   secret_req.append_header("Accept", "application/octet-stream");
   secret_req.append_header("X-Auth-Token", barbican_token);
 
-  res = secret_req.process();
+  res = secret_req.process(null_yield);
   if (res < 0) {
     return res;
   }
index cad6a0ea3499385cbbac7dfbc82ccc3c89fb2e54..8137b8e67cf2a019e245032bf68a14de176b1f77 100644 (file)
@@ -222,7 +222,7 @@ public:
         return io_block(0);
       }
       yield {
-        int ret = http_op->wait(shard_info);
+        int ret = http_op->wait(shard_info, null_yield);
         if (ret < 0) {
           return set_cr_error(ret);
         }
@@ -310,7 +310,7 @@ public:
       }
       yield {
         timer.reset();
-        int ret = http_op->wait(&response);
+        int ret = http_op->wait(&response, null_yield);
         if (ret < 0) {
           if (sync_env->counters && ret != -ENOENT) {
             sync_env->counters->inc(sync_counters::l_poll_err);
@@ -406,7 +406,7 @@ public:
   }
 
   int request_complete() override {
-    int ret = http_op->wait(result);
+    int ret = http_op->wait(result, null_yield);
     http_op->put();
     if (ret < 0 && ret != -ENOENT) {
       ldout(sync_env->store->ctx(), 0) << "ERROR: failed to list remote datalog shard, ret=" << ret << dendl;
index 50fe7915099faf64d85e16bd526050a942e0c095..c3ea3e38a69b5fabd5f222eb7e39bcf752b89dc7 100644 (file)
@@ -13,6 +13,7 @@
 #include "rgw_common.h"
 #include "rgw_http_client.h"
 #include "rgw_http_errors.h"
+#include "common/async/completion.h"
 #include "common/RefCountedObj.h"
 
 #include "rgw_coroutine.h"
@@ -46,15 +47,36 @@ struct rgw_http_req_data : public RefCountedObject {
   Mutex lock;
   Cond cond;
 
+  using Signature = void(boost::system::error_code);
+  using Completion = ceph::async::Completion<Signature>;
+  std::unique_ptr<Completion> completion;
+
   rgw_http_req_data() : id(-1), lock("rgw_http_req_data::lock") {
     memset(error_buf, 0, sizeof(error_buf));
   }
 
-  int wait() {
+  template <typename ExecutionContext, typename CompletionToken>
+  auto async_wait(ExecutionContext& ctx, CompletionToken&& token) {
+    boost::asio::async_completion<CompletionToken, Signature> init(token);
+    auto& handler = init.completion_handler;
+    completion = Completion::create(ctx.get_executor(), std::move(handler));
+    return init.result.get();
+  }
+
+  int wait(optional_yield y) {
     Mutex::Locker l(lock);
     if (done) {
       return ret;
     }
+#ifdef HAVE_BOOST_CONTEXT
+    if (y) {
+      auto& context = y.get_io_context();
+      auto& yield = y.get_yield_context();
+      boost::system::error_code ec;
+      async_wait(context, yield[ec]);
+      return -ec.value();
+    }
+#endif
     cond.Wait(lock);
     return ret;
   }
@@ -73,7 +95,12 @@ struct rgw_http_req_data : public RefCountedObject {
     curl_handle = NULL;
     h = NULL;
     done = true;
-    cond.Signal();
+    if (completion) {
+      boost::system::error_code ec(-ret, boost::system::system_category());
+      Completion::post(std::move(completion), ec);
+    } else {
+      cond.Signal();
+    }
   }
 
   bool _is_done() {
@@ -448,9 +475,9 @@ static bool is_upload_request(const string& method)
 /*
  * process a single simple one off request
  */
-int RGWHTTPClient::process()
+int RGWHTTPClient::process(optional_yield y)
 {
-  return RGWHTTP::process(this);
+  return RGWHTTP::process(this, y);
 }
 
 string RGWHTTPClient::to_str()
@@ -528,9 +555,9 @@ bool RGWHTTPClient::is_done()
 /*
  * wait for async request to complete
  */
-int RGWHTTPClient::wait()
+int RGWHTTPClient::wait(optional_yield y)
 {
-  return req_data->wait();
+  return req_data->wait(y);
 }
 
 void RGWHTTPClient::cancel()
@@ -1210,7 +1237,7 @@ int RGWHTTP::send(RGWHTTPClient *req) {
   return 0;
 }
 
-int RGWHTTP::process(RGWHTTPClient *req) {
+int RGWHTTP::process(RGWHTTPClient *req, optional_yield y) {
   if (!req) {
     return 0;
   }
@@ -1219,6 +1246,6 @@ int RGWHTTP::process(RGWHTTPClient *req) {
     return r;
   }
 
-  return req->wait();
+  return req->wait(y);
 }
 
index 04d8506e3903d7aa0432048a883550abfe7bb465..35c11e1fd4e5774d64f4e5682c8550627c698da0 100644 (file)
@@ -4,6 +4,7 @@
 #ifndef CEPH_RGW_HTTP_CLIENT_H
 #define CEPH_RGW_HTTP_CLIENT_H
 
+#include "common/async/yield_context.h"
 #include "common/RWLock.h"
 #include "common/Cond.h"
 #include "rgw_common.h"
@@ -178,9 +179,9 @@ public:
     verify_ssl = flag;
   }
 
-  int process();
+  int process(optional_yield y);
 
-  int wait();
+  int wait(optional_yield y);
   void cancel();
   bool is_done();
 
@@ -360,6 +361,6 @@ class RGWHTTP
 {
 public:
   static int send(RGWHTTPClient *req);
-  static int process(RGWHTTPClient *req);
+  static int process(RGWHTTPClient *req, optional_yield y);
 };
 #endif
index 758c3331878f37c1e7c88891afd783736f96ecee..dbe708fefada09cb562984cbb2e8c9012d244341 100644 (file)
@@ -282,7 +282,7 @@ int Service::issue_admin_token_request(CephContext* const cct,
 
   token_req.set_url(token_url);
 
-  const int ret = token_req.process();
+  const int ret = token_req.process(null_yield);
   if (ret < 0) {
     return ret;
   }
@@ -356,7 +356,7 @@ int Service::get_keystone_barbican_token(CephContext * const cct,
   token_req.set_url(token_url);
 
   ldout(cct, 20) << "Requesting secret from barbican url=" << token_url << dendl;
-  const int ret = token_req.process();
+  const int ret = token_req.process(null_yield);
   if (ret < 0) {
     ldout(cct, 20) << "Barbican process error:" << token_bl.c_str() << dendl;
     return ret;
@@ -583,7 +583,7 @@ int TokenCache::RevokeThread::check_revoked()
   req.set_url(url);
 
   req.set_send_length(0);
-  int ret = req.process();
+  int ret = req.process(null_yield);
   if (ret < 0) {
     return ret;
   }
index 08abf5a174fc493716058705e64f3f24d90e04d8..8aca404d7193c87db228816f80dda4f1b383210f 100644 (file)
@@ -55,7 +55,7 @@ int rgw_opa_authorize(RGWOp *& op,
   req.set_send_length(ss.str().length());
 
   /* send request */
-  ret = req.process();
+  ret = req.process(null_yield);
   if (ret < 0) {
     ldpp_dout(op, 2) << "OPA process error:" << bl.c_str() << dendl;
     return ret;
index daf6a24e53daa379017a9ec9e22d2abbfa18ff94..c2ddbd6550d219cd07c69ff263cef69775241e44 100644 (file)
@@ -157,7 +157,7 @@ int RGWRESTSimpleRequest::execute(RGWAccessKey& key, const char *_method, const
   ldout(cct, 15) << "generated auth header: " << auth_hdr << dendl;
 
   headers.push_back(pair<string, string>("AUTHORIZATION", auth_hdr));
-  int r = process();
+  int r = process(null_yield);
   if (r < 0)
     return r;
 
@@ -324,7 +324,7 @@ int RGWRESTSimpleRequest::forward_request(RGWAccessKey& key, req_info& info, siz
   method = new_info.method;
   url = new_url;
 
-  int r = process();
+  int r = process(null_yield);
   if (r < 0){
     if (r == -EINVAL){
       // curl_easy has errored, generally means the service is not available
@@ -802,7 +802,7 @@ int RGWRESTStreamRWRequest::complete_request(string *etag,
                                              map<string, string> *pattrs,
                                              map<string, string> *pheaders)
 {
-  int ret = wait();
+  int ret = wait(null_yield);
   if (ret < 0) {
     return ret;
   }
index fa273e91a7cd5f8d86cbf34b265948d188606f07..ccc05d007f3dd6886ace7a0384f6b75c08245acf 100644 (file)
@@ -302,8 +302,8 @@ public:
     return req.get_http_status();
   }
 
-  int wait(bufferlist *pbl) {
-    int ret = req.wait();
+  int wait(bufferlist *pbl, optional_yield y) {
+    int ret = req.wait(y);
     if (ret < 0) {
       return ret;
     }
@@ -316,7 +316,7 @@ public:
   }
 
   template <class T>
-  int wait(T *dest);
+  int wait(T *dest, optional_yield y);
 
   template <class T>
   int fetch(T *dest);
@@ -353,9 +353,9 @@ int RGWRESTReadResource::fetch(T *dest)
 }
 
 template <class T>
-int RGWRESTReadResource::wait(T *dest)
+int RGWRESTReadResource::wait(T *dest, optional_yield y)
 {
-  int ret = req.wait();
+  int ret = req.wait(y);
   if (ret < 0) {
     return ret;
   }
@@ -426,8 +426,8 @@ public:
     return req.get_http_status();
   }
 
-  int wait(bufferlist *pbl) {
-    int ret = req.wait();
+  int wait(bufferlist *pbl, optional_yield y) {
+    int ret = req.wait(y);
     *pbl = bl;
     if (ret < 0) {
       return ret;
@@ -440,7 +440,7 @@ public:
   }
 
   template <class T, class E = int>
-  int wait(T *dest, E *err_result = nullptr);
+  int wait(T *dest, optional_yield y, E *err_result = nullptr);
 };
 
 template <class T, class E>
@@ -466,9 +466,9 @@ int RGWRESTSendResource::decode_resource(T *dest, E *err_result)
 }
 
 template <class T, class E>
-int RGWRESTSendResource::wait(T *dest, E *err_result)
+int RGWRESTSendResource::wait(T *dest, optional_yield y, E *err_result)
 {
-  int ret = req.wait();
+  int ret = req.wait(y);
   if (ret < 0) {
     if (err_result) {
       parse_decode_json(cct, *err_result, bl);
index 072ee9c25c5937c5acf9f238b8b825e629fa6325..30d7e821f67c888fbf4372f54cd5922d12c81ece 100644 (file)
@@ -64,7 +64,7 @@ WebTokenEngine::get_from_idp(const DoutPrefixProvider* dpp, const std::string& t
     introspect_req.set_post_data(post_data);
     introspect_req.set_send_length(post_data.length());
 
-    int res = introspect_req.process();
+    int res = introspect_req.process(null_yield);
     if (res < 0) {
       ldpp_dout(dpp, 10) << "HTTP request res: " << res << dendl;
       throw -EINVAL;
index 0f5bdd0274707431029291bb30824e27f5504a8a..a8c911e9103e635097e0f34e6a75f24d7e42cfa9 100644 (file)
@@ -398,7 +398,7 @@ ExternalTokenEngine::authenticate(const DoutPrefixProvider* dpp,
 
   ldpp_dout(dpp, 10) << "rgw_swift_validate_token url=" << url_buf << dendl;
 
-  int ret = validator.process();
+  int ret = validator.process(null_yield);
   if (ret < 0) {
     throw ret;
   }
index 09e4c8034e75b314e0b4e5822198598dbf92d3e4..eb83a51192f334cc7ab1fbe289c6c193d7b04c40 100644 (file)
@@ -512,7 +512,7 @@ public:
         return io_block(0);
       }
       yield {
-        int ret = http_op->wait(shard_info);
+        int ret = http_op->wait(shard_info, null_yield);
         http_op->put();
         if (ret < 0) {
           return set_cr_error(ret);
@@ -576,7 +576,7 @@ public:
   }
 
   int request_complete() override {
-    int ret = http_op->wait(result);
+    int ret = http_op->wait(result, null_yield);
     http_op->put();
     if (ret < 0 && ret != -ENOENT) {
       ldpp_dout(sync_env->dpp, 0) << "ERROR: failed to list remote mdlog shard, ret=" << ret << dendl;
@@ -1046,7 +1046,7 @@ public:
         return io_block(0);
       }
       yield {
-        int ret = http_op->wait(pbl);
+        int ret = http_op->wait(pbl, null_yield);
         http_op->put();
         if (ret < 0) {
           return set_cr_error(ret);
@@ -2378,7 +2378,7 @@ int RGWCloneMetaLogCoroutine::state_send_rest_request()
 
 int RGWCloneMetaLogCoroutine::state_receive_rest_response()
 {
-  int ret = http_op->wait(&data);
+  int ret = http_op->wait(&data, null_yield);
   if (ret < 0) {
     error_stream << "http operation failed: " << http_op->to_str() << " status=" << http_op->get_http_status() << std::endl;
     ldpp_dout(sync_env->dpp, 5) << "failed to wait for op, ret=" << ret << dendl;