if (mc.get_monmap_privately() < 0)
return -1;
- int err = OSD::mkfs(g_conf->osd_data, g_conf->osd_journal, mc.monmap.fsid, whoami);
+ int err = OSD::mkfs(g_ceph_context, g_conf->osd_data, g_conf->osd_journal, mc.monmap.fsid, whoami);
if (err < 0) {
derr << TEXT_RED << " ** ERROR: error creating empty object store in "
<< g_conf->osd_data << ": " << cpp_strerror(-err) << TEXT_NORMAL << dendl;
exit(0);
if (mkjournal) {
common_init_finish(g_ceph_context);
- int err = OSD::mkjournal(g_conf->osd_data, g_conf->osd_journal);
+ int err = OSD::mkjournal(g_ceph_context, g_conf->osd_data, g_conf->osd_journal);
if (err < 0) {
derr << TEXT_RED << " ** ERROR: error creating fresh journal " << g_conf->osd_journal
<< " for object store " << g_conf->osd_data
}
if (flushjournal) {
common_init_finish(g_ceph_context);
- int err = OSD::flushjournal(g_conf->osd_data, g_conf->osd_journal);
+ int err = OSD::flushjournal(g_ceph_context, g_conf->osd_data, g_conf->osd_journal);
if (err < 0) {
derr << TEXT_RED << " ** ERROR: error flushing journal " << g_conf->osd_journal
<< " for object store " << g_conf->osd_data
}
if (dump_journal) {
common_init_finish(g_ceph_context);
- int err = OSD::dump_journal(g_conf->osd_data, g_conf->osd_journal, cout);
+ int err = OSD::dump_journal(g_ceph_context, g_conf->osd_data, g_conf->osd_journal, cout);
if (err < 0) {
derr << TEXT_RED << " ** ERROR: error dumping journal " << g_conf->osd_journal
<< " for object store " << g_conf->osd_data
return -1;
global_init_chdir(g_ceph_context);
- osd = new OSD(whoami,
+ osd = new OSD(g_ceph_context, whoami,
ms_cluster,
ms_public,
ms_hbclient,
#include "common/Clock.h"
#include "common/debug.h"
-#include "global/global_context.h"
// ick
#include <sys/types.h>
bl.push_back(bp);
uint64_t wrote = 0;
while (1) {
- if (ceph_clock_now(g_ceph_context) > until) break;
+ if (ceph_clock_now(cct) > until) break;
struct statfs st;
store->statfs(&st);
srand(0);
- utime_t start = ceph_clock_now(g_ceph_context);
+ utime_t start = ceph_clock_now(cct);
utime_t until = start;
until.sec_ref() += time;
uint64_t wrote = 0;
for (int c=1; c<=count; c++) {
- if (ceph_clock_now(g_ceph_context) > until) break;
+ if (ceph_clock_now(cct) > until) break;
//if (c == 7) start_debug = true;
// dump freelist?
/*
- if (ceph_clock_now(g_ceph_context) > nextfl) {
+ if (ceph_clock_now(cct) > nextfl) {
elapsed += freelist_inc;
save_freelist(elapsed);
nextfl.sec_ref() += freelist_inc;
#include "include/Distribution.h"
#include "os/ObjectStore.h"
#include "common/Clock.h"
+#include "common/ceph_context.h"
#include <list>
#include <vector>
using namespace std;
class Ager {
+ CephContext *cct;
ObjectStore *store;
private:
file_object_t age_get_oid();
public:
- Ager(ObjectStore *s) : store(s), did_distn(false) {}
+ Ager(CephContext *cct_, ObjectStore *s) : cct(cct_), store(s), did_distn(false) {}
void age(int time,
float high_water, // fill to this %
int ClassHandler::open_all_classes()
{
dout(10) << __func__ << dendl;
- DIR *dir = ::opendir(g_conf->osd_class_dir.c_str());
+ DIR *dir = ::opendir(cct->_conf->osd_class_dir.c_str());
if (!dir)
return -errno;
cls->status == ClassData::CLASS_MISSING) {
char fname[PATH_MAX];
snprintf(fname, sizeof(fname), "%s/" CLS_PREFIX "%s" CLS_SUFFIX,
- g_conf->osd_class_dir.c_str(),
+ cct->_conf->osd_class_dir.c_str(),
cls->name.c_str());
dout(10) << "_load_class " << cls->name << " from " << fname << dendl;
#include "common/Cond.h"
#include "common/Mutex.h"
+#include "common/ceph_context.h"
class ClassHandler
{
public:
+ CephContext *cct;
+
struct ClassData;
struct ClassMethod {
int _load_class(ClassData *cls);
public:
- ClassHandler() : mutex("ClassHandler") {}
+ ClassHandler(CephContext *cct_) : cct(cct_), mutex("ClassHandler") {}
int open_all_classes();
watch_timer(osd->client_messenger->cct, watch_lock),
next_notif_id(0),
backfill_request_lock("OSD::backfill_request_lock"),
- backfill_request_timer(g_ceph_context, backfill_request_lock, false),
+ backfill_request_timer(osd->cct, backfill_request_lock, false),
last_tid(0),
tid_lock("OSDService::tid_lock"),
- reserver_finisher(g_ceph_context),
- local_reserver(&reserver_finisher, g_conf->osd_max_backfills),
- remote_reserver(&reserver_finisher, g_conf->osd_max_backfills),
+ reserver_finisher(osd->cct),
+ local_reserver(&reserver_finisher, osd->cct->_conf->osd_max_backfills),
+ remote_reserver(&reserver_finisher, osd->cct->_conf->osd_max_backfills),
pg_temp_lock("OSDService::pg_temp_lock"),
map_cache_lock("OSDService::map_lock"),
- map_cache(g_conf->osd_map_cache_size),
- map_bl_cache(g_conf->osd_map_cache_size),
- map_bl_inc_cache(g_conf->osd_map_cache_size),
+ map_cache(osd->cct->_conf->osd_map_cache_size),
+ map_bl_cache(osd->cct->_conf->osd_map_cache_size),
+ map_bl_inc_cache(osd->cct->_conf->osd_map_cache_size),
in_progress_split_lock("OSDService::in_progress_split_lock"),
full_status_lock("OSDService::full_status_lock"),
cur_state(NONE),
watch_timer.init();
}
-ObjectStore *OSD::create_object_store(const std::string &dev, const std::string &jdev)
+ObjectStore *OSD::create_object_store(CephContext *cct, const std::string &dev, const std::string &jdev)
{
struct stat st;
if (::stat(dev.c_str(), &st) != 0)
return 0;
- if (g_conf->filestore)
+ if (cct->_conf->filestore)
return new FileStore(dev, jdev);
if (S_ISDIR(st.st_mode))
return r;
}
-int OSD::mkfs(const std::string &dev, const std::string &jdev, uuid_d fsid, int whoami)
+int OSD::mkfs(CephContext *cct, const std::string &dev, const std::string &jdev, uuid_d fsid, int whoami)
{
int ret;
ObjectStore *store = NULL;
try {
- store = create_object_store(dev, jdev);
+ store = create_object_store(cct, dev, jdev);
if (!store) {
ret = -ENOENT;
goto out;
}
// if we are fed a uuid for this osd, use it.
- store->set_fsid(g_conf->osd_uuid);
+ store->set_fsid(cct->_conf->osd_uuid);
ret = store->mkfs();
if (ret) {
}
// age?
- if (g_conf->osd_age_time != 0) {
- if (g_conf->osd_age_time >= 0) {
- dout(0) << "aging..." << dendl;
- Ager ager(store);
- ager.age(g_conf->osd_age_time,
- g_conf->osd_age,
- g_conf->osd_age - .05,
- 50000,
- g_conf->osd_age - .05);
+ if (cct->_conf->osd_age_time != 0) {
+ if (cct->_conf->osd_age_time >= 0) {
+ dout(0) << "aging..." << dendl;
+ Ager ager(cct, store);
+ ager.age(cct->_conf->osd_age_time,
+ cct->_conf->osd_age,
+ cct->_conf->osd_age - .05,
+ 50000,
+ cct->_conf->osd_age - .05);
}
}
sb.compat_features = get_osd_compat_set();
// benchmark?
- if (g_conf->osd_auto_weight) {
+ if (cct->_conf->osd_auto_weight) {
bufferlist bl;
bufferptr bp(1048576);
bp.zero();
bl.push_back(bp);
dout(0) << "testing disk bandwidth..." << dendl;
- utime_t start = ceph_clock_now(g_ceph_context);
+ utime_t start = ceph_clock_now(cct);
object_t oid("disk_bw_test");
for (int i=0; i<1000; i++) {
ObjectStore::Transaction *t = new ObjectStore::Transaction;
store->queue_transaction(NULL, t);
}
store->sync();
- utime_t end = ceph_clock_now(g_ceph_context);
+ utime_t end = ceph_clock_now(cct);
end -= start;
dout(0) << "measured " << (1000.0 / (double)end) << " mb/sec" << dendl;
ObjectStore::Transaction tr;
return ret;
}
-int OSD::mkjournal(const std::string &dev, const std::string &jdev)
+int OSD::mkjournal(CephContext *cct, const std::string &dev, const std::string &jdev)
{
- ObjectStore *store = create_object_store(dev, jdev);
+ ObjectStore *store = create_object_store(cct, dev, jdev);
if (!store)
return -ENOENT;
return store->mkjournal();
}
-int OSD::flushjournal(const std::string &dev, const std::string &jdev)
+int OSD::flushjournal(CephContext *cct, const std::string &dev, const std::string &jdev)
{
- ObjectStore *store = create_object_store(dev, jdev);
+ ObjectStore *store = create_object_store(cct, dev, jdev);
if (!store)
return -ENOENT;
int err = store->mount();
return err;
}
-int OSD::dump_journal(const std::string &dev, const std::string &jdev, ostream& out)
+int OSD::dump_journal(CephContext *cct, const std::string &dev, const std::string &jdev, ostream& out)
{
- ObjectStore *store = create_object_store(dev, jdev);
+ ObjectStore *store = create_object_store(cct, dev, jdev);
if (!store)
return -ENOENT;
int err = store->dump_journal(out);
// cons/des
-OSD::OSD(int id, Messenger *internal_messenger, Messenger *external_messenger,
+OSD::OSD(CephContext *cct_, int id, Messenger *internal_messenger, Messenger *external_messenger,
Messenger *hb_clientm,
Messenger *hb_front_serverm,
Messenger *hb_back_serverm,
Messenger *osdc_messenger,
MonClient *mc,
const std::string &dev, const std::string &jdev) :
- Dispatcher(external_messenger->cct),
+ Dispatcher(cct_),
osd_lock("OSD::osd_lock"),
- tick_timer(external_messenger->cct, osd_lock),
- authorize_handler_cluster_registry(new AuthAuthorizeHandlerRegistry(external_messenger->cct,
+ tick_timer(cct, osd_lock),
+ authorize_handler_cluster_registry(new AuthAuthorizeHandlerRegistry(cct,
cct->_conf->auth_supported.length() ?
cct->_conf->auth_supported :
cct->_conf->auth_cluster_required)),
- authorize_handler_service_registry(new AuthAuthorizeHandlerRegistry(external_messenger->cct,
+ authorize_handler_service_registry(new AuthAuthorizeHandlerRegistry(cct,
cct->_conf->auth_supported.length() ?
cct->_conf->auth_supported :
cct->_conf->auth_service_required)),
logger(NULL),
recoverystate_perf(NULL),
store(NULL),
- clog(external_messenger->cct, client_messenger, &mc->monmap, LogClient::NO_FLAGS),
+ clog(cct, client_messenger, &mc->monmap, LogClient::NO_FLAGS),
whoami(id),
dev_path(dev), journal_path(jdev),
dispatch_running(false),
asok_hook(NULL),
osd_compat(get_osd_compat_set()),
state(STATE_INITIALIZING), boot_epoch(0), up_epoch(0), bind_epoch(0),
- op_tp(external_messenger->cct, "OSD::op_tp", g_conf->osd_op_threads, "osd_op_threads"),
- recovery_tp(external_messenger->cct, "OSD::recovery_tp", g_conf->osd_recovery_threads, "osd_recovery_threads"),
- disk_tp(external_messenger->cct, "OSD::disk_tp", g_conf->osd_disk_threads, "osd_disk_threads"),
- command_tp(external_messenger->cct, "OSD::command_tp", 1),
+ op_tp(cct, "OSD::op_tp", cct->_conf->osd_op_threads, "osd_op_threads"),
+ recovery_tp(cct, "OSD::recovery_tp", cct->_conf->osd_recovery_threads, "osd_recovery_threads"),
+ disk_tp(cct, "OSD::disk_tp", cct->_conf->osd_disk_threads, "osd_disk_threads"),
+ command_tp(cct, "OSD::command_tp", 1),
paused_recovery(false),
heartbeat_lock("OSD::heartbeat_lock"),
heartbeat_stop(false), heartbeat_need_update(true), heartbeat_epoch(0),
heartbeat_dispatcher(this),
stat_lock("OSD::stat_lock"),
finished_lock("OSD::finished_lock"),
+ op_tracker(cct),
test_ops_hook(NULL),
- op_wq(this, g_conf->osd_op_thread_timeout, &op_tp),
- peering_wq(this, g_conf->osd_op_thread_timeout, &op_tp),
+ op_wq(this, cct->_conf->osd_op_thread_timeout, &op_tp),
+ peering_wq(this, cct->_conf->osd_op_thread_timeout, &op_tp),
map_lock("OSD::map_lock"),
peer_map_epoch_lock("OSD::peer_map_epoch_lock"),
- debug_drop_pg_create_probability(g_conf->osd_debug_drop_pg_create_probability),
- debug_drop_pg_create_duration(g_conf->osd_debug_drop_pg_create_duration),
+ debug_drop_pg_create_probability(cct->_conf->osd_debug_drop_pg_create_probability),
+ debug_drop_pg_create_duration(cct->_conf->osd_debug_drop_pg_create_duration),
debug_drop_pg_create_left(-1),
outstanding_pg_stats(false),
up_thru_wanted(0), up_thru_pending(0),
pg_stat_queue_lock("OSD::pg_stat_queue_lock"),
osd_stat_updated(false),
pg_stat_tid(0), pg_stat_tid_flushed(0),
- command_wq(this, g_conf->osd_command_thread_timeout, &command_tp),
+ command_wq(this, cct->_conf->osd_command_thread_timeout, &command_tp),
recovery_ops_active(0),
- recovery_wq(this, g_conf->osd_recovery_thread_timeout, &recovery_tp),
+ recovery_wq(this, cct->_conf->osd_recovery_thread_timeout, &recovery_tp),
replay_queue_lock("OSD::replay_queue_lock"),
- snap_trim_wq(this, g_conf->osd_snap_trim_thread_timeout, &disk_tp),
- scrub_wq(this, g_conf->osd_scrub_thread_timeout, &disk_tp),
- scrub_finalize_wq(this, g_conf->osd_scrub_finalize_thread_timeout, &op_tp),
- rep_scrub_wq(this, g_conf->osd_scrub_thread_timeout, &disk_tp),
- remove_wq(store, g_conf->osd_remove_thread_timeout, &disk_tp),
+ snap_trim_wq(this, cct->_conf->osd_snap_trim_thread_timeout, &disk_tp),
+ scrub_wq(this, cct->_conf->osd_scrub_thread_timeout, &disk_tp),
+ scrub_finalize_wq(this, cct->_conf->osd_scrub_finalize_thread_timeout, &op_tp),
+ rep_scrub_wq(this, cct->_conf->osd_scrub_thread_timeout, &disk_tp),
+ remove_wq(store, cct->_conf->osd_remove_thread_timeout, &disk_tp),
next_removal_seq(0),
service(this)
{
delete authorize_handler_cluster_registry;
delete authorize_handler_service_registry;
delete class_handler;
- g_ceph_context->get_perfcounters_collection()->remove(recoverystate_perf);
- g_ceph_context->get_perfcounters_collection()->remove(logger);
+ cct->get_perfcounters_collection()->remove(recoverystate_perf);
+ cct->get_perfcounters_collection()->remove(logger);
delete recoverystate_perf;
delete logger;
delete store;
return 0;
assert(!store);
- store = create_object_store(dev_path, journal_path);
+ store = create_object_store(cct, dev_path, journal_path);
if (!store) {
derr << "OSD::pre_init: unable to create object store" << dendl;
return -ENODEV;
return -EBUSY;
}
- g_conf->add_observer(this);
+ cct->_conf->add_observer(this);
return 0;
}
return r;
}
- class_handler = new ClassHandler();
+ class_handler = new ClassHandler(cct);
cls_initialize(class_handler);
- if (g_conf->osd_open_classes_on_start) {
+ if (cct->_conf->osd_open_classes_on_start) {
int r = class_handler->open_all_classes();
if (r)
dout(1) << "warning: got an error loading one or more classes: " << cpp_strerror(r) << dendl;
heartbeat_thread.create();
// tick
- tick_timer.add_event_after(g_conf->osd_heartbeat_interval, new C_Tick(this));
+ tick_timer.add_event_after(cct->_conf->osd_heartbeat_interval, new C_Tick(this));
service.init();
service.publish_map(osdmap);
{
dout(10) << "create_logger" << dendl;
- PerfCountersBuilder osd_plb(g_ceph_context, "osd", l_osd_first, l_osd_last);
+ PerfCountersBuilder osd_plb(cct, "osd", l_osd_first, l_osd_last);
osd_plb.add_u64(l_osd_opq, "opq"); // op queue length (waiting to be processed yet)
osd_plb.add_u64(l_osd_op_wip, "op_wip"); // rep ops currently being processed (primary)
"messages_delayed_for_map"); // dup osdmap epochs
logger = osd_plb.create_perf_counters();
- g_ceph_context->get_perfcounters_collection()->add(logger);
+ cct->get_perfcounters_collection()->add(logger);
}
void OSD::create_recoverystate_perf()
{
dout(10) << "create_recoverystate_perf" << dendl;
- PerfCountersBuilder rs_perf(g_ceph_context, "recoverystate_perf", rs_first, rs_last);
+ PerfCountersBuilder rs_perf(cct, "recoverystate_perf", rs_first, rs_last);
rs_perf.add_time_avg(rs_initial_latency, "initial_latency");
rs_perf.add_time_avg(rs_started_latency, "started_latency");
rs_perf.add_time_avg(rs_waitupthru_latency, "waitupthru_latency");
recoverystate_perf = rs_perf.create_perf_counters();
- g_ceph_context->get_perfcounters_collection()->add(recoverystate_perf);
+ cct->get_perfcounters_collection()->add(recoverystate_perf);
}
void OSD::suicide(int exitcode)
{
- if (g_conf->filestore_blackhole) {
+ if (cct->_conf->filestore_blackhole) {
derr << " filestore_blackhole=true, doing abbreviated shutdown" << dendl;
_exit(exitcode);
}
heartbeat_lock.Unlock();
// Debugging
- g_ceph_context->_conf->set_val("debug_osd", "100");
- g_ceph_context->_conf->set_val("debug_journal", "100");
- g_ceph_context->_conf->set_val("debug_filestore", "100");
- g_ceph_context->_conf->set_val("debug_ms", "100");
- g_ceph_context->_conf->apply_changes(NULL);
+ cct->_conf->set_val("debug_osd", "100");
+ cct->_conf->set_val("debug_journal", "100");
+ cct->_conf->set_val("debug_filestore", "100");
+ cct->_conf->set_val("debug_ms", "100");
+ cct->_conf->apply_changes(NULL);
// Shutdown PGs
for (hash_map<pg_t, PG*>::iterator p = pg_map.begin();
#ifdef PG_DEBUG_REFS
service.dump_live_pgids();
#endif
- g_conf->remove_observer(this);
+ cct->_conf->remove_observer(this);
monc->shutdown();
osd_lock.Unlock();
pg->unlock();
// don't let the transaction get too big
- if (++num >= g_conf->osd_target_transaction_size) {
+ if (++num >= cct->_conf->osd_target_transaction_size) {
store->apply_transaction(t);
t = ObjectStore::Transaction();
num = 0;
float OSDService::get_full_ratio()
{
- float full_ratio = g_conf->osd_failsafe_full_ratio;
+ float full_ratio = osd->cct->_conf->osd_failsafe_full_ratio;
if (full_ratio > 1.0) full_ratio /= 100.0;
return full_ratio;
}
float OSDService::get_nearfull_ratio()
{
- float nearfull_ratio = g_conf->osd_failsafe_nearfull_ratio;
+ float nearfull_ratio = osd->cct->_conf->osd_failsafe_nearfull_ratio;
if (nearfull_ratio > 1.0) nearfull_ratio /= 100.0;
return nearfull_ratio;
}
if (cur_state != new_state) {
cur_state = new_state;
- } else if (now - last_msg < g_conf->osd_op_complaint_time) {
+ } else if (now - last_msg < osd->cct->_conf->osd_op_complaint_time) {
return;
}
last_msg = now;
{
Mutex::Locker l(full_status_lock);
double max_ratio;
- max_ratio = g_conf->osd_backfill_full_ratio;
+ max_ratio = osd->cct->_conf->osd_backfill_full_ratio;
if (_ratio)
*_ratio = cur_ratio;
if (_max_ratio)
assert(osd_lock.is_locked());
if (is_waiting_for_healthy()) {
- utime_t now = ceph_clock_now(g_ceph_context);
+ utime_t now = ceph_clock_now(cct);
if (last_heartbeat_resample == utime_t()) {
last_heartbeat_resample = now;
heartbeat_need_update = true;
} else if (!heartbeat_need_update) {
utime_t dur = now - last_heartbeat_resample;
- if (dur > g_conf->osd_heartbeat_grace) {
+ if (dur > cct->_conf->osd_heartbeat_grace) {
dout(10) << "maybe_update_heartbeat_peers forcing update after " << dur << " seconds" << dendl;
heartbeat_need_update = true;
last_heartbeat_resample = now;
// too few?
int start = osdmap->get_next_up_osd_after(whoami);
for (int n = start; n >= 0; ) {
- if ((int)heartbeat_peers.size() >= g_conf->osd_heartbeat_min_peers)
+ if ((int)heartbeat_peers.size() >= cct->_conf->osd_heartbeat_min_peers)
break;
if (!extras.count(n) && !want.count(n) && n != whoami) {
dout(10) << " adding random peer osd." << n << dendl;
// too many?
for (set<int>::iterator p = extras.begin();
- (int)heartbeat_peers.size() > g_conf->osd_heartbeat_min_peers && p != extras.end();
+ (int)heartbeat_peers.size() > cct->_conf->osd_heartbeat_min_peers && p != extras.end();
++p) {
if (want.count(*p))
continue;
case MOSDPing::PING:
{
- if (g_conf->osd_debug_drop_ping_probability > 0) {
+ if (cct->_conf->osd_debug_drop_ping_probability > 0) {
if (debug_heartbeat_drops_remaining.count(from)) {
if (debug_heartbeat_drops_remaining[from] == 0) {
debug_heartbeat_drops_remaining.erase(from);
<< " remaining to drop" << dendl;
break;
}
- } else if (g_conf->osd_debug_drop_ping_probability >
+ } else if (cct->_conf->osd_debug_drop_ping_probability >
((((double)(rand()%100))/100.0))) {
debug_heartbeat_drops_remaining[from] =
- g_conf->osd_debug_drop_ping_duration;
+ cct->_conf->osd_debug_drop_ping_duration;
dout(5) << "Dropping heartbeat from " << from
<< ", " << debug_heartbeat_drops_remaining[from]
<< " remaining to drop" << dendl;
}
}
- if (!g_ceph_context->get_heartbeat_map()->is_healthy()) {
+ if (!cct->get_heartbeat_map()->is_healthy()) {
dout(10) << "internal heartbeat not healthy, dropping ping request" << dendl;
break;
}
}
}
- utime_t cutoff = ceph_clock_now(g_ceph_context);
- cutoff -= g_conf->osd_heartbeat_grace;
+ utime_t cutoff = ceph_clock_now(cct);
+ cutoff -= cct->_conf->osd_heartbeat_grace;
if (i->second.is_healthy(cutoff)) {
// Cancel false reports
if (failure_queue.count(from)) {
while (!heartbeat_stop) {
heartbeat();
- double wait = .5 + ((float)(rand() % 10)/10.0) * (float)g_conf->osd_heartbeat_interval;
+ double wait = .5 + ((float)(rand() % 10)/10.0) * (float)cct->_conf->osd_heartbeat_interval;
utime_t w;
w.set_from_double(wait);
dout(30) << "heartbeat_entry sleeping for " << wait << dendl;
- heartbeat_cond.WaitInterval(g_ceph_context, heartbeat_lock, w);
+ heartbeat_cond.WaitInterval(cct, heartbeat_lock, w);
if (is_stopping())
return;
dout(30) << "heartbeat_entry woke up" << dendl;
void OSD::heartbeat_check()
{
assert(heartbeat_lock.is_locked());
- utime_t now = ceph_clock_now(g_ceph_context);
+ utime_t now = ceph_clock_now(cct);
double age = hbclient_messenger->get_dispatch_queue_max_age(now);
- if (age > (g_conf->osd_heartbeat_grace / 2)) {
+ if (age > (cct->_conf->osd_heartbeat_grace / 2)) {
derr << "skipping heartbeat_check, hbqueue max age: " << age << dendl;
return; // hb dispatch is too backed up for our hb status to be meaningful
}
// check for incoming heartbeats (move me elsewhere?)
utime_t cutoff = now;
- cutoff -= g_conf->osd_heartbeat_grace;
+ cutoff -= cct->_conf->osd_heartbeat_grace;
for (map<int,HeartbeatInfo>::iterator p = heartbeat_peers.begin();
p != heartbeat_peers.end();
++p) {
dout(5) << "heartbeat: " << osd_stat << dendl;
- utime_t now = ceph_clock_now(g_ceph_context);
+ utime_t now = ceph_clock_now(cct);
// send heartbeats
for (map<int,HeartbeatInfo>::iterator i = heartbeat_peers.begin();
// hmm.. am i all alone?
dout(30) << "heartbeat lonely?" << dendl;
if (heartbeat_peers.empty()) {
- if (now - last_mon_heartbeat > g_conf->osd_mon_heartbeat_interval && is_active()) {
+ if (now - last_mon_heartbeat > cct->_conf->osd_mon_heartbeat_interval && is_active()) {
last_mon_heartbeat = now;
dout(10) << "i have no heartbeat peers; checking mon for new map" << dendl;
monc->sub_want("osdmap", osdmap->get_epoch() + 1, CEPH_SUBSCRIBE_ONETIME);
heartbeat_lock.Unlock();
// mon report?
- utime_t now = ceph_clock_now(g_ceph_context);
+ utime_t now = ceph_clock_now(cct);
if (outstanding_pg_stats &&
- (now - g_conf->osd_mon_ack_timeout) > last_pg_stats_ack) {
+ (now - cct->_conf->osd_mon_ack_timeout) > last_pg_stats_ack) {
dout(1) << "mon hasn't acked PGStats in " << now - last_pg_stats_ack
<< " seconds, reconnecting elsewhere" << dendl;
monc->reopen_session();
- last_pg_stats_ack = ceph_clock_now(g_ceph_context); // reset clock
+ last_pg_stats_ack = ceph_clock_now(cct); // reset clock
last_pg_stats_sent = utime_t();
}
- if (now - last_pg_stats_sent > g_conf->osd_mon_report_interval_max) {
+ if (now - last_pg_stats_sent > cct->_conf->osd_mon_report_interval_max) {
osd_stat_updated = true;
do_mon_report();
- } else if (now - last_mon_report > g_conf->osd_mon_report_interval_min) {
+ } else if (now - last_mon_report > cct->_conf->osd_mon_report_interval_min) {
do_mon_report();
}
string poolstr;
- cmd_getval(g_ceph_context, cmdmap, "pool", poolstr);
+ cmd_getval(service->osd->cct, cmdmap, "pool", poolstr);
pool = curmap->const_lookup_pg_pool_name(poolstr.c_str());
//If we can't find it by name then maybe id specified
if (pool < 0 && isdigit(poolstr[0]))
}
r = -1;
string objname, nspace;
- cmd_getval(g_ceph_context, cmdmap, "objname", objname);
+ cmd_getval(service->osd->cct, cmdmap, "objname", objname);
std::size_t found = objname.find_first_of('/');
if (found != string::npos) {
nspace = objname.substr(0, found);
map<string, bufferlist> newattrs;
bufferlist val;
string key, valstr;
- cmd_getval(g_ceph_context, cmdmap, "key", key);
- cmd_getval(g_ceph_context, cmdmap, "val", valstr);
+ cmd_getval(service->osd->cct, cmdmap, "key", key);
+ cmd_getval(service->osd->cct, cmdmap, "val", valstr);
val.append(valstr);
newattrs[key] = val;
} else if (command == "rmomapkey") {
string key;
set<string> keys;
- cmd_getval(g_ceph_context, cmdmap, "key", key);
+ cmd_getval(service->osd->cct, cmdmap, "key", key);
keys.insert(key);
t.omap_rmkeys(coll_t(pgid), obj, keys);
bufferlist newheader;
string headerstr;
- cmd_getval(g_ceph_context, cmdmap, "header", headerstr);
+ cmd_getval(service->osd->cct, cmdmap, "header", headerstr);
newheader.append(headerstr);
t.omap_setheader(coll_t(pgid), obj, newheader);
r = store->apply_transaction(t);
}
} else if (command == "truncobj") {
int64_t trunclen;
- cmd_getval(g_ceph_context, cmdmap, "len", trunclen);
+ cmd_getval(service->osd->cct, cmdmap, "len", trunclen);
t.truncate(coll_t(pgid), obj, trunclen);
r = store->apply_transaction(t);
if (r < 0)
// =========================================
bool remove_dir(
+ CephContext *cct,
ObjectStore *store, SnapMapper *mapper,
OSDriver *osdriver,
ObjectStore::Sequencer *osr,
assert(0);
}
t->remove(coll, *i);
- if (num >= g_conf->osd_target_transaction_size) {
+ if (num >= cct->_conf->osd_target_transaction_size) {
C_SaferCond waiter;
store->queue_transaction(osr, t, &waiter);
bool cont = dstate->pause_clearing();
if (pg->have_temp_coll()) {
bool cont = remove_dir(
- store, &mapper, &driver, pg->osr.get(), pg->get_temp_coll(), item.second);
+ pg->osd->osd->cct, store, &mapper, &driver, pg->osr.get(), pg->get_temp_coll(), item.second);
if (!cont)
return;
}
bool cont = remove_dir(
- store, &mapper, &driver, pg->osr.get(), coll, item.second);
+ pg->osd->osd->cct, store, &mapper, &driver, pg->osr.get(), coll, item.second);
if (!cont)
return;
{
dout(7) << "do_mon_report" << dendl;
- utime_t now(ceph_clock_now(g_ceph_context));
+ utime_t now(ceph_clock_now(cct));
last_mon_report = now;
// do any pending reports
send_alive();
service.send_pg_temp();
send_failures();
- send_pg_stats(ceph_clock_now(g_ceph_context));
+ send_pg_stats(ceph_clock_now(cct));
monc->sub_want("osd_pg_creates", 0, CEPH_SUBSCRIBE_ONETIME);
monc->renew_subs();
// send pings sooner rather than later
heartbeat_kick();
} else if (osdmap->get_epoch() >= oldest - 1 &&
- osdmap->get_epoch() + g_conf->osd_map_message_max > newest) {
+ osdmap->get_epoch() + cct->_conf->osd_map_message_max > newest) {
_send_boot();
return;
}
bool OSD::_is_healthy()
{
- if (!g_ceph_context->get_heartbeat_map()->is_healthy()) {
+ if (!cct->get_heartbeat_map()->is_healthy()) {
dout(1) << "is_healthy false -- internal heartbeat failed" << dendl;
return false;
}
if (is_waiting_for_healthy()) {
Mutex::Locker l(heartbeat_lock);
- utime_t cutoff = ceph_clock_now(g_ceph_context);
- cutoff -= g_conf->osd_heartbeat_grace;
+ utime_t cutoff = ceph_clock_now(cct);
+ cutoff -= cct->_conf->osd_heartbeat_grace;
int num = 0, up = 0;
for (map<int,HeartbeatInfo>::iterator p = heartbeat_peers.begin();
p != heartbeat_peers.end();
++up;
++num;
}
- if ((float)up < (float)num * g_conf->osd_heartbeat_min_healthy_ratio) {
+ if ((float)up < (float)num * cct->_conf->osd_heartbeat_min_healthy_ratio) {
dout(1) << "is_healthy false -- only " << up << "/" << num << " up peers (less than 1/3)" << dendl;
return false;
}
up_thru_wanted = want;
// expedite, a bit. WARNING this will somewhat delay other mon queries.
- last_mon_report = ceph_clock_now(g_ceph_context);
+ last_mon_report = ceph_clock_now(cct);
send_alive();
} else {
dout(10) << "queue_want_up_thru want " << want << " <= queued " << up_thru_wanted
heartbeat_lock.Lock();
locked = true;
}
- utime_t now = ceph_clock_now(g_ceph_context);
+ utime_t now = ceph_clock_now(cct);
while (!failure_queue.empty()) {
int osd = failure_queue.begin()->first;
int failed_for = (int)(double)(now - failure_queue.begin()->second);
if (!outstanding_pg_stats) {
outstanding_pg_stats = true;
- last_pg_stats_ack = ceph_clock_now(g_ceph_context);
+ last_pg_stats_ack = ceph_clock_now(cct);
}
monc->send_mon_message(m);
}
return;
}
- last_pg_stats_ack = ceph_clock_now(g_ceph_context);
+ last_pg_stats_ack = ceph_clock_now(cct);
pg_stat_queue_lock.Lock();
goto out;
}
- cmd_getval(g_ceph_context, cmdmap, "prefix", prefix);
+ cmd_getval(cct, cmdmap, "prefix", prefix);
if (prefix == "get_command_descriptions") {
int cmdnum = 0;
goto out;
}
- cmd_getval(g_ceph_context, cmdmap, "format", format);
+ cmd_getval(cct, cmdmap, "format", format);
f.reset(new_formatter(format));
if (prefix == "version") {
}
else if (prefix == "injectargs") {
vector<string> argsvec;
- cmd_getval(g_ceph_context, cmdmap, "injected_args", argsvec);
+ cmd_getval(cct, cmdmap, "injected_args", argsvec);
if (argsvec.empty()) {
r = -EINVAL;
for (vector<string>::iterator a = ++argsvec.begin(); a != argsvec.end(); ++a)
args += " " + *a;
osd_lock.Unlock();
- g_conf->injectargs(args, &ss);
+ cct->_conf->injectargs(args, &ss);
osd_lock.Lock();
}
// 'tell <pgid>' (which comes in without any of that prefix)?
else if (prefix == "pg" ||
- (cmd_getval(g_ceph_context, cmdmap, "pgid", pgidstr) &&
+ (cmd_getval(cct, cmdmap, "pgid", pgidstr) &&
(prefix == "query" ||
prefix == "mark_unfound_lost" ||
prefix == "list_missing")
)) {
pg_t pgid;
- if (!cmd_getval(g_ceph_context, cmdmap, "pgid", pgidstr)) {
+ if (!cmd_getval(cct, cmdmap, "pgid", pgidstr)) {
ss << "no pgid specified";
r = -EINVAL;
} else if (!pgid.parse(pgidstr.c_str())) {
} else {
// simulate pg <pgid> cmd= for pg->do-command
if (prefix != "pg")
- cmd_putval(g_ceph_context, cmdmap, "cmd", prefix);
+ cmd_putval(cct, cmdmap, "cmd", prefix);
r = pg->do_command(cmdmap, ss, data, odata);
pg->unlock();
}
int64_t count;
int64_t bsize;
// default count 1G, size 4MB
- cmd_getval(g_ceph_context, cmdmap, "count", count, (int64_t)1 << 30);
- cmd_getval(g_ceph_context, cmdmap, "bsize", bsize, (int64_t)4 << 20);
+ cmd_getval(cct, cmdmap, "count", count, (int64_t)1 << 30);
+ cmd_getval(cct, cmdmap, "bsize", bsize, (int64_t)4 << 20);
bufferlist bl;
bufferptr bp(bsize);
ObjectStore::Transaction *cleanupt = new ObjectStore::Transaction;
store->sync_and_flush();
- utime_t start = ceph_clock_now(g_ceph_context);
+ utime_t start = ceph_clock_now(cct);
for (int64_t pos = 0; pos < count; pos += bsize) {
char nm[30];
snprintf(nm, sizeof(nm), "disk_bw_test_%lld", (long long)pos);
cleanupt->remove(coll_t::META_COLL, soid);
}
store->sync_and_flush();
- utime_t end = ceph_clock_now(g_ceph_context);
+ utime_t end = ceph_clock_now(cct);
// clean up
store->queue_transaction(NULL, cleanupt);
ss << "could not issue heap profiler command -- not using tcmalloc!";
} else {
string heapcmd;
- cmd_getval(g_ceph_context, cmdmap, "heapcmd", heapcmd);
+ cmd_getval(cct, cmdmap, "heapcmd", heapcmd);
// XXX 1-element vector, change at callee or make vector here?
vector<string> heapcmd_vec;
get_str_vec(heapcmd, heapcmd_vec);
else if (prefix == "debug dump_missing") {
string file_name;
- cmd_getval(g_ceph_context, cmdmap, "filename", file_name);
+ cmd_getval(cct, cmdmap, "filename", file_name);
std::ofstream fout(file_name.c_str());
if (!fout.is_open()) {
ss << "failed to open file '" << file_name << "'";
}
else if (prefix == "debug kick_recovery_wq") {
int64_t delay;
- cmd_getval(g_ceph_context, cmdmap, "delay", delay);
+ cmd_getval(cct, cmdmap, "delay", delay);
ostringstream oss;
oss << delay;
- r = g_conf->set_val("osd_recovery_delay_start", oss.str().c_str());
+ r = cct->_conf->set_val("osd_recovery_delay_start", oss.str().c_str());
if (r != 0) {
ss << "kick_recovery_wq: error setting "
<< "osd_recovery_delay_start to '" << delay << "': error "
<< r;
goto out;
}
- g_conf->apply_changes(NULL);
+ cct->_conf->apply_changes(NULL);
ss << "kicking recovery queue. set osd_recovery_delay_start "
- << "to " << g_conf->osd_recovery_delay_start;
- defer_recovery_until = ceph_clock_now(g_ceph_context);
- defer_recovery_until += g_conf->osd_recovery_delay_start;
+ << "to " << cct->_conf->osd_recovery_delay_start;
+ defer_recovery_until = ceph_clock_now(cct);
+ defer_recovery_until += cct->_conf->osd_recovery_delay_start;
recovery_wq.wake();
}
else if (prefix == "cpu_profiler") {
string arg;
- cmd_getval(g_ceph_context, cmdmap, "arg", arg);
+ cmd_getval(cct, cmdmap, "arg", arg);
vector<string> argvec;
get_str_vec(arg, argvec);
cpu_profiler_handle_command(argvec, ds);
uint64_t global_id;
uint64_t auid = CEPH_AUTH_UID_DEFAULT;
- isvalid = authorize_handler->verify_authorizer(g_ceph_context, monc->rotating_secrets,
+ isvalid = authorize_handler->verify_authorizer(cct, monc->rotating_secrets,
authorizer_data, authorizer_reply, name, global_id, caps_info, session_key, &auid);
if (isvalid) {
return false;
}
- if (loadavgs[0] >= g_conf->osd_scrub_load_threshold) {
+ if (loadavgs[0] >= cct->_conf->osd_scrub_load_threshold) {
dout(20) << "scrub_should_schedule loadavg " << loadavgs[0]
- << " >= max " << g_conf->osd_scrub_load_threshold
+ << " >= max " << cct->_conf->osd_scrub_load_threshold
<< " = no, load too high" << dendl;
return false;
}
dout(20) << "scrub_should_schedule loadavg " << loadavgs[0]
- << " < max " << g_conf->osd_scrub_load_threshold
+ << " < max " << cct->_conf->osd_scrub_load_threshold
<< " = yes" << dendl;
- return loadavgs[0] < g_conf->osd_scrub_load_threshold;
+ return loadavgs[0] < cct->_conf->osd_scrub_load_threshold;
}
void OSD::sched_scrub()
dout(20) << "sched_scrub load_is_low=" << (int)load_is_low << dendl;
- utime_t now = ceph_clock_now(g_ceph_context);
+ utime_t now = ceph_clock_now(cct);
//dout(20) << " " << last_scrub_pg << dendl;
dout(30) << "sched_scrub examine " << pgid << " at " << t << dendl;
utime_t diff = now - t;
- if ((double)diff < g_conf->osd_scrub_min_interval) {
+ if ((double)diff < cct->_conf->osd_scrub_min_interval) {
dout(10) << "sched_scrub " << pgid << " at " << t
- << ": " << (double)diff << " < min (" << g_conf->osd_scrub_min_interval << " seconds)" << dendl;
+ << ": " << (double)diff << " < min (" << cct->_conf->osd_scrub_min_interval << " seconds)" << dendl;
break;
}
- if ((double)diff < g_conf->osd_scrub_max_interval && !load_is_low) {
+ if ((double)diff < cct->_conf->osd_scrub_max_interval && !load_is_low) {
// save ourselves some effort
dout(10) << "sched_scrub " << pgid << " high load at " << t
- << ": " << (double)diff << " < max (" << g_conf->osd_scrub_max_interval << " seconds)" << dendl;
+ << ": " << (double)diff << " < max (" << cct->_conf->osd_scrub_max_interval << " seconds)" << dendl;
break;
}
if (pg) {
if (pg->is_active() &&
(load_is_low ||
- (double)diff >= g_conf->osd_scrub_max_interval ||
+ (double)diff >= cct->_conf->osd_scrub_max_interval ||
pg->scrubber.must_scrub)) {
dout(10) << "sched_scrub scrubbing " << pgid << " at " << t
<< (pg->scrubber.must_scrub ? ", explicitly requested" :
- ( (double)diff >= g_conf->osd_scrub_max_interval ? ", diff >= max" : ""))
+ ( (double)diff >= cct->_conf->osd_scrub_max_interval ? ", diff >= max" : ""))
<< dendl;
if (pg->sched_scrub()) {
pg->unlock();
bool result = false;
sched_scrub_lock.Lock();
- if (scrubs_pending + scrubs_active < g_conf->osd_max_scrubs) {
+ if (scrubs_pending + scrubs_active < osd->cct->_conf->osd_max_scrubs) {
dout(20) << "inc_scrubs_pending " << scrubs_pending << " -> " << (scrubs_pending+1)
- << " (max " << g_conf->osd_max_scrubs << ", active " << scrubs_active << ")" << dendl;
+ << " (max " << osd->cct->_conf->osd_max_scrubs << ", active " << scrubs_active << ")" << dendl;
result = true;
++scrubs_pending;
} else {
- dout(20) << "inc_scrubs_pending " << scrubs_pending << " + " << scrubs_active << " active >= max " << g_conf->osd_max_scrubs << dendl;
+ dout(20) << "inc_scrubs_pending " << scrubs_pending << " + " << scrubs_active << " active >= max " << osd->cct->_conf->osd_max_scrubs << dendl;
}
sched_scrub_lock.Unlock();
{
sched_scrub_lock.Lock();
dout(20) << "dec_scrubs_pending " << scrubs_pending << " -> " << (scrubs_pending-1)
- << " (max " << g_conf->osd_max_scrubs << ", active " << scrubs_active << ")" << dendl;
+ << " (max " << osd->cct->_conf->osd_max_scrubs << ", active " << scrubs_active << ")" << dendl;
--scrubs_pending;
assert(scrubs_pending >= 0);
sched_scrub_lock.Unlock();
if (reserved) {
--(scrubs_pending);
dout(20) << "inc_scrubs_active " << (scrubs_active-1) << " -> " << scrubs_active
- << " (max " << g_conf->osd_max_scrubs
+ << " (max " << osd->cct->_conf->osd_max_scrubs
<< ", pending " << (scrubs_pending+1) << " -> " << scrubs_pending << ")" << dendl;
assert(scrubs_pending >= 0);
} else {
dout(20) << "inc_scrubs_active " << (scrubs_active-1) << " -> " << scrubs_active
- << " (max " << g_conf->osd_max_scrubs
+ << " (max " << osd->cct->_conf->osd_max_scrubs
<< ", pending " << scrubs_pending << ")" << dendl;
}
sched_scrub_lock.Unlock();
{
sched_scrub_lock.Lock();
dout(20) << "dec_scrubs_active " << scrubs_active << " -> " << (scrubs_active-1)
- << " (max " << g_conf->osd_max_scrubs << ", pending " << scrubs_pending << ")" << dendl;
+ << " (max " << osd->cct->_conf->osd_max_scrubs << ", pending " << scrubs_pending << ")" << dendl;
--scrubs_active;
sched_scrub_lock.Unlock();
}
osdmap->get_epoch(),
false
));
- utime_t now = ceph_clock_now(g_ceph_context);
+ utime_t now = ceph_clock_now(osd->cct);
utime_t timeout;
- timeout.set_from_double(now + g_conf->osd_mon_shutdown_timeout);
- while ((ceph_clock_now(g_ceph_context) < timeout) &&
+ timeout.set_from_double(now + osd->cct->_conf->osd_mon_shutdown_timeout);
+ while ((ceph_clock_now(osd->cct) < timeout) &&
(state != STOPPING)) {
is_stopping_cond.WaitUntil(is_stopping_lock, timeout);
}
t.remove(coll_t::META_COLL, get_inc_osdmap_pobject_name(e));
superblock.oldest_map = e+1;
num++;
- if (num >= g_conf->osd_target_transaction_size &&
+ if (num >= cct->_conf->osd_target_transaction_size &&
(uint64_t)num > (last - first)) // make sure we at least keep pace with incoming maps
break;
}
map_lock.get_write();
- C_Contexts *fin = new C_Contexts(g_ceph_context);
+ C_Contexts *fin = new C_Contexts(cct);
// advance through the new maps
for (epoch_t cur = start; cur <= superblock.newest_map; cur++) {
superblock.current_epoch = cur;
advance_map(t, fin);
- had_map_since = ceph_clock_now(g_ceph_context);
+ had_map_since = ceph_clock_now(cct);
}
if (osdmap->is_up(whoami) &&
return;
}
- if (to > since && (int64_t)(to - since) > g_conf->osd_map_share_max_epochs) {
- dout(10) << " " << (to - since) << " > max " << g_conf->osd_map_share_max_epochs
+ if (to > since && (int64_t)(to - since) > cct->_conf->osd_map_share_max_epochs) {
+ dout(10) << " " << (to - since) << " > max " << cct->_conf->osd_map_share_max_epochs
<< ", only sending most recent" << dendl;
- since = to - g_conf->osd_map_share_max_epochs;
+ since = to - cct->_conf->osd_map_share_max_epochs;
}
while (since < to) {
- if (to - since > (epoch_t)g_conf->osd_map_message_max)
- to = since + g_conf->osd_map_message_max;
+ if (to - since > (epoch_t)cct->_conf->osd_map_message_max)
+ to = since + cct->_conf->osd_map_message_max;
MOSDMap *m = build_incremental_map_msg(since, to);
send_map(m, con);
since = to;
{
epoch_t e = o->get_epoch();
- if (g_conf->osd_map_dedup) {
+ if (osd->cct->_conf->osd_map_dedup) {
// Dedup against an existing map at a nearby epoch
OSDMapRef for_dedup = map_cache.lower_bound(e);
if (for_dedup) {
// drop the next N pg_creates in a row?
if (debug_drop_pg_create_left < 0 &&
- g_conf->osd_debug_drop_pg_create_probability >
+ cct->_conf->osd_debug_drop_pg_create_probability >
((((double)(rand()%100))/100.0))) {
debug_drop_pg_create_left = debug_drop_pg_create_duration;
}
PG::RecoveryCtx OSD::create_context()
{
ObjectStore::Transaction *t = new ObjectStore::Transaction;
- C_Contexts *on_applied = new C_Contexts(g_ceph_context);
- C_Contexts *on_safe = new C_Contexts(g_ceph_context);
+ C_Contexts *on_applied = new C_Contexts(cct);
+ C_Contexts *on_safe = new C_Contexts(cct);
map< int, map<pg_t,pg_query_t> > *query_map =
new map<int, map<pg_t, pg_query_t> >;
map<int,vector<pair<pg_notify_t, pg_interval_map_t> > > *notify_list =
ctx.transaction, ctx.on_applied, ctx.on_safe);
assert(tr == 0);
ctx.transaction = new ObjectStore::Transaction;
- ctx.on_applied = new C_Contexts(g_ceph_context);
- ctx.on_safe = new C_Contexts(g_ceph_context);
+ ctx.on_applied = new C_Contexts(cct);
+ ctx.on_safe = new C_Contexts(cct);
}
}
{
assert(osd_lock.is_locked());
- utime_t now = ceph_clock_now(g_ceph_context);
+ utime_t now = ceph_clock_now(cct);
list< pair<pg_t,utime_t> > pgids;
replay_queue_lock.Lock();
while (!replay_queue.empty() &&
bool OSD::_recover_now()
{
- if (recovery_ops_active >= g_conf->osd_recovery_max_active) {
+ if (recovery_ops_active >= cct->_conf->osd_recovery_max_active) {
dout(15) << "_recover_now active " << recovery_ops_active
- << " >= max " << g_conf->osd_recovery_max_active << dendl;
+ << " >= max " << cct->_conf->osd_recovery_max_active << dendl;
return false;
}
- if (ceph_clock_now(g_ceph_context) < defer_recovery_until) {
+ if (ceph_clock_now(cct) < defer_recovery_until) {
dout(15) << "_recover_now defer until " << defer_recovery_until << dendl;
return false;
}
{
// see how many we should try to start. note that this is a bit racy.
recovery_wq.lock();
- int max = MAX(g_conf->osd_recovery_max_active - recovery_ops_active,
- g_conf->osd_recovery_max_single_start);
+ int max = MAX(cct->_conf->osd_recovery_max_active - recovery_ops_active,
+ cct->_conf->osd_recovery_max_single_start);
if (max > 0) {
- dout(10) << "do_recovery can start " << max << " (" << recovery_ops_active << "/" << g_conf->osd_recovery_max_active
+ dout(10) << "do_recovery can start " << max << " (" << recovery_ops_active << "/" << cct->_conf->osd_recovery_max_active
<< " rops)" << dendl;
recovery_ops_active += max; // take them now, return them if we don't use them.
} else {
- dout(10) << "do_recovery can start 0 (" << recovery_ops_active << "/" << g_conf->osd_recovery_max_active
+ dout(10) << "do_recovery can start 0 (" << recovery_ops_active << "/" << cct->_conf->osd_recovery_max_active
<< " rops)" << dendl;
}
recovery_wq.unlock();
{
recovery_wq.lock();
dout(10) << "start_recovery_op " << *pg << " " << soid
- << " (" << recovery_ops_active << "/" << g_conf->osd_recovery_max_active << " rops)"
+ << " (" << recovery_ops_active << "/" << cct->_conf->osd_recovery_max_active << " rops)"
<< dendl;
assert(recovery_ops_active >= 0);
recovery_ops_active++;
recovery_wq.lock();
dout(10) << "finish_recovery_op " << *pg << " " << soid
<< " dequeue=" << dequeue
- << " (" << recovery_ops_active << "/" << g_conf->osd_recovery_max_active << " rops)"
+ << " (" << recovery_ops_active << "/" << cct->_conf->osd_recovery_max_active << " rops)"
<< dendl;
// adjust count
}
}
- if (g_conf->osd_debug_drop_op_probability > 0 &&
+ if (cct->_conf->osd_debug_drop_op_probability > 0 &&
!m->get_source().is_mds()) {
- if ((double)rand() / (double)RAND_MAX < g_conf->osd_debug_drop_op_probability) {
+ if ((double)rand() / (double)RAND_MAX < cct->_conf->osd_debug_drop_op_probability) {
dout(0) << "handle_op DEBUG artificially dropping op " << *m << dendl;
return;
}
}
// too big?
- if (g_conf->osd_max_write_size &&
- m->get_data_len() > g_conf->osd_max_write_size << 20) {
+ if (cct->_conf->osd_max_write_size &&
+ m->get_data_len() > cct->_conf->osd_max_write_size << 20) {
// journal can't hold commit!
derr << "handle_op msg data len " << m->get_data_len()
- << " > osd_max_write_size " << (g_conf->osd_max_write_size << 20)
+ << " > osd_max_write_size " << (cct->_conf->osd_max_write_size << 20)
<< " on " << *m << dendl;
service.reply_op_error(op, -OSD_WRITETOOBIG);
return;
*/
void OSD::enqueue_op(PG *pg, OpRequestRef op)
{
- utime_t latency = ceph_clock_now(g_ceph_context) - op->request->get_recv_stamp();
+ utime_t latency = ceph_clock_now(cct) - op->request->get_recv_stamp();
dout(15) << "enqueue_op " << op << " prio " << op->request->get_priority()
<< " cost " << op->request->get_cost()
<< " latency " << latency
PGRef pg, OpRequestRef op,
ThreadPool::TPHandle &handle)
{
- utime_t latency = ceph_clock_now(g_ceph_context) - op->request->get_recv_stamp();
+ utime_t latency = ceph_clock_now(cct) - op->request->get_recv_stamp();
dout(10) << "dequeue_op " << op << " prio " << op->request->get_priority()
<< " cost " << op->request->get_cost()
<< " latency " << latency
const std::set <std::string> &changed)
{
if (changed.count("osd_max_backfills")) {
- service.local_reserver.set_max(g_conf->osd_max_backfills);
- service.remote_reserver.set_max(g_conf->osd_max_backfills);
+ service.local_reserver.set_max(cct->_conf->osd_max_backfills);
+ service.remote_reserver.set_max(cct->_conf->osd_max_backfills);
}
}
return 0;
}
+
+bool OSD::RecoveryWQ::_enqueue(PG *pg) {
+ if (!pg->recovery_item.is_on_list()) {
+ pg->get("RecoveryWQ");
+ osd->recovery_queue.push_back(&pg->recovery_item);
+
+ if (osd->cct->_conf->osd_recovery_delay_start > 0) {
+ osd->defer_recovery_until = ceph_clock_now(osd->cct);
+ osd->defer_recovery_until += osd->cct->_conf->osd_recovery_delay_start;
+ }
+ return true;
+ }
+ return false;
+}
+
+void OSD::PeeringWQ::_dequeue(list<PG*> *out) {
+ set<PG*> got;
+ for (list<PG*>::iterator i = peering_queue.begin();
+ i != peering_queue.end() &&
+ out->size() < osd->cct->_conf->osd_peering_wq_batch_size;
+ ) {
+ if (in_use.count(*i)) {
+ ++i;
+ } else {
+ out->push_back(*i);
+ got.insert(*i);
+ peering_queue.erase(i++);
+ }
+ }
+ in_use.insert(got.begin(), got.end());
+}
struct HeartbeatDispatcher : public Dispatcher {
OSD *osd;
- HeartbeatDispatcher(OSD *o) : Dispatcher(g_ceph_context), osd(o) {}
+ HeartbeatDispatcher(OSD *o) : Dispatcher(cct), osd(o) {}
bool ms_dispatch(Message *m) {
return osd->heartbeat_dispatch(m);
};
bool _empty() {
return peering_queue.empty();
}
- void _dequeue(list<PG*> *out) {
- set<PG*> got;
- for (list<PG*>::iterator i = peering_queue.begin();
- i != peering_queue.end() &&
- out->size() < g_conf->osd_peering_wq_batch_size;
- ) {
- if (in_use.count(*i)) {
- ++i;
- } else {
- out->push_back(*i);
- got.insert(*i);
- peering_queue.erase(i++);
- }
- }
- in_use.insert(got.begin(), got.end());
- }
+ void _dequeue(list<PG*> *out);
void _process(
const list<PG *> &pgs,
ThreadPool::TPHandle &handle) {
bool _empty() {
return osd->recovery_queue.empty();
}
- bool _enqueue(PG *pg) {
- if (!pg->recovery_item.is_on_list()) {
- pg->get("RecoveryWQ");
- osd->recovery_queue.push_back(&pg->recovery_item);
-
- if (g_conf->osd_recovery_delay_start > 0) {
- osd->defer_recovery_until = ceph_clock_now(g_ceph_context);
- osd->defer_recovery_until += g_conf->osd_recovery_delay_start;
- }
- return true;
- }
- return false;
- }
+ bool _enqueue(PG *pg);
void _dequeue(PG *pg) {
if (pg->recovery_item.remove_myself())
pg->put("RecoveryWQ");
public:
/* internal and external can point to the same messenger, they will still
* be cleaned up properly*/
- OSD(int id,
+ OSD(CephContext *cct_,
+ int id,
Messenger *internal,
Messenger *external,
Messenger *hb_client,
// static bits
static int find_osd_dev(char *result, int whoami);
- static ObjectStore *create_object_store(const std::string &dev, const std::string &jdev);
+ static ObjectStore *create_object_store(CephContext *cct, const std::string &dev, const std::string &jdev);
static int convertfs(const std::string &dev, const std::string &jdev);
static int do_convertfs(ObjectStore *store);
static int convert_collection(ObjectStore *store, coll_t cid);
- static int mkfs(const std::string &dev, const std::string &jdev,
+ static int mkfs(CephContext *cct, const std::string &dev, const std::string &jdev,
uuid_d fsid, int whoami);
- static int mkjournal(const std::string &dev, const std::string &jdev);
- static int flushjournal(const std::string &dev, const std::string &jdev);
- static int dump_journal(const std::string &dev, const std::string &jdev, ostream& out);
+ static int mkjournal(CephContext *cct, const std::string &dev, const std::string &jdev);
+ static int flushjournal(CephContext *cct, const std::string &dev, const std::string &jdev);
+ static int dump_journal(CephContext *cct, const std::string &dev, const std::string &jdev, ostream& out);
/* remove any non-user xattrs from a map of them */
void filter_xattrs(map<string, bufferptr>& attrs) {
for (map<string, bufferptr>::iterator iter = attrs.begin();
seq(0) {
received_time = request->get_recv_stamp();
tracker->register_inflight_op(&xitem);
- if (req->get_priority() < g_conf->osd_client_op_priority) {
+ if (req->get_priority() < tracker->cct->_conf->osd_client_op_priority) {
// don't warn as quickly for low priority ops
- warn_interval_multiplier = g_conf->osd_recovery_op_warn_multiple;
+ warn_interval_multiplier = tracker->cct->_conf->osd_recovery_op_warn_multiple;
}
}
{
while (arrived.size() &&
(now - arrived.begin()->first >
- (double)(g_conf->osd_op_history_duration))) {
+ (double)(tracker->cct->_conf->osd_op_history_duration))) {
duration.erase(make_pair(
arrived.begin()->second->get_duration(),
arrived.begin()->second));
arrived.erase(arrived.begin());
}
- while (duration.size() > g_conf->osd_op_history_size) {
+ while (duration.size() > tracker->cct->_conf->osd_op_history_size) {
arrived.erase(make_pair(
duration.begin()->second->get_arrived(),
duration.begin()->second));
{
cleanup(now);
f->open_object_section("OpHistory");
- f->dump_int("num to keep", g_conf->osd_op_history_size);
- f->dump_int("duration to keep", g_conf->osd_op_history_duration);
+ f->dump_int("num to keep", tracker->cct->_conf->osd_op_history_size);
+ f->dump_int("duration to keep", tracker->cct->_conf->osd_op_history_duration);
{
f->open_array_section("Ops");
for (set<pair<utime_t, OpRequestRef> >::const_iterator i =
void OpTracker::dump_historic_ops(Formatter *f)
{
Mutex::Locker locker(ops_in_flight_lock);
- utime_t now = ceph_clock_now(g_ceph_context);
+ utime_t now = ceph_clock_now(cct);
history.dump_ops(now, f);
}
f->open_object_section("ops_in_flight"); // overall dump
f->dump_int("num_ops", ops_in_flight.size());
f->open_array_section("ops"); // list of OpRequests
- utime_t now = ceph_clock_now(g_ceph_context);
+ utime_t now = ceph_clock_now(cct);
for (xlist<OpRequest*>::iterator p = ops_in_flight.begin(); !p.end(); ++p) {
f->open_object_section("op");
(*p)->dump(now, f);
{
Mutex::Locker locker(ops_in_flight_lock);
assert(i->xitem.get_list() == &ops_in_flight);
- utime_t now = ceph_clock_now(g_ceph_context);
+ utime_t now = ceph_clock_now(cct);
i->xitem.remove_myself();
i->request->clear_data();
history.insert(now, OpRequestRef(i));
if (!ops_in_flight.size())
return false;
- utime_t now = ceph_clock_now(g_ceph_context);
+ utime_t now = ceph_clock_now(cct);
utime_t too_old = now;
- too_old -= g_conf->osd_op_complaint_time;
+ too_old -= cct->_conf->osd_op_complaint_time;
utime_t oldest_secs = now - ops_in_flight.front()->received_time;
<< "; oldest is " << oldest_secs
<< " seconds old" << dendl;
- if (oldest_secs < g_conf->osd_op_complaint_time)
+ if (oldest_secs < cct->_conf->osd_op_complaint_time)
return false;
xlist<OpRequest*>::iterator i = ops_in_flight.begin();
- warning_vector.reserve(g_conf->osd_op_log_threshold + 1);
+ warning_vector.reserve(cct->_conf->osd_op_log_threshold + 1);
int slow = 0; // total slow
int warned = 0; // total logged
// exponential backoff of warning intervals
if (((*i)->received_time +
- (g_conf->osd_op_complaint_time *
+ (cct->_conf->osd_op_complaint_time *
(*i)->warn_interval_multiplier)) < now) {
// will warn
if (warning_vector.empty())
warning_vector.push_back("");
warned++;
- if (warned > g_conf->osd_op_log_threshold)
+ if (warned > cct->_conf->osd_op_log_threshold)
break;
utime_t age = now - (*i)->received_time;
void OpTracker::mark_event(OpRequest *op, const string &dest)
{
- utime_t now = ceph_clock_now(g_ceph_context);
+ utime_t now = ceph_clock_now(cct);
return _mark_event(op, dest, now);
}
void OpRequest::mark_event(const string &event)
{
- utime_t now = ceph_clock_now(g_ceph_context);
+ utime_t now = ceph_clock_now(tracker->cct);
{
Mutex::Locker l(lock);
events.push_back(make_pair(now, event));
#include "osd/osd_types.h"
struct OpRequest;
+class OpTracker;
typedef std::tr1::shared_ptr<OpRequest> OpRequestRef;
class OpHistory {
set<pair<utime_t, OpRequestRef> > arrived;
set<pair<double, OpRequestRef> > duration;
void cleanup(utime_t now);
bool shutdown;
+ OpTracker *tracker;
public:
- OpHistory() : shutdown(false) {}
+ OpHistory(OpTracker *tracker_) : shutdown(false), tracker(tracker_) {}
~OpHistory() {
assert(arrived.empty());
assert(duration.empty());
void operator()(OpRequest *op);
};
friend class RemoveOnDelete;
+ friend class OpRequest;
+ friend class OpHistory;
uint64_t seq;
Mutex ops_in_flight_lock;
xlist<OpRequest *> ops_in_flight;
OpHistory history;
+protected:
+ CephContext *cct;
+
public:
- OpTracker() : seq(0), ops_in_flight_lock("OpTracker mutex") {}
+ OpTracker(CephContext *cct_) : seq(0), ops_in_flight_lock("OpTracker mutex"), history(this), cct(cct_) {}
void dump_ops_in_flight(Formatter *f);
void dump_historic_ops(Formatter *f);
void register_inflight_op(xlist<OpRequest*>::item *i);
+
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
/*
deleting(false), dirty_info(false), dirty_big_info(false),
info(p),
info_struct_v(0),
- coll(p), pg_log(g_ceph_context), log_oid(loid), biginfo_oid(ioid),
+ coll(p), pg_log(o->osd->cct), log_oid(loid), biginfo_oid(ioid),
recovery_item(this), scrub_item(this), scrub_finalize_item(this), snap_trim_item(this), stat_queue_item(this),
recovery_ops_active(0),
waiting_on_backfill(0),
if (is_primary() &&
pool.info.crash_replay_interval > 0 &&
may_need_replay(get_osdmap())) {
- replay_until = ceph_clock_now(g_ceph_context);
+ replay_until = ceph_clock_now(osd->osd->cct);
replay_until += pool.info.crash_replay_interval;
dout(10) << "activate starting replay interval for " << pool.info.crash_replay_interval
<< " until " << replay_until << dendl;
m = new MOSDPGLog(get_osdmap()->get_epoch(), pi);
// send some recent log, so that op dup detection works well.
- m->log.copy_up_to(pg_log.get_log(), g_conf->osd_min_pg_log_entries);
+ m->log.copy_up_to(pg_log.get_log(), osd->osd->cct->_conf->osd_min_pg_log_entries);
m->info.log_tail = m->log.tail;
pi.log_tail = m->log.tail; // sigh...
else
state_clear(PG_STATE_INCONSISTENT);
- utime_t now = ceph_clock_now(g_ceph_context);
+ utime_t now = ceph_clock_now(osd->osd->cct);
info.stats.last_fresh = now;
if (info.stats.state != state) {
info.stats.state = state;
return false;
}
- bool time_for_deep = (ceph_clock_now(g_ceph_context) >
- info.history.last_deep_scrub_stamp + g_conf->osd_deep_scrub_interval);
+ bool time_for_deep = (ceph_clock_now(osd->osd->cct) >
+ info.history.last_deep_scrub_stamp + osd->osd->cct->_conf->osd_deep_scrub_interval);
//NODEEP_SCRUB so ignore time initiated deep-scrub
if (osd->osd->get_osdmap()->test_flag(CEPH_OSDMAP_NODEEP_SCRUB))
int r;
__u64 pos = 0;
while ( (r = osd->store->read(coll, poid, pos,
- g_conf->osd_deep_scrub_stride, bl,
+ osd->osd->cct->_conf->osd_deep_scrub_stride, bl,
true)) > 0) {
handle.reset_tp_timeout();
h << bl;
assert(iter);
uint64_t keys_scanned = 0;
for (iter->seek_to_first(); iter->valid() ; iter->next()) {
- if (g_conf->osd_scan_list_ping_tp_interval &&
- (keys_scanned % g_conf->osd_scan_list_ping_tp_interval == 0)) {
+ if (osd->osd->cct->_conf->osd_scan_list_ping_tp_interval &&
+ (keys_scanned % osd->osd->cct->_conf->osd_scan_list_ping_tp_interval == 0)) {
handle.reset_tp_timeout();
}
++keys_scanned;
{
Mutex::Locker lock(osd->backfill_request_lock);
osd->backfill_request_timer.add_event_after(
- g_conf->osd_backfill_retry_interval,
+ osd->osd->cct->_conf->osd_backfill_retry_interval,
new QueuePeeringEvt<RequestBackfill>(
this, get_osdmap()->get_epoch(),
RequestBackfill()));
while (!boundary_found) {
vector<hobject_t> objects;
ret = osd->store->collection_list_partial(coll, start,
- g_conf->osd_scrub_chunk_min,
- g_conf->osd_scrub_chunk_max,
+ osd->osd->cct->_conf->osd_scrub_chunk_min,
+ osd->osd->cct->_conf->osd_scrub_chunk_max,
0,
&objects, &scrubber.end);
assert(ret >= 0);
// finish up
unreg_next_scrub();
- utime_t now = ceph_clock_now(g_ceph_context);
+ utime_t now = ceph_clock_now(osd->osd->cct);
info.history.last_scrub = info.last_update;
info.history.last_scrub_stamp = now;
if (scrubber.deep) {
if (last_complete_ondisk.epoch >= info.history.last_epoch_started) {
// DEBUG: verify that the snaps are empty in snap_mapper
- if (g_conf->osd_debug_verify_snaps_on_info) {
+ if (osd->osd->cct->_conf->osd_debug_verify_snaps_on_info) {
interval_set<snapid_t> p;
p.union_of(oinfo.purged_snaps, info.purged_snaps);
p.subtract(info.purged_snaps);
ActMap evt;
recovery_state.handle_event(evt, rctx);
if (osdmap_ref->get_epoch() - last_persisted_osdmap_ref->get_epoch() >
- g_conf->osd_pg_epoch_persisted_max_stale) {
+ osd->osd->cct->_conf->osd_pg_epoch_persisted_max_stale) {
dout(20) << __func__ << ": Dirtying info: last_persisted is "
<< last_persisted_osdmap_ref->get_epoch()
<< " while current is " << osdmap_ref->get_epoch() << dendl;
{
context< RecoveryMachine >().log_exit(state_name, enter_time);
PG *pg = context< RecoveryMachine >().pg;
- utime_t dur = ceph_clock_now(g_ceph_context) - enter_time;
+ utime_t dur = ceph_clock_now(pg->osd->osd->cct) - enter_time;
pg->osd->recoverystate_perf->tinc(rs_initial_latency, dur);
}
{
context< RecoveryMachine >().log_exit(state_name, enter_time);
PG *pg = context< RecoveryMachine >().pg;
- utime_t dur = ceph_clock_now(g_ceph_context) - enter_time;
+ utime_t dur = ceph_clock_now(pg->osd->osd->cct) - enter_time;
pg->osd->recoverystate_perf->tinc(rs_started_latency, dur);
}
{
context< RecoveryMachine >().log_exit(state_name, enter_time);
PG *pg = context< RecoveryMachine >().pg;
- utime_t dur = ceph_clock_now(g_ceph_context) - enter_time;
+ utime_t dur = ceph_clock_now(pg->osd->osd->cct) - enter_time;
pg->osd->recoverystate_perf->tinc(rs_reset_latency, dur);
}
{
context< RecoveryMachine >().log_exit(state_name, enter_time);
PG *pg = context< RecoveryMachine >().pg;
- utime_t dur = ceph_clock_now(g_ceph_context) - enter_time;
+ utime_t dur = ceph_clock_now(pg->osd->osd->cct) - enter_time;
pg->osd->recoverystate_perf->tinc(rs_start_latency, dur);
}
context< RecoveryMachine >().log_exit(state_name, enter_time);
PG *pg = context< RecoveryMachine >().pg;
pg->want_acting.clear();
- utime_t dur = ceph_clock_now(g_ceph_context) - enter_time;
+ utime_t dur = ceph_clock_now(pg->osd->osd->cct) - enter_time;
pg->osd->recoverystate_perf->tinc(rs_primary_latency, dur);
}
pg->state_clear(PG_STATE_PEERING);
pg->clear_probe_targets();
- utime_t dur = ceph_clock_now(g_ceph_context) - enter_time;
+ utime_t dur = ceph_clock_now(pg->osd->osd->cct) - enter_time;
pg->osd->recoverystate_perf->tinc(rs_peering_latency, dur);
}
pg->backfill_reserved = false;
pg->backfill_reserving = false;
pg->state_clear(PG_STATE_BACKFILL);
- utime_t dur = ceph_clock_now(g_ceph_context) - enter_time;
+ utime_t dur = ceph_clock_now(pg->osd->osd->cct) - enter_time;
pg->osd->recoverystate_perf->tinc(rs_backfilling_latency, dur);
}
{
context< RecoveryMachine >().log_exit(state_name, enter_time);
PG *pg = context< RecoveryMachine >().pg;
- utime_t dur = ceph_clock_now(g_ceph_context) - enter_time;
+ utime_t dur = ceph_clock_now(pg->osd->osd->cct) - enter_time;
pg->osd->recoverystate_perf->tinc(rs_waitremotebackfillreserved_latency, dur);
}
{
context< RecoveryMachine >().log_exit(state_name, enter_time);
PG *pg = context< RecoveryMachine >().pg;
- utime_t dur = ceph_clock_now(g_ceph_context) - enter_time;
+ utime_t dur = ceph_clock_now(pg->osd->osd->cct) - enter_time;
pg->osd->recoverystate_perf->tinc(rs_waitlocalbackfillreserved_latency, dur);
}
{
context< RecoveryMachine >().log_exit(state_name, enter_time);
PG *pg = context< RecoveryMachine >().pg;
- utime_t dur = ceph_clock_now(g_ceph_context) - enter_time;
+ utime_t dur = ceph_clock_now(pg->osd->osd->cct) - enter_time;
pg->osd->recoverystate_perf->tinc(rs_notbackfilling_latency, dur);
}
{
context< RecoveryMachine >().log_exit(state_name, enter_time);
PG *pg = context< RecoveryMachine >().pg;
- utime_t dur = ceph_clock_now(g_ceph_context) - enter_time;
+ utime_t dur = ceph_clock_now(pg->osd->osd->cct) - enter_time;
pg->osd->recoverystate_perf->tinc(rs_repnotrecovering_latency, dur);
}
{
context< RecoveryMachine >().log_exit(state_name, enter_time);
PG *pg = context< RecoveryMachine >().pg;
- utime_t dur = ceph_clock_now(g_ceph_context) - enter_time;
+ utime_t dur = ceph_clock_now(pg->osd->osd->cct) - enter_time;
pg->osd->recoverystate_perf->tinc(rs_repwaitrecoveryreserved_latency, dur);
}
double ratio, max_ratio;
if (pg->osd->too_full_for_backfill(&ratio, &max_ratio) &&
- !g_conf->osd_debug_skip_full_check_in_backfill_reservation) {
+ !pg->osd->osd->cct->_conf->osd_debug_skip_full_check_in_backfill_reservation) {
dout(10) << "backfill reservation rejected: full ratio is "
<< ratio << ", which is greater than max allowed ratio "
<< max_ratio << dendl;
{
context< RecoveryMachine >().log_exit(state_name, enter_time);
PG *pg = context< RecoveryMachine >().pg;
- utime_t dur = ceph_clock_now(g_ceph_context) - enter_time;
+ utime_t dur = ceph_clock_now(pg->osd->osd->cct) - enter_time;
pg->osd->recoverystate_perf->tinc(rs_repwaitbackfillreserved_latency, dur);
}
context< RecoveryMachine >().log_exit(state_name, enter_time);
PG *pg = context< RecoveryMachine >().pg;
pg->osd->remote_reserver.cancel_reservation(pg->info.pgid);
- utime_t dur = ceph_clock_now(g_ceph_context) - enter_time;
+ utime_t dur = ceph_clock_now(pg->osd->osd->cct) - enter_time;
pg->osd->recoverystate_perf->tinc(rs_RepRecovering_latency, dur);
}
{
context< RecoveryMachine >().log_exit(state_name, enter_time);
PG *pg = context< RecoveryMachine >().pg;
- utime_t dur = ceph_clock_now(g_ceph_context) - enter_time;
+ utime_t dur = ceph_clock_now(pg->osd->osd->cct) - enter_time;
pg->osd->recoverystate_perf->tinc(rs_activating_latency, dur);
}
{
context< RecoveryMachine >().log_exit(state_name, enter_time);
PG *pg = context< RecoveryMachine >().pg;
- utime_t dur = ceph_clock_now(g_ceph_context) - enter_time;
+ utime_t dur = ceph_clock_now(pg->osd->osd->cct) - enter_time;
pg->osd->recoverystate_perf->tinc(rs_waitlocalrecoveryreserved_latency, dur);
}
{
context< RecoveryMachine >().log_exit(state_name, enter_time);
PG *pg = context< RecoveryMachine >().pg;
- utime_t dur = ceph_clock_now(g_ceph_context) - enter_time;
+ utime_t dur = ceph_clock_now(pg->osd->osd->cct) - enter_time;
pg->osd->recoverystate_perf->tinc(rs_waitremoterecoveryreserved_latency, dur);
}
{
context< RecoveryMachine >().log_exit(state_name, enter_time);
PG *pg = context< RecoveryMachine >().pg;
- utime_t dur = ceph_clock_now(g_ceph_context) - enter_time;
+ utime_t dur = ceph_clock_now(pg->osd->osd->cct) - enter_time;
pg->osd->recoverystate_perf->tinc(rs_recovering_latency, dur);
}
{
context< RecoveryMachine >().log_exit(state_name, enter_time);
PG *pg = context< RecoveryMachine >().pg;
- utime_t dur = ceph_clock_now(g_ceph_context) - enter_time;
+ utime_t dur = ceph_clock_now(pg->osd->osd->cct) - enter_time;
pg->osd->recoverystate_perf->tinc(rs_recovered_latency, dur);
}
context< RecoveryMachine >().log_exit(state_name, enter_time);
PG *pg = context< RecoveryMachine >().pg;
pg->state_clear(PG_STATE_CLEAN);
- utime_t dur = ceph_clock_now(g_ceph_context) - enter_time;
+ utime_t dur = ceph_clock_now(pg->osd->osd->cct) - enter_time;
pg->osd->recoverystate_perf->tinc(rs_clean_latency, dur);
}
}
// if we haven't reported our PG stats in a long time, do so now.
- if (pg->info.stats.reported_epoch + g_conf->osd_pg_stat_report_interval_max < advmap.osdmap->get_epoch()) {
+ if (pg->info.stats.reported_epoch + pg->osd->osd->cct->_conf->osd_pg_stat_report_interval_max < advmap.osdmap->get_epoch()) {
dout(20) << "reporting stats to osd after " << (advmap.osdmap->get_epoch() - pg->info.stats.reported_epoch)
<< " epochs" << dendl;
pg->publish_stats_to_osd();
pg->discover_all_missing(*context< RecoveryMachine >().get_query_map());
}
- if (g_conf->osd_check_for_log_corruption)
+ if (pg->osd->osd->cct->_conf->osd_check_for_log_corruption)
pg->check_log_for_corruption(pg->osd->store);
int unfound = pg->pg_log.get_missing().num_missing() - pg->missing_loc.size();
if (unfound > 0 &&
pg->all_unfound_are_queried_or_lost(pg->get_osdmap())) {
- if (g_conf->osd_auto_mark_unfound_lost) {
+ if (pg->osd->osd->cct->_conf->osd_auto_mark_unfound_lost) {
pg->osd->clog.error() << pg->info.pgid << " has " << unfound
<< " objects unfound and apparently lost, would automatically marking lost but NOT IMPLEMENTED\n";
//pg->mark_all_unfound_lost(*context< RecoveryMachine >().get_cur_transaction());
pg->state_clear(PG_STATE_BACKFILL_WAIT);
pg->state_clear(PG_STATE_RECOVERY_WAIT);
pg->state_clear(PG_STATE_REPLAY);
- utime_t dur = ceph_clock_now(g_ceph_context) - enter_time;
+ utime_t dur = ceph_clock_now(pg->osd->osd->cct) - enter_time;
pg->osd->recoverystate_perf->tinc(rs_active_latency, dur);
}
context< RecoveryMachine >().log_exit(state_name, enter_time);
PG *pg = context< RecoveryMachine >().pg;
pg->osd->remote_reserver.cancel_reservation(pg->info.pgid);
- utime_t dur = ceph_clock_now(g_ceph_context) - enter_time;
+ utime_t dur = ceph_clock_now(pg->osd->osd->cct) - enter_time;
pg->osd->recoverystate_perf->tinc(rs_replicaactive_latency, dur);
}
{
context< RecoveryMachine >().log_exit(state_name, enter_time);
PG *pg = context< RecoveryMachine >().pg;
- utime_t dur = ceph_clock_now(g_ceph_context) - enter_time;
+ utime_t dur = ceph_clock_now(pg->osd->osd->cct) - enter_time;
pg->osd->recoverystate_perf->tinc(rs_stray_latency, dur);
}
{
context< RecoveryMachine >().log_exit(state_name, enter_time);
PG *pg = context< RecoveryMachine >().pg;
- utime_t dur = ceph_clock_now(g_ceph_context) - enter_time;
+ utime_t dur = ceph_clock_now(pg->osd->osd->cct) - enter_time;
pg->osd->recoverystate_perf->tinc(rs_getinfo_latency, dur);
}
{
context< RecoveryMachine >().log_exit(state_name, enter_time);
PG *pg = context< RecoveryMachine >().pg;
- utime_t dur = ceph_clock_now(g_ceph_context) - enter_time;
+ utime_t dur = ceph_clock_now(pg->osd->osd->cct) - enter_time;
pg->osd->recoverystate_perf->tinc(rs_getlog_latency, dur);
}
{
context< RecoveryMachine >().log_exit(state_name, enter_time);
PG *pg = context< RecoveryMachine >().pg;
- utime_t dur = ceph_clock_now(g_ceph_context) - enter_time;
+ utime_t dur = ceph_clock_now(pg->osd->osd->cct) - enter_time;
pg->osd->recoverystate_perf->tinc(rs_waitactingchange_latency, dur);
}
PG *pg = context< RecoveryMachine >().pg;
pg->state_clear(PG_STATE_INCOMPLETE);
- utime_t dur = ceph_clock_now(g_ceph_context) - enter_time;
+ utime_t dur = ceph_clock_now(pg->osd->osd->cct) - enter_time;
pg->osd->recoverystate_perf->tinc(rs_incomplete_latency, dur);
}
{
context< RecoveryMachine >().log_exit(state_name, enter_time);
PG *pg = context< RecoveryMachine >().pg;
- utime_t dur = ceph_clock_now(g_ceph_context) - enter_time;
+ utime_t dur = ceph_clock_now(pg->osd->osd->cct) - enter_time;
pg->osd->recoverystate_perf->tinc(rs_getmissing_latency, dur);
}
{
context< RecoveryMachine >().log_exit(state_name, enter_time);
PG *pg = context< RecoveryMachine >().pg;
- utime_t dur = ceph_clock_now(g_ceph_context) - enter_time;
+ utime_t dur = ceph_clock_now(pg->osd->osd->cct) - enter_time;
pg->osd->recoverystate_perf->tinc(rs_waitupthru_latency, dur);
}
void PG::RecoveryState::RecoveryMachine::log_exit(const char *state_name, utime_t enter_time)
{
- utime_t dur = ceph_clock_now(g_ceph_context) - enter_time;
+ utime_t dur = ceph_clock_now(pg->osd->osd->cct) - enter_time;
dout(5) << "exit " << state_name << " " << dur << " " << event_count << " " << event_time << dendl;
- pg->osd->pg_recovery_stats.log_exit(state_name, ceph_clock_now(g_ceph_context) - enter_time,
+ pg->osd->pg_recovery_stats.log_exit(state_name, ceph_clock_now(pg->osd->osd->cct) - enter_time,
event_count, event_time);
event_count = 0;
event_time = utime_t();
return false;
}
+void PG::RecoveryState::start_handle(RecoveryCtx *new_ctx) {
+ assert(!rctx);
+ rctx = new_ctx;
+ if (rctx)
+ rctx->start_time = ceph_clock_now(pg->osd->osd->cct);
+}
+
+void PG::RecoveryState::end_handle() {
+ if (rctx) {
+ utime_t dur = ceph_clock_now(pg->osd->osd->cct) - rctx->start_time;
+ machine.event_time += dur;
+ }
+ machine.event_count++;
+ rctx = 0;
+}
+
void intrusive_ptr_add_ref(PG *pg) { pg->get("intptr"); }
void intrusive_ptr_release(PG *pg) { pg->put("intptr"); }
const char *state_name;
utime_t enter_time;
const char *get_state_name() { return state_name; }
- NamedState() : state_name(0), enter_time(ceph_clock_now(g_ceph_context)) {}
+ NamedState() : state_name(0), enter_time(ceph_clock_now(g_ceph_context)) {};
virtual ~NamedState() {}
};
/* Encapsulates PG recovery process */
class RecoveryState {
- void start_handle(RecoveryCtx *new_ctx) {
- assert(!rctx);
- rctx = new_ctx;
- if (rctx)
- rctx->start_time = ceph_clock_now(g_ceph_context);
- }
-
- void end_handle() {
- if (rctx) {
- utime_t dur = ceph_clock_now(g_ceph_context) - rctx->start_time;
- machine.event_time += dur;
- }
- machine.event_count++;
- rctx = 0;
- }
+ void start_handle(RecoveryCtx *new_ctx);
+ void end_handle();
/* States */
struct Initial;
else {
dout(7) << "missing " << soid << " v " << v << ", pulling." << dendl;
map<int, vector<PullOp> > pulls;
- prepare_pull(soid, v, g_conf->osd_client_op_priority, &pulls);
- send_pulls(g_conf->osd_client_op_priority, pulls);
+ prepare_pull(soid, v, osd->osd->cct->_conf->osd_client_op_priority, &pulls);
+ send_pulls(osd->osd->cct->_conf->osd_client_op_priority, pulls);
}
waiting_for_missing_object[soid].push_back(op);
op->mark_delayed("waiting for missing object");
}
}
map<int, vector<PushOp> > pushes;
- prep_object_replica_pushes(soid, v, g_conf->osd_client_op_priority, &pushes);
- send_pushes(g_conf->osd_client_op_priority, pushes);
+ prep_object_replica_pushes(soid, v, osd->osd->cct->_conf->osd_client_op_priority, &pushes);
+ send_pushes(osd->osd->cct->_conf->osd_client_op_priority, pushes);
}
waiting_for_degraded_object[soid].push_back(op);
op->mark_delayed("waiting for degraded object");
string prefix;
string format;
- cmd_getval(g_ceph_context, cmdmap, "format", format);
+ cmd_getval(osd->osd->cct, cmdmap, "format", format);
boost::scoped_ptr<Formatter> f(new_formatter(format));
// demand that we have a formatter
if (!f)
f.reset(new_formatter("json"));
string command;
- cmd_getval(g_ceph_context, cmdmap, "cmd", command);
+ cmd_getval(osd->osd->cct, cmdmap, "cmd", command);
if (command == "query") {
f->open_object_section("pg");
f->dump_string("state", pg_state_string(get_state()));
}
else if (command == "mark_unfound_lost") {
string mulcmd;
- cmd_getval(g_ceph_context, cmdmap, "mulcmd", mulcmd);
+ cmd_getval(osd->osd->cct, cmdmap, "mulcmd", mulcmd);
if (mulcmd != "revert") {
ss << "mode must be 'revert'; mark and delete not yet implemented";
return -EINVAL;
else if (command == "list_missing") {
hobject_t offset;
string offset_json;
- if (cmd_getval(g_ceph_context, cmdmap, "offset", offset_json)) {
+ if (cmd_getval(osd->osd->cct, cmdmap, "offset", offset_json)) {
json_spirit::Value v;
try {
if (!json_spirit::read(offset_json, v))
f->open_array_section("objects");
int32_t num = 0;
bufferlist bl;
- while (p != missing.missing.end() && num < g_conf->osd_command_max_records) {
+ while (p != missing.missing.end() && num < osd->osd->cct->_conf->osd_command_max_records) {
f->open_object_section("object");
{
f->open_object_section("oid");
dout(10) << " pgls pg=" << m->get_pg() << " != " << info.pgid << dendl;
result = 0; // hmm?
} else {
- unsigned list_size = MIN(g_conf->osd_max_pgls, p->op.pgls.count);
+ unsigned list_size = MIN(osd->osd->cct->_conf->osd_max_pgls, p->op.pgls.count);
dout(10) << " pgls pg=" << m->get_pg() << " count " << list_size << dendl;
// read into a buffer
return;
}
- size_t target = g_conf->osd_min_pg_log_entries;
+ size_t target = osd->osd->cct->_conf->osd_min_pg_log_entries;
if (is_degraded() ||
state_test(PG_STATE_RECOVERING |
PG_STATE_RECOVERY_WAIT |
PG_STATE_BACKFILL |
PG_STATE_BACKFILL_WAIT |
PG_STATE_BACKFILL_TOOFULL)) {
- target = g_conf->osd_max_pg_log_entries;
+ target = osd->osd->cct->_conf->osd_max_pg_log_entries;
}
if (min_last_complete_ondisk != eversion_t() &&
ctx->user_at_version = obc->obs.oi.user_version;
// note my stats
- utime_t now = ceph_clock_now(g_ceph_context);
+ utime_t now = ceph_clock_now(osd->osd->cct);
// note some basic context for op replication that prepare_transaction may clobber
eversion_t old_last_update = pg_log.get_head();
append_log(ctx->log, pg_trim_to, ctx->local_t);
// verify that we are doing this in order?
- if (g_conf->osd_debug_op_order && m->get_source().is_client()) {
+ if (osd->osd->cct->_conf->osd_debug_op_order && m->get_source().is_client()) {
map<client_t,tid_t>& cm = debug_op_order[obc->obs.oi.soid];
tid_t t = m->get_tid();
client_t n = m->get_source().num();
OpRequestRef op = ctx->op;
MOSDOp *m = static_cast<MOSDOp*>(op->request);
- utime_t now = ceph_clock_now(g_ceph_context);
+ utime_t now = ceph_clock_now(osd->osd->cct);
utime_t latency = now;
latency -= ctx->op->request->get_recv_stamp();
void ReplicatedPG::log_subop_stats(OpRequestRef op, int tag_inb, int tag_lat)
{
- utime_t now = ceph_clock_now(g_ceph_context);
+ utime_t now = ceph_clock_now(osd->osd->cct);
utime_t latency = now;
latency -= op->request->get_recv_stamp();
BackfillInterval bi;
osr->flush();
scan_range(
- m->begin, g_conf->osd_backfill_scan_min,
- g_conf->osd_backfill_scan_max, &bi, handle);
+ m->begin, osd->osd->cct->_conf->osd_backfill_scan_min,
+ osd->osd->cct->_conf->osd_backfill_scan_max, &bi, handle);
MOSDPGScan *reply = new MOSDPGScan(MOSDPGScan::OP_SCAN_DIGEST,
get_osdmap()->get_epoch(), m->query_epoch,
info.pgid, bi.begin, bi.end);
reply->pgid = info.pgid;
reply->map_epoch = m->map_epoch;
reply->replies.swap(replies);
- reply->compute_cost(g_ceph_context);
+ reply->compute_cost(osd->osd->cct);
t->register_on_complete(new C_OSD_SendMessageOnConn(
osd, reply, m->get_connection()));
reply->pgid = info.pgid;
reply->map_epoch = m->map_epoch;
reply->pulls.swap(replies);
- reply->compute_cost(g_ceph_context);
+ reply->compute_cost(osd->osd->cct);
t->register_on_complete(new C_OSD_SendMessageOnConn(
osd, reply, m->get_connection()));
case MOSDPGBackfill::OP_BACKFILL_FINISH:
{
assert(is_replica());
- assert(g_conf->osd_kill_backfill_at != 1);
+ assert(osd->osd->cct->_conf->osd_kill_backfill_at != 1);
MOSDPGBackfill *reply = new MOSDPGBackfill(MOSDPGBackfill::OP_BACKFILL_FINISH_ACK,
get_osdmap()->get_epoch(), m->query_epoch,
info.pgid);
- reply->set_priority(g_conf->osd_recovery_op_priority);
+ reply->set_priority(osd->osd->cct->_conf->osd_recovery_op_priority);
osd->send_message_osd_cluster(reply, m->get_connection());
queue_peering_event(
CephPeeringEvtRef(
case MOSDPGBackfill::OP_BACKFILL_PROGRESS:
{
assert(is_replica());
- assert(g_conf->osd_kill_backfill_at != 2);
+ assert(osd->osd->cct->_conf->osd_kill_backfill_at != 2);
info.last_backfill = m->last_backfill;
if (m->compat_stat_sum) {
case MOSDPGBackfill::OP_BACKFILL_FINISH_ACK:
{
assert(is_primary());
- assert(g_conf->osd_kill_backfill_at != 3);
+ assert(osd->osd->cct->_conf->osd_kill_backfill_at != 3);
finish_recovery_op(hobject_t::get_max());
}
break;
&obc->obs,
obc->ssc,
this);
- ctx->mtime = ceph_clock_now(g_ceph_context);
+ ctx->mtime = ceph_clock_now(osd->osd->cct);
ctx->at_version.epoch = get_osdmap()->get_epoch();
ctx->at_version.version = pg_log.get_head().version + 1;
return result;
}
-static int check_offset_and_length(uint64_t offset, uint64_t length)
+static int check_offset_and_length(uint64_t offset, uint64_t length, uint64_t max)
{
- if (offset >= g_conf->osd_max_object_size ||
- length > g_conf->osd_max_object_size ||
- offset + length > g_conf->osd_max_object_size)
+ if (offset >= max ||
+ length > max ||
+ offset + length > max)
return -EFBIG;
return 0;
// munge ZERO -> TRUNCATE? (don't munge to DELETE or we risk hosing attributes)
if (op.op == CEPH_OSD_OP_ZERO &&
obs.exists &&
- op.extent.offset < g_conf->osd_max_object_size &&
+ op.extent.offset < osd->osd->cct->_conf->osd_max_object_size &&
op.extent.length >= 1 &&
- op.extent.length <= g_conf->osd_max_object_size &&
+ op.extent.length <= osd->osd->cct->_conf->osd_max_object_size &&
op.extent.offset + op.extent.length >= oi.size) {
if (op.extent.offset >= oi.size) {
// no-op
uint64_t last = op.extent.offset;
for (miter = m.begin(); miter != m.end(); ++miter) {
// verify hole?
- if (g_conf->osd_verify_sparse_read_holes &&
+ if (osd->osd->cct->_conf->osd_verify_sparse_read_holes &&
last < miter->first) {
bufferlist t;
uint64_t len = miter->first - last;
}
// verify trailing hole?
- if (g_conf->osd_verify_sparse_read_holes) {
+ if (osd->osd->cct->_conf->osd_verify_sparse_read_holes) {
uint64_t end = MIN(op.extent.offset + op.extent.length, oi.size);
if (last < end) {
bufferlist t;
timeout = 0;
}
if (!timeout)
- timeout = g_conf->osd_default_notify_timeout;
+ timeout = osd->osd->cct->_conf->osd_default_notify_timeout;
notify_info_t n;
n.timeout = timeout;
oi.truncate_size = op.extent.truncate_size;
}
}
- result = check_offset_and_length(op.extent.offset, op.extent.length);
+ result = check_offset_and_length(op.extent.offset, op.extent.length, osd->osd->cct->_conf->osd_max_object_size);
if (result < 0)
break;
t.write(coll, soid, op.extent.offset, op.extent.length, osd_op.indata);
result = -EINVAL;
break;
}
- result = check_offset_and_length(op.extent.offset, op.extent.length);
+ result = check_offset_and_length(op.extent.offset, op.extent.length, osd->osd->cct->_conf->osd_max_object_size);
if (result < 0)
break;
if (obs.exists) {
case CEPH_OSD_OP_ZERO:
++ctx->num_write;
{ // zero
- result = check_offset_and_length(op.extent.offset, op.extent.length);
+ result = check_offset_and_length(op.extent.offset, op.extent.length, osd->osd->cct->_conf->osd_max_object_size);
if (result < 0)
break;
assert(op.extent.length);
break;
}
- if (op.extent.offset > g_conf->osd_max_object_size) {
+ if (op.extent.offset > osd->osd->cct->_conf->osd_max_object_size) {
result = -EFBIG;
break;
}
dout(10) << "watch: peer_addr="
<< ctx->op->request->get_connection()->get_peer_addr() << dendl;
- watch_info_t w(cookie, g_conf->osd_client_watch_timeout,
+ watch_info_t w(cookie, osd->osd->cct->_conf->osd_client_watch_timeout,
ctx->op->request->get_connection()->get_peer_addr());
if (do_watch) {
if (oi.watchers.count(make_pair(cookie, entity))) {
case CEPH_OSD_OP_SETXATTR:
++ctx->num_write;
{
- if (op.xattr.value_len > g_conf->osd_max_attr_size) {
+ if (op.xattr.value_len > osd->osd->cct->_conf->osd_max_attr_size) {
result = -EFBIG;
break;
}
}
}
- if (g_conf->osd_tmapput_sets_uses_tmap) {
- assert(g_conf->osd_auto_upgrade_tmap);
+ if (osd->osd->cct->_conf->osd_tmapput_sets_uses_tmap) {
+ assert(osd->osd->cct->_conf->osd_auto_upgrade_tmap);
oi.uses_tmap = true;
}
}
set<string> out_set;
- if (oi.uses_tmap && g_conf->osd_auto_upgrade_tmap) {
+ if (oi.uses_tmap && osd->osd->cct->_conf->osd_auto_upgrade_tmap) {
dout(20) << "CEPH_OSD_OP_OMAPGETKEYS: "
<< " Reading " << oi.soid << " omap from tmap" << dendl;
map<string, bufferlist> vals;
}
map<string, bufferlist> out_set;
- if (oi.uses_tmap && g_conf->osd_auto_upgrade_tmap) {
+ if (oi.uses_tmap && osd->osd->cct->_conf->osd_auto_upgrade_tmap) {
dout(20) << "CEPH_OSD_OP_OMAPGETVALS: "
<< " Reading " << oi.soid << " omap from tmap" << dendl;
map<string, bufferlist> vals;
case CEPH_OSD_OP_OMAPGETHEADER:
++ctx->num_read;
{
- if (oi.uses_tmap && g_conf->osd_auto_upgrade_tmap) {
+ if (oi.uses_tmap && osd->osd->cct->_conf->osd_auto_upgrade_tmap) {
dout(20) << "CEPH_OSD_OP_OMAPGETHEADER: "
<< " Reading " << oi.soid << " omap from tmap" << dendl;
map<string, bufferlist> vals;
goto fail;
}
map<string, bufferlist> out;
- if (oi.uses_tmap && g_conf->osd_auto_upgrade_tmap) {
+ if (oi.uses_tmap && osd->osd->cct->_conf->osd_auto_upgrade_tmap) {
dout(20) << "CEPH_OSD_OP_OMAPGET: "
<< " Reading " << oi.soid << " omap from tmap" << dendl;
map<string, bufferlist> vals;
case CEPH_OSD_OP_OMAPSETVALS:
++ctx->num_write;
{
- if (oi.uses_tmap && g_conf->osd_auto_upgrade_tmap) {
+ if (oi.uses_tmap && osd->osd->cct->_conf->osd_auto_upgrade_tmap) {
_copy_up_tmap(ctx);
}
if (!obs.exists) {
case CEPH_OSD_OP_OMAPSETHEADER:
++ctx->num_write;
{
- if (oi.uses_tmap && g_conf->osd_auto_upgrade_tmap) {
+ if (oi.uses_tmap && osd->osd->cct->_conf->osd_auto_upgrade_tmap) {
_copy_up_tmap(ctx);
}
if (!obs.exists) {
result = -ENOENT;
break;
}
- if (oi.uses_tmap && g_conf->osd_auto_upgrade_tmap) {
+ if (oi.uses_tmap && osd->osd->cct->_conf->osd_auto_upgrade_tmap) {
_copy_up_tmap(ctx);
}
t.touch(coll, soid);
result = -ENOENT;
break;
}
- if (oi.uses_tmap && g_conf->osd_auto_upgrade_tmap) {
+ if (oi.uses_tmap && osd->osd->cct->_conf->osd_auto_upgrade_tmap) {
_copy_up_tmap(ctx);
}
t.touch(coll, soid);
// _prior_ to being committed; it will not get set with
// writeahead journaling, for instance.
if (repop->ctx->readable_stamp == utime_t())
- repop->ctx->readable_stamp = ceph_clock_now(g_ceph_context);
+ repop->ctx->readable_stamp = ceph_clock_now(osd->osd->cct);
}
}
RepGather *repop = new RepGather(ctx, obc, rep_tid, info.last_complete);
- repop->start = ceph_clock_now(g_ceph_context);
+ repop->start = ceph_clock_now(osd->osd->cct);
repop_queue.push_back(&repop->queue_item);
repop_map[repop->rep_tid] = repop;
osd_reqid_t reqid(osd->get_cluster_msgr_name(), 0, rep_tid);
OpContext *ctx = new OpContext(OpRequestRef(), reqid, ops,
&obc->obs, obc->ssc, this);
- ctx->mtime = ceph_clock_now(g_ceph_context);
+ ctx->mtime = ceph_clock_now(osd->osd->cct);
ctx->at_version.epoch = get_osdmap()->get_epoch();
ctx->at_version.version = pg_log.get_head().version + 1;
if (size)
data_subset.insert(0, size);
- if (!g_conf->osd_recover_clone_overlap) {
+ if (!osd->osd->cct->_conf->osd_recover_clone_overlap) {
dout(10) << "calc_head_subsets " << head << " -- osd_recover_clone_overlap disabled" << dendl;
return;
}
}
- if (cloning.num_intervals() > g_conf->osd_recover_clone_overlap_limit) {
+ if (cloning.num_intervals() > osd->osd->cct->_conf->osd_recover_clone_overlap_limit) {
dout(10) << "skipping clone, too many holes" << dendl;
clone_subsets.clear();
cloning.clear();
if (size)
data_subset.insert(0, size);
- if (!g_conf->osd_recover_clone_overlap) {
+ if (!osd->osd->cct->_conf->osd_recover_clone_overlap) {
dout(10) << "calc_clone_subsets " << soid << " -- osd_recover_clone_overlap disabled" << dendl;
return;
}
<< " overlap " << next << dendl;
}
- if (cloning.num_intervals() > g_conf->osd_recover_clone_overlap_limit) {
+ if (cloning.num_intervals() > osd->osd->cct->_conf->osd_recover_clone_overlap_limit) {
dout(10) << "skipping clone, too many holes" << dendl;
clone_subsets.clear();
cloning.clear();
subop->set_priority(prio);
subop->ops = vector<OSDOp>(1);
subop->ops[0].op.op = CEPH_OSD_OP_PULL;
- subop->ops[0].op.extent.length = g_conf->osd_recovery_max_chunk;
+ subop->ops[0].op.extent.length = osd->osd->cct->_conf->osd_recovery_max_chunk;
subop->recovery_info = recovery_info;
subop->recovery_progress = progress;
msg->set_priority(prio);
for (;
(j != i->second.end() &&
- cost < g_conf->osd_max_push_cost &&
- pushes < g_conf->osd_max_push_objects) ;
+ cost < osd->osd->cct->_conf->osd_max_push_cost &&
+ pushes < osd->osd->cct->_conf->osd_max_push_objects) ;
++j) {
dout(20) << __func__ << ": sending push " << *j
<< " to osd." << i->first << dendl;
- cost += j->cost(g_ceph_context);
+ cost += j->cost(osd->osd->cct);
pushes += 1;
msg->pushes.push_back(*j);
}
- msg->compute_cost(g_ceph_context);
+ msg->compute_cost(osd->osd->cct);
osd->send_message_osd_cluster(msg, con);
}
}
msg->pgid = info.pgid;
msg->map_epoch = get_osdmap()->get_epoch();
msg->pulls.swap(i->second);
- msg->compute_cost(g_ceph_context);
+ msg->compute_cost(osd->osd->cct);
osd->send_message_osd_cluster(msg, con);
}
}
new_progress.first = false;
}
- uint64_t available = g_conf->osd_recovery_max_chunk;
+ uint64_t available = osd->osd->cct->_conf->osd_recovery_max_chunk;
if (!progress.omap_complete) {
ObjectMap::ObjectMapIterator iter =
osd->store->get_omap_iterator(coll,
ObjectStore::Transaction *t = new ObjectStore::Transaction;
C_PG_MarkUnfoundLost *c = new C_PG_MarkUnfoundLost(this);
- utime_t mtime = ceph_clock_now(g_ceph_context);
+ utime_t mtime = ceph_clock_now(osd->osd->cct);
info.last_update.epoch = get_osdmap()->get_epoch();
const pg_missing_t &missing = pg_log.get_missing();
map<hobject_t, pg_missing_t::item>::const_iterator m = missing.missing.begin();
++skipped;
} else {
int r = prepare_pull(
- soid, need, g_conf->osd_recovery_op_priority, &pulls);
+ soid, need, osd->osd->cct->_conf->osd_recovery_op_priority, &pulls);
switch (r) {
case PULL_YES:
++started;
pg_log.set_last_requested(v);
}
- send_pulls(g_conf->osd_recovery_op_priority, pulls);
+ send_pulls(osd->osd->cct->_conf->osd_recovery_op_priority, pulls);
return started;
}
dout(10) << __func__ << ": recover_object_replicas(" << soid << ")" << dendl;
map<hobject_t,pg_missing_t::item>::const_iterator r = m.missing.find(soid);
started += prep_object_replica_pushes(soid, r->second.need,
- g_conf->osd_recovery_op_priority,
+ osd->osd->cct->_conf->osd_recovery_op_priority,
&pushes);
}
}
- send_pushes(g_conf->osd_recovery_op_priority, pushes);
+ send_pushes(osd->osd->cct->_conf->osd_recovery_op_priority, pushes);
return started;
}
prep_backfill_object_push(
i->first, i->second.first, i->second.second, backfill_target, &pushes);
}
- send_pushes(g_conf->osd_recovery_op_priority, pushes);
+ send_pushes(osd->osd->cct->_conf->osd_recovery_op_priority, pushes);
release_waiting_for_backfill_pos();
dout(5) << "backfill_pos is " << backfill_pos << " and pinfo.last_backfill is "
ObjectContextRef obc = get_object_context(oid, false);
obc->ondisk_read_lock();
(*pushes)[peer].push_back(PushOp());
- prep_push_to_replica(obc, oid, peer, g_conf->osd_recovery_op_priority,
+ prep_push_to_replica(obc, oid, peer, osd->osd->cct->_conf->osd_recovery_op_priority,
&((*pushes)[peer].back()));
obc->ondisk_read_unlock();
}
assert(info.last_update >= pg_log.get_tail()); // otherwise we need some help!
- if (!g_conf->osd_debug_verify_stray_on_activate)
+ if (!osd->osd->cct->_conf->osd_debug_verify_stray_on_activate)
return;
// just scan the log.