]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: data changes log, naive implementation
authorYehuda Sadeh <yehuda@inktank.com>
Fri, 26 Apr 2013 02:06:08 +0000 (19:06 -0700)
committerYehuda Sadeh <yehuda@inktank.com>
Wed, 8 May 2013 18:22:08 +0000 (11:22 -0700)
Signed-off-by: Yehuda Sadeh <yehuda@inktank.com>
src/rgw/rgw_admin.cc
src/rgw/rgw_bucket.cc
src/rgw/rgw_bucket.h
src/rgw/rgw_rados.cc
src/rgw/rgw_rados.h

index 6418e4c8a295ea26ea566e66710712304895e63c..bb6f69ed638216f116429b91a0d86c7cf2a25750 100644 (file)
@@ -90,6 +90,7 @@ void _usage()
   cerr << "  metadata list              list metadata info\n";
   cerr << "  mdlog list                 list metadata log\n";
   cerr << "  bilog list                 list bucket index log\n";
+  cerr << "  datalog list               list data log\n";
   cerr << "options:\n";
   cerr << "   --uid=<id>                user id\n";
   cerr << "   --subuser=<name>          subuser name\n";
@@ -198,6 +199,7 @@ enum {
   OPT_METADATA_LIST,
   OPT_MDLOG_LIST,
   OPT_BILOG_LIST,
+  OPT_DATALOG_LIST,
 };
 
 static int get_cmd(const char *cmd, const char *prev_cmd, bool *need_more)
@@ -225,7 +227,8 @@ static int get_cmd(const char *cmd, const char *prev_cmd, bool *need_more)
       strcmp(cmd, "temp") == 0 ||
       strcmp(cmd, "metadata") == 0 ||
       strcmp(cmd, "mdlog") == 0 ||
-      strcmp(cmd, "bilog") == 0) {
+      strcmp(cmd, "bilog") == 0 ||
+      strcmp(cmd, "datalog") == 0) {
     *need_more = true;
     return 0;
   }
@@ -366,6 +369,9 @@ static int get_cmd(const char *cmd, const char *prev_cmd, bool *need_more)
   } else if (strcmp(prev_cmd, "bilog") == 0) {
     if (strcmp(cmd, "list") == 0)
       return OPT_BILOG_LIST;
+  } else if (strcmp(prev_cmd, "datalog") == 0) {
+    if (strcmp(cmd, "list") == 0)
+      return OPT_DATALOG_LIST;
   }
 
   return -EINVAL;
@@ -1797,6 +1803,47 @@ next:
     formatter->close_section();
     formatter->flush(cout);
   }
+
+  if (opt_cmd == OPT_DATALOG_LIST) {
+    formatter->open_array_section("entries");
+    bool truncated;
+    int count = 0;
+    if (max_entries < 0)
+      max_entries = 1000;
+
+    utime_t start_time, end_time;
+
+    int ret = parse_date_str(start_date, start_time);
+    if (ret < 0)
+      return -ret;
+
+    ret = parse_date_str(end_date, end_time);
+    if (ret < 0)
+      return -ret;
+
+    RGWDataChangesLog *log = store->data_log;
+    RGWDataChangesLog::LogMarker marker;
+
+    do {
+      list<rgw_data_change> entries;
+      ret = log->list_entries(start_time, end_time, max_entries - count, entries, marker, &truncated);
+      if (ret < 0) {
+        cerr << "ERROR: list_bi_log_entries(): " << cpp_strerror(-ret) << std::endl;
+        return -ret;
+      }
+
+      count += entries.size();
+
+      for (list<rgw_data_change>::iterator iter = entries.begin(); iter != entries.end(); ++iter) {
+        rgw_data_change& entry = *iter;
+        encode_json("entry", entry, formatter);
+      }
+      formatter->flush(cout);
+    } while (truncated && count < max_entries);
+
+    formatter->close_section();
+    formatter->flush(cout);
+  }
   
   return 0;
 }
index 223524f27fe3a0ad6e9c9b0c9be84b1ee4fcc0c9..7d8a3b36e8394a7692ce5f979ccc42d3e7f204a2 100644 (file)
@@ -4,6 +4,7 @@
 #include <map>
 
 #include "common/errno.h"
+#include "common/ceph_json.h"
 #include "rgw_rados.h"
 #include "rgw_acl.h"
 #include "rgw_acl_s3.h"
@@ -932,3 +933,190 @@ int RGWBucketAdminOp::info(RGWRados *store, RGWBucketAdminOpState& op_state,
   return 0;
 }
 
+
+#if 0
+
+class CompletionMap {
+  map<rgw_bucket, RefCountedCond *> entries;
+  Mutex lock;
+
+public:
+
+  void add(string& s) {
+    Mutex::Locker l(lock);
+
+    entries[s] = new RefCountedObject;
+  }
+
+
+  bool wait(string& s) {
+    map<string, RefCountedCond *>::iterator iter;
+    l.Lock();
+    iter = entries.find(s);
+    if (iter == entries.end()) {
+      l.Unlock();
+      return false;
+    }
+
+    RefCountedCond *rcc = iter->second;
+    rcc->get();
+    l.Unlock();
+
+    rcc->wait();
+    rcc->put();
+
+    return true;
+
+  }
+
+  void complete(string& s) {
+    lock.Lock();
+
+    map<string, RefCountedCond *>::iterator iter = entries.find(s);
+    if (iter == entries.end()) {
+      lock.Unlock();
+      return;
+    }
+
+    RefCountedCond *rcc = iter->second;
+
+    entries.erase(iter);
+
+    lock.Unlock();
+
+    rcc->complete();
+    rcc->put();
+  }
+
+};
+
+
+class RGWChangedBucketsTracker {
+  CephContext *cct;
+  RGWRados *store;
+
+  map<rgw_bucket, utime_t> last_reported;
+
+  struct PendingInfo : public RefCountedCond {
+    PendingInfo() {}
+  };
+
+  CompletionMap pending;
+
+  Mutex lock;
+public:
+  RGWChangedBucketsTracker(CephContext *_cct, RGWRados *_store) : cct(_cct), store(_store), lock("RGWChanedBucketsTracker") {}
+
+  int report_bucket_changed(rgw_bucket& bucket) {
+    lock.Lock();
+
+    map<rgw_bucket, utime_t>::iteartor iter = last_reported.find(bucket);
+
+    bool exists = (iter != iter.end());
+    if (exists) {
+      utime_t& t = iter->second;
+      utime_t now = ceph_clock_now(cct);
+
+      if (now > t + get_resolution_sec())
+        exists = false;
+    }
+
+    lock.Unlock();
+
+    if (exists)
+      return true;
+  }
+
+  uint32_t get_resolution_sec();
+};
+
+
+#endif
+
+void rgw_data_change::dump(Formatter *f) const
+{
+  string type;
+  switch (entity_type) {
+    case ENTITY_TYPE_BUCKET:
+      type = "bucket";
+      break;
+    default:
+      type = "unknown";
+  }
+  encode_json("entity_type", type, f);
+  encode_json("key", key, f);
+}
+
+
+int RGWDataChangesLog::choose_oid(rgw_bucket& bucket) {
+    string& name = bucket.name;
+    uint32_t r = ceph_str_hash_linux(name.c_str(), name.size()) % num_shards;
+
+    return (int)r;
+}
+
+int RGWDataChangesLog::add_entry(rgw_bucket& bucket) {
+  string& oid = oids[choose_oid(bucket)];
+
+  utime_t ut = ceph_clock_now(cct);
+  bufferlist bl;
+  rgw_data_change change;
+  change.entity_type = ENTITY_TYPE_BUCKET;
+  change.key = bucket.name;
+  ::encode(change, bl);
+  string section;
+  return store->time_log_add(oid, ut, section, change.key, bl);
+}
+
+int RGWDataChangesLog::list_entries(int shard, utime_t& start_time, utime_t& end_time, int max_entries,
+             list<rgw_data_change>& entries, string& marker, bool *truncated) {
+
+  list<cls_log_entry> log_entries;
+
+  int ret = store->time_log_list(oids[shard], start_time, end_time,
+                                 max_entries, log_entries, marker, truncated);
+  if (ret < 0)
+    return ret;
+
+  list<cls_log_entry>::iterator iter;
+  for (iter = log_entries.begin(); iter != log_entries.end(); ++iter) {
+    rgw_data_change entry;
+    bufferlist::iterator liter = iter->data.begin();
+    try {
+      ::decode(entry, liter);
+    } catch (buffer::error& err) {
+      lderr(cct) << "ERROR: failed to decode data changes log entry" << dendl;
+      return -EIO;
+    }
+    entries.push_back(entry);
+  }
+
+  return 0;
+}
+
+int RGWDataChangesLog::list_entries(utime_t& start_time, utime_t& end_time, int max_entries,
+             list<rgw_data_change>& entries, LogMarker& marker, bool *ptruncated) {
+  bool truncated;
+
+  entries.clear();
+
+  for (; marker.shard < num_shards && (int)entries.size() < max_entries;
+       marker.shard++, marker.marker.clear()) {
+    int ret = list_entries(marker.shard, start_time, end_time, max_entries - entries.size(), entries,
+                       marker.marker, &truncated);
+    if (ret == -ENOENT) {
+      continue;
+    }
+    if (ret < 0) {
+      return ret;
+    }
+    if (truncated) {
+      *ptruncated = true;
+      return 0;
+    }
+  }
+
+  *ptruncated = (marker.shard < num_shards);
+
+  return 0;
+}
index 9eb16cf37070fbe86e054306dd1936dd26d31a5a..ddca6122efc1755185df869eba2c3cd6ed884e9f 100644 (file)
@@ -219,4 +219,76 @@ public:
   static int info(RGWRados *store, RGWBucketAdminOpState& op_state, RGWFormatterFlusher& flusher);
 };
 
+
+enum DataLogEntityType {
+  ENTITY_TYPE_BUCKET = 1,
+};
+
+struct rgw_data_change {
+  DataLogEntityType entity_type;
+  string key;
+
+  void encode(bufferlist& bl) const {
+    ENCODE_START(1, 1, bl);
+    uint8_t t = (uint8_t)entity_type;
+    ::encode(t, bl);
+    ::encode(key, bl);
+    ENCODE_FINISH(bl);
+  }
+
+  void decode(bufferlist::iterator& bl) {
+     DECODE_START(1, bl);
+     uint8_t t;
+     ::decode(t, bl);
+     entity_type = (DataLogEntityType)t;
+     ::decode(key, bl);
+     DECODE_FINISH(bl);
+  }
+
+  void dump(Formatter *f) const;
+};
+WRITE_CLASS_ENCODER(rgw_data_change)
+
+class RGWDataChangesLog {
+  CephContext *cct;
+  RGWRados *store;
+
+  int num_shards;
+  string *oids;
+
+public:
+
+  RGWDataChangesLog(CephContext *_cct, RGWRados *_store) : cct(_cct), store(_store) {
+    num_shards = 128; /* FIXME */
+    oids = new string[num_shards];
+
+    const char *prefix = "bucket_log"; /* FIXME */
+
+    for (int i = 0; i < num_shards; i++) {
+      char buf[16];
+      snprintf(buf, sizeof(buf), "%s.%d", prefix, i);
+      oids[i] = buf;
+    }
+  }
+
+  ~RGWDataChangesLog() {
+    delete[] oids;
+  }
+
+  int choose_oid(rgw_bucket& bucket);
+  int add_entry(rgw_bucket& bucket);
+  int list_entries(int shard, utime_t& start_time, utime_t& end_time, int max_entries,
+               list<rgw_data_change>& entries, string& marker, bool *truncated);
+
+  struct LogMarker {
+    int shard;
+    string marker;
+
+    LogMarker() : shard(0) {}
+  };
+  int list_entries(utime_t& start_time, utime_t& end_time, int max_entries,
+               list<rgw_data_change>& entries, LogMarker& marker, bool *ptruncated);
+};
+
+
 #endif
index ccd993637658b2304e2854e11f373c2801f77682..419b1fbac648b0d208d8eb3d7f040d9b3fd505cf 100644 (file)
@@ -12,6 +12,7 @@
 #include "rgw_cache.h"
 #include "rgw_acl.h"
 #include "rgw_metadata.h"
+#include "rgw_bucket.h"
 
 #include "cls/rgw/cls_rgw_types.h"
 #include "cls/rgw/cls_rgw_client.h"
@@ -497,6 +498,7 @@ void RGWRadosCtx::set_prefetch_data(rgw_obj& obj) {
 void RGWRados::finalize()
 {
   delete meta_mgr;
+  delete data_log;
   if (use_gc_thread) {
     gc->stop_processor();
     delete gc;
@@ -525,6 +527,7 @@ int RGWRados::init_rados()
    return ret;
 
   meta_mgr = new RGWMetadataManager(cct, this);
+  data_log = new RGWDataChangesLog(cct, this);
 
   return ret;
 }
@@ -2886,6 +2889,12 @@ int RGWRados::prepare_update_index(RGWObjState *state, rgw_bucket& bucket,
   if (bucket_is_system(bucket))
     return 0;
 
+  int ret = data_log->add_entry(obj.bucket);
+  if (ret < 0) {
+    lderr(cct) << "ERROR: failed writing data log" << dendl;
+    return ret;
+  }
+
   if (state && state->obj_tag.length()) {
     int len = state->obj_tag.length();
     char buf[len + 1];
@@ -2897,7 +2906,7 @@ int RGWRados::prepare_update_index(RGWObjState *state, rgw_bucket& bucket,
       append_rand_alpha(cct, tag, tag, 32);
     }
   }
-  int ret = cls_obj_prepare_op(bucket, CLS_RGW_OP_ADD, tag,
+  ret = cls_obj_prepare_op(bucket, CLS_RGW_OP_ADD, tag,
                                obj.object, obj.key);
 
   return ret;
index 057bf75e7c09b011ceb53cb24f98c33e9396096e..f3d095f92d400f8f459f747325c040970f752b96 100644 (file)
@@ -456,7 +456,7 @@ struct RGWRegionMap {
 };
 WRITE_CLASS_ENCODER(RGWRegionMap);
 
-
+class RGWDataChangesLog;
   
 class RGWRados
 {
@@ -564,7 +564,7 @@ public:
                num_watchers(0), watchers(NULL), watch_handles(NULL),
                bucket_id_lock("rados_bucket_id"), max_bucket_id(0),
                cct(NULL), rados(NULL),
-               pools_initialized(false), meta_mgr(NULL) {}
+               pools_initialized(false), meta_mgr(NULL), data_log(NULL) {}
 
   void set_context(CephContext *_cct) {
     cct = _cct;
@@ -583,6 +583,8 @@ public:
 
   RGWMetadataManager *meta_mgr;
 
+  RGWDataChangesLog *data_log;
+
   virtual ~RGWRados() {
     if (rados) {
       rados->shutdown();