]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: work towards datalog sync notification
authorYehuda Sadeh <yehuda@redhat.com>
Wed, 7 Oct 2015 00:14:15 +0000 (17:14 -0700)
committerYehuda Sadeh <yehuda@redhat.com>
Fri, 12 Feb 2016 00:12:57 +0000 (16:12 -0800)
Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
src/rgw/rgw_rados.cc
src/rgw/rgw_rados.h
src/rgw/rgw_rest_log.cc
src/rgw/rgw_rest_log.h
src/rgw/rgw_sync.h

index 3ea5896ef928611304cace4cedd5187d9841d7de..3219263c276979a13aa371f196579233005ef08e 100644 (file)
@@ -55,6 +55,7 @@ using namespace librados;
 #include "rgw_gc.h"
 #include "rgw_object_expirer_core.h"
 #include "rgw_sync.h"
+#include "rgw_data_sync.h"
 
 #define dout_subsys ceph_subsys_rgw
 
@@ -2192,19 +2193,24 @@ int RGWMetaNotifier::process()
     ldout(cct, 20) << __func__ << "(): notifying mdlog change, shard_id=" << *iter << dendl;
   }
 
-  for (map<string, RGWRESTConn *>::iterator iter = store->zone_conn_map.begin();
-       iter != store->zone_conn_map.end(); ++iter) {
-    RGWRESTConn *conn = iter->second;
-  }
-
   notify_mgr.notify_all(store->zone_conn_map, shards);
 
   return 0;
 }
 
 class RGWSyncProcessorThread : public RGWRadosThread {
+public:
+  RGWSyncProcessorThread(RGWRados *_store) : RGWRadosThread(_store) {}
+  virtual ~RGWSyncProcessorThread() {}
+  virtual int init() = 0 ;
+  virtual int process() = 0;
+  virtual void wakeup_sync_shards(set<int>& shard_ids) = 0;
+};
+
+template <class T>
+class RGWSyncProcessorThreadImpl : public RGWSyncProcessorThread {
   CephContext *cct;
-  RGWMetaSyncStatusManager sync;
+  T sync;
 
   uint64_t interval_msec() {
     return 0; /* no interval associated, it'll run once until stopped */
@@ -2213,7 +2219,7 @@ class RGWSyncProcessorThread : public RGWRadosThread {
     sync.stop();
   }
 public:
-  RGWSyncProcessorThread(RGWRados *_store) : RGWRadosThread(_store), cct(_store->ctx()), sync(_store) {}
+  RGWSyncProcessorThreadImpl<T>(RGWRados *_store, const string& source_entity) : RGWSyncProcessorThread(_store), cct(_store->ctx()), sync(_store, source_entity) {}
 
   int init();
   int process();
@@ -2225,7 +2231,8 @@ public:
   }
 };
 
-int RGWSyncProcessorThread::init()
+template <class T>
+int RGWSyncProcessorThreadImpl<T>::init()
 {
   int ret = sync.init();
   if (ret < 0) {
@@ -2236,18 +2243,32 @@ int RGWSyncProcessorThread::init()
   return 0;
 }
 
-int RGWSyncProcessorThread::process()
+template <class T>
+int RGWSyncProcessorThreadImpl<T>::process()
 {
   sync.run();
   return 0;
 }
 
-void RGWRados::wakeup_sync_shards(set<int>& shard_ids)
+void RGWRados::wakeup_meta_sync_shards(set<int>& shard_ids)
+{
+  Mutex::Locker l(meta_sync_thread_lock);
+  if (meta_sync_processor_thread) {
+    meta_sync_processor_thread->wakeup_sync_shards(shard_ids);
+  }
+}
+
+void RGWRados::wakeup_data_sync_shards(const string& source_zone, set<int>& shard_ids)
 {
-  Mutex::Locker l(sync_thread_lock);
-  if (sync_processor_thread) {
-    sync_processor_thread->wakeup_sync_shards(shard_ids);
+  Mutex::Locker l(data_sync_thread_lock);
+  map<string, RGWSyncProcessorThread *>::iterator iter = data_sync_processor_threads.find(source_zone);
+  if (iter == data_sync_processor_threads.end()) {
+    return;
   }
+
+  RGWSyncProcessorThread *thread = iter->second;
+  assert(thread);
+  thread->wakeup_sync_shards(shard_ids);
 }
 
 int RGWRados::get_required_alignment(rgw_bucket& bucket, uint64_t *alignment)
@@ -2311,10 +2332,19 @@ int RGWRados::get_max_chunk_size(rgw_bucket& bucket, uint64_t *max_chunk_size)
 void RGWRados::finalize()
 {
   if (run_sync_thread) {
-    Mutex::Locker l(sync_thread_lock);
-    sync_processor_thread->stop();
-    delete sync_processor_thread;
-    sync_processor_thread = NULL;
+    Mutex::Locker l(meta_sync_thread_lock);
+    meta_sync_processor_thread->stop();
+    delete meta_sync_processor_thread;
+    meta_sync_processor_thread = NULL;
+
+    Mutex::Locker dl(data_sync_thread_lock);
+    map<string, RGWSyncProcessorThread *>::iterator iter = data_sync_processor_threads.begin();
+    for (; iter != data_sync_processor_threads.end(); ++iter) {
+      RGWSyncProcessorThread *thread = iter->second;
+      thread->stop();
+      delete thread;
+    }
+    data_sync_processor_threads.clear();
   }
   if (finisher) {
     finisher->stop();
@@ -2638,14 +2668,26 @@ int RGWRados::init_complete()
   }
 
   if (run_sync_thread) {
-    Mutex::Locker l(sync_thread_lock);
-    sync_processor_thread = new RGWSyncProcessorThread(this);
-    ret = sync_processor_thread->init();
+    Mutex::Locker l(meta_sync_thread_lock);
+    meta_sync_processor_thread = new RGWSyncProcessorThreadImpl<RGWMetaSyncStatusManager>(this, zonegroup_map.master_zonegroup);
+    ret = meta_sync_processor_thread->init();
     if (ret < 0) {
       ldout(cct, 0) << "ERROR: failed to initialize" << dendl;
       return ret;
     }
-    sync_processor_thread->start();
+    meta_sync_processor_thread->start();
+
+    Mutex::Locker dl(data_sync_thread_lock);
+    for (map<string, RGWRESTConn *>::iterator iter = zone_conn_map.begin(); iter != zone_conn_map.end(); ++iter) {
+      RGWSyncProcessorThread *thread = new RGWSyncProcessorThreadImpl<RGWDataSyncStatusManager>(this, iter->first);
+      ret = thread->init();
+      if (ret < 0) {
+        ldout(cct, 0) << "ERROR: failed to initialize" << dendl;
+        return ret;
+      }
+      thread->start();
+      data_sync_processor_threads[iter->first] = thread;
+    }
   }
 
   quota_handler = RGWQuotaHandler::generate_handler(this, quota_threads);
index 217e2398cde4de262601c0014e21bb347c3bdd24..de0adc6b29193416d25b655a3dc57e3085a5db40 100644 (file)
@@ -1577,9 +1577,11 @@ class RGWRados
   bool run_sync_thread;
 
   RGWMetaNotifier *meta_notifier;
-  RGWSyncProcessorThread *sync_processor_thread;
+  RGWSyncProcessorThread *meta_sync_processor_thread;
+  map<string, RGWSyncProcessorThread *> data_sync_processor_threads;
 
-  Mutex sync_thread_lock;
+  Mutex meta_sync_thread_lock;
+  Mutex data_sync_thread_lock;
 
   int num_watchers;
   RGWWatcher **watchers;
@@ -1639,7 +1641,8 @@ public:
   RGWRados() : max_req_id(0), lock("rados_timer_lock"), watchers_lock("watchers_lock"), timer(NULL),
                gc(NULL), obj_expirer(NULL), use_gc_thread(false), quota_threads(false),
                run_sync_thread(false), meta_notifier(NULL),
-               sync_processor_thread(NULL), sync_thread_lock("sync_thread_lock"),
+               meta_sync_processor_thread(NULL),
+               meta_sync_thread_lock("meta_sync_thread_lock"), data_sync_thread_lock("data_sync_thread_lock"),
                num_watchers(0), watchers(NULL),
                watch_initialized(false),
                bucket_id_lock("rados_bucket_id"),
@@ -2263,7 +2266,8 @@ public:
    * Check to see if the bucket metadata is synced
    */
   bool is_syncing_bucket_meta(rgw_bucket& bucket);
-  void wakeup_sync_shards(set<int>& shard_ids);
+  void wakeup_meta_sync_shards(set<int>& shard_ids);
+  void wakeup_data_sync_shards(const string& source_zone, set<int>& shard_ids);
 
   int set_bucket_owner(rgw_bucket& bucket, ACLOwner& owner);
   int set_buckets_enabled(std::vector<rgw_bucket>& buckets, bool enabled);
index dcfd92efeee231af82eb2370edaf678b2b24dabe..a26b8f0f81b09781de6fb058b5a0f88b7dc8f55d 100644 (file)
@@ -695,6 +695,41 @@ void RGWOp_DATALog_Unlock::execute() {
   http_ret = store->data_log->unlock(shard_id, zone_id, locker_id);
 }
 
+void RGWOp_DATALog_Notify::execute() {
+  char *data;
+  int len = 0;
+#define LARGE_ENOUGH_BUF (128 * 1024)
+  int r = rgw_rest_read_all_input(s, &data, &len, LARGE_ENOUGH_BUF);
+  if (r < 0) {
+    http_ret = r;
+    return;
+  }
+
+  ldout(s->cct, 20) << __func__ << "(): read data: " << string(data, len) << dendl;
+
+  JSONParser p;
+  r = p.parse(data, len);
+  free(data);
+  if (r < 0) {
+    ldout(s->cct, 0) << "ERROR: failed to parse JSON" << dendl;
+    http_ret = r;
+    return;
+  }
+
+  set<int> updated_shards;
+  decode_json_obj(updated_shards, &p);
+
+  if (store->ctx()->_conf->subsys.should_gather(ceph_subsys_rgw, 20)) {
+    for (set<int>::iterator iter = updated_shards.begin(); iter != updated_shards.end(); ++iter) {
+      ldout(s->cct, 20) << __func__ << "(): updated shard=" << *iter << dendl;
+    }
+  }
+
+  store->wakeup_data_sync_shards(updated_shards);
+
+  http_ret = 0;
+}
+
 void RGWOp_DATALog_Delete::execute() {
   string   st = s->info.args.get("start-time"),
            et = s->info.args.get("end-time"),
@@ -807,6 +842,8 @@ RGWOp *RGWHandler_Log::op_post() {
       return new RGWOp_DATALog_Lock;
     else if (s->info.args.exists("unlock"))
       return new RGWOp_DATALog_Unlock;
+    else if (s->info.args.exists("notify"))
+      return new RGWOp_DATALog_Notify;     
   }
   return NULL;
 }
index ead5d489a737ed616bed1138d4ad83ec0bf2fecf..363fb7f126ac3da473ab337938d9998499aa6d8a 100644 (file)
@@ -275,6 +275,20 @@ public:
   }
 };
 
+class RGWOp_DATALog_Notify : public RGWRESTOp {
+public:
+  RGWOp_DATALog_Notify() {}
+  ~RGWOp_DATALog_Notify() {}
+
+  int check_caps(RGWUserCaps& caps) {
+    return caps.check_cap("datalog", RGW_CAP_WRITE);
+  }
+  void execute();
+  virtual const string name() {
+    return "datalog_notify";
+  }
+};
+
 class RGWOp_DATALog_Delete : public RGWRESTOp {
 public:
   RGWOp_DATALog_Delete() {}
index 98c3719e79e30044dcb36e6df3488cad11958191..45c318c484f517477bc4e8d4cc1c1ec17cb89b9d 100644 (file)
@@ -196,7 +196,7 @@ class RGWMetaSyncStatusManager {
   vector<string> clone_markers;
 
 public:
-  RGWMetaSyncStatusManager(RGWRados *_store) : store(_store), master_log(store, this), num_shards(0),
+  RGWMetaSyncStatusManager(RGWRados *_store, const string& source) : store(_store), master_log(store, this), num_shards(0),
                                                ts_to_shard_lock("ts_to_shard_lock") {}
   int init();