const bufferlist& inbl,
std::function<void(int,const std::string&,bufferlist&)> on_finish)
{
- int ret = 0;
- stringstream ss; // stderr error message stream
std::unique_ptr<Formatter> f(Formatter::create(
format, "json-pretty", "json-pretty"));
+ int ret = 0;
+ stringstream ss; // stderr error message stream
bufferlist outbl; // if empty at end, we'll dump formatter as output
- if (prefix == "status") {
+ // --- PG commands are routed here to PG::do_command ---
+ if (prefix == "pg" ||
+ prefix == "query" ||
+ prefix == "mark_unfound_lost" ||
+ prefix == "list_unfound"
+ ) {
+ string pgidstr;
+ pg_t pgid;
+ if (!cmd_getval(cct, cmdmap, "pgid", pgidstr)) {
+ ss << "no pgid specified";
+ ret = -EINVAL;
+ goto out;
+ }
+ if (!pgid.parse(pgidstr.c_str())) {
+ ss << "couldn't parse pgid '" << pgidstr << "'";
+ ret = -EINVAL;
+ goto out;
+ }
+ spg_t pcand;
+ PGRef pg;
+ if (osdmap->get_primary_shard(pgid, &pcand) &&
+ (pg = _lookup_lock_pg(pcand))) {
+ if (pg->is_primary()) {
+ cmdmap_t new_cmdmap = cmdmap;
+ try {
+ pg->do_command(prefix, new_cmdmap, inbl, on_finish);
+ pg->unlock();
+ return; // the pg handler calls on_finish directly
+ } catch (const bad_cmd_get& e) {
+ pg->unlock();
+ ss << e.what();
+ ret = -EINVAL;
+ goto out;
+ }
+ } else {
+ ss << "not primary for pgid " << pgid;
+ // do not reply; they will get newer maps and realize they
+ // need to resend.
+#warning fixme on client side
+ pg->unlock();
+ ret = -EAGAIN;
+ goto out;
+ }
+ } else {
+ ss << "i don't have pgid " << pgid;
+ ret = -ENOENT;
+ }
+ }
+
+ // --- OSD commands follow ---
+
+ else if (prefix == "status") {
+ lock_guard l(osd_lock);
f->open_object_section("status");
f->dump_stream("cluster_fsid") << superblock.cluster_fsid;
f->dump_stream("osd_fsid") << superblock.osd_fsid;
f->close_section();
} else if (prefix == "get_latest_osdmap") {
get_latest_osdmap();
- } else if (prefix == "heap") {
- auto result = ceph::osd_cmds::heap(*cct, cmdmap, *f, ss);
-
- // Note: Failed heap profile commands won't necessarily trigger an error:
- f->open_object_section("result");
- f->dump_string("error", cpp_strerror(result));
- f->dump_bool("success", result >= 0);
- f->close_section();
} else if (prefix == "set_heap_property") {
string property;
int64_t value = 0;
}
f->close_section();
} else if (prefix == "send_beacon") {
+ lock_guard l(osd_lock);
if (is_active()) {
send_beacon(ceph::coarse_mono_clock::now());
}
- } else if (prefix == "dump_osd_network") {
- int64_t value = 0;
- if (!(cmd_getval(cct, cmdmap, "value", value))) {
- // Convert milliseconds to microseconds
- value = static_cast<int64_t>(g_conf().get_val<double>("mon_warn_on_slow_ping_time")) * 1000;
- if (value == 0) {
- double ratio = g_conf().get_val<double>("mon_warn_on_slow_ping_ratio");
- value = g_conf().get_val<int64_t>("osd_heartbeat_grace");
- value *= 1000000 * ratio; // Seconds of grace to microseconds at ratio
+ }
+
+ else if (prefix == "cluster_log") {
+ vector<string> msg;
+ cmd_getval(cct, cmdmap, "message", msg);
+ if (msg.empty()) {
+ ret = -EINVAL;
+ ss << "ignoring empty log message";
+ goto out;
+ }
+ string message = msg.front();
+ for (vector<string>::iterator a = ++msg.begin(); a != msg.end(); ++a)
+ message += " " + *a;
+ string lvl;
+ cmd_getval(cct, cmdmap, "level", lvl);
+ clog_type level = string_to_clog_type(lvl);
+ if (level < 0) {
+ ret = -EINVAL;
+ ss << "unknown level '" << lvl << "'";
+ goto out;
+ }
+ clog->do_log(level, message);
+ }
+
+ else if (prefix == "bench") {
+ lock_guard l(osd_lock);
+ int64_t count;
+ int64_t bsize;
+ int64_t osize, onum;
+ // default count 1G, size 4MB
+ cmd_getval(cct, cmdmap, "count", count, (int64_t)1 << 30);
+ cmd_getval(cct, cmdmap, "size", bsize, (int64_t)4 << 20);
+ cmd_getval(cct, cmdmap, "object_size", osize, (int64_t)0);
+ cmd_getval(cct, cmdmap, "object_num", onum, (int64_t)0);
+
+ uint32_t duration = cct->_conf->osd_bench_duration;
+
+ if (bsize > (int64_t) cct->_conf->osd_bench_max_block_size) {
+ // let us limit the block size because the next checks rely on it
+ // having a sane value. If we allow any block size to be set things
+ // can still go sideways.
+ ss << "block 'size' values are capped at "
+ << byte_u_t(cct->_conf->osd_bench_max_block_size) << ". If you wish to use"
+ << " a higher value, please adjust 'osd_bench_max_block_size'";
+ ret = -EINVAL;
+ goto out;
+ } else if (bsize < (int64_t) (1 << 20)) {
+ // entering the realm of small block sizes.
+ // limit the count to a sane value, assuming a configurable amount of
+ // IOPS and duration, so that the OSD doesn't get hung up on this,
+ // preventing timeouts from going off
+ int64_t max_count =
+ bsize * duration * cct->_conf->osd_bench_small_size_max_iops;
+ if (count > max_count) {
+ ss << "'count' values greater than " << max_count
+ << " for a block size of " << byte_u_t(bsize) << ", assuming "
+ << cct->_conf->osd_bench_small_size_max_iops << " IOPS,"
+ << " for " << duration << " seconds,"
+ << " can cause ill effects on osd. "
+ << " Please adjust 'osd_bench_small_size_max_iops' with a higher"
+ << " value if you wish to use a higher 'count'.";
+ ret = -EINVAL;
+ goto out;
}
} else {
- // Convert user input to microseconds
- value *= 1000;
+ // 1MB block sizes are big enough so that we get more stuff done.
+ // However, to avoid the osd from getting hung on this and having
+ // timers being triggered, we are going to limit the count assuming
+ // a configurable throughput and duration.
+ // NOTE: max_count is the total amount of bytes that we believe we
+ // will be able to write during 'duration' for the given
+ // throughput. The block size hardly impacts this unless it's
+ // way too big. Given we already check how big the block size
+ // is, it's safe to assume everything will check out.
+ int64_t max_count =
+ cct->_conf->osd_bench_large_size_max_throughput * duration;
+ if (count > max_count) {
+ ss << "'count' values greater than " << max_count
+ << " for a block size of " << byte_u_t(bsize) << ", assuming "
+ << byte_u_t(cct->_conf->osd_bench_large_size_max_throughput) << "/s,"
+ << " for " << duration << " seconds,"
+ << " can cause ill effects on osd. "
+ << " Please adjust 'osd_bench_large_size_max_throughput'"
+ << " with a higher value if you wish to use a higher 'count'.";
+ ret = -EINVAL;
+ goto out;
+ }
}
- if (value < 0) value = 0;
- struct osd_ping_time_t {
- uint32_t pingtime;
- int to;
- bool back;
- std::array<uint32_t,3> times;
- std::array<uint32_t,3> min;
- std::array<uint32_t,3> max;
- uint32_t last;
- uint32_t last_update;
+ if (osize && bsize > osize)
+ bsize = osize;
- bool operator<(const osd_ping_time_t& rhs) const {
- if (pingtime < rhs.pingtime)
- return true;
- if (pingtime > rhs.pingtime)
- return false;
- if (to < rhs.to)
- return true;
- if (to > rhs.to)
- return false;
- return back;
+ dout(1) << " bench count " << count
+ << " bsize " << byte_u_t(bsize) << dendl;
+
+ ObjectStore::Transaction cleanupt;
+
+ if (osize && onum) {
+ bufferlist bl;
+ bufferptr bp(osize);
+ bp.zero();
+ bl.push_back(std::move(bp));
+ bl.rebuild_page_aligned();
+ for (int i=0; i<onum; ++i) {
+ char nm[30];
+ snprintf(nm, sizeof(nm), "disk_bw_test_%d", i);
+ object_t oid(nm);
+ hobject_t soid(sobject_t(oid, 0));
+ ObjectStore::Transaction t;
+ t.write(coll_t(), ghobject_t(soid), 0, osize, bl);
+ store->queue_transaction(service.meta_ch, std::move(t), NULL);
+ cleanupt.remove(coll_t(), ghobject_t(soid));
}
- };
+ }
- set<osd_ping_time_t> sorted;
- // Get pingtimes under lock and not on the stack
- map<int, osd_stat_t::Interfaces> *pingtimes = new map<int, osd_stat_t::Interfaces>;
- service.get_hb_pingtime(pingtimes);
- for (auto j : *pingtimes) {
- if (j.second.last_update == 0)
- continue;
- osd_ping_time_t item;
- item.pingtime = std::max(j.second.back_pingtime[0], j.second.back_pingtime[1]);
- item.pingtime = std::max(item.pingtime, j.second.back_pingtime[2]);
- if (item.pingtime >= value) {
- item.to = j.first;
- item.times[0] = j.second.back_pingtime[0];
- item.times[1] = j.second.back_pingtime[1];
- item.times[2] = j.second.back_pingtime[2];
- item.min[0] = j.second.back_min[0];
- item.min[1] = j.second.back_min[1];
- item.min[2] = j.second.back_min[2];
- item.max[0] = j.second.back_max[0];
- item.max[1] = j.second.back_max[1];
- item.max[2] = j.second.back_max[2];
- item.last = j.second.back_last;
- item.back = true;
- item.last_update = j.second.last_update;
- sorted.emplace(item);
+ bufferlist bl;
+ bufferptr bp(bsize);
+ bp.zero();
+ bl.push_back(std::move(bp));
+ bl.rebuild_page_aligned();
+
+ {
+ C_SaferCond waiter;
+ if (!service.meta_ch->flush_commit(&waiter)) {
+ waiter.wait();
}
- if (j.second.front_last == 0)
- continue;
- item.pingtime = std::max(j.second.front_pingtime[0], j.second.front_pingtime[1]);
- item.pingtime = std::max(item.pingtime, j.second.front_pingtime[2]);
- if (item.pingtime >= value) {
- item.to = j.first;
- item.times[0] = j.second.front_pingtime[0];
- item.times[1] = j.second.front_pingtime[1];
- item.times[2] = j.second.front_pingtime[2];
- item.min[0] = j.second.front_min[0];
- item.min[1] = j.second.front_min[1];
- item.min[2] = j.second.front_min[2];
- item.max[0] = j.second.front_max[0];
- item.max[1] = j.second.front_max[1];
- item.max[2] = j.second.front_max[2];
- item.last = j.second.front_last;
- item.last_update = j.second.last_update;
- item.back = false;
- sorted.emplace(item);
+ }
+
+ utime_t start = ceph_clock_now();
+ for (int64_t pos = 0; pos < count; pos += bsize) {
+ char nm[30];
+ unsigned offset = 0;
+ if (onum && osize) {
+ snprintf(nm, sizeof(nm), "disk_bw_test_%d", (int)(rand() % onum));
+ offset = rand() % (osize / bsize) * bsize;
+ } else {
+ snprintf(nm, sizeof(nm), "disk_bw_test_%lld", (long long)pos);
}
+ object_t oid(nm);
+ hobject_t soid(sobject_t(oid, 0));
+ ObjectStore::Transaction t;
+ t.write(coll_t::meta(), ghobject_t(soid), offset, bsize, bl);
+ store->queue_transaction(service.meta_ch, std::move(t), NULL);
+ if (!onum || !osize)
+ cleanupt.remove(coll_t::meta(), ghobject_t(soid));
}
- delete pingtimes;
- //
- // Network ping times (1min 5min 15min)
- f->open_object_section("network_ping_times");
- f->dump_int("threshold", value / 1000);
- f->open_array_section("entries");
- for (auto &sitem : boost::adaptors::reverse(sorted)) {
- ceph_assert(sitem.pingtime >= value);
- f->open_object_section("entry");
- const time_t lu(sitem.last_update);
- char buffer[26];
- string lustr(ctime_r(&lu, buffer));
- lustr.pop_back(); // Remove trailing \n
- auto stale = cct->_conf.get_val<int64_t>("osd_heartbeat_stale");
- f->dump_string("last update", lustr);
- f->dump_bool("stale", ceph_clock_now().sec() - sitem.last_update > stale);
- f->dump_int("from osd", whoami);
- f->dump_int("to osd", sitem.to);
+ {
+ C_SaferCond waiter;
+ if (!service.meta_ch->flush_commit(&waiter)) {
+ waiter.wait();
+ }
+ }
+ utime_t end = ceph_clock_now();
+
+ // clean up
+ store->queue_transaction(service.meta_ch, std::move(cleanupt), NULL);
+ {
+ C_SaferCond waiter;
+ if (!service.meta_ch->flush_commit(&waiter)) {
+ waiter.wait();
+ }
+ }
+
+ double elapsed = end - start;
+ double rate = count / elapsed;
+ double iops = rate / bsize;
+ f->open_object_section("osd_bench_results");
+ f->dump_int("bytes_written", count);
+ f->dump_int("blocksize", bsize);
+ f->dump_float("elapsed_sec", elapsed);
+ f->dump_float("bytes_per_sec", rate);
+ f->dump_float("iops", iops);
+ f->close_section();
+ }
+
+ else if (prefix == "flush_pg_stats") {
+ mgrc.send_pgstats();
+ f->dump_unsigned("stat_seq", service.get_osd_stat_seq());
+ }
+
+ else if (prefix == "heap") {
+ ret = ceph::osd_cmds::heap(*cct, cmdmap, *f, ss);
+ }
+
+ else if (prefix == "debug dump_missing") {
+ f->open_array_section("pgs");
+ vector<PGRef> pgs;
+ _get_pgs(&pgs);
+ for (auto& pg : pgs) {
+ string s = stringify(pg->pg_id);
+ f->open_array_section(s.c_str());
+ pg->lock();
+ pg->dump_missing(f.get());
+ pg->unlock();
+ f->close_section();
+ }
+ f->close_section();
+ }
+
+ else if (prefix == "debug kick_recovery_wq") {
+ int64_t delay;
+ cmd_getval(cct, cmdmap, "delay", delay);
+ ostringstream oss;
+ oss << delay;
+ ret = cct->_conf.set_val("osd_recovery_delay_start", oss.str().c_str());
+ if (ret != 0) {
+ ss << "kick_recovery_wq: error setting "
+ << "osd_recovery_delay_start to '" << delay << "': error "
+ << ret;
+ goto out;
+ }
+ cct->_conf.apply_changes(nullptr);
+ ss << "kicking recovery queue. set osd_recovery_delay_start "
+ << "to " << cct->_conf->osd_recovery_delay_start;
+ }
+
+ else if (prefix == "cpu_profiler") {
+ ostringstream ds;
+ string arg;
+ cmd_getval(cct, cmdmap, "arg", arg);
+ vector<string> argvec;
+ get_str_vec(arg, argvec);
+ cpu_profiler_handle_command(argvec, ds);
+ outbl.append(ds.str());
+ }
+
+ else if (prefix == "dump_pg_recovery_stats") {
+ lock_guard l(osd_lock);
+ pg_recovery_stats.dump_formatted(f.get());
+ }
+
+ else if (prefix == "reset_pg_recovery_stats") {
+ lock_guard l(osd_lock);
+ pg_recovery_stats.reset();
+ }
+
+ else if (prefix == "perf histogram dump") {
+ std::string logger;
+ std::string counter;
+ cmd_getval(cct, cmdmap, "logger", logger);
+ cmd_getval(cct, cmdmap, "counter", counter);
+ cct->get_perfcounters_collection()->dump_formatted_histograms(
+ f.get(), false, logger, counter);
+ }
+
+ else if (prefix == "cache drop") {
+ lock_guard l(osd_lock);
+ dout(20) << "clearing all caches" << dendl;
+ // Clear the objectstore's cache - onode and buffer for Bluestore,
+ // system's pagecache for Filestore
+ ret = store->flush_cache(&ss);
+ if (ret < 0) {
+ ss << "Error flushing objectstore cache: " << cpp_strerror(ret);
+ goto out;
+ }
+ // Clear the objectcontext cache (per PG)
+ vector<PGRef> pgs;
+ _get_pgs(&pgs);
+ for (auto& pg: pgs) {
+ pg->clear_cache();
+ }
+ }
+
+ else if (prefix == "cache status") {
+ lock_guard l(osd_lock);
+ int obj_ctx_count = 0;
+ vector<PGRef> pgs;
+ _get_pgs(&pgs);
+ for (auto& pg: pgs) {
+ obj_ctx_count += pg->get_cache_obj_count();
+ }
+ f->open_object_section("cache_status");
+ f->dump_int("object_ctx", obj_ctx_count);
+ store->dump_cache_stats(f.get());
+ f->close_section();
+ }
+
+ else if (prefix == "scrub_purged_snaps") {
+ lock_guard l(osd_lock);
+ scrub_purged_snaps();
+ }
+
+ else if (prefix == "dump_osd_network") {
+ lock_guard l(osd_lock);
+ int64_t value = 0;
+ if (!(cmd_getval(cct, cmdmap, "value", value))) {
+ // Convert milliseconds to microseconds
+ value = static_cast<double>(g_conf().get_val<double>(
+ "mon_warn_on_slow_ping_time")) * 1000;
+ if (value == 0) {
+ double ratio = g_conf().get_val<double>("mon_warn_on_slow_ping_ratio");
+ value = g_conf().get_val<int64_t>("osd_heartbeat_grace");
+ value *= 1000000 * ratio; // Seconds of grace to microseconds at ratio
+ }
+ } else {
+ // Convert user input to microseconds
+ value *= 1000;
+ }
+ if (value < 0) value = 0;
+
+ struct osd_ping_time_t {
+ uint32_t pingtime;
+ int to;
+ bool back;
+ std::array<uint32_t,3> times;
+ std::array<uint32_t,3> min;
+ std::array<uint32_t,3> max;
+ uint32_t last;
+ uint32_t last_update;
+
+ bool operator<(const osd_ping_time_t& rhs) const {
+ if (pingtime < rhs.pingtime)
+ return true;
+ if (pingtime > rhs.pingtime)
+ return false;
+ if (to < rhs.to)
+ return true;
+ if (to > rhs.to)
+ return false;
+ return back;
+ }
+ };
+
+ set<osd_ping_time_t> sorted;
+ // Get pingtimes under lock and not on the stack
+ map<int, osd_stat_t::Interfaces> *pingtimes = new map<int, osd_stat_t::Interfaces>;
+ service.get_hb_pingtime(pingtimes);
+ for (auto j : *pingtimes) {
+ if (j.second.last_update == 0)
+ continue;
+ osd_ping_time_t item;
+ item.pingtime = std::max(j.second.back_pingtime[0], j.second.back_pingtime[1]);
+ item.pingtime = std::max(item.pingtime, j.second.back_pingtime[2]);
+ if (item.pingtime >= value) {
+ item.to = j.first;
+ item.times[0] = j.second.back_pingtime[0];
+ item.times[1] = j.second.back_pingtime[1];
+ item.times[2] = j.second.back_pingtime[2];
+ item.min[0] = j.second.back_min[0];
+ item.min[1] = j.second.back_min[1];
+ item.min[2] = j.second.back_min[2];
+ item.max[0] = j.second.back_max[0];
+ item.max[1] = j.second.back_max[1];
+ item.max[2] = j.second.back_max[2];
+ item.last = j.second.back_last;
+ item.back = true;
+ item.last_update = j.second.last_update;
+ sorted.emplace(item);
+ }
+ if (j.second.front_last == 0)
+ continue;
+ item.pingtime = std::max(j.second.front_pingtime[0], j.second.front_pingtime[1]);
+ item.pingtime = std::max(item.pingtime, j.second.front_pingtime[2]);
+ if (item.pingtime >= value) {
+ item.to = j.first;
+ item.times[0] = j.second.front_pingtime[0];
+ item.times[1] = j.second.front_pingtime[1];
+ item.times[2] = j.second.front_pingtime[2];
+ item.min[0] = j.second.front_min[0];
+ item.min[1] = j.second.front_min[1];
+ item.min[2] = j.second.front_min[2];
+ item.max[0] = j.second.front_max[0];
+ item.max[1] = j.second.front_max[1];
+ item.max[2] = j.second.front_max[2];
+ item.last = j.second.front_last;
+ item.last_update = j.second.last_update;
+ item.back = false;
+ sorted.emplace(item);
+ }
+ }
+ delete pingtimes;
+ //
+ // Network ping times (1min 5min 15min)
+ f->open_object_section("network_ping_times");
+ f->dump_int("threshold", value / 1000);
+ f->open_array_section("entries");
+ for (auto &sitem : boost::adaptors::reverse(sorted)) {
+ ceph_assert(sitem.pingtime >= value);
+ f->open_object_section("entry");
+
+ const time_t lu(sitem.last_update);
+ char buffer[26];
+ string lustr(ctime_r(&lu, buffer));
+ lustr.pop_back(); // Remove trailing \n
+ auto stale = cct->_conf.get_val<int64_t>("osd_heartbeat_stale");
+ f->dump_string("last update", lustr);
+ f->dump_bool("stale", ceph_clock_now().sec() - sitem.last_update > stale);
+ f->dump_int("from osd", whoami);
+ f->dump_int("to osd", sitem.to);
f->dump_string("interface", (sitem.back ? "back" : "front"));
f->open_object_section("average");
f->dump_format_unquoted("1min", "%s", fixed_u_to_string(sitem.times[0],3).c_str());
"the mon");
ceph_assert(r == 0);
- r = admin_socket->register_command( "heap " \
- "name=heapcmd,type=CephString " \
- "name=value,type=CephString,req=false",
- asok_hook,
- "show heap usage info (available only if "
- "compiled with tcmalloc)");
- ceph_assert(r == 0);
-
r = admin_socket->register_command("set_heap_property " \
"name=property,type=CephString " \
"name=value,type=CephInt",
ceph_assert(r == 0);
- r = admin_socket->register_command("smart name=devid,type=CephString,req=False",
+ r = admin_socket->register_command("smart name=devid,type=CephString,req=false",
asok_hook,
"probe OSD devices for SMART data.");
test_ops_hook,
"Inject a full disk (optional count times)");
ceph_assert(r == 0);
+ r = admin_socket->register_command(
+ "bench " \
+ "name=count,type=CephInt,req=false " \
+ "name=size,type=CephInt,req=false " \
+ "name=object_size,type=CephInt,req=false " \
+ "name=object_num,type=CephInt,req=false ",
+ asok_hook,
+ "OSD benchmark: write <count> <size>-byte objects(with <obj_size> <obj_num>), " \
+ "(default count=1G default size=4MB). Results in log.");
+ ceph_assert(r == 0);
+ r = admin_socket->register_command(
+ "cluster_log " \
+ "name=level,type=CephChoices,strings=error,warning,info,debug " \
+ "name=message,type=CephString,n=N",
+ asok_hook,
+ "log a message to the cluster log");
+ ceph_assert(r == 0);
+ r = admin_socket->register_command(
+ "flush_pg_stats",
+ asok_hook,
+ "flush pg stats");
+ ceph_assert(r == 0);
+ r = admin_socket->register_command(
+ "heap " \
+ "name=heapcmd,type=CephChoices,strings=" \
+ "dump|start_profiler|stop_profiler|release|get_release_rate|set_release_rate|stats " \
+ "name=value,type=CephString,req=false",
+ asok_hook,
+ "show heap usage info (available only if compiled with tcmalloc)");
+ ceph_assert(r == 0);
+ r = admin_socket->register_command(
+ "debug dump_missing " \
+ "name=filename,type=CephFilepath",
+ asok_hook,
+ "dump missing objects to a named file");
+ ceph_assert(r == 0);
+ r = admin_socket->register_command(
+ "debug kick_recovery_wq " \
+ "name=delay,type=CephInt,range=0",
+ asok_hook,
+ "set osd_recovery_delay_start to <val>");
+ ceph_assert(r == 0);
+ r = admin_socket->register_command(
+ "cpu_profiler " \
+ "name=arg,type=CephChoices,strings=status|flush",
+ asok_hook,
+ "run cpu profiling on daemon");
+ ceph_assert(r == 0);
+ r = admin_socket->register_command(
+ "dump_pg_recovery_stats",
+ asok_hook,
+ "dump pg recovery statistics");
+ ceph_assert(r == 0);
+ r = admin_socket->register_command(
+ "reset_pg_recovery_stats",
+ asok_hook,
+ "reset pg recovery statistics");
+ ceph_assert(r == 0);
+ r = admin_socket->register_command(
+ "cache drop",
+ asok_hook,
+ "Drop all OSD caches");
+ ceph_assert(r == 0);
+ r = admin_socket->register_command(
+ "cache status",
+ asok_hook,
+ "Get OSD caches statistics");
+ ceph_assert(r == 0);
+ r = admin_socket->register_command(
+ "scrub_purged_snaps",
+ asok_hook,
+ "Scrub purged_snaps vs snapmapper index");
+ ceph_assert(r == 0);
+
+ // -- pg commands --
+ // old form: ceph pg <pgid> command ...
+ r = admin_socket->register_command(
+ "pg " \
+ "name=pgid,type=CephPgid " \
+ "name=cmd,type=CephChoices,strings=query",
+ asok_hook,
+ "");
+ ceph_assert(r == 0);
+ r = admin_socket->register_command(
+ "pg " \
+ "name=pgid,type=CephPgid " \
+ "name=cmd,type=CephChoices,strings=mark_unfound_lost " \
+ "name=mulcmd,type=CephChoices,strings=revert|delete",
+ asok_hook,
+ "");
+ ceph_assert(r == 0);
+ r = admin_socket->register_command(
+ "pg " \
+ "name=pgid,type=CephPgid " \
+ "name=cmd,type=CephChoices,strings=list_unfound " \
+ "name=offset,type=CephString,req=false",
+ asok_hook,
+ "");
+ ceph_assert(r == 0);
+ // new form: tell <pgid> <cmd> for both cli and rest
+ r = admin_socket->register_command(
+ "query",
+ asok_hook,
+ "show details of a specific pg");
+ ceph_assert(r == 0);
+ r = admin_socket->register_command(
+ "mark_unfound_lost " \
+ "name=pgid,type=CephPgid,req=false " \
+ "name=mulcmd,type=CephChoices,strings=revert|delete",
+ asok_hook,
+ "mark all unfound objects in this pg as lost, either removing or reverting to a prior version if one is available");
+ ceph_assert(r == 0);
+ r = admin_socket->register_command(
+ "list_unfound " \
+ "name=pgid,type=CephPgid,req=false " \
+ "name=offset,type=CephString,req=false",
+ asok_hook,
+ "list unfound objects on this pg, perhaps starting at an offset given in JSON");
+ ceph_assert(r == 0);
}
void OSD::create_logger()
m->put();
return;
}
-
- OSDCap& caps = session->caps;
-
- if (!caps.allow_all() || m->get_source().is_mon()) {
+ if (!session->caps.allow_all()) {
con->send_message(new MCommandReply(m, -EPERM));
m->put();
return;
}
-
- Command *c = new Command(m->cmd, m->get_tid(), m->get_data(), con.get());
- command_wq.queue(c);
-
+ cct->get_admin_socket()->queue_tell_command(m);
m->put();
}
#define COMMAND(parsesig, helptext, module, perm) \
{parsesig, helptext, module, perm},
-// yes, these are really pg commands, but there's a limit to how
-// much work it's worth. The OSD returns all of them. Make this
-// form (pg <pgid> <cmd>) valid only for the cli.
-// Rest uses "tell <pgid> <cmd>"
-
-COMMAND("pg " \
- "name=pgid,type=CephPgid " \
- "name=cmd,type=CephChoices,strings=query", \
- "show details of a specific pg", "osd", "r")
-COMMAND("pg " \
- "name=pgid,type=CephPgid " \
- "name=cmd,type=CephChoices,strings=mark_unfound_lost " \
- "name=mulcmd,type=CephChoices,strings=revert|delete", \
- "mark all unfound objects in this pg as lost, either removing or reverting to a prior version if one is available",
- "osd", "rw")
-COMMAND("pg " \
- "name=pgid,type=CephPgid " \
- "name=cmd,type=CephChoices,strings=list_unfound " \
- "name=offset,type=CephString,req=false",
- "list unfound objects on this pg, perhaps starting at an offset given in JSON",
- "osd", "r")
-
-// new form: tell <pgid> <cmd> for both cli and rest
-
-COMMAND("query",
- "show details of a specific pg", "osd", "r")
-COMMAND("mark_unfound_lost " \
- "name=mulcmd,type=CephChoices,strings=revert|delete", \
- "mark all unfound objects in this pg as lost, either removing or reverting to a prior version if one is available",
- "osd", "rw")
-COMMAND("list_unfound " \
- "name=offset,type=CephString,req=false",
- "list unfound objects on this pg, perhaps starting at an offset given in JSON",
- "osd", "r")
COMMAND("perf histogram dump "
"name=logger,type=CephString,req=false "
"name=counter,type=CephString,req=false",
"name=key,type=CephString",
"Unset a configuration option at runtime (not persistent)",
"osd", "rw")
-COMMAND("cluster_log " \
- "name=level,type=CephChoices,strings=error,warning,info,debug " \
- "name=message,type=CephString,n=N",
- "log a message to the cluster log",
- "osd", "rw")
-COMMAND("bench " \
- "name=count,type=CephInt,req=false " \
- "name=size,type=CephInt,req=false " \
- "name=object_size,type=CephInt,req=false " \
- "name=object_num,type=CephInt,req=false ", \
- "OSD benchmark: write <count> <size>-byte objects(with <obj_size> <obj_num>), " \
- "(default count=1G default size=4MB). Results in log.",
- "osd", "rw")
-COMMAND("flush_pg_stats", "flush pg stats", "osd", "rw")
-COMMAND("heap " \
- "name=heapcmd,type=CephChoices,strings="\
- "dump|start_profiler|stop_profiler|release|get_release_rate|set_release_rate|stats " \
- "name=value,type=CephString,req=false",
- "show heap usage info (available only if compiled with tcmalloc)",
- "osd", "rw")
-COMMAND("debug dump_missing " \
- "name=filename,type=CephFilepath",
- "dump missing objects to a named file", "osd", "r")
-COMMAND("debug kick_recovery_wq " \
- "name=delay,type=CephInt,range=0",
- "set osd_recovery_delay_start to <val>", "osd", "rw")
-COMMAND("cpu_profiler " \
- "name=arg,type=CephChoices,strings=status|flush",
- "run cpu profiling on daemon", "osd", "rw")
-COMMAND("dump_pg_recovery_stats", "dump pg recovery statistics",
- "osd", "r")
-COMMAND("reset_pg_recovery_stats", "reset pg recovery statistics",
- "osd", "rw")
-COMMAND("compact",
- "compact object store's omap. "
- "WARNING: Compaction probably slows your requests",
- "osd", "rw")
-COMMAND("smart name=devid,type=CephString,req=False",
+COMMAND("smart name=devid,type=CephString,req=false",
"runs smartctl on this osd devices. ",
"osd", "rw")
-COMMAND("cache drop",
- "Drop all OSD caches",
- "osd", "rwx")
-COMMAND("cache status",
- "Get OSD caches statistics",
- "osd", "r")
-COMMAND("send_beacon",
- "Send OSD beacon to mon immediately",
- "osd", "r")
-COMMAND("scrub_purged_snaps",
- "Scrub purged_snaps vs snapmapper index",
- "osd", "r")
};
void OSD::do_command(
r = 0; // make command idempotent
}
}
- else if (prefix == "cluster_log") {
- vector<string> msg;
- cmd_getval(cct, cmdmap, "message", msg);
- if (msg.empty()) {
- r = -EINVAL;
- ss << "ignoring empty log message";
- goto out;
- }
- string message = msg.front();
- for (vector<string>::iterator a = ++msg.begin(); a != msg.end(); ++a)
- message += " " + *a;
- string lvl;
- cmd_getval(cct, cmdmap, "level", lvl);
- clog_type level = string_to_clog_type(lvl);
- if (level < 0) {
- r = -EINVAL;
- ss << "unknown level '" << lvl << "'";
- goto out;
- }
- clog->do_log(level, message);
- }
-
- // either 'pg <pgid> <command>' or
- // 'tell <pgid>' (which comes in without any of that prefix)?
-
- else if (prefix == "pg" ||
- prefix == "query" ||
- prefix == "mark_unfound_lost" ||
- prefix == "list_unfound"
- ) {
- pg_t pgid;
-
- if (!cmd_getval(cct, cmdmap, "pgid", pgidstr)) {
- ss << "no pgid specified";
- r = -EINVAL;
- } else if (!pgid.parse(pgidstr.c_str())) {
- ss << "couldn't parse pgid '" << pgidstr << "'";
- r = -EINVAL;
- } else {
- spg_t pcand;
- PGRef pg;
- if (osdmap->get_primary_shard(pgid, &pcand) &&
- (pg = _lookup_lock_pg(pcand))) {
- if (pg->is_primary()) {
- // simulate pg <pgid> cmd= for pg->do-command
- if (prefix != "pg")
- cmd_putval(cct, cmdmap, "cmd", prefix);
- try {
- r = pg->do_command(cmdmap, ss, data, odata, con, tid);
- } catch (const bad_cmd_get& e) {
- pg->unlock();
- ss << e.what();
- return -EINVAL;
- }
- if (r == -EAGAIN) {
- pg->unlock();
- // don't reply, pg will do so async
- return -EAGAIN;
- }
- } else {
- ss << "not primary for pgid " << pgid;
-
- // send them the latest diff to ensure they realize the mapping
- // has changed.
- service.send_incremental_map(osdmap->get_epoch() - 1, con, osdmap);
-
- // do not reply; they will get newer maps and realize they
- // need to resend.
- pg->unlock();
- return -EAGAIN;
- }
- pg->unlock();
- } else {
- ss << "i don't have pgid " << pgid;
- r = -ENOENT;
- }
- }
- }
-
- else if (prefix == "bench") {
- int64_t count;
- int64_t bsize;
- int64_t osize, onum;
- // default count 1G, size 4MB
- cmd_getval(cct, cmdmap, "count", count, (int64_t)1 << 30);
- cmd_getval(cct, cmdmap, "size", bsize, (int64_t)4 << 20);
- cmd_getval(cct, cmdmap, "object_size", osize, (int64_t)0);
- cmd_getval(cct, cmdmap, "object_num", onum, (int64_t)0);
-
- uint32_t duration = cct->_conf->osd_bench_duration;
-
- if (bsize > (int64_t) cct->_conf->osd_bench_max_block_size) {
- // let us limit the block size because the next checks rely on it
- // having a sane value. If we allow any block size to be set things
- // can still go sideways.
- ss << "block 'size' values are capped at "
- << byte_u_t(cct->_conf->osd_bench_max_block_size) << ". If you wish to use"
- << " a higher value, please adjust 'osd_bench_max_block_size'";
- r = -EINVAL;
- goto out;
- } else if (bsize < (int64_t) (1 << 20)) {
- // entering the realm of small block sizes.
- // limit the count to a sane value, assuming a configurable amount of
- // IOPS and duration, so that the OSD doesn't get hung up on this,
- // preventing timeouts from going off
- int64_t max_count =
- bsize * duration * cct->_conf->osd_bench_small_size_max_iops;
- if (count > max_count) {
- ss << "'count' values greater than " << max_count
- << " for a block size of " << byte_u_t(bsize) << ", assuming "
- << cct->_conf->osd_bench_small_size_max_iops << " IOPS,"
- << " for " << duration << " seconds,"
- << " can cause ill effects on osd. "
- << " Please adjust 'osd_bench_small_size_max_iops' with a higher"
- << " value if you wish to use a higher 'count'.";
- r = -EINVAL;
- goto out;
- }
- } else {
- // 1MB block sizes are big enough so that we get more stuff done.
- // However, to avoid the osd from getting hung on this and having
- // timers being triggered, we are going to limit the count assuming
- // a configurable throughput and duration.
- // NOTE: max_count is the total amount of bytes that we believe we
- // will be able to write during 'duration' for the given
- // throughput. The block size hardly impacts this unless it's
- // way too big. Given we already check how big the block size
- // is, it's safe to assume everything will check out.
- int64_t max_count =
- cct->_conf->osd_bench_large_size_max_throughput * duration;
- if (count > max_count) {
- ss << "'count' values greater than " << max_count
- << " for a block size of " << byte_u_t(bsize) << ", assuming "
- << byte_u_t(cct->_conf->osd_bench_large_size_max_throughput) << "/s,"
- << " for " << duration << " seconds,"
- << " can cause ill effects on osd. "
- << " Please adjust 'osd_bench_large_size_max_throughput'"
- << " with a higher value if you wish to use a higher 'count'.";
- r = -EINVAL;
- goto out;
- }
- }
-
- if (osize && bsize > osize)
- bsize = osize;
-
- dout(1) << " bench count " << count
- << " bsize " << byte_u_t(bsize) << dendl;
-
- ObjectStore::Transaction cleanupt;
-
- if (osize && onum) {
- bufferlist bl;
- bufferptr bp(osize);
- bp.zero();
- bl.push_back(std::move(bp));
- bl.rebuild_page_aligned();
- for (int i=0; i<onum; ++i) {
- char nm[30];
- snprintf(nm, sizeof(nm), "disk_bw_test_%d", i);
- object_t oid(nm);
- hobject_t soid(sobject_t(oid, 0));
- ObjectStore::Transaction t;
- t.write(coll_t(), ghobject_t(soid), 0, osize, bl);
- store->queue_transaction(service.meta_ch, std::move(t), NULL);
- cleanupt.remove(coll_t(), ghobject_t(soid));
- }
- }
-
- bufferlist bl;
- bufferptr bp(bsize);
- bp.zero();
- bl.push_back(std::move(bp));
- bl.rebuild_page_aligned();
-
- {
- C_SaferCond waiter;
- if (!service.meta_ch->flush_commit(&waiter)) {
- waiter.wait();
- }
- }
-
- utime_t start = ceph_clock_now();
- for (int64_t pos = 0; pos < count; pos += bsize) {
- char nm[30];
- unsigned offset = 0;
- if (onum && osize) {
- snprintf(nm, sizeof(nm), "disk_bw_test_%d", (int)(rand() % onum));
- offset = rand() % (osize / bsize) * bsize;
- } else {
- snprintf(nm, sizeof(nm), "disk_bw_test_%lld", (long long)pos);
- }
- object_t oid(nm);
- hobject_t soid(sobject_t(oid, 0));
- ObjectStore::Transaction t;
- t.write(coll_t::meta(), ghobject_t(soid), offset, bsize, bl);
- store->queue_transaction(service.meta_ch, std::move(t), NULL);
- if (!onum || !osize)
- cleanupt.remove(coll_t::meta(), ghobject_t(soid));
- }
-
- {
- C_SaferCond waiter;
- if (!service.meta_ch->flush_commit(&waiter)) {
- waiter.wait();
- }
- }
- utime_t end = ceph_clock_now();
-
- // clean up
- store->queue_transaction(service.meta_ch, std::move(cleanupt), NULL);
- {
- C_SaferCond waiter;
- if (!service.meta_ch->flush_commit(&waiter)) {
- waiter.wait();
- }
- }
-
- double elapsed = end - start;
- double rate = count / elapsed;
- double iops = rate / bsize;
- if (f) {
- f->open_object_section("osd_bench_results");
- f->dump_int("bytes_written", count);
- f->dump_int("blocksize", bsize);
- f->dump_float("elapsed_sec", elapsed);
- f->dump_float("bytes_per_sec", rate);
- f->dump_float("iops", iops);
- f->close_section();
- f->flush(ds);
- } else {
- ds << "bench: wrote " << byte_u_t(count)
- << " in blocks of " << byte_u_t(bsize) << " in "
- << elapsed << " sec at " << byte_u_t(rate) << "/sec "
- << si_u_t(iops) << " IOPS";
- }
- }
-
- else if (prefix == "flush_pg_stats") {
- mgrc.send_pgstats();
- ds << service.get_osd_stat_seq() << "\n";
- }
-
- else if (prefix == "heap") {
- r = ceph::osd_cmds::heap(*cct, cmdmap, *f, ds);
- }
-
- else if (prefix == "debug dump_missing") {
- if (!f) {
- f.reset(new JSONFormatter(true));
- }
- f->open_array_section("pgs");
- vector<PGRef> pgs;
- _get_pgs(&pgs);
- for (auto& pg : pgs) {
- string s = stringify(pg->pg_id);
- f->open_array_section(s.c_str());
- pg->lock();
- pg->dump_missing(f.get());
- pg->unlock();
- f->close_section();
- }
- f->close_section();
- f->flush(ds);
- }
- else if (prefix == "debug kick_recovery_wq") {
- int64_t delay;
- cmd_getval(cct, cmdmap, "delay", delay);
- ostringstream oss;
- oss << delay;
- unlock_guard unlock{osd_lock};
- r = cct->_conf.set_val("osd_recovery_delay_start", oss.str().c_str());
- if (r != 0) {
- ss << "kick_recovery_wq: error setting "
- << "osd_recovery_delay_start to '" << delay << "': error "
- << r;
- goto out;
- }
- cct->_conf.apply_changes(nullptr);
- ss << "kicking recovery queue. set osd_recovery_delay_start "
- << "to " << cct->_conf->osd_recovery_delay_start;
- }
-
- else if (prefix == "cpu_profiler") {
- string arg;
- cmd_getval(cct, cmdmap, "arg", arg);
- vector<string> argvec;
- get_str_vec(arg, argvec);
- cpu_profiler_handle_command(argvec, ds);
- }
-
- else if (prefix == "dump_pg_recovery_stats") {
- stringstream s;
- if (f) {
- pg_recovery_stats.dump_formatted(f.get());
- f->flush(ds);
- } else {
- pg_recovery_stats.dump(s);
- ds << "dump pg recovery stats: " << s.str();
- }
- }
-
- else if (prefix == "reset_pg_recovery_stats") {
- ss << "reset pg recovery stats";
- pg_recovery_stats.reset();
- }
-
- else if (prefix == "perf histogram dump") {
- std::string logger;
- std::string counter;
- cmd_getval(cct, cmdmap, "logger", logger);
- cmd_getval(cct, cmdmap, "counter", counter);
- if (f) {
- cct->get_perfcounters_collection()->dump_formatted_histograms(
- f.get(), false, logger, counter);
- f->flush(ds);
- }
- }
-
- else if (prefix == "compact") {
- dout(1) << "triggering manual compaction" << dendl;
- auto start = ceph::coarse_mono_clock::now();
- store->compact();
- auto end = ceph::coarse_mono_clock::now();
- double duration = std::chrono::duration<double>(end-start).count();
- dout(1) << "finished manual compaction in "
- << duration
- << " seconds" << dendl;
- ss << "compacted omap in " << duration << " seconds";
- }
else if (prefix == "smart") {
string devid;
probe_smart(devid, ds);
}
- else if (prefix == "cache drop") {
- dout(20) << "clearing all caches" << dendl;
- // Clear the objectstore's cache - onode and buffer for Bluestore,
- // system's pagecache for Filestore
- r = store->flush_cache(&ss);
- if (r < 0) {
- ds << "Error flushing objectstore cache: " << cpp_strerror(r);
- goto out;
- }
- // Clear the objectcontext cache (per PG)
- vector<PGRef> pgs;
- _get_pgs(&pgs);
- for (auto& pg: pgs) {
- pg->clear_cache();
- }
- }
- else if (prefix == "cache status") {
- int obj_ctx_count = 0;
- vector<PGRef> pgs;
- _get_pgs(&pgs);
- for (auto& pg: pgs) {
- obj_ctx_count += pg->get_cache_obj_count();
- }
- if (f) {
- f->open_object_section("cache_status");
- f->dump_int("object_ctx", obj_ctx_count);
- store->dump_cache_stats(f.get());
- f->close_section();
- f->flush(ds);
- } else {
- ds << "object_ctx: " << obj_ctx_count;
- store->dump_cache_stats(ds);
- }
- }
- else if (prefix == "send_beacon") {
- if (is_active()) {
- send_beacon(ceph::coarse_mono_clock::now());
- }
- } else if (prefix == "scrub_purged_snaps") {
- scrub_purged_snaps();
- } else {
+ else {
ss << "unrecognized command '" << prefix << "'";
- r = -EINVAL;
+ r = -ENOSYS;
}
out: