From: Babu Shanmugam Date: Wed, 26 Jun 2013 13:26:33 +0000 (+0530) Subject: RESTful API implementation for replica_log X-Git-Tag: v0.67-rc1~128^2~18^2~12 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=641bd2fa385ec35b7d5505cc19eb6be0fe4fe9ee;p=ceph.git RESTful API implementation for replica_log Signed-off-by: Babu Shanmugam --- diff --git a/src/Makefile.am b/src/Makefile.am index 28ebb8331727..ffb416c35821 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -425,6 +425,7 @@ radosgw_SOURCES = \ rgw/rgw_replica_log.cc \ rgw/rgw_rest_log.cc \ rgw/rgw_rest_opstate.cc \ + rgw/rgw_rest_replica_log.cc \ rgw/rgw_http_client.cc \ rgw/rgw_swift.cc \ rgw/rgw_swift_auth.cc \ @@ -2154,8 +2155,9 @@ noinst_HEADERS = \ rgw/rgw_rest_conn.h\ rgw/rgw_tools.h\ rgw/rgw_rest_metadata.h\ - rgw/rgw_rest_log.h\ - rgw/rgw_rest_opstate.h\ + rgw/rgw_rest_log.h\ + rgw/rgw_rest_opstate.h\ + rgw/rgw_rest_replica_log.h\ rgw/rgw_usage.h\ rgw/rgw_user.h\ rgw/rgw_bucket.h\ diff --git a/src/rgw/rgw_main.cc b/src/rgw/rgw_main.cc index f2170929d6a9..289a269e5a4d 100644 --- a/src/rgw/rgw_main.cc +++ b/src/rgw/rgw_main.cc @@ -43,6 +43,8 @@ #include "rgw_rest_metadata.h" #include "rgw_rest_log.h" #include "rgw_rest_opstate.h" +#include "rgw_replica_log.h" +#include "rgw_rest_replica_log.h" #include "rgw_swift_auth.h" #include "rgw_swift.h" #include "rgw_log.h" @@ -508,6 +510,7 @@ int main(int argc, const char **argv) admin_resource->register_resource("metadata", new RGWRESTMgr_Metadata); admin_resource->register_resource("log", new RGWRESTMgr_Log); admin_resource->register_resource("opstate", new RGWRESTMgr_Opstate); + admin_resource->register_resource("replica_log", new RGWRESTMgr_ReplicaLog); rest.register_resource(g_conf->rgw_admin_entry, admin_resource); } diff --git a/src/rgw/rgw_rest_replica_log.cc b/src/rgw/rgw_rest_replica_log.cc new file mode 100644 index 000000000000..fc1ef9c52cb4 --- /dev/null +++ b/src/rgw/rgw_rest_replica_log.cc @@ -0,0 +1,407 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2013 eNovance SAS + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ +#include "common/ceph_json.h" +#include "common/strtol.h" +#include "rgw_rest.h" +#include "rgw_op.h" +#include "rgw_rest_s3.h" +#include "rgw_replica_log.h" +#include "rgw_metadata.h" +#include "rgw_bucket.h" +#include "rgw_rest_replica_log.h" +#include "rgw_client_io.h" +#include "common/errno.h" + +#define dout_subsys ceph_subsys_rgw +#define REPLICA_INPUT_MAX_LEN (512*1024) + +static int parse_to_utime(string& in, utime_t& out) { + struct tm tm; + + if (!parse_iso8601(in.c_str(), &tm)) + return -EINVAL; + + time_t tt = mktime(&tm); + out = utime_t(tt, 0); + return 0; +} + +static int parse_input_list(const char *data, int data_len, + const char *el_name, list >& out) { + JSONParser parser; + + if (!parser.parse(data, data_len)) { + return -EINVAL; + } + if (!parser.is_array()) { + dout(5) << "Should have been an array" << dendl; + return -EINVAL; + } + + vector l; + + l = parser.get_array_elements(); + for (vector::iterator it = l.begin(); + it != l.end(); it++) { + JSONParser el_parser; + + if (!el_parser.parse((*it).c_str(), (*it).length())) { + dout(5) << "Error parsing an array element" << dendl; + return -EINVAL; + } + + string name, time; + + JSONDecoder::decode_json(el_name, name, (JSONObj *)&el_parser); + JSONDecoder::decode_json("time", time, (JSONObj *)&el_parser); + + utime_t ut; + if (parse_to_utime(time, ut) < 0) { + return -EINVAL; + } + out.push_back(make_pair(name, ut)); + } + + return 0; +} + +static int get_input_list(req_state *s, const char *element_name, list >& out) { + int rv, data_len; + char *data; + + if ((rv = rgw_rest_read_all_input(s, &data, &data_len, REPLICA_INPUT_MAX_LEN)) < 0) { + dout(5) << "Error - reading input data - " << rv << dendl; + return rv; + } + + if ((rv = parse_input_list(data, data_len, element_name, out)) < 0) { + dout(5) << "Error parsing input list - " << rv << dendl; + return rv; + } + + free(data); + return 0; +} + +static void item_encode_json(const char *name, + const char *el_name, + cls_replica_log_item_marker& val, + Formatter *f) { + f->open_object_section(name); + f->dump_string(el_name, val.item_name); + encode_json("time", val.item_timestamp, f); + f->close_section(); +} + +static void progress_encode_json(const char *name, + const char *sub_array_name, + const char *sub_array_el_name, + cls_replica_log_progress_marker &val, + Formatter *f) { + f->open_object_section(name); + f->dump_string("daemon_id", val.entity_id); + f->dump_string("marker", val.position_marker); + encode_json("time", val.position_time, f); + + f->open_array_section(sub_array_name); + for (list::iterator it = val.items.begin(); + it != val.items.end(); it++) { + cls_replica_log_item_marker& entry = (*it); + + item_encode_json(sub_array_name, sub_array_el_name, entry, f); + } + f->close_section(); + f->close_section(); +} + +void RGWOp_OBJLog_SetBounds::execute() { + string id_str = s->info.args.get("id"), + marker = s->info.args.get("marker"), + time = s->info.args.get("time"), + daemon_id = s->info.args.get("daemon_id"); + + if (id_str.empty() || + marker.empty() || + time.empty() || + daemon_id.empty()) { + dout(5) << "Error - invalid parameter list" << dendl; + http_ret = -EINVAL; + return; + } + + int shard; + string err; + utime_t ut; + + shard = (int)strict_strtol(id_str.c_str(), 10, &err); + if (!err.empty()) { + dout(5) << "Error parsing id parameter - " << id_str << ", err " << err << dendl; + http_ret = -EINVAL; + return; + } + + if (parse_to_utime(time, ut) < 0) { + http_ret = -EINVAL; + return; + } + + string pool; + RGWReplicaObjectLogger rl(store, pool, prefix); + bufferlist bl; + list > entries; + + if ((http_ret = get_input_list(s, "bucket", entries)) < 0) { + return; + } + + http_ret = rl.update_bound(shard, daemon_id, marker, ut, &entries); +} + +void RGWOp_OBJLog_GetBounds::execute() { + string id = s->info.args.get("id"); + + if (id.empty()) { + dout(5) << " Error - invalid parameter list" << dendl; + http_ret = -EINVAL; + return; + } + + int shard; + string err; + + shard = (int)strict_strtol(id.c_str(), 10, &err); + if (!err.empty()) { + dout(5) << "Error parsing id parameter - " << id << ", err " << err << dendl; + http_ret = -EINVAL; + return; + } + + string pool; + RGWReplicaObjectLogger rl(store, pool, prefix); + http_ret = rl.get_bounds(shard, lowest_bound, oldest_time, entries); +} + +void RGWOp_OBJLog_GetBounds::send_response() { + set_req_state_err(s, http_ret); + dump_errno(s); + end_header(s); + + if (http_ret < 0) + return; + + s->formatter->open_object_section("container"); + s->formatter->open_array_section("items"); + for (list::iterator it = entries.begin(); + it != entries.end(); it++) { + cls_replica_log_progress_marker entry = (*it); + progress_encode_json("entry", "buckets", "bucket", entry, s->formatter); + flusher.flush(); + } + s->formatter->close_section(); + s->formatter->dump_string("lowest_bound", lowest_bound); + encode_json("oldest_time", oldest_time, s->formatter); + s->formatter->close_section(); + flusher.flush(); +} + +void RGWOp_OBJLog_DeleteBounds::execute() { + string id = s->info.args.get("id"), + daemon_id = s->info.args.get("daemon_id"); + + if (id.empty() || + daemon_id.empty()) { + dout(5) << "Error - invalid parameter list" << dendl; + http_ret = -EINVAL; + return; + } + + int shard; + string err; + + shard = (int)strict_strtol(id.c_str(), 10, &err); + if (!err.empty()) { + dout(5) << "Error parsing id parameter - " << id << ", err " << err << dendl; + http_ret = -EINVAL; + } + + string pool; + RGWReplicaObjectLogger rl(store, pool, prefix); + http_ret = rl.delete_bound(shard, daemon_id); +} + +static int bucket_name_to_bucket(RGWRados *store, string& bucket_str, rgw_bucket& bucket) { + RGWBucketInfo bucket_info; + RGWObjVersionTracker objv_tracker; + time_t mtime; + + int r = store->get_bucket_info(NULL, bucket_str, bucket_info, &objv_tracker, &mtime); + if (r < 0) { + dout(5) << "could not get bucket info for bucket=" << bucket_str << dendl; + return -EINVAL; + } + + bucket = bucket_info.bucket; + return 0; +} + +void RGWOp_BILog_SetBounds::execute() { + string bucket_str = s->info.args.get("bucket"), + marker = s->info.args.get("marker"), + time = s->info.args.get("time"), + daemon_id = s->info.args.get("daemon_id"); + + if (bucket_str.empty() || + marker.empty() || + time.empty() || + daemon_id.empty()) { + dout(5) << "Error - invalid parameter list" << dendl; + http_ret = -EINVAL; + return; + } + + utime_t ut; + + if (parse_to_utime(time, ut) < 0) { + http_ret = -EINVAL; + return; + } + + rgw_bucket bucket; + if ((http_ret = bucket_name_to_bucket(store, bucket_str, bucket)) < 0) + return; + + RGWReplicaBucketLogger rl(store); + bufferlist bl; + list > entries; + + if ((http_ret = get_input_list(s, "object", entries)) < 0) { + return; + } + + http_ret = rl.update_bound(bucket, daemon_id, marker, ut, &entries); +} + +void RGWOp_BILog_GetBounds::execute() { + string bucket_str = s->info.args.get("bucket"); + + if (bucket_str.empty()) { + dout(5) << " Error - invalid parameter list" << dendl; + http_ret = -EINVAL; + return; + } + + rgw_bucket bucket; + if ((http_ret = bucket_name_to_bucket(store, bucket_str, bucket)) < 0) + return; + + RGWReplicaBucketLogger rl(store); + http_ret = rl.get_bounds(bucket, lowest_bound, oldest_time, entries); +} + +void RGWOp_BILog_GetBounds::send_response() { + set_req_state_err(s, http_ret); + dump_errno(s); + end_header(s); + + if (http_ret < 0) + return; + + s->formatter->open_object_section("container"); + s->formatter->open_array_section("entries"); + for (list::iterator it = entries.begin(); + it != entries.end(); it++) { + cls_replica_log_progress_marker entry = (*it); + progress_encode_json("entry", "objects", "object", entry, s->formatter); + flusher.flush(); + } + s->formatter->close_section(); + s->formatter->dump_string("lowest_bound", lowest_bound); + encode_json("oldest_time", oldest_time, s->formatter); + s->formatter->close_section(); + flusher.flush(); +} + +void RGWOp_BILog_DeleteBounds::execute() { + string bucket_str = s->info.args.get("bucket"), + daemon_id = s->info.args.get("daemon_id"); + + if (bucket_str.empty() || + daemon_id.empty()) { + dout(5) << "Error - invalid parameter list" << dendl; + http_ret = -EINVAL; + return; + } + + rgw_bucket bucket; + if ((http_ret = bucket_name_to_bucket(store, bucket_str, bucket)) < 0) + return; + + RGWReplicaBucketLogger rl(store); + http_ret = rl.delete_bound(bucket, daemon_id); +} + +RGWOp *RGWHandler_ReplicaLog::op_get() { + bool exists; + string type = s->info.args.get("type", &exists); + + if (!exists) { + return NULL; + } + + if (type.compare("metadata") == 0) { + return new RGWOp_OBJLog_GetBounds(META_REPLICA_LOG_OBJ_PREFIX, "mdlog"); + } else if (type.compare("bucket-index") == 0) { + return new RGWOp_BILog_GetBounds; + } else if (type.compare("data") == 0) { + return new RGWOp_OBJLog_GetBounds(DATA_REPLICA_LOG_OBJ_PREFIX, "datalog"); + } + return NULL; +} + +RGWOp *RGWHandler_ReplicaLog::op_delete() { + bool exists; + string type = s->info.args.get("type", &exists); + + if (!exists) { + return NULL; + } + + if (type.compare("metadata") == 0) + return new RGWOp_OBJLog_DeleteBounds(META_REPLICA_LOG_OBJ_PREFIX, "mdlog"); + else if (type.compare("bucket-index") == 0) + return new RGWOp_BILog_DeleteBounds; + else if (type.compare("data") == 0) + return new RGWOp_OBJLog_DeleteBounds(DATA_REPLICA_LOG_OBJ_PREFIX, "datalog"); + + return NULL; +} + +RGWOp *RGWHandler_ReplicaLog::op_post() { + bool exists; + string type = s->info.args.get("type", &exists); + + if (!exists) { + return NULL; + } + + if (type.compare("metadata") == 0) { + return new RGWOp_OBJLog_SetBounds(META_REPLICA_LOG_OBJ_PREFIX, "mdlog"); + } else if (type.compare("bucket-index") == 0) { + return new RGWOp_BILog_SetBounds; + } else if (type.compare("data") == 0) { + return new RGWOp_OBJLog_SetBounds(DATA_REPLICA_LOG_OBJ_PREFIX, "datalog"); + } + return NULL; +} + diff --git a/src/rgw/rgw_rest_replica_log.h b/src/rgw/rgw_rest_replica_log.h new file mode 100644 index 000000000000..dacabbe4f7ce --- /dev/null +++ b/src/rgw/rgw_rest_replica_log.h @@ -0,0 +1,158 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2013 eNovance SAS + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ +#ifndef CEPH_RGW_REST_REPLICA_LOG_H +#define CEPH_RGW_REST_REPLICA_LOG_H + +class RGWOp_OBJLog_GetBounds : public RGWRESTOp { + string prefix; + string obj_type; + utime_t oldest_time; + string lowest_bound; + list entries; + +public: + RGWOp_OBJLog_GetBounds(const char *_prefix, const char *type) + : prefix(_prefix), obj_type(type){} + ~RGWOp_OBJLog_GetBounds() {} + + int check_caps(RGWUserCaps& caps) { + return caps.check_cap(obj_type.c_str(), RGW_CAP_READ); + } + int verify_permission() { + return check_caps(s->user.caps); + } + void execute(); + virtual void send_response(); + virtual const char *name() { + string s = "replica"; + s.append(obj_type); + s.append("_getbounds"); + return s.c_str(); + } +}; + +class RGWOp_OBJLog_SetBounds : public RGWRESTOp { + string prefix; + string obj_type; +public: + RGWOp_OBJLog_SetBounds(const char *_prefix, const char *type) + : prefix(_prefix), obj_type(type){} + ~RGWOp_OBJLog_SetBounds() {} + + int check_caps(RGWUserCaps& caps) { + return caps.check_cap(obj_type.c_str(), RGW_CAP_WRITE); + } + void execute(); + virtual const char *name() { + string s = "replica"; + s.append(obj_type); + s.append("_updatebounds"); + return s.c_str(); + } +}; + +class RGWOp_OBJLog_DeleteBounds : public RGWRESTOp { + string prefix; + string obj_type; +public: + RGWOp_OBJLog_DeleteBounds(const char *_prefix, const char *type) + : prefix(_prefix), obj_type(type){} + ~RGWOp_OBJLog_DeleteBounds() {} + + int check_caps(RGWUserCaps& caps) { + return caps.check_cap(obj_type.c_str(), RGW_CAP_WRITE); + } + void execute(); + virtual const char *name() { + string s = "replica"; + s.append(obj_type); + s.append("_deletebound"); + return s.c_str(); + } +}; + +class RGWOp_BILog_GetBounds : public RGWRESTOp { + utime_t oldest_time; + string lowest_bound; + list entries; +public: + RGWOp_BILog_GetBounds() {} + ~RGWOp_BILog_GetBounds() {} + + int check_caps(RGWUserCaps& caps) { + return caps.check_cap("bilog", RGW_CAP_READ); + } + int verify_permission() { + return check_caps(s->user.caps); + } + void execute(); + virtual void send_response(); + virtual const char *name() { + return "replicabilog_getbounds"; + } +}; + +class RGWOp_BILog_SetBounds : public RGWRESTOp { +public: + RGWOp_BILog_SetBounds() {} + ~RGWOp_BILog_SetBounds() {} + + int check_caps(RGWUserCaps& caps) { + return caps.check_cap("bilog", RGW_CAP_WRITE); + } + void execute(); + virtual const char *name() { + return "replicabilog_updatebounds"; + } +}; + +class RGWOp_BILog_DeleteBounds : public RGWRESTOp { +public: + RGWOp_BILog_DeleteBounds() {} + ~RGWOp_BILog_DeleteBounds() {} + + int check_caps(RGWUserCaps& caps) { + return caps.check_cap("bilog", RGW_CAP_WRITE); + } + void execute(); + virtual const char *name() { + return "replicabilog_deletebound"; + } +}; + +class RGWHandler_ReplicaLog : public RGWHandler_Auth_S3 { +protected: + RGWOp *op_get(); + RGWOp *op_delete(); + RGWOp *op_post(); + + int read_permissions(RGWOp*) { + return 0; + } +public: + RGWHandler_ReplicaLog() : RGWHandler_Auth_S3() {} + virtual ~RGWHandler_ReplicaLog() {} +}; + +class RGWRESTMgr_ReplicaLog : public RGWRESTMgr { +public: + RGWRESTMgr_ReplicaLog() {} + virtual ~RGWRESTMgr_ReplicaLog() {} + + virtual RGWHandler *get_handler(struct req_state *s){ + return new RGWHandler_ReplicaLog; + } +}; + +#endif /*!CEPH_RGW_REST_REPLICA_LOG_H*/