]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: send request to elasticsearch and parse response
authorYehuda Sadeh <yehuda@redhat.com>
Wed, 22 Mar 2017 00:12:48 +0000 (17:12 -0700)
committerYehuda Sadeh <yehuda@redhat.com>
Tue, 30 May 2017 20:24:41 +0000 (13:24 -0700)
and send back response to user

Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
src/rgw/rgw_http_client.cc
src/rgw/rgw_http_client.h
src/rgw/rgw_rest_client.cc
src/rgw/rgw_sync_module_es.cc
src/rgw/rgw_sync_module_es.h
src/rgw/rgw_sync_module_es_rest.cc

index 81c4c6ef6bca3480135f600bf65b4c43a612825f..f9f8d2ac3461d451753f61806365ae980e9a4a9d 100644 (file)
@@ -314,7 +314,7 @@ int RGWHTTPClient::get_req_retcode()
 /*
  * init request, will be used later with RGWHTTPManager
  */
-int RGWHTTPClient::init_request(const char *method, const char *url, rgw_http_req_data *_req_data)
+int RGWHTTPClient::init_request(const char *method, const char *url, rgw_http_req_data *_req_data, bool send_data_hint)
 {
   assert(!req_data);
   _req_data->get();
@@ -349,7 +349,7 @@ int RGWHTTPClient::init_request(const char *method, const char *url, rgw_http_re
   }
   curl_easy_setopt(easy_handle, CURLOPT_READFUNCTION, send_http_data);
   curl_easy_setopt(easy_handle, CURLOPT_READDATA, (void *)req_data);
-  if (is_upload_request(method)) {
+  if (send_data_hint || is_upload_request(method)) {
     curl_easy_setopt(easy_handle, CURLOPT_UPLOAD, 1L);
   }
   if (has_send_len) {
@@ -746,11 +746,11 @@ void RGWHTTPManager::manage_pending_requests()
   }
 }
 
-int RGWHTTPManager::add_request(RGWHTTPClient *client, const char *method, const char *url)
+int RGWHTTPManager::add_request(RGWHTTPClient *client, const char *method, const char *url, bool send_data_hint)
 {
   rgw_http_req_data *req_data = new rgw_http_req_data;
 
-  int ret = client->init_request(method, url, req_data);
+  int ret = client->init_request(method, url, req_data, send_data_hint);
   if (ret < 0) {
     req_data->put();
     req_data = NULL;
index cbe4f3d031210ec86639e1d04c68f4daa8f52ca4..63ea90001946723cae8c93e996fdc0b43b8ceec4 100644 (file)
@@ -42,7 +42,8 @@ protected:
 
   int init_request(const char *method,
                    const char *url,
-                   rgw_http_req_data *req_data);
+                   rgw_http_req_data *req_data,
+                   bool send_data_hint = false);
 
   virtual int receive_header(void *ptr, size_t len) {
     return 0;
@@ -264,7 +265,8 @@ public:
   int set_threaded();
   void stop();
 
-  int add_request(RGWHTTPClient *client, const char *method, const char *url);
+  int add_request(RGWHTTPClient *client, const char *method, const char *url,
+                  bool send_data_hint = false);
   int remove_request(RGWHTTPClient *client);
 
   /* only for non threaded case */
index 3572cea26dd8fd0f482535089144d2e9dc7ca830..1cccbab7d02b52ac15e597a7450700a877be2b0a 100644 (file)
@@ -686,8 +686,10 @@ int RGWRESTStreamRWRequest::send_request(RGWAccessKey *key, map<string, string>&
     headers.push_back(pair<string, string>(iter->first, iter->second));
   }
 
+  bool send_data_hint = false;
   if (send_data) {
-    in_data.claim(*send_data);
+    outbl.claim(*send_data);
+    send_data_hint = true;
   }
 
   RGWHTTPManager *pmanager = &http_manager;
@@ -695,7 +697,7 @@ int RGWRESTStreamRWRequest::send_request(RGWAccessKey *key, map<string, string>&
     pmanager = mgr;
   }
 
-  int r = pmanager->add_request(this, new_info.method, new_url.c_str());
+  int r = pmanager->add_request(this, new_info.method, new_url.c_str(), send_data_hint);
   if (r < 0)
     return r;
 
index 7c2ff99731ec1e0e411f84cc65d499d56105922e..fd0df360cb547f349acdfe40aa564dac9968143a 100644 (file)
@@ -169,7 +169,6 @@ struct es_obj_metadata {
     }
     f->close_section();
   }
-
 };
 
 class RGWElasticInitConfigCBCR : public RGWCoroutine {
@@ -333,6 +332,10 @@ RGWRESTConn *RGWElasticSyncModuleInstance::get_rest_conn()
   return data_handler->get_rest_conn();
 }
 
+string RGWElasticSyncModuleInstance::get_index_path(const RGWRealm& realm) {
+  return es_get_index_path(realm);
+}
+
 RGWRESTMgr *RGWElasticSyncModuleInstance::get_rest_filter(int dialect, RGWRESTMgr *orig) {
   if (dialect != RGW_REST_S3) {
     return orig;
index 28b11716fd0cdfaea4121232d777859a16da4b8d..539302ef6f46cea26813617f77a67c8f7d02dd06 100644 (file)
@@ -3,6 +3,8 @@
 
 #include "rgw_sync_module.h"
 
+class RGWRealm;
+
 class RGWElasticSyncModule : public RGWSyncModule {
 public:
   RGWElasticSyncModule() {}
@@ -22,6 +24,7 @@ public:
   RGWDataSyncModule *get_data_handler() override;
   RGWRESTMgr *get_rest_filter(int dialect, RGWRESTMgr *orig) override;
   RGWRESTConn *get_rest_conn();
+  std::string get_index_path(const RGWRealm& realm);
 };
 
 #endif
index 8bc774fa9d34e3fc636dd559bb928cd53f471109..f0f2ec65c71bdb61e0e23fba01595edc397df612 100644 (file)
 #define dout_context g_ceph_context
 #define dout_subsys ceph_subsys_rgw
 
+struct es_index_obj_response {
+  string bucket;
+  rgw_obj_key key;
+  ACLOwner owner;
+  set<string> read_permissions;
+
+  struct {
+    uint64_t size;
+    ceph::real_time mtime;
+    string etag;
+    map<string, string> custom_str;
+
+    template <class T>
+    struct _custom_entry {
+      string name;
+      T value;
+      void decode_json(JSONObj *obj) {
+        JSONDecoder::decode_json("name", name, obj);
+        JSONDecoder::decode_json("value", value, obj);
+      }
+    };
+
+    void decode_json(JSONObj *obj) {
+      JSONDecoder::decode_json("size", size, obj);
+      string mtime_str;
+      JSONDecoder::decode_json("mtime", mtime_str, obj);
+      parse_time(mtime_str.c_str(), &mtime);
+      JSONDecoder::decode_json("etag", etag, obj);
+      list<_custom_entry<string> > str_entries;
+      JSONDecoder::decode_json("custom-string", str_entries, obj);
+      for (auto& e : str_entries) {
+        custom_str[e.name] = e.value;
+      }
+    }
+  } meta;
+
+  void decode_json(JSONObj *obj) {
+    JSONDecoder::decode_json("bucket", bucket, obj);
+    JSONDecoder::decode_json("name", key.name, obj);
+    JSONDecoder::decode_json("instance", key.instance, obj);
+    JSONDecoder::decode_json("permissions", read_permissions, obj);
+    JSONDecoder::decode_json("meta", meta, obj);
+  }
+};
+
+struct es_search_response {
+  uint32_t took;
+  bool timed_out;
+  struct {
+    uint32_t total;
+    uint32_t successful;
+    uint32_t failed;
+    void decode_json(JSONObj *obj) {
+      JSONDecoder::decode_json("total", total, obj);
+      JSONDecoder::decode_json("successful", successful, obj);
+      JSONDecoder::decode_json("failed", failed, obj);
+    }
+  } shards;
+  struct obj_hit {
+    string index;
+    string type;
+    string id;
+    // double score
+    es_index_obj_response source;
+    void decode_json(JSONObj *obj) {
+      JSONDecoder::decode_json("_index", index, obj);
+      JSONDecoder::decode_json("_type", type, obj);
+      JSONDecoder::decode_json("_id", id, obj);
+      JSONDecoder::decode_json("_source", source, obj);
+    }
+  };
+  struct {
+    uint32_t total;
+    // double max_score;
+    list<obj_hit> hits;
+    void decode_json(JSONObj *obj) {
+      JSONDecoder::decode_json("total", total, obj);
+      // JSONDecoder::decode_json("max_score", max_score, obj);
+      JSONDecoder::decode_json("hits", hits, obj);
+    }
+  } hits;
+  void decode_json(JSONObj *obj) {
+    JSONDecoder::decode_json("took", took, obj);
+    JSONDecoder::decode_json("timed_out", timed_out, obj);
+    JSONDecoder::decode_json("_shards", shards, obj);
+    JSONDecoder::decode_json("hits", hits, obj);
+  }
+};
+
 class RGWMetadataSearchOp : public RGWOp {
   RGWElasticSyncModuleInstance *es_module;
 protected:
   string expression;
 
+  es_search_response response;
+
 public:
   RGWMetadataSearchOp(RGWElasticSyncModuleInstance *_es_module) : es_module(_es_module) {}
 
@@ -34,7 +125,6 @@ void RGWMetadataSearchOp::pre_exec()
   rgw_bucket_object_pre_exec(s);
 }
 
-
 void RGWMetadataSearchOp::execute()
 {
   op_ret = get_params();
@@ -50,8 +140,46 @@ void RGWMetadataSearchOp::execute()
     return;
   }
 
+  JSONFormatter f;
+  encode_json("root", es_query, &f);
+
   RGWRESTConn *conn = es_module->get_rest_conn();
-  // conn->
+
+  bufferlist in;
+  bufferlist out;
+
+  stringstream ss;
+
+  f.flush(ss);
+  in.append(ss.str());
+
+  string resource = es_module->get_index_path(store->get_realm()) + "/_search";
+  param_vec_t params;
+//    params.push_back(param_pair_t("size", size));
+  ldout(s->cct, 20) << "sending request to elasticsearch, payload=" << string(in.c_str(), in.length()) << dendl;
+  op_ret = conn->get_resource(resource, &params, nullptr, out, &in);
+  if (op_ret < 0) {
+    ldout(s->cct, 0) << "ERROR: failed to fetch resource (r=" << resource << ", ret=" << op_ret << ")" << dendl;
+    return;
+  }
+
+  ldout(s->cct, 20) << "response: " << string(out.c_str(), out.length()) << dendl;
+
+  JSONParser jparser;
+  if (!jparser.parse(out.c_str(), out.length())) {
+    ldout(s->cct, 0) << "ERROR: failed to parser elasticsearch response" << dendl;
+    op_ret = -EINVAL;
+    return;
+  }
+
+  try {
+    decode_json_obj(response, &jparser);
+  } catch (JSONDecoder::err& e) {
+    ldout(s->cct, 0) << "ERROR: failed to decode JSON input: " << e.message << dendl;
+    op_ret = -EINVAL;
+    return;
+  }
+
 }
 
 class RGWMetadataSearch_ObjStore_S3 : public RGWMetadataSearchOp {
@@ -72,8 +200,21 @@ public:
       return;
     }
 
-    // TODO
-
+    s->formatter->open_object_section("SearchMetadataResponse");
+    for (auto& i : response.hits.hits) {
+      es_index_obj_response& e = i.source;
+      s->formatter->open_object_section("Contents");
+      s->formatter->dump_string("Key", e.key.name);
+      string instance = (!e.key.instance.empty() ? e.key.instance : "null");
+      s->formatter->dump_string("Instance", instance.c_str());
+      dump_time(s, "LastModified", &e.meta.mtime);
+      s->formatter->dump_format("ETag", "\"%s\"", e.meta.etag.c_str());
+      dump_owner(s, e.owner.get_id(), e.owner.get_display_name());
+      s->formatter->close_section();
+      rgw_flush_formatter(s, s->formatter);
+    };
+    s->formatter->close_section();
+   rgw_flush_formatter_and_reset(s, s->formatter);
   }
 };