#include "rgw_gc.h"
#include "rgw_object_expirer_core.h"
#include "rgw_sync.h"
+#include "rgw_data_sync.h"
#define dout_subsys ceph_subsys_rgw
ldout(cct, 20) << __func__ << "(): notifying mdlog change, shard_id=" << *iter << dendl;
}
- for (map<string, RGWRESTConn *>::iterator iter = store->zone_conn_map.begin();
- iter != store->zone_conn_map.end(); ++iter) {
- RGWRESTConn *conn = iter->second;
- }
-
notify_mgr.notify_all(store->zone_conn_map, shards);
return 0;
}
class RGWSyncProcessorThread : public RGWRadosThread {
+public:
+ RGWSyncProcessorThread(RGWRados *_store) : RGWRadosThread(_store) {}
+ virtual ~RGWSyncProcessorThread() {}
+ virtual int init() = 0 ;
+ virtual int process() = 0;
+ virtual void wakeup_sync_shards(set<int>& shard_ids) = 0;
+};
+
+template <class T>
+class RGWSyncProcessorThreadImpl : public RGWSyncProcessorThread {
CephContext *cct;
- RGWMetaSyncStatusManager sync;
+ T sync;
uint64_t interval_msec() {
return 0; /* no interval associated, it'll run once until stopped */
sync.stop();
}
public:
- RGWSyncProcessorThread(RGWRados *_store) : RGWRadosThread(_store), cct(_store->ctx()), sync(_store) {}
+ RGWSyncProcessorThreadImpl<T>(RGWRados *_store, const string& source_entity) : RGWSyncProcessorThread(_store), cct(_store->ctx()), sync(_store, source_entity) {}
int init();
int process();
}
};
-int RGWSyncProcessorThread::init()
+template <class T>
+int RGWSyncProcessorThreadImpl<T>::init()
{
int ret = sync.init();
if (ret < 0) {
return 0;
}
-int RGWSyncProcessorThread::process()
+template <class T>
+int RGWSyncProcessorThreadImpl<T>::process()
{
sync.run();
return 0;
}
-void RGWRados::wakeup_sync_shards(set<int>& shard_ids)
+void RGWRados::wakeup_meta_sync_shards(set<int>& shard_ids)
+{
+ Mutex::Locker l(meta_sync_thread_lock);
+ if (meta_sync_processor_thread) {
+ meta_sync_processor_thread->wakeup_sync_shards(shard_ids);
+ }
+}
+
+void RGWRados::wakeup_data_sync_shards(const string& source_zone, set<int>& shard_ids)
{
- Mutex::Locker l(sync_thread_lock);
- if (sync_processor_thread) {
- sync_processor_thread->wakeup_sync_shards(shard_ids);
+ Mutex::Locker l(data_sync_thread_lock);
+ map<string, RGWSyncProcessorThread *>::iterator iter = data_sync_processor_threads.find(source_zone);
+ if (iter == data_sync_processor_threads.end()) {
+ return;
}
+
+ RGWSyncProcessorThread *thread = iter->second;
+ assert(thread);
+ thread->wakeup_sync_shards(shard_ids);
}
int RGWRados::get_required_alignment(rgw_bucket& bucket, uint64_t *alignment)
void RGWRados::finalize()
{
if (run_sync_thread) {
- Mutex::Locker l(sync_thread_lock);
- sync_processor_thread->stop();
- delete sync_processor_thread;
- sync_processor_thread = NULL;
+ Mutex::Locker l(meta_sync_thread_lock);
+ meta_sync_processor_thread->stop();
+ delete meta_sync_processor_thread;
+ meta_sync_processor_thread = NULL;
+
+ Mutex::Locker dl(data_sync_thread_lock);
+ map<string, RGWSyncProcessorThread *>::iterator iter = data_sync_processor_threads.begin();
+ for (; iter != data_sync_processor_threads.end(); ++iter) {
+ RGWSyncProcessorThread *thread = iter->second;
+ thread->stop();
+ delete thread;
+ }
+ data_sync_processor_threads.clear();
}
if (finisher) {
finisher->stop();
}
if (run_sync_thread) {
- Mutex::Locker l(sync_thread_lock);
- sync_processor_thread = new RGWSyncProcessorThread(this);
- ret = sync_processor_thread->init();
+ Mutex::Locker l(meta_sync_thread_lock);
+ meta_sync_processor_thread = new RGWSyncProcessorThreadImpl<RGWMetaSyncStatusManager>(this, zonegroup_map.master_zonegroup);
+ ret = meta_sync_processor_thread->init();
if (ret < 0) {
ldout(cct, 0) << "ERROR: failed to initialize" << dendl;
return ret;
}
- sync_processor_thread->start();
+ meta_sync_processor_thread->start();
+
+ Mutex::Locker dl(data_sync_thread_lock);
+ for (map<string, RGWRESTConn *>::iterator iter = zone_conn_map.begin(); iter != zone_conn_map.end(); ++iter) {
+ RGWSyncProcessorThread *thread = new RGWSyncProcessorThreadImpl<RGWDataSyncStatusManager>(this, iter->first);
+ ret = thread->init();
+ if (ret < 0) {
+ ldout(cct, 0) << "ERROR: failed to initialize" << dendl;
+ return ret;
+ }
+ thread->start();
+ data_sync_processor_threads[iter->first] = thread;
+ }
}
quota_handler = RGWQuotaHandler::generate_handler(this, quota_threads);
bool run_sync_thread;
RGWMetaNotifier *meta_notifier;
- RGWSyncProcessorThread *sync_processor_thread;
+ RGWSyncProcessorThread *meta_sync_processor_thread;
+ map<string, RGWSyncProcessorThread *> data_sync_processor_threads;
- Mutex sync_thread_lock;
+ Mutex meta_sync_thread_lock;
+ Mutex data_sync_thread_lock;
int num_watchers;
RGWWatcher **watchers;
RGWRados() : max_req_id(0), lock("rados_timer_lock"), watchers_lock("watchers_lock"), timer(NULL),
gc(NULL), obj_expirer(NULL), use_gc_thread(false), quota_threads(false),
run_sync_thread(false), meta_notifier(NULL),
- sync_processor_thread(NULL), sync_thread_lock("sync_thread_lock"),
+ meta_sync_processor_thread(NULL),
+ meta_sync_thread_lock("meta_sync_thread_lock"), data_sync_thread_lock("data_sync_thread_lock"),
num_watchers(0), watchers(NULL),
watch_initialized(false),
bucket_id_lock("rados_bucket_id"),
* Check to see if the bucket metadata is synced
*/
bool is_syncing_bucket_meta(rgw_bucket& bucket);
- void wakeup_sync_shards(set<int>& shard_ids);
+ void wakeup_meta_sync_shards(set<int>& shard_ids);
+ void wakeup_data_sync_shards(const string& source_zone, set<int>& shard_ids);
int set_bucket_owner(rgw_bucket& bucket, ACLOwner& owner);
int set_buckets_enabled(std::vector<rgw_bucket>& buckets, bool enabled);
http_ret = store->data_log->unlock(shard_id, zone_id, locker_id);
}
+void RGWOp_DATALog_Notify::execute() {
+ char *data;
+ int len = 0;
+#define LARGE_ENOUGH_BUF (128 * 1024)
+ int r = rgw_rest_read_all_input(s, &data, &len, LARGE_ENOUGH_BUF);
+ if (r < 0) {
+ http_ret = r;
+ return;
+ }
+
+ ldout(s->cct, 20) << __func__ << "(): read data: " << string(data, len) << dendl;
+
+ JSONParser p;
+ r = p.parse(data, len);
+ free(data);
+ if (r < 0) {
+ ldout(s->cct, 0) << "ERROR: failed to parse JSON" << dendl;
+ http_ret = r;
+ return;
+ }
+
+ set<int> updated_shards;
+ decode_json_obj(updated_shards, &p);
+
+ if (store->ctx()->_conf->subsys.should_gather(ceph_subsys_rgw, 20)) {
+ for (set<int>::iterator iter = updated_shards.begin(); iter != updated_shards.end(); ++iter) {
+ ldout(s->cct, 20) << __func__ << "(): updated shard=" << *iter << dendl;
+ }
+ }
+
+ store->wakeup_data_sync_shards(updated_shards);
+
+ http_ret = 0;
+}
+
void RGWOp_DATALog_Delete::execute() {
string st = s->info.args.get("start-time"),
et = s->info.args.get("end-time"),
return new RGWOp_DATALog_Lock;
else if (s->info.args.exists("unlock"))
return new RGWOp_DATALog_Unlock;
+ else if (s->info.args.exists("notify"))
+ return new RGWOp_DATALog_Notify;
}
return NULL;
}