rgw_data_sync.cc
rgw_sync_module.cc
rgw_sync_module_es.cc
+ rgw_sync_module_es_rest.cc
rgw_sync_module_log.cc
rgw_period_history.cc
rgw_period_puller.cc
/* rgw specific */
RGW_OP_ADMIN_SET_METADATA,
RGW_OP_GET_OBJ_LAYOUT,
-
- RGW_OP_BULK_UPLOAD
+ RGW_OP_BULK_UPLOAD,
+ RGW_OP_METADATA_SEARCH,
};
class RGWAccessControlPolicy;
#include "common/ceph_json.h"
#include "rgw_common.h"
+#include "rgw_es_query.h"
using namespace std;
return true;
}
-class ESQueryStack {
- list<string> l;
- list<string>::iterator iter;
-
-public:
- ESQueryStack(list<string>& src) {
- assign(src);
- }
-
- ESQueryStack() {}
-
- void assign(list<string>& src) {
- l.swap(src);
- iter = l.begin();
- }
-
- bool peek(string *dest) {
- if (done()) {
- return false;
- }
- *dest = *iter;
- return true;
- }
-
- bool pop(string *dest) {
- bool valid = peek(dest);
- if (!valid) {
- return false;
- }
- ++iter;
- return true;
- }
-
- bool done() {
- return (iter == l.end());
- }
-};
-
class ESQueryNode {
public:
ESQueryNode() {}
return (c != ')');
}
-class ESInfixQueryParser {
- string query;
- int size;
- const char *str;
- int pos{0};
- list<string> args;
-
- void skip_whitespace(const char *str, int size, int& pos) {
- while (pos < size && isspace(str[pos])) {
- ++pos;
- }
+void ESInfixQueryParser::skip_whitespace(const char *str, int size, int& pos) {
+ while (pos < size && isspace(str[pos])) {
+ ++pos;
}
+}
- bool get_next_token(bool (*filter)(char)) {
- skip_whitespace(str, size, pos);
- int token_start = pos;
- while (pos < size && filter(str[pos])) {
- ++pos;
- }
- if (pos == token_start) {
- return false;
- }
- string token = string(str + token_start, pos - token_start);
- args.push_back(token);
- return true;
+bool ESInfixQueryParser::get_next_token(bool (*filter)(char)) {
+ skip_whitespace(str, size, pos);
+ int token_start = pos;
+ while (pos < size && filter(str[pos])) {
+ ++pos;
}
-
- bool parse_condition() {
- /*
- * condition: <key> <operator> <val>
- *
- * whereas key: needs to conform to http header field restrictions
- * operator: one of the following: < <= == >= >
- * val: ascii, terminated by either space or ')' (or end of string)
- */
-
- /* parse key */
- bool valid = get_next_token(is_key_char) &&
- get_next_token(is_op_char) &&
- get_next_token(is_val_char);
-
- if (!valid) {
- return false;
- }
-
- return true;
+ if (pos == token_start) {
+ return false;
}
+ string token = string(str + token_start, pos - token_start);
+ args.push_back(token);
+ return true;
+}
- bool parse_and_or() {
- skip_whitespace(str, size, pos);
- if (pos + 3 <= size && strncmp(str + pos, "and", 3) == 0) {
- pos += 3;
- args.push_back("and");
- return true;
- }
+bool ESInfixQueryParser::parse_condition() {
+ /*
+ * condition: <key> <operator> <val>
+ *
+ * whereas key: needs to conform to http header field restrictions
+ * operator: one of the following: < <= == >= >
+ * val: ascii, terminated by either space or ')' (or end of string)
+ */
- if (pos + 2 <= size && strncmp(str + pos, "or", 2) == 0) {
- pos += 2;
- args.push_back("or");
- return true;
- }
+ /* parse key */
+ bool valid = get_next_token(is_key_char) &&
+ get_next_token(is_op_char) &&
+ get_next_token(is_val_char);
+ if (!valid) {
return false;
}
- bool parse_specific_char(const char *pchar) {
- skip_whitespace(str, size, pos);
- if (pos >= size) {
- return false;
- }
- if (str[pos] != *pchar) {
- return false;
- }
+ return true;
+}
- args.push_back(pchar);
- ++pos;
+bool ESInfixQueryParser::parse_and_or() {
+ skip_whitespace(str, size, pos);
+ if (pos + 3 <= size && strncmp(str + pos, "and", 3) == 0) {
+ pos += 3;
+ args.push_back("and");
return true;
}
- bool parse_open_bracket() {
- return parse_specific_char("(");
+ if (pos + 2 <= size && strncmp(str + pos, "or", 2) == 0) {
+ pos += 2;
+ args.push_back("or");
+ return true;
}
- bool parse_close_bracket() {
- return parse_specific_char(")");
+ return false;
+}
+
+bool ESInfixQueryParser::parse_specific_char(const char *pchar) {
+ skip_whitespace(str, size, pos);
+ if (pos >= size) {
+ return false;
+ }
+ if (str[pos] != *pchar) {
+ return false;
}
-public:
- ESInfixQueryParser(const string& _query) : query(_query), size(query.size()), str(query.c_str()) {}
- bool parse(list<string> *result) {
- /*
- * expression: [(]<condition>[[and/or]<condition>][)][and/or]...
- */
-
- while (pos < size) {
- parse_open_bracket();
- if (!parse_condition()) {
- return false;
- }
- parse_close_bracket();
- parse_and_or();
- }
+ args.push_back(pchar);
+ ++pos;
+ return true;
+}
- result->swap(args);
+bool ESInfixQueryParser::parse_open_bracket() {
+ return parse_specific_char("(");
+}
- return true;
- }
-};
+bool ESInfixQueryParser::parse_close_bracket() {
+ return parse_specific_char(")");
+}
-class ESQueryCompiler {
- ESInfixQueryParser parser;
- ESQueryStack stack;
- ESQueryNode *query_root{nullptr};
+bool ESInfixQueryParser::parse(list<string> *result) {
+ /*
+ * expression: [(]<condition>[[and/or]<condition>][)][and/or]...
+ */
- bool convert(list<string>& infix) {
- list<string> prefix;
- if (!infix_to_prefix(infix, &prefix)) {
- return false;
- }
- stack.assign(prefix);
- if (!alloc_node(&stack, &query_root)) {
+ while (pos < size) {
+ parse_open_bracket();
+ if (!parse_condition()) {
return false;
}
- if (!stack.done()) {
- return false;
- }
- return true;
+ parse_close_bracket();
+ parse_and_or();
}
-public:
- ESQueryCompiler(const string& query) : parser(query) {}
- ~ESQueryCompiler() {
- delete query_root;
- }
+ result->swap(args);
- bool compile() {
- list<string> infix;
- if (!parser.parse(&infix)) {
- return false;
- }
+ return true;
+}
- if (!convert(infix)) {
- return false;
- }
+bool ESQueryCompiler::convert(list<string>& infix) {
+ list<string> prefix;
+ if (!infix_to_prefix(infix, &prefix)) {
+ return false;
+ }
+ stack.assign(prefix);
+ if (!alloc_node(&stack, &query_root)) {
+ return false;
+ }
+ if (!stack.done()) {
+ return false;
+ }
+ return true;
+}
- return true;
+ESQueryCompiler::~ESQueryCompiler() {
+ delete query_root;
+}
+
+bool ESQueryCompiler::compile() {
+ list<string> infix;
+ if (!parser.parse(&infix)) {
+ return false;
}
- void dump(Formatter *f) const {
- encode_json("query", *query_root, f);
+ if (!convert(infix)) {
+ return false;
}
-};
+
+ return true;
+}
+
+void ESQueryCompiler::dump(Formatter *f) const {
+ encode_json("query", *query_root, f);
+}
int main(int argc, char *argv[])
--- /dev/null
+#ifndef CEPH_RGW_ES_QUERY_H
+#define CEPH_RGW_ES_QUERY_H
+
+class ESQueryStack {
+ list<string> l;
+ list<string>::iterator iter;
+
+public:
+ ESQueryStack(list<string>& src) {
+ assign(src);
+ }
+
+ ESQueryStack() {}
+
+ void assign(list<string>& src) {
+ l.swap(src);
+ iter = l.begin();
+ }
+
+ bool peek(string *dest) {
+ if (done()) {
+ return false;
+ }
+ *dest = *iter;
+ return true;
+ }
+
+ bool pop(string *dest) {
+ bool valid = peek(dest);
+ if (!valid) {
+ return false;
+ }
+ ++iter;
+ return true;
+ }
+
+ bool done() {
+ return (iter == l.end());
+ }
+};
+
+class ESInfixQueryParser {
+ string query;
+ int size;
+ const char *str;
+ int pos{0};
+ list<string> args;
+
+ void skip_whitespace(const char *str, int size, int& pos);
+ bool get_next_token(bool (*filter)(char));
+
+ bool parse_condition();
+ bool parse_and_or();
+ bool parse_specific_char(const char *pchar);
+ bool parse_open_bracket();
+ bool parse_close_bracket();
+
+public:
+ ESInfixQueryParser(const string& _query) : query(_query), size(query.size()), str(query.c_str()) {}
+ bool parse(list<string> *result);
+};
+
+class ESQueryNode;
+
+class ESQueryCompiler {
+ ESInfixQueryParser parser;
+ ESQueryStack stack;
+ ESQueryNode *query_root{nullptr};
+
+ bool convert(list<string>& infix);
+
+public:
+ ESQueryCompiler(const string& query) : parser(query) {}
+ ~ESQueryCompiler();
+
+ bool compile();
+ void dump(Formatter *f) const;
+};
+
+
+#endif
return mgr;
}
+static RGWRESTMgr *rest_filter(RGWRados *store, int dialect, RGWRESTMgr *orig)
+{
+ RGWSyncModuleInstanceRef sync_module = store->get_sync_module();
+ return sync_module->get_rest_filter(dialect, orig);
+}
+
RGWRealmReloader *preloader = NULL;
static void reloader_handler(int signum)
const bool swift_at_root = g_conf->rgw_swift_url_prefix == "/";
if (apis_map.count("s3") > 0 || s3website_enabled) {
if (! swift_at_root) {
- rest.register_default_mgr(set_logging(new RGWRESTMgr_S3(s3website_enabled)));
+ rest.register_default_mgr(set_logging(rest_filter(store, RGW_REST_S3,
+ new RGWRESTMgr_S3(s3website_enabled))));
} else {
derr << "Cannot have the S3 or S3 Website enabled together with "
<< "Swift API placed in the root of hierarchy" << dendl;
if (! swift_at_root) {
rest.register_resource(g_conf->rgw_swift_url_prefix,
- set_logging(swift_resource));
+ set_logging(rest_filter(store, RGW_REST_SWIFT,
+ swift_resource)));
} else {
if (store->get_zonegroup().zones.size() > 1) {
derr << "Placing Swift API in the root of URL hierarchy while running"
return e;
}
-static void rgw_bucket_object_pre_exec(struct req_state *s)
+void rgw_bucket_object_pre_exec(struct req_state *s)
{
if (s->expect_cont)
dump_continue(s);
};
+
+void rgw_bucket_object_pre_exec(struct req_state *s);
+
/**
* Provide the base class for all ops.
*/
RGWOp *RGWHandler_REST_Bucket_S3::get_obj_op(bool get_data)
{
// Non-website mode
- if (get_data)
+ if (get_data) {
return new RGWListBucket_ObjStore_S3;
- else
+ } else {
return new RGWStatBucket_ObjStore_S3;
+ }
}
RGWOp *RGWHandler_REST_Bucket_S3::op_get()
rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) = 0;
};
+class RGWRESTMgr;
+
class RGWSyncModuleInstance {
public:
RGWSyncModuleInstance() {}
virtual ~RGWSyncModuleInstance() {}
virtual RGWDataSyncModule *get_data_handler() = 0;
+ virtual RGWRESTMgr *get_rest_filter(int dialect, RGWRESTMgr *orig) {
+ return orig;
+ }
};
typedef std::shared_ptr<RGWSyncModuleInstance> RGWSyncModuleInstanceRef;
#include "rgw_data_sync.h"
#include "rgw_boost_asio_yield.h"
#include "rgw_sync_module_es.h"
+#include "rgw_sync_module_es_rest.h"
#include "rgw_rest_conn.h"
#include "rgw_cr_rest.h"
+#include "rgw_op.h"
#define dout_subsys ceph_subsys_rgw
<< " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl;
return NULL;
}
+ RGWRESTConn *get_rest_conn() {
+ return conf.conn;
+ }
};
-class RGWElasticSyncModuleInstance : public RGWSyncModuleInstance {
- RGWElasticDataSyncModule data_handler;
-public:
- RGWElasticSyncModuleInstance(CephContext *cct, const string& endpoint) : data_handler(cct, endpoint) {}
- RGWDataSyncModule *get_data_handler() override {
- return &data_handler;
+RGWElasticSyncModuleInstance::RGWElasticSyncModuleInstance(CephContext *cct, const string& endpoint)
+{
+ data_handler = std::unique_ptr<RGWElasticDataSyncModule>(new RGWElasticDataSyncModule(cct, endpoint));
+}
+
+RGWDataSyncModule *RGWElasticSyncModuleInstance::get_data_handler()
+{
+ return data_handler.get();
+}
+
+RGWRESTConn *RGWElasticSyncModuleInstance::get_rest_conn()
+{
+ return data_handler->get_rest_conn();
+}
+
+RGWRESTMgr *RGWElasticSyncModuleInstance::get_rest_filter(int dialect, RGWRESTMgr *orig) {
+ if (dialect != RGW_REST_S3) {
+ return orig;
}
-};
+ return new RGWRESTMgr_MDSearch_S3(this);
+}
int RGWElasticSyncModule::create_instance(CephContext *cct, map<string, string>& config, RGWSyncModuleInstanceRef *instance) {
string endpoint;
int create_instance(CephContext *cct, map<string, string>& config, RGWSyncModuleInstanceRef *instance) override;
};
+class RGWElasticDataSyncModule;
+class RGWRESTConn;
+
+class RGWElasticSyncModuleInstance : public RGWSyncModuleInstance {
+ std::unique_ptr<RGWElasticDataSyncModule> data_handler;
+public:
+ RGWElasticSyncModuleInstance(CephContext *cct, const string& endpoint);
+ RGWDataSyncModule *get_data_handler() override;
+ RGWRESTMgr *get_rest_filter(int dialect, RGWRESTMgr *orig) override;
+ RGWRESTConn *get_rest_conn();
+};
+
#endif
--- /dev/null
+#include "rgw_sync_module_es.h"
+#include "rgw_sync_module_es_rest.h"
+#include "rgw_es_query.h"
+#include "rgw_op.h"
+#include "rgw_rest.h"
+#include "rgw_rest_s3.h"
+
+#define dout_context g_ceph_context
+#define dout_subsys ceph_subsys_rgw
+
+class RGWMetadataSearch : public RGWOp {
+protected:
+ string expression;
+
+public:
+ RGWMetadataSearch() {}
+
+ int verify_permission() {
+ return 0;
+ }
+ virtual int get_params() = 0;
+ void pre_exec();
+ void execute();
+
+ virtual void send_response() = 0;
+ virtual const string name() { return "metadata_search"; }
+ virtual RGWOpType get_type() { return RGW_OP_METADATA_SEARCH; }
+ virtual uint32_t op_mask() { return RGW_OP_TYPE_READ; }
+};
+
+void RGWMetadataSearch::pre_exec()
+{
+ rgw_bucket_object_pre_exec(s);
+}
+
+
+void RGWMetadataSearch::execute()
+{
+ op_ret = get_params();
+ if (op_ret < 0)
+ return;
+
+ ESQueryCompiler es_query(expression);
+
+ bool valid = es_query.compile();
+ if (!valid) {
+ ldout(s->cct, 10) << "invalid query, failed generating request json" << dendl;
+ op_ret = -EINVAL;
+ return;
+ }
+
+}
+
+class RGWMetadataSearch_ObjStore_S3 : public RGWMetadataSearch {
+public:
+ RGWMetadataSearch_ObjStore_S3() {}
+
+ int get_params() override {
+ expression = s->info.args.get("query");
+ return 0;
+ }
+ void send_response() override {
+ if (op_ret)
+ set_req_state_err(s, op_ret);
+ dump_errno(s);
+ end_header(s, this, "application/xml");
+
+ if (op_ret < 0) {
+ return;
+ }
+
+ // TODO
+
+ }
+};
+
+class RGWHandler_REST_MDSearch_S3 : public RGWHandler_REST_S3 {
+protected:
+ RGWOp *op_get() {
+ if (!s->info.args.exists("query")) {
+ return nullptr;
+ }
+ return new RGWMetadataSearch_ObjStore_S3;
+ }
+ RGWOp *op_head() {
+ return nullptr;
+ }
+ RGWOp *op_post() {
+ return nullptr;
+ }
+public:
+ RGWHandler_REST_MDSearch_S3() {}
+ virtual ~RGWHandler_REST_MDSearch_S3() {}
+};
+
+
+RGWHandler_REST* RGWRESTMgr_MDSearch_S3::get_handler(struct req_state* const s,
+ const rgw::auth::StrategyRegistry& auth_registry,
+ const std::string& frontend_prefix)
+{
+ int ret =
+ RGWHandler_REST_S3::init_from_header(s,
+ RGW_FORMAT_XML, true);
+ if (ret < 0) {
+ return nullptr;
+ }
+
+ if (!s->object.empty()) {
+ return nullptr;
+ }
+
+ RGWHandler_REST *handler = new RGWHandler_REST_MDSearch_S3;
+
+ ldout(s->cct, 20) << __func__ << " handler=" << typeid(*handler).name()
+ << dendl;
+ return handler;
+}
+
--- /dev/null
+#ifndef CEPH_RGW_SYNC_MODULE_ES_REST_H
+#define CEPH_RGW_SYNC_MODULE_ES_REST_H
+
+#include "rgw_rest.h"
+
+class RGWElasticSyncModuleInstance;
+
+class RGWRESTMgr_MDSearch_S3 : public RGWRESTMgr {
+ RGWElasticSyncModuleInstance *es_module;
+public:
+ explicit RGWRESTMgr_MDSearch_S3(RGWElasticSyncModuleInstance *_es_module) : es_module(_es_module) {}
+
+ RGWHandler_REST *get_handler(struct req_state* s,
+ const rgw::auth::StrategyRegistry& auth_registry,
+ const std::string& frontend_prefix) override;
+};
+
+#endif