]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: rework replica log + RESTful api
authorYehuda Sadeh <yehuda@inktank.com>
Sat, 29 Jun 2013 07:25:09 +0000 (00:25 -0700)
committerYehuda Sadeh <yehuda@inktank.com>
Tue, 2 Jul 2013 22:02:30 +0000 (15:02 -0700)
Rather than making the caller code handle encode/decode/dump etc,
we introduce an RGWReplicaBounds struct which the RGWReplicaLogger classes
traffic in (instead of the string marker, utime_t time, and list of pairs of
items). Right now this is just referring to cls_replica_log internal structs,
but we use typedefs so that we can cleanly change them over later and avoid
crossing the namespaces too much.

This greatly reduces the amount of code in rgw_rest_replica_log -- hurrah!

Signed-off-by: Yehuda Sadeh <yehuda@inktank.com>
Signed-off-by: Greg Farnum <greg@inktank.com>
src/common/ceph_json.cc
src/include/utime.h
src/rgw/rgw_admin.cc
src/rgw/rgw_replica_log.cc
src/rgw/rgw_replica_log.h
src/rgw/rgw_rest_replica_log.cc
src/rgw/rgw_rest_replica_log.h

index 516d263b6e1d8d065ae0a57ce9048d818f3a8a43..84355575c6c99e95e3f9c81f48502e62e3a5c459 100644 (file)
@@ -444,9 +444,10 @@ void decode_json_obj(utime_t& val, JSONObj *obj)
 {
   string s = obj->get_data();
   uint64_t epoch;
-  int r = utime_t::parse_date(s, &epoch);
+  uint64_t nsec;
+  int r = utime_t::parse_date(s, &epoch, &nsec);
   if (r == 0) {
-    val.set_from_double(epoch);
+    val = utime_t(epoch, nsec);
   } else {
     throw JSONDecoder::err("failed to decode utime_t");
   }
index fab542736836d7fb14842e4c3ab94aaad43cb2ef..e13460593062b8d1a756712139968bbcf061bfb0 100644 (file)
@@ -242,6 +242,9 @@ public:
     struct tm tm;
     memset(&tm, 0, sizeof(tm));
 
+    if (nsec)
+      *nsec = 0;
+
     const char *p = strptime(date.c_str(), "%Y-%m-%d", &tm);
     if (p) {
       if (*p == ' ') {
index 23d6e72cfb5566b0bc529dce67fd5534a9ea7e06..0dbeee06e4c33d7aa902066d5ccc9b25479543c4 100644 (file)
@@ -2105,9 +2105,7 @@ next:
   }
 
   if (opt_cmd == OPT_REPLICALOG_GET) {
-    string pos_marker;
-    utime_t time_marker;
-    list<cls_replica_log_progress_marker> markers;
+    RGWReplicaBounds bounds;
     if (replica_log_type == ReplicaLog_Metadata) {
       if (!specified_shard_id) {
         cerr << "ERROR: shard-id must be specified for get operation" << std::endl;
@@ -2115,7 +2113,7 @@ next:
       }
 
       RGWReplicaObjectLogger logger(store, pool_name, META_REPLICA_LOG_OBJ_PREFIX);
-      int ret = logger.get_bounds(shard_id, pos_marker, time_marker, markers);
+      int ret = logger.get_bounds(shard_id, bounds);
       if (ret < 0)
         return -ret;
     } else if (replica_log_type == ReplicaLog_Data) {
@@ -2124,7 +2122,7 @@ next:
         return EINVAL;
       }
       RGWReplicaObjectLogger logger(store, pool_name, DATA_REPLICA_LOG_OBJ_PREFIX);
-      int ret = logger.get_bounds(shard_id, pos_marker, time_marker, markers);
+      int ret = logger.get_bounds(shard_id, bounds);
       if (ret < 0)
         return -ret;
     } else if (replica_log_type == ReplicaLog_Bucket) {
@@ -2139,13 +2137,13 @@ next:
       }
 
       RGWReplicaBucketLogger logger(store);
-      ret = logger.get_bounds(bucket, pos_marker, time_marker, markers);
+      ret = logger.get_bounds(bucket, bounds);
       if (ret < 0)
         return -ret;
     } else { // shouldn't get here
       assert(0);
     }
-    encode_json("markers", markers, formatter);
+    encode_json("bounds", bounds, formatter);
   }
 
   if (opt_cmd == OPT_REPLICALOG_DELETE) {
index 6ea9beb39b7b255625936e9bd364c0ad5dab267e..483d256377bcf9125d417520e7604e4366cc31d2 100644 (file)
@@ -8,10 +8,26 @@
  * Copyright 2013 Inktank
  */
 
+#include "common/ceph_json.h"
+
 #include "rgw_replica_log.h"
 #include "cls/replica_log/cls_replica_log_client.h"
 #include "rgw_rados.h"
 
+
+void RGWReplicaBounds::dump(Formatter *f) const
+{
+  encode_json("marker", marker, f);
+  encode_json("oldest_time", oldest_time, f);
+  encode_json("markers", markers, f);
+}
+
+void RGWReplicaBounds::decode_json(JSONObj *obj) {
+  JSONDecoder::decode_json("marker", marker, obj);
+  JSONDecoder::decode_json("oldest_time", oldest_time, obj);
+  JSONDecoder::decode_json("markers", markers, obj);
+}
+
 RGWReplicaLogger::RGWReplicaLogger(RGWRados *_store) :
     cct(_store->cct), store(_store) {}
 
@@ -19,8 +35,7 @@ int RGWReplicaLogger::open_ioctx(librados::IoCtx& ctx, const string& pool)
 {
   int r = store->rados->ioctx_create(pool.c_str(), ctx);
   if (r < 0) {
-    lderr(cct) << "ERROR: could not open rados pool "
-              << pool << dendl;
+    lderr(cct) << "ERROR: could not open rados pool " << pool << dendl;
   }
   return r;
 }
@@ -28,11 +43,13 @@ int RGWReplicaLogger::open_ioctx(librados::IoCtx& ctx, const string& pool)
 int RGWReplicaLogger::update_bound(const string& oid, const string& pool,
                                    const string& daemon_id,
                                    const string& marker, const utime_t& time,
-                                   const list<pair<string, utime_t> > *entries)
+                                   const list<RGWReplicaItemMarker> *entries)
 {
   cls_replica_log_progress_marker progress;
-  cls_replica_log_prepare_marker(progress, daemon_id, marker, time,
-                                 entries);
+  progress.entity_id = daemon_id;
+  progress.position_marker = marker;
+  progress.position_time = time;
+  progress.items = *entries;
 
   librados::IoCtx ioctx;
   int r = open_ioctx(ioctx, pool);
@@ -60,8 +77,7 @@ int RGWReplicaLogger::delete_bound(const string& oid, const string& pool,
 }
 
 int RGWReplicaLogger::get_bounds(const string& oid, const string& pool,
-                                 string& marker, utime_t& oldest_time,
-                                 list<cls_replica_log_progress_marker>& markers)
+                                 RGWReplicaBounds& bounds)
 {
   librados::IoCtx ioctx;
   int r = open_ioctx(ioctx, pool);
@@ -69,7 +85,7 @@ int RGWReplicaLogger::get_bounds(const string& oid, const string& pool,
     return r;
   }
 
-  return cls_replica_log_get_bounds(ioctx, oid, marker, oldest_time, markers);
+  return cls_replica_log_get_bounds(ioctx, oid, bounds.marker, bounds.oldest_time, bounds.markers);
 }
 
 RGWReplicaObjectLogger::
index fcdf9f2c2e085c817cedb1adad0780a432576ed3..f02fa423a615a96f1cc96594be4fc277b7e21b20 100644 (file)
@@ -25,6 +25,18 @@ using namespace std;
 #define META_REPLICA_LOG_OBJ_PREFIX "meta.replicalog."
 #define DATA_REPLICA_LOG_OBJ_PREFIX "data.replicalog."
 
+typedef cls_replica_log_item_marker RGWReplicaItemMarker;
+typedef cls_replica_log_progress_marker RGWReplicaProgressMarker;
+
+struct RGWReplicaBounds {
+  string marker;
+  utime_t oldest_time;
+  list<RGWReplicaProgressMarker> markers;
+
+  void dump(Formatter *f) const;
+  void decode_json(JSONObj *obj);
+};
+
 class RGWReplicaLogger {
 protected:
   CephContext *cct;
@@ -36,12 +48,11 @@ protected:
   int update_bound(const string& oid, const string& pool,
                    const string& daemon_id, const string& marker,
                    const utime_t& time,
-                   const list<pair<string, utime_t> > *entries);
+                   const list<RGWReplicaItemMarker> *entries);
   int delete_bound(const string& oid, const string& pool,
                    const string& daemon_id);
   int get_bounds(const string& oid, const string& pool,
-                 string& marker, utime_t& oldest_time,
-                 list<cls_replica_log_progress_marker>& markers);
+                 RGWReplicaBounds& bounds);
 };
 
 class RGWReplicaObjectLogger : private RGWReplicaLogger {
@@ -62,7 +73,7 @@ public:
   int create_log_objects(int shards);
   int update_bound(int shard, const string& daemon_id, const string& marker,
                    const utime_t& time,
-                   const list<pair<string, utime_t> > *entries) {
+                   const list<RGWReplicaItemMarker> *entries) {
     string oid;
     get_shard_oid(shard, oid);
     return RGWReplicaLogger::update_bound(oid, pool,
@@ -74,12 +85,10 @@ public:
     return RGWReplicaLogger::delete_bound(oid, pool,
                                           daemon_id);
   }
-  int get_bounds(int shard, string& marker, utime_t& oldest_time,
-                 list<cls_replica_log_progress_marker>& markers) {
+  int get_bounds(int shard, RGWReplicaBounds& bounds) {
     string oid;
     get_shard_oid(shard, oid);
-    return RGWReplicaLogger::get_bounds(oid, pool,
-                                        marker, oldest_time, markers);
+    return RGWReplicaLogger::get_bounds(oid, pool, bounds);
   }
 };
 
@@ -90,7 +99,7 @@ public:
   RGWReplicaBucketLogger(RGWRados *_store);
   int update_bound(const rgw_bucket& bucket, const string& daemon_id,
                    const string& marker, const utime_t& time,
-                   const list<pair<string, utime_t> > *entries) {
+                   const list<RGWReplicaItemMarker> *entries) {
     return RGWReplicaLogger::update_bound(prefix+bucket.name, pool,
                                           daemon_id, marker, time, entries);
   }
@@ -98,10 +107,9 @@ public:
     return RGWReplicaLogger::delete_bound(prefix+bucket.name, pool,
                                           daemon_id);
   }
-  int get_bounds(const rgw_bucket& bucket, string& marker, utime_t& oldest_time,
-                 list<cls_replica_log_progress_marker>& markers) {
+  int get_bounds(const rgw_bucket& bucket, RGWReplicaBounds& bounds) {
     return RGWReplicaLogger::get_bounds(prefix+bucket.name, pool,
-                                        marker, oldest_time, markers);
+                                        bounds);
   }
 };
 
index fc1ef9c52cb445317ce90f91c473c233e464263e..dfd935024fa0b55818b0f8cbbeaba95bb6833842 100644 (file)
@@ -37,46 +37,9 @@ static int parse_to_utime(string& in, utime_t& out) {
   return 0;
 }
 
-static int parse_input_list(const char *data, int data_len, 
-                            const char *el_name, list<pair<string, utime_t> >& out) {
-  JSONParser parser;
-
-  if (!parser.parse(data, data_len)) {
-    return -EINVAL;
-  }
-  if (!parser.is_array()) {
-    dout(5) << "Should have been an array" << dendl;
-    return -EINVAL;
-  }
-
-  vector<string> l;
-
-  l = parser.get_array_elements();
-  for (vector<string>::iterator it = l.begin();
-       it != l.end(); it++) {
-    JSONParser el_parser;
-
-    if (!el_parser.parse((*it).c_str(), (*it).length())) {
-      dout(5) << "Error parsing an array element" << dendl;
-      return -EINVAL;
-    }
-
-    string name, time;
-
-    JSONDecoder::decode_json(el_name, name, (JSONObj *)&el_parser);
-    JSONDecoder::decode_json("time", time, (JSONObj *)&el_parser);
-
-    utime_t ut;
-    if (parse_to_utime(time, ut) < 0) {
-      return -EINVAL;
-    }
-    out.push_back(make_pair(name, ut));
-  }
-
-  return 0;
-}
 
-static int get_input_list(req_state *s, const char *element_name, list<pair<string, utime_t> >& out) {
+template <class T>
+static int get_input(req_state *s, T& out) {
   int rv, data_len;
   char *data;
 
@@ -84,47 +47,20 @@ static int get_input_list(req_state *s, const char *element_name, list<pair<stri
     dout(5) << "Error - reading input data - " << rv << dendl;
     return rv;
   }
-  
-  if ((rv = parse_input_list(data, data_len, element_name, out)) < 0) {
-    dout(5) << "Error parsing input list - " << rv << dendl;
-    return rv;
+
+  JSONParser parser;
+
+  if (!parser.parse(data, data_len)) {
+    free(data);
+    return -EINVAL;
   }
 
+  decode_json_obj(out, &parser);
+  
   free(data);
   return 0;
 }
 
-static void item_encode_json(const char *name, 
-                 const char *el_name,
-                 cls_replica_log_item_marker& val,
-                 Formatter *f) {
-  f->open_object_section(name);
-  f->dump_string(el_name, val.item_name);
-  encode_json("time", val.item_timestamp, f);
-  f->close_section();
-}
-
-static void progress_encode_json(const char *name, 
-                 const char *sub_array_name,
-                 const char *sub_array_el_name,
-                 cls_replica_log_progress_marker &val, 
-                 Formatter *f) {
-  f->open_object_section(name);
-  f->dump_string("daemon_id", val.entity_id);
-  f->dump_string("marker", val.position_marker);
-  encode_json("time", val.position_time, f);
-
-  f->open_array_section(sub_array_name);
-  for (list<cls_replica_log_item_marker>::iterator it = val.items.begin();
-       it != val.items.end(); it++) {
-    cls_replica_log_item_marker& entry = (*it);
-
-    item_encode_json(sub_array_name, sub_array_el_name, entry, f);
-  }
-  f->close_section();
-  f->close_section();
-}
-
 void RGWOp_OBJLog_SetBounds::execute() {
   string id_str = s->info.args.get("id"),
          marker = s->info.args.get("marker"),
@@ -159,13 +95,13 @@ void RGWOp_OBJLog_SetBounds::execute() {
   string pool;
   RGWReplicaObjectLogger rl(store, pool, prefix);
   bufferlist bl;
-  list<pair<string, utime_t> > entries;
+  list<RGWReplicaItemMarker> markers;
 
-  if ((http_ret = get_input_list(s, "bucket", entries)) < 0) {
+  if ((http_ret = get_input(s, markers)) < 0) {
     return;
   }
 
-  http_ret = rl.update_bound(shard, daemon_id, marker, ut, &entries);
+  http_ret = rl.update_bound(shard, daemon_id, marker, ut, &markers);
 }
 
 void RGWOp_OBJLog_GetBounds::execute() {
@@ -189,7 +125,7 @@ void RGWOp_OBJLog_GetBounds::execute() {
  
   string pool;
   RGWReplicaObjectLogger rl(store, pool, prefix);
-  http_ret = rl.get_bounds(shard, lowest_bound, oldest_time, entries);
+  http_ret = rl.get_bounds(shard, bounds);
 }
 
 void RGWOp_OBJLog_GetBounds::send_response() {
@@ -200,18 +136,7 @@ void RGWOp_OBJLog_GetBounds::send_response() {
   if (http_ret < 0)
     return;
 
-  s->formatter->open_object_section("container");
-  s->formatter->open_array_section("items");
-  for (list<cls_replica_log_progress_marker>::iterator it = entries.begin();
-       it != entries.end(); it++) {
-    cls_replica_log_progress_marker entry = (*it);
-    progress_encode_json("entry", "buckets", "bucket", entry, s->formatter);
-    flusher.flush();
-  }
-  s->formatter->close_section();
-  s->formatter->dump_string("lowest_bound", lowest_bound);
-  encode_json("oldest_time", oldest_time, s->formatter);
-  s->formatter->close_section();
+  encode_json("bounds", bounds, s->formatter);
   flusher.flush();
 }
 
@@ -283,13 +208,13 @@ void RGWOp_BILog_SetBounds::execute() {
 
   RGWReplicaBucketLogger rl(store);
   bufferlist bl;
-  list<pair<string, utime_t> > entries;
+  list<RGWReplicaItemMarker> markers;
 
-  if ((http_ret = get_input_list(s, "object", entries)) < 0) {
+  if ((http_ret = get_input(s, markers)) < 0) {
     return;
   }
 
-  http_ret = rl.update_bound(bucket, daemon_id, marker, ut, &entries);
+  http_ret = rl.update_bound(bucket, daemon_id, marker, ut, &markers);
 }
 
 void RGWOp_BILog_GetBounds::execute() {
@@ -306,7 +231,7 @@ void RGWOp_BILog_GetBounds::execute() {
     return;
 
   RGWReplicaBucketLogger rl(store);
-  http_ret = rl.get_bounds(bucket, lowest_bound, oldest_time, entries);
+  http_ret = rl.get_bounds(bucket, bounds);
 }
 
 void RGWOp_BILog_GetBounds::send_response() {
@@ -317,18 +242,7 @@ void RGWOp_BILog_GetBounds::send_response() {
   if (http_ret < 0)
     return;
 
-  s->formatter->open_object_section("container");
-  s->formatter->open_array_section("entries");
-  for (list<cls_replica_log_progress_marker>::iterator it = entries.begin();
-       it != entries.end(); it++) {
-    cls_replica_log_progress_marker entry = (*it);
-    progress_encode_json("entry", "objects", "object", entry, s->formatter);
-    flusher.flush();
-  }
-  s->formatter->close_section();
-  s->formatter->dump_string("lowest_bound", lowest_bound);
-  encode_json("oldest_time", oldest_time, s->formatter);
-  s->formatter->close_section();
+  encode_json("bounds", bounds, s->formatter);
   flusher.flush();
 }
 
index dacabbe4f7cedf8fb4c6715e70a60ce0b3270fa6..91e3d6140629ec8e7ec84aecb56e2f1f7e98922a 100644 (file)
@@ -17,9 +17,7 @@
 class RGWOp_OBJLog_GetBounds : public RGWRESTOp {
   string prefix;
   string obj_type;
-  utime_t oldest_time;
-  string lowest_bound;
-  list<cls_replica_log_progress_marker> entries;
+  RGWReplicaBounds bounds;
 
 public:
   RGWOp_OBJLog_GetBounds(const char *_prefix, const char *type) 
@@ -83,9 +81,7 @@ public:
 };
 
 class RGWOp_BILog_GetBounds : public RGWRESTOp {
-  utime_t oldest_time;
-  string lowest_bound;
-  list<cls_replica_log_progress_marker> entries;
+  RGWReplicaBounds bounds;
 public:
   RGWOp_BILog_GetBounds() {}
   ~RGWOp_BILog_GetBounds() {}