]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
RESTful API implementation for replica_log
authorBabu Shanmugam <anbu@enovance.com>
Wed, 26 Jun 2013 13:26:33 +0000 (18:56 +0530)
committerYehuda Sadeh <yehuda@inktank.com>
Tue, 2 Jul 2013 19:38:28 +0000 (12:38 -0700)
Signed-off-by: Babu Shanmugam <anbu@enovance.com>
src/Makefile.am
src/rgw/rgw_main.cc
src/rgw/rgw_rest_replica_log.cc [new file with mode: 0644]
src/rgw/rgw_rest_replica_log.h [new file with mode: 0644]

index 28ebb8331727dbcf0d7072e5e0afc2e0df6f4be7..ffb416c35821215e820d85aa2b73a4dfc330c4d2 100644 (file)
@@ -425,6 +425,7 @@ radosgw_SOURCES = \
         rgw/rgw_replica_log.cc \
         rgw/rgw_rest_log.cc \
         rgw/rgw_rest_opstate.cc \
+        rgw/rgw_rest_replica_log.cc \
         rgw/rgw_http_client.cc \
         rgw/rgw_swift.cc \
        rgw/rgw_swift_auth.cc \
@@ -2154,8 +2155,9 @@ noinst_HEADERS = \
        rgw/rgw_rest_conn.h\
        rgw/rgw_tools.h\
        rgw/rgw_rest_metadata.h\
-  rgw/rgw_rest_log.h\
-  rgw/rgw_rest_opstate.h\
+       rgw/rgw_rest_log.h\
+       rgw/rgw_rest_opstate.h\
+       rgw/rgw_rest_replica_log.h\
        rgw/rgw_usage.h\
        rgw/rgw_user.h\
        rgw/rgw_bucket.h\
index f2170929d6a9bbbf2932707d9cb81095c9ade730..289a269e5a4de09383d0fa8923be0fd71d52fa9e 100644 (file)
@@ -43,6 +43,8 @@
 #include "rgw_rest_metadata.h"
 #include "rgw_rest_log.h"
 #include "rgw_rest_opstate.h"
+#include "rgw_replica_log.h"
+#include "rgw_rest_replica_log.h"
 #include "rgw_swift_auth.h"
 #include "rgw_swift.h"
 #include "rgw_log.h"
@@ -508,6 +510,7 @@ int main(int argc, const char **argv)
     admin_resource->register_resource("metadata", new RGWRESTMgr_Metadata);
     admin_resource->register_resource("log", new RGWRESTMgr_Log);
     admin_resource->register_resource("opstate", new RGWRESTMgr_Opstate);
+    admin_resource->register_resource("replica_log", new RGWRESTMgr_ReplicaLog);
     rest.register_resource(g_conf->rgw_admin_entry, admin_resource);
   }
 
diff --git a/src/rgw/rgw_rest_replica_log.cc b/src/rgw/rgw_rest_replica_log.cc
new file mode 100644 (file)
index 0000000..fc1ef9c
--- /dev/null
@@ -0,0 +1,407 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2013 eNovance SAS <licensing@enovance.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+#include "common/ceph_json.h"
+#include "common/strtol.h"
+#include "rgw_rest.h"
+#include "rgw_op.h"
+#include "rgw_rest_s3.h"
+#include "rgw_replica_log.h"
+#include "rgw_metadata.h"
+#include "rgw_bucket.h"
+#include "rgw_rest_replica_log.h"
+#include "rgw_client_io.h"
+#include "common/errno.h"
+
+#define dout_subsys ceph_subsys_rgw
+#define REPLICA_INPUT_MAX_LEN (512*1024)
+
+static int parse_to_utime(string& in, utime_t& out) {
+  struct tm tm;
+  
+  if (!parse_iso8601(in.c_str(), &tm)) 
+    return -EINVAL;
+
+  time_t tt = mktime(&tm);
+  out = utime_t(tt, 0);
+  return 0;
+}
+
+static int parse_input_list(const char *data, int data_len, 
+                            const char *el_name, list<pair<string, utime_t> >& out) {
+  JSONParser parser;
+
+  if (!parser.parse(data, data_len)) {
+    return -EINVAL;
+  }
+  if (!parser.is_array()) {
+    dout(5) << "Should have been an array" << dendl;
+    return -EINVAL;
+  }
+
+  vector<string> l;
+
+  l = parser.get_array_elements();
+  for (vector<string>::iterator it = l.begin();
+       it != l.end(); it++) {
+    JSONParser el_parser;
+
+    if (!el_parser.parse((*it).c_str(), (*it).length())) {
+      dout(5) << "Error parsing an array element" << dendl;
+      return -EINVAL;
+    }
+
+    string name, time;
+
+    JSONDecoder::decode_json(el_name, name, (JSONObj *)&el_parser);
+    JSONDecoder::decode_json("time", time, (JSONObj *)&el_parser);
+
+    utime_t ut;
+    if (parse_to_utime(time, ut) < 0) {
+      return -EINVAL;
+    }
+    out.push_back(make_pair(name, ut));
+  }
+
+  return 0;
+}
+
+static int get_input_list(req_state *s, const char *element_name, list<pair<string, utime_t> >& out) {
+  int rv, data_len;
+  char *data;
+
+  if ((rv = rgw_rest_read_all_input(s, &data, &data_len, REPLICA_INPUT_MAX_LEN)) < 0) {
+    dout(5) << "Error - reading input data - " << rv << dendl;
+    return rv;
+  }
+  
+  if ((rv = parse_input_list(data, data_len, element_name, out)) < 0) {
+    dout(5) << "Error parsing input list - " << rv << dendl;
+    return rv;
+  }
+
+  free(data);
+  return 0;
+}
+
+static void item_encode_json(const char *name, 
+                 const char *el_name,
+                 cls_replica_log_item_marker& val,
+                 Formatter *f) {
+  f->open_object_section(name);
+  f->dump_string(el_name, val.item_name);
+  encode_json("time", val.item_timestamp, f);
+  f->close_section();
+}
+
+static void progress_encode_json(const char *name, 
+                 const char *sub_array_name,
+                 const char *sub_array_el_name,
+                 cls_replica_log_progress_marker &val, 
+                 Formatter *f) {
+  f->open_object_section(name);
+  f->dump_string("daemon_id", val.entity_id);
+  f->dump_string("marker", val.position_marker);
+  encode_json("time", val.position_time, f);
+
+  f->open_array_section(sub_array_name);
+  for (list<cls_replica_log_item_marker>::iterator it = val.items.begin();
+       it != val.items.end(); it++) {
+    cls_replica_log_item_marker& entry = (*it);
+
+    item_encode_json(sub_array_name, sub_array_el_name, entry, f);
+  }
+  f->close_section();
+  f->close_section();
+}
+
+void RGWOp_OBJLog_SetBounds::execute() {
+  string id_str = s->info.args.get("id"),
+         marker = s->info.args.get("marker"),
+         time = s->info.args.get("time"),
+         daemon_id = s->info.args.get("daemon_id");
+
+  if (id_str.empty() ||
+      marker.empty() ||
+      time.empty() ||
+      daemon_id.empty()) {
+    dout(5) << "Error - invalid parameter list" << dendl;
+    http_ret = -EINVAL;
+    return;
+  }
+  
+  int shard;
+  string err;
+  utime_t ut;
+
+  shard = (int)strict_strtol(id_str.c_str(), 10, &err);
+  if (!err.empty()) {
+    dout(5) << "Error parsing id parameter - " << id_str << ", err " << err << dendl;
+    http_ret = -EINVAL;
+    return;
+  }
+  
+  if (parse_to_utime(time, ut) < 0) {
+    http_ret = -EINVAL;
+    return;
+  }
+
+  string pool;
+  RGWReplicaObjectLogger rl(store, pool, prefix);
+  bufferlist bl;
+  list<pair<string, utime_t> > entries;
+
+  if ((http_ret = get_input_list(s, "bucket", entries)) < 0) {
+    return;
+  }
+
+  http_ret = rl.update_bound(shard, daemon_id, marker, ut, &entries);
+}
+
+void RGWOp_OBJLog_GetBounds::execute() {
+  string id = s->info.args.get("id");
+
+  if (id.empty()) {
+    dout(5) << " Error - invalid parameter list" << dendl;
+    http_ret = -EINVAL;
+    return;
+  }
+
+  int shard;
+  string err;
+
+  shard = (int)strict_strtol(id.c_str(), 10, &err);
+  if (!err.empty()) {
+    dout(5) << "Error parsing id parameter - " << id << ", err " << err << dendl;
+    http_ret = -EINVAL;
+    return;
+  }
+  string pool;
+  RGWReplicaObjectLogger rl(store, pool, prefix);
+  http_ret = rl.get_bounds(shard, lowest_bound, oldest_time, entries);
+}
+
+void RGWOp_OBJLog_GetBounds::send_response() {
+  set_req_state_err(s, http_ret);
+  dump_errno(s);
+  end_header(s);
+
+  if (http_ret < 0)
+    return;
+
+  s->formatter->open_object_section("container");
+  s->formatter->open_array_section("items");
+  for (list<cls_replica_log_progress_marker>::iterator it = entries.begin();
+       it != entries.end(); it++) {
+    cls_replica_log_progress_marker entry = (*it);
+    progress_encode_json("entry", "buckets", "bucket", entry, s->formatter);
+    flusher.flush();
+  }
+  s->formatter->close_section();
+  s->formatter->dump_string("lowest_bound", lowest_bound);
+  encode_json("oldest_time", oldest_time, s->formatter);
+  s->formatter->close_section();
+  flusher.flush();
+}
+
+void RGWOp_OBJLog_DeleteBounds::execute() {
+  string id = s->info.args.get("id"),
+         daemon_id = s->info.args.get("daemon_id");
+
+  if (id.empty() ||
+      daemon_id.empty()) {
+    dout(5) << "Error - invalid parameter list" << dendl;
+    http_ret = -EINVAL;
+    return;
+  }
+  
+  int shard;
+  string err;
+
+  shard = (int)strict_strtol(id.c_str(), 10, &err);
+  if (!err.empty()) {
+    dout(5) << "Error parsing id parameter - " << id << ", err " << err << dendl;
+    http_ret = -EINVAL;
+  }
+  
+  string pool;
+  RGWReplicaObjectLogger rl(store, pool, prefix);
+  http_ret = rl.delete_bound(shard, daemon_id);
+}
+
+static int bucket_name_to_bucket(RGWRados *store, string& bucket_str, rgw_bucket& bucket) {
+  RGWBucketInfo bucket_info;
+  RGWObjVersionTracker objv_tracker;
+  time_t mtime;
+  
+  int r = store->get_bucket_info(NULL, bucket_str, bucket_info, &objv_tracker, &mtime);
+  if (r < 0) {
+    dout(5) << "could not get bucket info for bucket=" << bucket_str << dendl;
+    return -EINVAL;
+  }
+
+  bucket = bucket_info.bucket;
+  return 0;
+}
+
+void RGWOp_BILog_SetBounds::execute() {
+  string bucket_str = s->info.args.get("bucket"),
+         marker = s->info.args.get("marker"),
+         time = s->info.args.get("time"),
+         daemon_id = s->info.args.get("daemon_id");
+
+  if (bucket_str.empty() ||
+      marker.empty() ||
+      time.empty() ||
+      daemon_id.empty()) {
+    dout(5) << "Error - invalid parameter list" << dendl;
+    http_ret = -EINVAL;
+    return;
+  }
+  
+  utime_t ut;
+  
+  if (parse_to_utime(time, ut) < 0) {
+    http_ret = -EINVAL;
+    return;
+  }
+
+  rgw_bucket bucket;
+  if ((http_ret = bucket_name_to_bucket(store, bucket_str, bucket)) < 0) 
+    return;
+
+  RGWReplicaBucketLogger rl(store);
+  bufferlist bl;
+  list<pair<string, utime_t> > entries;
+
+  if ((http_ret = get_input_list(s, "object", entries)) < 0) {
+    return;
+  }
+
+  http_ret = rl.update_bound(bucket, daemon_id, marker, ut, &entries);
+}
+
+void RGWOp_BILog_GetBounds::execute() {
+  string bucket_str = s->info.args.get("bucket");
+
+  if (bucket_str.empty()) {
+    dout(5) << " Error - invalid parameter list" << dendl;
+    http_ret = -EINVAL;
+    return;
+  }
+
+  rgw_bucket bucket;
+  if ((http_ret = bucket_name_to_bucket(store, bucket_str, bucket)) < 0) 
+    return;
+
+  RGWReplicaBucketLogger rl(store);
+  http_ret = rl.get_bounds(bucket, lowest_bound, oldest_time, entries);
+}
+
+void RGWOp_BILog_GetBounds::send_response() {
+  set_req_state_err(s, http_ret);
+  dump_errno(s);
+  end_header(s);
+
+  if (http_ret < 0)
+    return;
+
+  s->formatter->open_object_section("container");
+  s->formatter->open_array_section("entries");
+  for (list<cls_replica_log_progress_marker>::iterator it = entries.begin();
+       it != entries.end(); it++) {
+    cls_replica_log_progress_marker entry = (*it);
+    progress_encode_json("entry", "objects", "object", entry, s->formatter);
+    flusher.flush();
+  }
+  s->formatter->close_section();
+  s->formatter->dump_string("lowest_bound", lowest_bound);
+  encode_json("oldest_time", oldest_time, s->formatter);
+  s->formatter->close_section();
+  flusher.flush();
+}
+
+void RGWOp_BILog_DeleteBounds::execute() {
+  string bucket_str = s->info.args.get("bucket"),
+         daemon_id = s->info.args.get("daemon_id");
+
+  if (bucket_str.empty() ||
+      daemon_id.empty()) {
+    dout(5) << "Error - invalid parameter list" << dendl;
+    http_ret = -EINVAL;
+    return;
+  }
+  
+  rgw_bucket bucket;
+  if ((http_ret = bucket_name_to_bucket(store, bucket_str, bucket)) < 0) 
+    return;
+  
+  RGWReplicaBucketLogger rl(store);
+  http_ret = rl.delete_bound(bucket, daemon_id);
+}
+
+RGWOp *RGWHandler_ReplicaLog::op_get() {
+  bool exists;
+  string type = s->info.args.get("type", &exists);
+
+  if (!exists) {
+    return NULL;
+  }
+
+  if (type.compare("metadata") == 0) {
+    return new RGWOp_OBJLog_GetBounds(META_REPLICA_LOG_OBJ_PREFIX, "mdlog");
+  } else if (type.compare("bucket-index") == 0) {
+    return new RGWOp_BILog_GetBounds;
+  } else if (type.compare("data") == 0) {
+    return new RGWOp_OBJLog_GetBounds(DATA_REPLICA_LOG_OBJ_PREFIX, "datalog");
+  }
+  return NULL;
+}
+
+RGWOp *RGWHandler_ReplicaLog::op_delete() {
+  bool exists;
+  string type = s->info.args.get("type", &exists);
+
+  if (!exists) {
+    return NULL;
+  }
+
+  if (type.compare("metadata") == 0)
+    return new RGWOp_OBJLog_DeleteBounds(META_REPLICA_LOG_OBJ_PREFIX, "mdlog");
+  else if (type.compare("bucket-index") == 0) 
+    return new RGWOp_BILog_DeleteBounds;
+  else if (type.compare("data") == 0)
+    return new RGWOp_OBJLog_DeleteBounds(DATA_REPLICA_LOG_OBJ_PREFIX, "datalog");
+  
+  return NULL;
+}
+
+RGWOp *RGWHandler_ReplicaLog::op_post() {
+  bool exists;
+  string type = s->info.args.get("type", &exists);
+
+  if (!exists) {
+    return NULL;
+  }
+
+  if (type.compare("metadata") == 0) {
+    return new RGWOp_OBJLog_SetBounds(META_REPLICA_LOG_OBJ_PREFIX, "mdlog");
+  } else if (type.compare("bucket-index") == 0) {
+    return new RGWOp_BILog_SetBounds;
+  } else if (type.compare("data") == 0) {
+    return new RGWOp_OBJLog_SetBounds(DATA_REPLICA_LOG_OBJ_PREFIX, "datalog");
+  }
+  return NULL;
+}
+
diff --git a/src/rgw/rgw_rest_replica_log.h b/src/rgw/rgw_rest_replica_log.h
new file mode 100644 (file)
index 0000000..dacabbe
--- /dev/null
@@ -0,0 +1,158 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2013 eNovance SAS <licensing@enovance.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+#ifndef CEPH_RGW_REST_REPLICA_LOG_H
+#define CEPH_RGW_REST_REPLICA_LOG_H
+
+class RGWOp_OBJLog_GetBounds : public RGWRESTOp {
+  string prefix;
+  string obj_type;
+  utime_t oldest_time;
+  string lowest_bound;
+  list<cls_replica_log_progress_marker> entries;
+
+public:
+  RGWOp_OBJLog_GetBounds(const char *_prefix, const char *type) 
+    : prefix(_prefix), obj_type(type){}
+  ~RGWOp_OBJLog_GetBounds() {}
+
+  int check_caps(RGWUserCaps& caps) {
+    return caps.check_cap(obj_type.c_str(), RGW_CAP_READ);
+  }
+  int verify_permission() {
+    return check_caps(s->user.caps);
+  }
+  void execute();
+  virtual void send_response();
+  virtual const char *name() {
+    string s = "replica";
+    s.append(obj_type);
+    s.append("_getbounds");
+    return s.c_str();
+  }
+};
+
+class RGWOp_OBJLog_SetBounds : public RGWRESTOp {
+  string prefix;
+  string obj_type;
+public:
+  RGWOp_OBJLog_SetBounds(const char *_prefix, const char *type) 
+    : prefix(_prefix), obj_type(type){}
+  ~RGWOp_OBJLog_SetBounds() {}
+
+  int check_caps(RGWUserCaps& caps) {
+    return caps.check_cap(obj_type.c_str(), RGW_CAP_WRITE);
+  }
+  void execute();
+  virtual const char *name() {
+    string s = "replica";
+    s.append(obj_type);
+    s.append("_updatebounds");
+    return s.c_str();
+  }
+};
+
+class RGWOp_OBJLog_DeleteBounds : public RGWRESTOp {
+  string prefix;
+  string obj_type;
+public:
+  RGWOp_OBJLog_DeleteBounds(const char *_prefix, const char *type) 
+    : prefix(_prefix), obj_type(type){}
+  ~RGWOp_OBJLog_DeleteBounds() {}
+
+  int check_caps(RGWUserCaps& caps) {
+    return caps.check_cap(obj_type.c_str(), RGW_CAP_WRITE);
+  }
+  void execute();
+  virtual const char *name() {
+    string s = "replica";
+    s.append(obj_type);
+    s.append("_deletebound");
+    return s.c_str();
+  }
+};
+
+class RGWOp_BILog_GetBounds : public RGWRESTOp {
+  utime_t oldest_time;
+  string lowest_bound;
+  list<cls_replica_log_progress_marker> entries;
+public:
+  RGWOp_BILog_GetBounds() {}
+  ~RGWOp_BILog_GetBounds() {}
+
+  int check_caps(RGWUserCaps& caps) {
+    return caps.check_cap("bilog", RGW_CAP_READ);
+  }
+  int verify_permission() {
+    return check_caps(s->user.caps);
+  }
+  void execute();
+  virtual void send_response();
+  virtual const char *name() {
+    return "replicabilog_getbounds";
+  }
+};
+
+class RGWOp_BILog_SetBounds : public RGWRESTOp {
+public:
+  RGWOp_BILog_SetBounds() {}
+  ~RGWOp_BILog_SetBounds() {}
+
+  int check_caps(RGWUserCaps& caps) {
+    return caps.check_cap("bilog", RGW_CAP_WRITE);
+  }
+  void execute();
+  virtual const char *name() {
+    return "replicabilog_updatebounds";
+  }
+};
+
+class RGWOp_BILog_DeleteBounds : public RGWRESTOp {
+public:
+  RGWOp_BILog_DeleteBounds() {}
+  ~RGWOp_BILog_DeleteBounds() {}
+
+  int check_caps(RGWUserCaps& caps) {
+    return caps.check_cap("bilog", RGW_CAP_WRITE);
+  }
+  void execute();
+  virtual const char *name() {
+    return "replicabilog_deletebound";
+  }
+};
+
+class RGWHandler_ReplicaLog : public RGWHandler_Auth_S3 {
+protected:
+  RGWOp *op_get();
+  RGWOp *op_delete();
+  RGWOp *op_post();
+
+  int read_permissions(RGWOp*) {
+    return 0;
+  }
+public:
+  RGWHandler_ReplicaLog() : RGWHandler_Auth_S3() {}
+  virtual ~RGWHandler_ReplicaLog() {}
+};
+
+class RGWRESTMgr_ReplicaLog : public RGWRESTMgr {
+public:
+  RGWRESTMgr_ReplicaLog() {}
+  virtual ~RGWRESTMgr_ReplicaLog() {}
+
+  virtual RGWHandler *get_handler(struct req_state *s){
+    return new RGWHandler_ReplicaLog;
+  }
+};
+
+#endif /*!CEPH_RGW_REST_REPLICA_LOG_H*/