/*
* init request, will be used later with RGWHTTPManager
*/
-int RGWHTTPClient::init_request(const char *method, const char *url, rgw_http_req_data *_req_data)
+int RGWHTTPClient::init_request(const char *method, const char *url, rgw_http_req_data *_req_data, bool send_data_hint)
{
assert(!req_data);
_req_data->get();
}
curl_easy_setopt(easy_handle, CURLOPT_READFUNCTION, send_http_data);
curl_easy_setopt(easy_handle, CURLOPT_READDATA, (void *)req_data);
- if (is_upload_request(method)) {
+ if (send_data_hint || is_upload_request(method)) {
curl_easy_setopt(easy_handle, CURLOPT_UPLOAD, 1L);
}
if (has_send_len) {
}
}
-int RGWHTTPManager::add_request(RGWHTTPClient *client, const char *method, const char *url)
+int RGWHTTPManager::add_request(RGWHTTPClient *client, const char *method, const char *url, bool send_data_hint)
{
rgw_http_req_data *req_data = new rgw_http_req_data;
- int ret = client->init_request(method, url, req_data);
+ int ret = client->init_request(method, url, req_data, send_data_hint);
if (ret < 0) {
req_data->put();
req_data = NULL;
#define dout_context g_ceph_context
#define dout_subsys ceph_subsys_rgw
+struct es_index_obj_response {
+ string bucket;
+ rgw_obj_key key;
+ ACLOwner owner;
+ set<string> read_permissions;
+
+ struct {
+ uint64_t size;
+ ceph::real_time mtime;
+ string etag;
+ map<string, string> custom_str;
+
+ template <class T>
+ struct _custom_entry {
+ string name;
+ T value;
+ void decode_json(JSONObj *obj) {
+ JSONDecoder::decode_json("name", name, obj);
+ JSONDecoder::decode_json("value", value, obj);
+ }
+ };
+
+ void decode_json(JSONObj *obj) {
+ JSONDecoder::decode_json("size", size, obj);
+ string mtime_str;
+ JSONDecoder::decode_json("mtime", mtime_str, obj);
+ parse_time(mtime_str.c_str(), &mtime);
+ JSONDecoder::decode_json("etag", etag, obj);
+ list<_custom_entry<string> > str_entries;
+ JSONDecoder::decode_json("custom-string", str_entries, obj);
+ for (auto& e : str_entries) {
+ custom_str[e.name] = e.value;
+ }
+ }
+ } meta;
+
+ void decode_json(JSONObj *obj) {
+ JSONDecoder::decode_json("bucket", bucket, obj);
+ JSONDecoder::decode_json("name", key.name, obj);
+ JSONDecoder::decode_json("instance", key.instance, obj);
+ JSONDecoder::decode_json("permissions", read_permissions, obj);
+ JSONDecoder::decode_json("meta", meta, obj);
+ }
+};
+
+struct es_search_response {
+ uint32_t took;
+ bool timed_out;
+ struct {
+ uint32_t total;
+ uint32_t successful;
+ uint32_t failed;
+ void decode_json(JSONObj *obj) {
+ JSONDecoder::decode_json("total", total, obj);
+ JSONDecoder::decode_json("successful", successful, obj);
+ JSONDecoder::decode_json("failed", failed, obj);
+ }
+ } shards;
+ struct obj_hit {
+ string index;
+ string type;
+ string id;
+ // double score
+ es_index_obj_response source;
+ void decode_json(JSONObj *obj) {
+ JSONDecoder::decode_json("_index", index, obj);
+ JSONDecoder::decode_json("_type", type, obj);
+ JSONDecoder::decode_json("_id", id, obj);
+ JSONDecoder::decode_json("_source", source, obj);
+ }
+ };
+ struct {
+ uint32_t total;
+ // double max_score;
+ list<obj_hit> hits;
+ void decode_json(JSONObj *obj) {
+ JSONDecoder::decode_json("total", total, obj);
+ // JSONDecoder::decode_json("max_score", max_score, obj);
+ JSONDecoder::decode_json("hits", hits, obj);
+ }
+ } hits;
+ void decode_json(JSONObj *obj) {
+ JSONDecoder::decode_json("took", took, obj);
+ JSONDecoder::decode_json("timed_out", timed_out, obj);
+ JSONDecoder::decode_json("_shards", shards, obj);
+ JSONDecoder::decode_json("hits", hits, obj);
+ }
+};
+
class RGWMetadataSearchOp : public RGWOp {
RGWElasticSyncModuleInstance *es_module;
protected:
string expression;
+ es_search_response response;
+
public:
RGWMetadataSearchOp(RGWElasticSyncModuleInstance *_es_module) : es_module(_es_module) {}
rgw_bucket_object_pre_exec(s);
}
-
void RGWMetadataSearchOp::execute()
{
op_ret = get_params();
return;
}
+ JSONFormatter f;
+ encode_json("root", es_query, &f);
+
RGWRESTConn *conn = es_module->get_rest_conn();
- // conn->
+
+ bufferlist in;
+ bufferlist out;
+
+ stringstream ss;
+
+ f.flush(ss);
+ in.append(ss.str());
+
+ string resource = es_module->get_index_path(store->get_realm()) + "/_search";
+ param_vec_t params;
+// params.push_back(param_pair_t("size", size));
+ ldout(s->cct, 20) << "sending request to elasticsearch, payload=" << string(in.c_str(), in.length()) << dendl;
+ op_ret = conn->get_resource(resource, ¶ms, nullptr, out, &in);
+ if (op_ret < 0) {
+ ldout(s->cct, 0) << "ERROR: failed to fetch resource (r=" << resource << ", ret=" << op_ret << ")" << dendl;
+ return;
+ }
+
+ ldout(s->cct, 20) << "response: " << string(out.c_str(), out.length()) << dendl;
+
+ JSONParser jparser;
+ if (!jparser.parse(out.c_str(), out.length())) {
+ ldout(s->cct, 0) << "ERROR: failed to parser elasticsearch response" << dendl;
+ op_ret = -EINVAL;
+ return;
+ }
+
+ try {
+ decode_json_obj(response, &jparser);
+ } catch (JSONDecoder::err& e) {
+ ldout(s->cct, 0) << "ERROR: failed to decode JSON input: " << e.message << dendl;
+ op_ret = -EINVAL;
+ return;
+ }
+
}
class RGWMetadataSearch_ObjStore_S3 : public RGWMetadataSearchOp {
return;
}
- // TODO
-
+ s->formatter->open_object_section("SearchMetadataResponse");
+ for (auto& i : response.hits.hits) {
+ es_index_obj_response& e = i.source;
+ s->formatter->open_object_section("Contents");
+ s->formatter->dump_string("Key", e.key.name);
+ string instance = (!e.key.instance.empty() ? e.key.instance : "null");
+ s->formatter->dump_string("Instance", instance.c_str());
+ dump_time(s, "LastModified", &e.meta.mtime);
+ s->formatter->dump_format("ETag", "\"%s\"", e.meta.etag.c_str());
+ dump_owner(s, e.owner.get_id(), e.owner.get_display_name());
+ s->formatter->close_section();
+ rgw_flush_formatter(s, s->formatter);
+ };
+ s->formatter->close_section();
+ rgw_flush_formatter_and_reset(s, s->formatter);
}
};