services/svc_bucket_sync_sobj.cc
services/svc_cls.cc
services/svc_config_key_rados.cc
- services/svc_datalog_rados.cc
services/svc_mdlog.cc
services/svc_meta.cc
services/svc_meta_be.cc
#include "services/svc_sync_modules.h"
#include "services/svc_cls.h"
#include "services/svc_bilog_rados.h"
-#include "services/svc_datalog_rados.h"
#include "services/svc_mdlog.h"
#include "services/svc_meta_be_otp.h"
#include "services/svc_zone.h"
#include "services/svc_user.h"
#include "services/svc_cls.h"
#include "services/svc_bilog_rados.h"
-#include "services/svc_datalog_rados.h"
#include "include/rados/librados.hpp"
// until everything is moved from rgw_common
}
-RGWDataChangesLog::RGWDataChangesLog(RGWSI_Zone *zone_svc, RGWSI_Cls *cls_svc)
- : cct(zone_svc->ctx()), changes(cct->_conf->rgw_data_log_changes_size)
+RGWDataChangesLog::RGWDataChangesLog(CephContext* cct)
+ : cct(cct),
+ num_shards(cct->_conf->rgw_data_log_num_shards),
+ changes(cct->_conf->rgw_data_log_changes_size) {}
+
+void RGWDataChangesLog::init(RGWSI_Cls *cls_svc)
{
- svc.zone = zone_svc;
svc.cls = cls_svc;
-
- num_shards = cct->_conf->rgw_data_log_num_shards;
+ assert(svc.cls);
oids = new string[num_shards];
-
- string prefix = cct->_conf->rgw_data_log_obj_prefix;
-
- if (prefix.empty()) {
- prefix = "data_log";
- }
-
for (int i = 0; i < num_shards; i++) {
oids[i] = get_oid(i);
}
+}
- renew_thread = new ChangesRenewThread(cct, this);
- renew_thread->create("rgw_dt_lg_renew");
+int RGWDataChangesLog::start(const RGWZone* _zone)
+{
+ zone = _zone;
+ assert(zone);
+ renew_thread = make_named_thread("rgw_dt_lg_renew",
+ &RGWDataChangesLog::renew_run, this);
+ return 0;
}
int RGWDataChangesLog::choose_oid(const rgw_bucket_shard& bs) {
int RGWDataChangesLog::renew_entries()
{
- if (!svc.zone->need_to_log_data())
+ if (!zone->log_data)
return 0;
/* we can't keep the bucket name as part of the cls_log_entry, and we need
RGWDataChangesLog::~RGWDataChangesLog() {
down_flag = true;
- renew_thread->stop();
- renew_thread->join();
- delete renew_thread;
+ if (renew_thread.joinable()) {
+ renew_stop();
+ renew_thread.join();
+ }
delete[] oids;
}
-void *RGWDataChangesLog::ChangesRenewThread::entry() {
+void RGWDataChangesLog::renew_run() {
for (;;) {
dout(2) << "RGWDataChangesLog::ChangesRenewThread: start" << dendl;
- int r = log->renew_entries();
+ int r = renew_entries();
if (r < 0) {
dout(0) << "ERROR: RGWDataChangesLog::renew_entries returned error r=" << r << dendl;
}
- if (log->going_down())
+ if (going_down())
break;
int interval = cct->_conf->rgw_data_log_window * 3 / 4;
- std::unique_lock locker{lock};
- cond.wait_for(locker, std::chrono::seconds(interval));
+ std::unique_lock locker{renew_lock};
+ renew_cond.wait_for(locker, std::chrono::seconds(interval));
}
-
- return NULL;
}
-void RGWDataChangesLog::ChangesRenewThread::stop()
+void RGWDataChangesLog::renew_stop()
{
- std::lock_guard l{lock};
- cond.notify_all();
+ std::lock_guard l{renew_lock};
+ renew_cond.notify_all();
}
void RGWDataChangesLog::mark_modified(int shard_id, const rgw_bucket_shard& bs)
class RGWBucketInstanceMetadataHandler;
class RGWUserCtl;
class RGWBucketCtl;
+class RGWZone;
namespace rgw { namespace sal {
class RGWRadosStore;
class RGWDataChangesLog {
CephContext *cct;
rgw::BucketChangeObserver *observer = nullptr;
+ const RGWZone* zone;
struct Svc {
- RGWSI_Zone *zone{nullptr};
RGWSI_Cls *cls{nullptr};
} svc;
void register_renew(rgw_bucket_shard& bs);
void update_renewed(rgw_bucket_shard& bs, ceph::real_time& expiration);
- class ChangesRenewThread : public Thread {
- CephContext *cct;
- RGWDataChangesLog *log;
- ceph::mutex lock = ceph::make_mutex("ChangesRenewThread::lock");
- ceph::condition_variable cond;
-
- public:
- ChangesRenewThread(CephContext *_cct, RGWDataChangesLog *_log) : cct(_cct), log(_log) {}
- void *entry() override;
- void stop();
- };
-
- ChangesRenewThread* renew_thread;
+ ceph::mutex renew_lock = ceph::make_mutex("ChangesRenewThread::lock");
+ ceph::condition_variable renew_cond;
+ void renew_run();
+ void renew_stop();
+ std::thread renew_thread;
std::function<bool(const rgw_bucket& bucket, optional_yield y)> bucket_filter;
public:
- RGWDataChangesLog(RGWSI_Zone *zone_svc, RGWSI_Cls *cls_svc);
+ RGWDataChangesLog(CephContext* cct);
~RGWDataChangesLog();
+ void init(RGWSI_Cls *cls_svc);
+ int start(const RGWZone* _zone);
+
int choose_oid(const rgw_bucket_shard& bs);
std::string get_oid(int shard_id) const;
int add_entry(const RGWBucketInfo& bucket_info, int shard_id);
#include "services/svc_zone.h"
#include "services/svc_sync_modules.h"
-#include "services/svc_datalog_rados.h"
+#include "rgw_bucket.h"
#include "include/common_fwd.h"
#include "include/random.h"
#include "services/svc_sys_obj_cache.h"
#include "services/svc_bucket.h"
#include "services/svc_mdlog.h"
-#include "services/svc_datalog_rados.h"
#include "compressor/Compressor.h"
int RGWDataNotifier::process()
{
- auto data_log = store->svc.datalog_rados->get_log();
+ auto data_log = store->svc.datalog_rados;
if (!data_log) {
return 0;
}
#include "services/svc_zone.h"
#include "services/svc_mdlog.h"
#include "services/svc_bilog_rados.h"
-#include "services/svc_datalog_rados.h"
#include "common/errno.h"
#include "include/ceph_assert.h"
#include "services/svc_bucket_sync_sobj.h"
#include "services/svc_cls.h"
#include "services/svc_config_key_rados.h"
-#include "services/svc_datalog_rados.h"
#include "services/svc_mdlog.h"
#include "services/svc_meta.h"
#include "services/svc_meta_be.h"
bilog_rados = std::make_unique<RGWSI_BILog_RADOS>(cct);
cls = std::make_unique<RGWSI_Cls>(cct);
config_key_rados = std::make_unique<RGWSI_ConfigKey_RADOS>(cct);
- datalog_rados = std::make_unique<RGWSI_DataLog_RADOS>(cct);
+ datalog_rados = std::make_unique<RGWDataChangesLog>(cct);
mdlog = std::make_unique<RGWSI_MDLog>(cct, run_sync);
meta = std::make_unique<RGWSI_Meta>(cct);
meta_be_sobj = std::make_unique<RGWSI_MetaBackend_SObj>(cct);
bucket_sobj.get());
cls->init(zone.get(), rados.get());
config_key_rados->init(rados.get());
- datalog_rados->init(zone.get(), cls.get());
+ datalog_rados->init(cls.get());
mdlog->init(rados.get(), zone.get(), sysobj.get(), cls.get());
meta->init(sysobj.get(), mdlog.get(), meta_bes);
meta_be_sobj->init(sysobj.get(), mdlog.get());
return r;
}
+ r = datalog_rados->start(&zone->get_zone());
+ if (r < 0) {
+ ldout(cct, 0) << "ERROR: failed to start datalog service (" << cpp_strerror(-r) << dendl;
+ return r;
+ }
+
r = mdlog->start();
if (r < 0) {
ldout(cct, 0) << "ERROR: failed to start mdlog service (" << cpp_strerror(-r) << dendl;
}
if (!raw) {
- r = datalog_rados->start();
- if (r < 0) {
- ldout(cct, 0) << "ERROR: failed to start datalog_rados service (" << cpp_strerror(-r) << dendl;
- return r;
- }
-
r = meta_be_sobj->start();
if (r < 0) {
ldout(cct, 0) << "ERROR: failed to start meta_be_sobj service (" << cpp_strerror(-r) << dendl;
return;
}
- datalog_rados->shutdown();
-
sysobj->shutdown();
sysobj_core->shutdown();
notify->shutdown();
bucket->init(user.get(),
(RGWBucketMetadataHandler *)bucket_meta_handler,
(RGWBucketInstanceMetadataHandler *)bi_meta_handler,
- svc.datalog_rados->get_log());
+ svc.datalog_rados);
otp->init((RGWOTPMetadataHandler *)meta.otp.get());
class RGWSI_Cls;
class RGWSI_ConfigKey;
class RGWSI_ConfigKey_RADOS;
-class RGWSI_DataLog_RADOS;
class RGWSI_MDLog;
class RGWSI_Meta;
class RGWSI_MetaBackend;
class RGWSI_SysObj_Cache;
class RGWSI_User;
class RGWSI_User_RADOS;
+class RGWDataChangesLog;
struct RGWServices_Def
{
std::unique_ptr<RGWSI_BILog_RADOS> bilog_rados;
std::unique_ptr<RGWSI_Cls> cls;
std::unique_ptr<RGWSI_ConfigKey_RADOS> config_key_rados;
- std::unique_ptr<RGWSI_DataLog_RADOS> datalog_rados;
std::unique_ptr<RGWSI_MDLog> mdlog;
std::unique_ptr<RGWSI_Meta> meta;
std::unique_ptr<RGWSI_MetaBackend_SObj> meta_be_sobj;
std::unique_ptr<RGWSI_SysObj_Core> sysobj_core;
std::unique_ptr<RGWSI_SysObj_Cache> sysobj_cache;
std::unique_ptr<RGWSI_User_RADOS> user_rados;
+ std::unique_ptr<RGWDataChangesLog> datalog_rados;
RGWServices_Def();
~RGWServices_Def();
RGWSI_Cls *cls{nullptr};
RGWSI_ConfigKey_RADOS *config_key_rados{nullptr};
RGWSI_ConfigKey *config_key{nullptr};
- RGWSI_DataLog_RADOS *datalog_rados{nullptr};
+ RGWDataChangesLog *datalog_rados{nullptr};
RGWSI_MDLog *mdlog{nullptr};
RGWSI_Meta *meta{nullptr};
RGWSI_MetaBackend *meta_be_sobj{nullptr};
#include "rgw_bucket.h"
#include "services/svc_zone.h"
-#include "services/svc_datalog_rados.h"
#include <boost/asio/yield.hpp>
#include "svc_bi_rados.h"
#include "svc_bilog_rados.h"
-#include "svc_datalog_rados.h"
#include "svc_zone.h"
+#include "rgw/rgw_bucket.h"
#include "rgw/rgw_zone.h"
#include "cls/rgw/cls_rgw_client.h"
void RGWSI_BucketIndex_RADOS::init(RGWSI_Zone *zone_svc,
RGWSI_RADOS *rados_svc,
RGWSI_BILog_RADOS *bilog_svc,
- RGWSI_DataLog_RADOS *datalog_rados_svc)
+ RGWDataChangesLog *datalog_rados_svc)
{
svc.zone = zone_svc;
svc.rados = rados_svc;
-
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab ft=cpp
struct rgw_bucket_dir_header;
class RGWSI_BILog_RADOS;
-class RGWSI_DataLog_RADOS;
+class RGWDataChangesLog;
#define RGW_NO_SHARD -1
RGWSI_Zone *zone{nullptr};
RGWSI_RADOS *rados{nullptr};
RGWSI_BILog_RADOS *bilog{nullptr};
- RGWSI_DataLog_RADOS *datalog_rados{nullptr};
+ RGWDataChangesLog *datalog_rados{nullptr};
} svc;
RGWSI_BucketIndex_RADOS(CephContext *cct);
void init(RGWSI_Zone *zone_svc,
RGWSI_RADOS *rados_svc,
RGWSI_BILog_RADOS *bilog_svc,
- RGWSI_DataLog_RADOS *datalog_rados_svc);
+ RGWDataChangesLog *datalog_rados_svc);
static int shards_max() {
return RGW_SHARDS_PRIME_1;