From: Yehuda Sadeh Date: Wed, 22 Mar 2017 00:12:48 +0000 (-0700) Subject: rgw: send request to elasticsearch and parse response X-Git-Tag: ses5-milestone6~9^2~3^2~55 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=86fe1223c939b0d0772fe156e25eb317f21bda16;p=ceph.git rgw: send request to elasticsearch and parse response and send back response to user Signed-off-by: Yehuda Sadeh --- diff --git a/src/rgw/rgw_http_client.cc b/src/rgw/rgw_http_client.cc index 81c4c6ef6bc..f9f8d2ac346 100644 --- a/src/rgw/rgw_http_client.cc +++ b/src/rgw/rgw_http_client.cc @@ -314,7 +314,7 @@ int RGWHTTPClient::get_req_retcode() /* * init request, will be used later with RGWHTTPManager */ -int RGWHTTPClient::init_request(const char *method, const char *url, rgw_http_req_data *_req_data) +int RGWHTTPClient::init_request(const char *method, const char *url, rgw_http_req_data *_req_data, bool send_data_hint) { assert(!req_data); _req_data->get(); @@ -349,7 +349,7 @@ int RGWHTTPClient::init_request(const char *method, const char *url, rgw_http_re } curl_easy_setopt(easy_handle, CURLOPT_READFUNCTION, send_http_data); curl_easy_setopt(easy_handle, CURLOPT_READDATA, (void *)req_data); - if (is_upload_request(method)) { + if (send_data_hint || is_upload_request(method)) { curl_easy_setopt(easy_handle, CURLOPT_UPLOAD, 1L); } if (has_send_len) { @@ -746,11 +746,11 @@ void RGWHTTPManager::manage_pending_requests() } } -int RGWHTTPManager::add_request(RGWHTTPClient *client, const char *method, const char *url) +int RGWHTTPManager::add_request(RGWHTTPClient *client, const char *method, const char *url, bool send_data_hint) { rgw_http_req_data *req_data = new rgw_http_req_data; - int ret = client->init_request(method, url, req_data); + int ret = client->init_request(method, url, req_data, send_data_hint); if (ret < 0) { req_data->put(); req_data = NULL; diff --git a/src/rgw/rgw_http_client.h b/src/rgw/rgw_http_client.h index cbe4f3d0312..63ea9000194 100644 --- a/src/rgw/rgw_http_client.h +++ b/src/rgw/rgw_http_client.h @@ -42,7 +42,8 @@ protected: int init_request(const char *method, const char *url, - rgw_http_req_data *req_data); + rgw_http_req_data *req_data, + bool send_data_hint = false); virtual int receive_header(void *ptr, size_t len) { return 0; @@ -264,7 +265,8 @@ public: int set_threaded(); void stop(); - int add_request(RGWHTTPClient *client, const char *method, const char *url); + int add_request(RGWHTTPClient *client, const char *method, const char *url, + bool send_data_hint = false); int remove_request(RGWHTTPClient *client); /* only for non threaded case */ diff --git a/src/rgw/rgw_rest_client.cc b/src/rgw/rgw_rest_client.cc index 3572cea26dd..1cccbab7d02 100644 --- a/src/rgw/rgw_rest_client.cc +++ b/src/rgw/rgw_rest_client.cc @@ -686,8 +686,10 @@ int RGWRESTStreamRWRequest::send_request(RGWAccessKey *key, map& headers.push_back(pair(iter->first, iter->second)); } + bool send_data_hint = false; if (send_data) { - in_data.claim(*send_data); + outbl.claim(*send_data); + send_data_hint = true; } RGWHTTPManager *pmanager = &http_manager; @@ -695,7 +697,7 @@ int RGWRESTStreamRWRequest::send_request(RGWAccessKey *key, map& pmanager = mgr; } - int r = pmanager->add_request(this, new_info.method, new_url.c_str()); + int r = pmanager->add_request(this, new_info.method, new_url.c_str(), send_data_hint); if (r < 0) return r; diff --git a/src/rgw/rgw_sync_module_es.cc b/src/rgw/rgw_sync_module_es.cc index 7c2ff99731e..fd0df360cb5 100644 --- a/src/rgw/rgw_sync_module_es.cc +++ b/src/rgw/rgw_sync_module_es.cc @@ -169,7 +169,6 @@ struct es_obj_metadata { } f->close_section(); } - }; class RGWElasticInitConfigCBCR : public RGWCoroutine { @@ -333,6 +332,10 @@ RGWRESTConn *RGWElasticSyncModuleInstance::get_rest_conn() return data_handler->get_rest_conn(); } +string RGWElasticSyncModuleInstance::get_index_path(const RGWRealm& realm) { + return es_get_index_path(realm); +} + RGWRESTMgr *RGWElasticSyncModuleInstance::get_rest_filter(int dialect, RGWRESTMgr *orig) { if (dialect != RGW_REST_S3) { return orig; diff --git a/src/rgw/rgw_sync_module_es.h b/src/rgw/rgw_sync_module_es.h index 28b11716fd0..539302ef6f4 100644 --- a/src/rgw/rgw_sync_module_es.h +++ b/src/rgw/rgw_sync_module_es.h @@ -3,6 +3,8 @@ #include "rgw_sync_module.h" +class RGWRealm; + class RGWElasticSyncModule : public RGWSyncModule { public: RGWElasticSyncModule() {} @@ -22,6 +24,7 @@ public: RGWDataSyncModule *get_data_handler() override; RGWRESTMgr *get_rest_filter(int dialect, RGWRESTMgr *orig) override; RGWRESTConn *get_rest_conn(); + std::string get_index_path(const RGWRealm& realm); }; #endif diff --git a/src/rgw/rgw_sync_module_es_rest.cc b/src/rgw/rgw_sync_module_es_rest.cc index 8bc774fa9d3..f0f2ec65c71 100644 --- a/src/rgw/rgw_sync_module_es_rest.cc +++ b/src/rgw/rgw_sync_module_es_rest.cc @@ -8,11 +8,102 @@ #define dout_context g_ceph_context #define dout_subsys ceph_subsys_rgw +struct es_index_obj_response { + string bucket; + rgw_obj_key key; + ACLOwner owner; + set read_permissions; + + struct { + uint64_t size; + ceph::real_time mtime; + string etag; + map custom_str; + + template + struct _custom_entry { + string name; + T value; + void decode_json(JSONObj *obj) { + JSONDecoder::decode_json("name", name, obj); + JSONDecoder::decode_json("value", value, obj); + } + }; + + void decode_json(JSONObj *obj) { + JSONDecoder::decode_json("size", size, obj); + string mtime_str; + JSONDecoder::decode_json("mtime", mtime_str, obj); + parse_time(mtime_str.c_str(), &mtime); + JSONDecoder::decode_json("etag", etag, obj); + list<_custom_entry > str_entries; + JSONDecoder::decode_json("custom-string", str_entries, obj); + for (auto& e : str_entries) { + custom_str[e.name] = e.value; + } + } + } meta; + + void decode_json(JSONObj *obj) { + JSONDecoder::decode_json("bucket", bucket, obj); + JSONDecoder::decode_json("name", key.name, obj); + JSONDecoder::decode_json("instance", key.instance, obj); + JSONDecoder::decode_json("permissions", read_permissions, obj); + JSONDecoder::decode_json("meta", meta, obj); + } +}; + +struct es_search_response { + uint32_t took; + bool timed_out; + struct { + uint32_t total; + uint32_t successful; + uint32_t failed; + void decode_json(JSONObj *obj) { + JSONDecoder::decode_json("total", total, obj); + JSONDecoder::decode_json("successful", successful, obj); + JSONDecoder::decode_json("failed", failed, obj); + } + } shards; + struct obj_hit { + string index; + string type; + string id; + // double score + es_index_obj_response source; + void decode_json(JSONObj *obj) { + JSONDecoder::decode_json("_index", index, obj); + JSONDecoder::decode_json("_type", type, obj); + JSONDecoder::decode_json("_id", id, obj); + JSONDecoder::decode_json("_source", source, obj); + } + }; + struct { + uint32_t total; + // double max_score; + list hits; + void decode_json(JSONObj *obj) { + JSONDecoder::decode_json("total", total, obj); + // JSONDecoder::decode_json("max_score", max_score, obj); + JSONDecoder::decode_json("hits", hits, obj); + } + } hits; + void decode_json(JSONObj *obj) { + JSONDecoder::decode_json("took", took, obj); + JSONDecoder::decode_json("timed_out", timed_out, obj); + JSONDecoder::decode_json("_shards", shards, obj); + JSONDecoder::decode_json("hits", hits, obj); + } +}; + class RGWMetadataSearchOp : public RGWOp { RGWElasticSyncModuleInstance *es_module; protected: string expression; + es_search_response response; + public: RGWMetadataSearchOp(RGWElasticSyncModuleInstance *_es_module) : es_module(_es_module) {} @@ -34,7 +125,6 @@ void RGWMetadataSearchOp::pre_exec() rgw_bucket_object_pre_exec(s); } - void RGWMetadataSearchOp::execute() { op_ret = get_params(); @@ -50,8 +140,46 @@ void RGWMetadataSearchOp::execute() return; } + JSONFormatter f; + encode_json("root", es_query, &f); + RGWRESTConn *conn = es_module->get_rest_conn(); - // conn-> + + bufferlist in; + bufferlist out; + + stringstream ss; + + f.flush(ss); + in.append(ss.str()); + + string resource = es_module->get_index_path(store->get_realm()) + "/_search"; + param_vec_t params; +// params.push_back(param_pair_t("size", size)); + ldout(s->cct, 20) << "sending request to elasticsearch, payload=" << string(in.c_str(), in.length()) << dendl; + op_ret = conn->get_resource(resource, ¶ms, nullptr, out, &in); + if (op_ret < 0) { + ldout(s->cct, 0) << "ERROR: failed to fetch resource (r=" << resource << ", ret=" << op_ret << ")" << dendl; + return; + } + + ldout(s->cct, 20) << "response: " << string(out.c_str(), out.length()) << dendl; + + JSONParser jparser; + if (!jparser.parse(out.c_str(), out.length())) { + ldout(s->cct, 0) << "ERROR: failed to parser elasticsearch response" << dendl; + op_ret = -EINVAL; + return; + } + + try { + decode_json_obj(response, &jparser); + } catch (JSONDecoder::err& e) { + ldout(s->cct, 0) << "ERROR: failed to decode JSON input: " << e.message << dendl; + op_ret = -EINVAL; + return; + } + } class RGWMetadataSearch_ObjStore_S3 : public RGWMetadataSearchOp { @@ -72,8 +200,21 @@ public: return; } - // TODO - + s->formatter->open_object_section("SearchMetadataResponse"); + for (auto& i : response.hits.hits) { + es_index_obj_response& e = i.source; + s->formatter->open_object_section("Contents"); + s->formatter->dump_string("Key", e.key.name); + string instance = (!e.key.instance.empty() ? e.key.instance : "null"); + s->formatter->dump_string("Instance", instance.c_str()); + dump_time(s, "LastModified", &e.meta.mtime); + s->formatter->dump_format("ETag", "\"%s\"", e.meta.etag.c_str()); + dump_owner(s, e.owner.get_id(), e.owner.get_display_name()); + s->formatter->close_section(); + rgw_flush_formatter(s, s->formatter); + }; + s->formatter->close_section(); + rgw_flush_formatter_and_reset(s, s->formatter); } };