From 8987f94416f453829eae6dda08837ef5a42531c6 Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Mon, 9 Sep 2019 22:07:03 -0500 Subject: [PATCH] osd: route tell commands to asok; migrate commands - move items from _do_command to asok_command in OSD.cc - update PG::do_command to take a std::function on_finish - sprinkle in some osd_lock locking (_do_command implicitly locks osd_lock, asok_command() does not; most commands don't need it) Signed-off-by: Sage Weil --- src/osd/OSD.cc | 1138 +++++++++++++++++++-------------------- src/osd/PG.h | 12 +- src/osd/PrimaryLogPG.cc | 92 ++-- src/osd/PrimaryLogPG.h | 15 +- 4 files changed, 625 insertions(+), 632 deletions(-) diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc index d6170ce7d868e..b733173fd276c 100644 --- a/src/osd/OSD.cc +++ b/src/osd/OSD.cc @@ -2367,13 +2367,65 @@ void OSD::asok_command( const bufferlist& inbl, std::function on_finish) { - int ret = 0; - stringstream ss; // stderr error message stream std::unique_ptr f(Formatter::create( format, "json-pretty", "json-pretty")); + int ret = 0; + stringstream ss; // stderr error message stream bufferlist outbl; // if empty at end, we'll dump formatter as output - if (prefix == "status") { + // --- PG commands are routed here to PG::do_command --- + if (prefix == "pg" || + prefix == "query" || + prefix == "mark_unfound_lost" || + prefix == "list_unfound" + ) { + string pgidstr; + pg_t pgid; + if (!cmd_getval(cct, cmdmap, "pgid", pgidstr)) { + ss << "no pgid specified"; + ret = -EINVAL; + goto out; + } + if (!pgid.parse(pgidstr.c_str())) { + ss << "couldn't parse pgid '" << pgidstr << "'"; + ret = -EINVAL; + goto out; + } + spg_t pcand; + PGRef pg; + if (osdmap->get_primary_shard(pgid, &pcand) && + (pg = _lookup_lock_pg(pcand))) { + if (pg->is_primary()) { + cmdmap_t new_cmdmap = cmdmap; + try { + pg->do_command(prefix, new_cmdmap, inbl, on_finish); + pg->unlock(); + return; // the pg handler calls on_finish directly + } catch (const bad_cmd_get& e) { + pg->unlock(); + ss << e.what(); + ret = -EINVAL; + goto out; + } + } else { + ss << "not primary for pgid " << pgid; + // do not reply; they will get newer maps and realize they + // need to resend. +#warning fixme on client side + pg->unlock(); + ret = -EAGAIN; + goto out; + } + } else { + ss << "i don't have pgid " << pgid; + ret = -ENOENT; + } + } + + // --- OSD commands follow --- + + else if (prefix == "status") { + lock_guard l(osd_lock); f->open_object_section("status"); f->dump_stream("cluster_fsid") << superblock.cluster_fsid; f->dump_stream("osd_fsid") << superblock.osd_fsid; @@ -2509,14 +2561,6 @@ will start to track new ops received afterwards."; f->close_section(); } else if (prefix == "get_latest_osdmap") { get_latest_osdmap(); - } else if (prefix == "heap") { - auto result = ceph::osd_cmds::heap(*cct, cmdmap, *f, ss); - - // Note: Failed heap profile commands won't necessarily trigger an error: - f->open_object_section("result"); - f->dump_string("error", cpp_strerror(result)); - f->dump_bool("success", result >= 0); - f->close_section(); } else if (prefix == "set_heap_property") { string property; int64_t value = 0; @@ -2621,114 +2665,400 @@ will start to track new ops received afterwards."; } f->close_section(); } else if (prefix == "send_beacon") { + lock_guard l(osd_lock); if (is_active()) { send_beacon(ceph::coarse_mono_clock::now()); } - } else if (prefix == "dump_osd_network") { - int64_t value = 0; - if (!(cmd_getval(cct, cmdmap, "value", value))) { - // Convert milliseconds to microseconds - value = static_cast(g_conf().get_val("mon_warn_on_slow_ping_time")) * 1000; - if (value == 0) { - double ratio = g_conf().get_val("mon_warn_on_slow_ping_ratio"); - value = g_conf().get_val("osd_heartbeat_grace"); - value *= 1000000 * ratio; // Seconds of grace to microseconds at ratio + } + + else if (prefix == "cluster_log") { + vector msg; + cmd_getval(cct, cmdmap, "message", msg); + if (msg.empty()) { + ret = -EINVAL; + ss << "ignoring empty log message"; + goto out; + } + string message = msg.front(); + for (vector::iterator a = ++msg.begin(); a != msg.end(); ++a) + message += " " + *a; + string lvl; + cmd_getval(cct, cmdmap, "level", lvl); + clog_type level = string_to_clog_type(lvl); + if (level < 0) { + ret = -EINVAL; + ss << "unknown level '" << lvl << "'"; + goto out; + } + clog->do_log(level, message); + } + + else if (prefix == "bench") { + lock_guard l(osd_lock); + int64_t count; + int64_t bsize; + int64_t osize, onum; + // default count 1G, size 4MB + cmd_getval(cct, cmdmap, "count", count, (int64_t)1 << 30); + cmd_getval(cct, cmdmap, "size", bsize, (int64_t)4 << 20); + cmd_getval(cct, cmdmap, "object_size", osize, (int64_t)0); + cmd_getval(cct, cmdmap, "object_num", onum, (int64_t)0); + + uint32_t duration = cct->_conf->osd_bench_duration; + + if (bsize > (int64_t) cct->_conf->osd_bench_max_block_size) { + // let us limit the block size because the next checks rely on it + // having a sane value. If we allow any block size to be set things + // can still go sideways. + ss << "block 'size' values are capped at " + << byte_u_t(cct->_conf->osd_bench_max_block_size) << ". If you wish to use" + << " a higher value, please adjust 'osd_bench_max_block_size'"; + ret = -EINVAL; + goto out; + } else if (bsize < (int64_t) (1 << 20)) { + // entering the realm of small block sizes. + // limit the count to a sane value, assuming a configurable amount of + // IOPS and duration, so that the OSD doesn't get hung up on this, + // preventing timeouts from going off + int64_t max_count = + bsize * duration * cct->_conf->osd_bench_small_size_max_iops; + if (count > max_count) { + ss << "'count' values greater than " << max_count + << " for a block size of " << byte_u_t(bsize) << ", assuming " + << cct->_conf->osd_bench_small_size_max_iops << " IOPS," + << " for " << duration << " seconds," + << " can cause ill effects on osd. " + << " Please adjust 'osd_bench_small_size_max_iops' with a higher" + << " value if you wish to use a higher 'count'."; + ret = -EINVAL; + goto out; } } else { - // Convert user input to microseconds - value *= 1000; + // 1MB block sizes are big enough so that we get more stuff done. + // However, to avoid the osd from getting hung on this and having + // timers being triggered, we are going to limit the count assuming + // a configurable throughput and duration. + // NOTE: max_count is the total amount of bytes that we believe we + // will be able to write during 'duration' for the given + // throughput. The block size hardly impacts this unless it's + // way too big. Given we already check how big the block size + // is, it's safe to assume everything will check out. + int64_t max_count = + cct->_conf->osd_bench_large_size_max_throughput * duration; + if (count > max_count) { + ss << "'count' values greater than " << max_count + << " for a block size of " << byte_u_t(bsize) << ", assuming " + << byte_u_t(cct->_conf->osd_bench_large_size_max_throughput) << "/s," + << " for " << duration << " seconds," + << " can cause ill effects on osd. " + << " Please adjust 'osd_bench_large_size_max_throughput'" + << " with a higher value if you wish to use a higher 'count'."; + ret = -EINVAL; + goto out; + } } - if (value < 0) value = 0; - struct osd_ping_time_t { - uint32_t pingtime; - int to; - bool back; - std::array times; - std::array min; - std::array max; - uint32_t last; - uint32_t last_update; + if (osize && bsize > osize) + bsize = osize; - bool operator<(const osd_ping_time_t& rhs) const { - if (pingtime < rhs.pingtime) - return true; - if (pingtime > rhs.pingtime) - return false; - if (to < rhs.to) - return true; - if (to > rhs.to) - return false; - return back; + dout(1) << " bench count " << count + << " bsize " << byte_u_t(bsize) << dendl; + + ObjectStore::Transaction cleanupt; + + if (osize && onum) { + bufferlist bl; + bufferptr bp(osize); + bp.zero(); + bl.push_back(std::move(bp)); + bl.rebuild_page_aligned(); + for (int i=0; iqueue_transaction(service.meta_ch, std::move(t), NULL); + cleanupt.remove(coll_t(), ghobject_t(soid)); } - }; + } - set sorted; - // Get pingtimes under lock and not on the stack - map *pingtimes = new map; - service.get_hb_pingtime(pingtimes); - for (auto j : *pingtimes) { - if (j.second.last_update == 0) - continue; - osd_ping_time_t item; - item.pingtime = std::max(j.second.back_pingtime[0], j.second.back_pingtime[1]); - item.pingtime = std::max(item.pingtime, j.second.back_pingtime[2]); - if (item.pingtime >= value) { - item.to = j.first; - item.times[0] = j.second.back_pingtime[0]; - item.times[1] = j.second.back_pingtime[1]; - item.times[2] = j.second.back_pingtime[2]; - item.min[0] = j.second.back_min[0]; - item.min[1] = j.second.back_min[1]; - item.min[2] = j.second.back_min[2]; - item.max[0] = j.second.back_max[0]; - item.max[1] = j.second.back_max[1]; - item.max[2] = j.second.back_max[2]; - item.last = j.second.back_last; - item.back = true; - item.last_update = j.second.last_update; - sorted.emplace(item); + bufferlist bl; + bufferptr bp(bsize); + bp.zero(); + bl.push_back(std::move(bp)); + bl.rebuild_page_aligned(); + + { + C_SaferCond waiter; + if (!service.meta_ch->flush_commit(&waiter)) { + waiter.wait(); } - if (j.second.front_last == 0) - continue; - item.pingtime = std::max(j.second.front_pingtime[0], j.second.front_pingtime[1]); - item.pingtime = std::max(item.pingtime, j.second.front_pingtime[2]); - if (item.pingtime >= value) { - item.to = j.first; - item.times[0] = j.second.front_pingtime[0]; - item.times[1] = j.second.front_pingtime[1]; - item.times[2] = j.second.front_pingtime[2]; - item.min[0] = j.second.front_min[0]; - item.min[1] = j.second.front_min[1]; - item.min[2] = j.second.front_min[2]; - item.max[0] = j.second.front_max[0]; - item.max[1] = j.second.front_max[1]; - item.max[2] = j.second.front_max[2]; - item.last = j.second.front_last; - item.last_update = j.second.last_update; - item.back = false; - sorted.emplace(item); + } + + utime_t start = ceph_clock_now(); + for (int64_t pos = 0; pos < count; pos += bsize) { + char nm[30]; + unsigned offset = 0; + if (onum && osize) { + snprintf(nm, sizeof(nm), "disk_bw_test_%d", (int)(rand() % onum)); + offset = rand() % (osize / bsize) * bsize; + } else { + snprintf(nm, sizeof(nm), "disk_bw_test_%lld", (long long)pos); } + object_t oid(nm); + hobject_t soid(sobject_t(oid, 0)); + ObjectStore::Transaction t; + t.write(coll_t::meta(), ghobject_t(soid), offset, bsize, bl); + store->queue_transaction(service.meta_ch, std::move(t), NULL); + if (!onum || !osize) + cleanupt.remove(coll_t::meta(), ghobject_t(soid)); } - delete pingtimes; - // - // Network ping times (1min 5min 15min) - f->open_object_section("network_ping_times"); - f->dump_int("threshold", value / 1000); - f->open_array_section("entries"); - for (auto &sitem : boost::adaptors::reverse(sorted)) { - ceph_assert(sitem.pingtime >= value); - f->open_object_section("entry"); - const time_t lu(sitem.last_update); - char buffer[26]; - string lustr(ctime_r(&lu, buffer)); - lustr.pop_back(); // Remove trailing \n - auto stale = cct->_conf.get_val("osd_heartbeat_stale"); - f->dump_string("last update", lustr); - f->dump_bool("stale", ceph_clock_now().sec() - sitem.last_update > stale); - f->dump_int("from osd", whoami); - f->dump_int("to osd", sitem.to); + { + C_SaferCond waiter; + if (!service.meta_ch->flush_commit(&waiter)) { + waiter.wait(); + } + } + utime_t end = ceph_clock_now(); + + // clean up + store->queue_transaction(service.meta_ch, std::move(cleanupt), NULL); + { + C_SaferCond waiter; + if (!service.meta_ch->flush_commit(&waiter)) { + waiter.wait(); + } + } + + double elapsed = end - start; + double rate = count / elapsed; + double iops = rate / bsize; + f->open_object_section("osd_bench_results"); + f->dump_int("bytes_written", count); + f->dump_int("blocksize", bsize); + f->dump_float("elapsed_sec", elapsed); + f->dump_float("bytes_per_sec", rate); + f->dump_float("iops", iops); + f->close_section(); + } + + else if (prefix == "flush_pg_stats") { + mgrc.send_pgstats(); + f->dump_unsigned("stat_seq", service.get_osd_stat_seq()); + } + + else if (prefix == "heap") { + ret = ceph::osd_cmds::heap(*cct, cmdmap, *f, ss); + } + + else if (prefix == "debug dump_missing") { + f->open_array_section("pgs"); + vector pgs; + _get_pgs(&pgs); + for (auto& pg : pgs) { + string s = stringify(pg->pg_id); + f->open_array_section(s.c_str()); + pg->lock(); + pg->dump_missing(f.get()); + pg->unlock(); + f->close_section(); + } + f->close_section(); + } + + else if (prefix == "debug kick_recovery_wq") { + int64_t delay; + cmd_getval(cct, cmdmap, "delay", delay); + ostringstream oss; + oss << delay; + ret = cct->_conf.set_val("osd_recovery_delay_start", oss.str().c_str()); + if (ret != 0) { + ss << "kick_recovery_wq: error setting " + << "osd_recovery_delay_start to '" << delay << "': error " + << ret; + goto out; + } + cct->_conf.apply_changes(nullptr); + ss << "kicking recovery queue. set osd_recovery_delay_start " + << "to " << cct->_conf->osd_recovery_delay_start; + } + + else if (prefix == "cpu_profiler") { + ostringstream ds; + string arg; + cmd_getval(cct, cmdmap, "arg", arg); + vector argvec; + get_str_vec(arg, argvec); + cpu_profiler_handle_command(argvec, ds); + outbl.append(ds.str()); + } + + else if (prefix == "dump_pg_recovery_stats") { + lock_guard l(osd_lock); + pg_recovery_stats.dump_formatted(f.get()); + } + + else if (prefix == "reset_pg_recovery_stats") { + lock_guard l(osd_lock); + pg_recovery_stats.reset(); + } + + else if (prefix == "perf histogram dump") { + std::string logger; + std::string counter; + cmd_getval(cct, cmdmap, "logger", logger); + cmd_getval(cct, cmdmap, "counter", counter); + cct->get_perfcounters_collection()->dump_formatted_histograms( + f.get(), false, logger, counter); + } + + else if (prefix == "cache drop") { + lock_guard l(osd_lock); + dout(20) << "clearing all caches" << dendl; + // Clear the objectstore's cache - onode and buffer for Bluestore, + // system's pagecache for Filestore + ret = store->flush_cache(&ss); + if (ret < 0) { + ss << "Error flushing objectstore cache: " << cpp_strerror(ret); + goto out; + } + // Clear the objectcontext cache (per PG) + vector pgs; + _get_pgs(&pgs); + for (auto& pg: pgs) { + pg->clear_cache(); + } + } + + else if (prefix == "cache status") { + lock_guard l(osd_lock); + int obj_ctx_count = 0; + vector pgs; + _get_pgs(&pgs); + for (auto& pg: pgs) { + obj_ctx_count += pg->get_cache_obj_count(); + } + f->open_object_section("cache_status"); + f->dump_int("object_ctx", obj_ctx_count); + store->dump_cache_stats(f.get()); + f->close_section(); + } + + else if (prefix == "scrub_purged_snaps") { + lock_guard l(osd_lock); + scrub_purged_snaps(); + } + + else if (prefix == "dump_osd_network") { + lock_guard l(osd_lock); + int64_t value = 0; + if (!(cmd_getval(cct, cmdmap, "value", value))) { + // Convert milliseconds to microseconds + value = static_cast(g_conf().get_val( + "mon_warn_on_slow_ping_time")) * 1000; + if (value == 0) { + double ratio = g_conf().get_val("mon_warn_on_slow_ping_ratio"); + value = g_conf().get_val("osd_heartbeat_grace"); + value *= 1000000 * ratio; // Seconds of grace to microseconds at ratio + } + } else { + // Convert user input to microseconds + value *= 1000; + } + if (value < 0) value = 0; + + struct osd_ping_time_t { + uint32_t pingtime; + int to; + bool back; + std::array times; + std::array min; + std::array max; + uint32_t last; + uint32_t last_update; + + bool operator<(const osd_ping_time_t& rhs) const { + if (pingtime < rhs.pingtime) + return true; + if (pingtime > rhs.pingtime) + return false; + if (to < rhs.to) + return true; + if (to > rhs.to) + return false; + return back; + } + }; + + set sorted; + // Get pingtimes under lock and not on the stack + map *pingtimes = new map; + service.get_hb_pingtime(pingtimes); + for (auto j : *pingtimes) { + if (j.second.last_update == 0) + continue; + osd_ping_time_t item; + item.pingtime = std::max(j.second.back_pingtime[0], j.second.back_pingtime[1]); + item.pingtime = std::max(item.pingtime, j.second.back_pingtime[2]); + if (item.pingtime >= value) { + item.to = j.first; + item.times[0] = j.second.back_pingtime[0]; + item.times[1] = j.second.back_pingtime[1]; + item.times[2] = j.second.back_pingtime[2]; + item.min[0] = j.second.back_min[0]; + item.min[1] = j.second.back_min[1]; + item.min[2] = j.second.back_min[2]; + item.max[0] = j.second.back_max[0]; + item.max[1] = j.second.back_max[1]; + item.max[2] = j.second.back_max[2]; + item.last = j.second.back_last; + item.back = true; + item.last_update = j.second.last_update; + sorted.emplace(item); + } + if (j.second.front_last == 0) + continue; + item.pingtime = std::max(j.second.front_pingtime[0], j.second.front_pingtime[1]); + item.pingtime = std::max(item.pingtime, j.second.front_pingtime[2]); + if (item.pingtime >= value) { + item.to = j.first; + item.times[0] = j.second.front_pingtime[0]; + item.times[1] = j.second.front_pingtime[1]; + item.times[2] = j.second.front_pingtime[2]; + item.min[0] = j.second.front_min[0]; + item.min[1] = j.second.front_min[1]; + item.min[2] = j.second.front_min[2]; + item.max[0] = j.second.front_max[0]; + item.max[1] = j.second.front_max[1]; + item.max[2] = j.second.front_max[2]; + item.last = j.second.front_last; + item.last_update = j.second.last_update; + item.back = false; + sorted.emplace(item); + } + } + delete pingtimes; + // + // Network ping times (1min 5min 15min) + f->open_object_section("network_ping_times"); + f->dump_int("threshold", value / 1000); + f->open_array_section("entries"); + for (auto &sitem : boost::adaptors::reverse(sorted)) { + ceph_assert(sitem.pingtime >= value); + f->open_object_section("entry"); + + const time_t lu(sitem.last_update); + char buffer[26]; + string lustr(ctime_r(&lu, buffer)); + lustr.pop_back(); // Remove trailing \n + auto stale = cct->_conf.get_val("osd_heartbeat_stale"); + f->dump_string("last update", lustr); + f->dump_bool("stale", ceph_clock_now().sec() - sitem.last_update > stale); + f->dump_int("from osd", whoami); + f->dump_int("to osd", sitem.to); f->dump_string("interface", (sitem.back ? "back" : "front")); f->open_object_section("average"); f->dump_format_unquoted("1min", "%s", fixed_u_to_string(sitem.times[0],3).c_str()); @@ -3381,14 +3711,6 @@ void OSD::final_init() "the mon"); ceph_assert(r == 0); - r = admin_socket->register_command( "heap " \ - "name=heapcmd,type=CephString " \ - "name=value,type=CephString,req=false", - asok_hook, - "show heap usage info (available only if " - "compiled with tcmalloc)"); - ceph_assert(r == 0); - r = admin_socket->register_command("set_heap_property " \ "name=property,type=CephString " \ "name=value,type=CephInt", @@ -3438,7 +3760,7 @@ void OSD::final_init() ceph_assert(r == 0); - r = admin_socket->register_command("smart name=devid,type=CephString,req=False", + r = admin_socket->register_command("smart name=devid,type=CephString,req=false", asok_hook, "probe OSD devices for SMART data."); @@ -3546,6 +3868,125 @@ void OSD::final_init() test_ops_hook, "Inject a full disk (optional count times)"); ceph_assert(r == 0); + r = admin_socket->register_command( + "bench " \ + "name=count,type=CephInt,req=false " \ + "name=size,type=CephInt,req=false " \ + "name=object_size,type=CephInt,req=false " \ + "name=object_num,type=CephInt,req=false ", + asok_hook, + "OSD benchmark: write -byte objects(with ), " \ + "(default count=1G default size=4MB). Results in log."); + ceph_assert(r == 0); + r = admin_socket->register_command( + "cluster_log " \ + "name=level,type=CephChoices,strings=error,warning,info,debug " \ + "name=message,type=CephString,n=N", + asok_hook, + "log a message to the cluster log"); + ceph_assert(r == 0); + r = admin_socket->register_command( + "flush_pg_stats", + asok_hook, + "flush pg stats"); + ceph_assert(r == 0); + r = admin_socket->register_command( + "heap " \ + "name=heapcmd,type=CephChoices,strings=" \ + "dump|start_profiler|stop_profiler|release|get_release_rate|set_release_rate|stats " \ + "name=value,type=CephString,req=false", + asok_hook, + "show heap usage info (available only if compiled with tcmalloc)"); + ceph_assert(r == 0); + r = admin_socket->register_command( + "debug dump_missing " \ + "name=filename,type=CephFilepath", + asok_hook, + "dump missing objects to a named file"); + ceph_assert(r == 0); + r = admin_socket->register_command( + "debug kick_recovery_wq " \ + "name=delay,type=CephInt,range=0", + asok_hook, + "set osd_recovery_delay_start to "); + ceph_assert(r == 0); + r = admin_socket->register_command( + "cpu_profiler " \ + "name=arg,type=CephChoices,strings=status|flush", + asok_hook, + "run cpu profiling on daemon"); + ceph_assert(r == 0); + r = admin_socket->register_command( + "dump_pg_recovery_stats", + asok_hook, + "dump pg recovery statistics"); + ceph_assert(r == 0); + r = admin_socket->register_command( + "reset_pg_recovery_stats", + asok_hook, + "reset pg recovery statistics"); + ceph_assert(r == 0); + r = admin_socket->register_command( + "cache drop", + asok_hook, + "Drop all OSD caches"); + ceph_assert(r == 0); + r = admin_socket->register_command( + "cache status", + asok_hook, + "Get OSD caches statistics"); + ceph_assert(r == 0); + r = admin_socket->register_command( + "scrub_purged_snaps", + asok_hook, + "Scrub purged_snaps vs snapmapper index"); + ceph_assert(r == 0); + + // -- pg commands -- + // old form: ceph pg command ... + r = admin_socket->register_command( + "pg " \ + "name=pgid,type=CephPgid " \ + "name=cmd,type=CephChoices,strings=query", + asok_hook, + ""); + ceph_assert(r == 0); + r = admin_socket->register_command( + "pg " \ + "name=pgid,type=CephPgid " \ + "name=cmd,type=CephChoices,strings=mark_unfound_lost " \ + "name=mulcmd,type=CephChoices,strings=revert|delete", + asok_hook, + ""); + ceph_assert(r == 0); + r = admin_socket->register_command( + "pg " \ + "name=pgid,type=CephPgid " \ + "name=cmd,type=CephChoices,strings=list_unfound " \ + "name=offset,type=CephString,req=false", + asok_hook, + ""); + ceph_assert(r == 0); + // new form: tell for both cli and rest + r = admin_socket->register_command( + "query", + asok_hook, + "show details of a specific pg"); + ceph_assert(r == 0); + r = admin_socket->register_command( + "mark_unfound_lost " \ + "name=pgid,type=CephPgid,req=false " \ + "name=mulcmd,type=CephChoices,strings=revert|delete", + asok_hook, + "mark all unfound objects in this pg as lost, either removing or reverting to a prior version if one is available"); + ceph_assert(r == 0); + r = admin_socket->register_command( + "list_unfound " \ + "name=pgid,type=CephPgid,req=false " \ + "name=offset,type=CephString,req=false", + asok_hook, + "list unfound objects on this pg, perhaps starting at an offset given in JSON"); + ceph_assert(r == 0); } void OSD::create_logger() @@ -6346,18 +6787,12 @@ void OSD::handle_command(MCommand *m) m->put(); return; } - - OSDCap& caps = session->caps; - - if (!caps.allow_all() || m->get_source().is_mon()) { + if (!session->caps.allow_all()) { con->send_message(new MCommandReply(m, -EPERM)); m->put(); return; } - - Command *c = new Command(m->cmd, m->get_tid(), m->get_data(), con.get()); - command_wq.queue(c); - + cct->get_admin_socket()->queue_tell_command(m); m->put(); } @@ -6371,40 +6806,6 @@ struct OSDCommand { #define COMMAND(parsesig, helptext, module, perm) \ {parsesig, helptext, module, perm}, -// yes, these are really pg commands, but there's a limit to how -// much work it's worth. The OSD returns all of them. Make this -// form (pg ) valid only for the cli. -// Rest uses "tell " - -COMMAND("pg " \ - "name=pgid,type=CephPgid " \ - "name=cmd,type=CephChoices,strings=query", \ - "show details of a specific pg", "osd", "r") -COMMAND("pg " \ - "name=pgid,type=CephPgid " \ - "name=cmd,type=CephChoices,strings=mark_unfound_lost " \ - "name=mulcmd,type=CephChoices,strings=revert|delete", \ - "mark all unfound objects in this pg as lost, either removing or reverting to a prior version if one is available", - "osd", "rw") -COMMAND("pg " \ - "name=pgid,type=CephPgid " \ - "name=cmd,type=CephChoices,strings=list_unfound " \ - "name=offset,type=CephString,req=false", - "list unfound objects on this pg, perhaps starting at an offset given in JSON", - "osd", "r") - -// new form: tell for both cli and rest - -COMMAND("query", - "show details of a specific pg", "osd", "r") -COMMAND("mark_unfound_lost " \ - "name=mulcmd,type=CephChoices,strings=revert|delete", \ - "mark all unfound objects in this pg as lost, either removing or reverting to a prior version if one is available", - "osd", "rw") -COMMAND("list_unfound " \ - "name=offset,type=CephString,req=false", - "list unfound objects on this pg, perhaps starting at an offset given in JSON", - "osd", "r") COMMAND("perf histogram dump " "name=logger,type=CephString,req=false " "name=counter,type=CephString,req=false", @@ -6430,58 +6831,9 @@ COMMAND("config unset " \ "name=key,type=CephString", "Unset a configuration option at runtime (not persistent)", "osd", "rw") -COMMAND("cluster_log " \ - "name=level,type=CephChoices,strings=error,warning,info,debug " \ - "name=message,type=CephString,n=N", - "log a message to the cluster log", - "osd", "rw") -COMMAND("bench " \ - "name=count,type=CephInt,req=false " \ - "name=size,type=CephInt,req=false " \ - "name=object_size,type=CephInt,req=false " \ - "name=object_num,type=CephInt,req=false ", \ - "OSD benchmark: write -byte objects(with ), " \ - "(default count=1G default size=4MB). Results in log.", - "osd", "rw") -COMMAND("flush_pg_stats", "flush pg stats", "osd", "rw") -COMMAND("heap " \ - "name=heapcmd,type=CephChoices,strings="\ - "dump|start_profiler|stop_profiler|release|get_release_rate|set_release_rate|stats " \ - "name=value,type=CephString,req=false", - "show heap usage info (available only if compiled with tcmalloc)", - "osd", "rw") -COMMAND("debug dump_missing " \ - "name=filename,type=CephFilepath", - "dump missing objects to a named file", "osd", "r") -COMMAND("debug kick_recovery_wq " \ - "name=delay,type=CephInt,range=0", - "set osd_recovery_delay_start to ", "osd", "rw") -COMMAND("cpu_profiler " \ - "name=arg,type=CephChoices,strings=status|flush", - "run cpu profiling on daemon", "osd", "rw") -COMMAND("dump_pg_recovery_stats", "dump pg recovery statistics", - "osd", "r") -COMMAND("reset_pg_recovery_stats", "reset pg recovery statistics", - "osd", "rw") -COMMAND("compact", - "compact object store's omap. " - "WARNING: Compaction probably slows your requests", - "osd", "rw") -COMMAND("smart name=devid,type=CephString,req=False", +COMMAND("smart name=devid,type=CephString,req=false", "runs smartctl on this osd devices. ", "osd", "rw") -COMMAND("cache drop", - "Drop all OSD caches", - "osd", "rwx") -COMMAND("cache status", - "Get OSD caches statistics", - "osd", "r") -COMMAND("send_beacon", - "Send OSD beacon to mon immediately", - "osd", "r") -COMMAND("scrub_purged_snaps", - "Scrub purged_snaps vs snapmapper index", - "osd", "r") }; void OSD::do_command( @@ -6635,336 +6987,6 @@ int OSD::_do_command( r = 0; // make command idempotent } } - else if (prefix == "cluster_log") { - vector msg; - cmd_getval(cct, cmdmap, "message", msg); - if (msg.empty()) { - r = -EINVAL; - ss << "ignoring empty log message"; - goto out; - } - string message = msg.front(); - for (vector::iterator a = ++msg.begin(); a != msg.end(); ++a) - message += " " + *a; - string lvl; - cmd_getval(cct, cmdmap, "level", lvl); - clog_type level = string_to_clog_type(lvl); - if (level < 0) { - r = -EINVAL; - ss << "unknown level '" << lvl << "'"; - goto out; - } - clog->do_log(level, message); - } - - // either 'pg ' or - // 'tell ' (which comes in without any of that prefix)? - - else if (prefix == "pg" || - prefix == "query" || - prefix == "mark_unfound_lost" || - prefix == "list_unfound" - ) { - pg_t pgid; - - if (!cmd_getval(cct, cmdmap, "pgid", pgidstr)) { - ss << "no pgid specified"; - r = -EINVAL; - } else if (!pgid.parse(pgidstr.c_str())) { - ss << "couldn't parse pgid '" << pgidstr << "'"; - r = -EINVAL; - } else { - spg_t pcand; - PGRef pg; - if (osdmap->get_primary_shard(pgid, &pcand) && - (pg = _lookup_lock_pg(pcand))) { - if (pg->is_primary()) { - // simulate pg cmd= for pg->do-command - if (prefix != "pg") - cmd_putval(cct, cmdmap, "cmd", prefix); - try { - r = pg->do_command(cmdmap, ss, data, odata, con, tid); - } catch (const bad_cmd_get& e) { - pg->unlock(); - ss << e.what(); - return -EINVAL; - } - if (r == -EAGAIN) { - pg->unlock(); - // don't reply, pg will do so async - return -EAGAIN; - } - } else { - ss << "not primary for pgid " << pgid; - - // send them the latest diff to ensure they realize the mapping - // has changed. - service.send_incremental_map(osdmap->get_epoch() - 1, con, osdmap); - - // do not reply; they will get newer maps and realize they - // need to resend. - pg->unlock(); - return -EAGAIN; - } - pg->unlock(); - } else { - ss << "i don't have pgid " << pgid; - r = -ENOENT; - } - } - } - - else if (prefix == "bench") { - int64_t count; - int64_t bsize; - int64_t osize, onum; - // default count 1G, size 4MB - cmd_getval(cct, cmdmap, "count", count, (int64_t)1 << 30); - cmd_getval(cct, cmdmap, "size", bsize, (int64_t)4 << 20); - cmd_getval(cct, cmdmap, "object_size", osize, (int64_t)0); - cmd_getval(cct, cmdmap, "object_num", onum, (int64_t)0); - - uint32_t duration = cct->_conf->osd_bench_duration; - - if (bsize > (int64_t) cct->_conf->osd_bench_max_block_size) { - // let us limit the block size because the next checks rely on it - // having a sane value. If we allow any block size to be set things - // can still go sideways. - ss << "block 'size' values are capped at " - << byte_u_t(cct->_conf->osd_bench_max_block_size) << ". If you wish to use" - << " a higher value, please adjust 'osd_bench_max_block_size'"; - r = -EINVAL; - goto out; - } else if (bsize < (int64_t) (1 << 20)) { - // entering the realm of small block sizes. - // limit the count to a sane value, assuming a configurable amount of - // IOPS and duration, so that the OSD doesn't get hung up on this, - // preventing timeouts from going off - int64_t max_count = - bsize * duration * cct->_conf->osd_bench_small_size_max_iops; - if (count > max_count) { - ss << "'count' values greater than " << max_count - << " for a block size of " << byte_u_t(bsize) << ", assuming " - << cct->_conf->osd_bench_small_size_max_iops << " IOPS," - << " for " << duration << " seconds," - << " can cause ill effects on osd. " - << " Please adjust 'osd_bench_small_size_max_iops' with a higher" - << " value if you wish to use a higher 'count'."; - r = -EINVAL; - goto out; - } - } else { - // 1MB block sizes are big enough so that we get more stuff done. - // However, to avoid the osd from getting hung on this and having - // timers being triggered, we are going to limit the count assuming - // a configurable throughput and duration. - // NOTE: max_count is the total amount of bytes that we believe we - // will be able to write during 'duration' for the given - // throughput. The block size hardly impacts this unless it's - // way too big. Given we already check how big the block size - // is, it's safe to assume everything will check out. - int64_t max_count = - cct->_conf->osd_bench_large_size_max_throughput * duration; - if (count > max_count) { - ss << "'count' values greater than " << max_count - << " for a block size of " << byte_u_t(bsize) << ", assuming " - << byte_u_t(cct->_conf->osd_bench_large_size_max_throughput) << "/s," - << " for " << duration << " seconds," - << " can cause ill effects on osd. " - << " Please adjust 'osd_bench_large_size_max_throughput'" - << " with a higher value if you wish to use a higher 'count'."; - r = -EINVAL; - goto out; - } - } - - if (osize && bsize > osize) - bsize = osize; - - dout(1) << " bench count " << count - << " bsize " << byte_u_t(bsize) << dendl; - - ObjectStore::Transaction cleanupt; - - if (osize && onum) { - bufferlist bl; - bufferptr bp(osize); - bp.zero(); - bl.push_back(std::move(bp)); - bl.rebuild_page_aligned(); - for (int i=0; iqueue_transaction(service.meta_ch, std::move(t), NULL); - cleanupt.remove(coll_t(), ghobject_t(soid)); - } - } - - bufferlist bl; - bufferptr bp(bsize); - bp.zero(); - bl.push_back(std::move(bp)); - bl.rebuild_page_aligned(); - - { - C_SaferCond waiter; - if (!service.meta_ch->flush_commit(&waiter)) { - waiter.wait(); - } - } - - utime_t start = ceph_clock_now(); - for (int64_t pos = 0; pos < count; pos += bsize) { - char nm[30]; - unsigned offset = 0; - if (onum && osize) { - snprintf(nm, sizeof(nm), "disk_bw_test_%d", (int)(rand() % onum)); - offset = rand() % (osize / bsize) * bsize; - } else { - snprintf(nm, sizeof(nm), "disk_bw_test_%lld", (long long)pos); - } - object_t oid(nm); - hobject_t soid(sobject_t(oid, 0)); - ObjectStore::Transaction t; - t.write(coll_t::meta(), ghobject_t(soid), offset, bsize, bl); - store->queue_transaction(service.meta_ch, std::move(t), NULL); - if (!onum || !osize) - cleanupt.remove(coll_t::meta(), ghobject_t(soid)); - } - - { - C_SaferCond waiter; - if (!service.meta_ch->flush_commit(&waiter)) { - waiter.wait(); - } - } - utime_t end = ceph_clock_now(); - - // clean up - store->queue_transaction(service.meta_ch, std::move(cleanupt), NULL); - { - C_SaferCond waiter; - if (!service.meta_ch->flush_commit(&waiter)) { - waiter.wait(); - } - } - - double elapsed = end - start; - double rate = count / elapsed; - double iops = rate / bsize; - if (f) { - f->open_object_section("osd_bench_results"); - f->dump_int("bytes_written", count); - f->dump_int("blocksize", bsize); - f->dump_float("elapsed_sec", elapsed); - f->dump_float("bytes_per_sec", rate); - f->dump_float("iops", iops); - f->close_section(); - f->flush(ds); - } else { - ds << "bench: wrote " << byte_u_t(count) - << " in blocks of " << byte_u_t(bsize) << " in " - << elapsed << " sec at " << byte_u_t(rate) << "/sec " - << si_u_t(iops) << " IOPS"; - } - } - - else if (prefix == "flush_pg_stats") { - mgrc.send_pgstats(); - ds << service.get_osd_stat_seq() << "\n"; - } - - else if (prefix == "heap") { - r = ceph::osd_cmds::heap(*cct, cmdmap, *f, ds); - } - - else if (prefix == "debug dump_missing") { - if (!f) { - f.reset(new JSONFormatter(true)); - } - f->open_array_section("pgs"); - vector pgs; - _get_pgs(&pgs); - for (auto& pg : pgs) { - string s = stringify(pg->pg_id); - f->open_array_section(s.c_str()); - pg->lock(); - pg->dump_missing(f.get()); - pg->unlock(); - f->close_section(); - } - f->close_section(); - f->flush(ds); - } - else if (prefix == "debug kick_recovery_wq") { - int64_t delay; - cmd_getval(cct, cmdmap, "delay", delay); - ostringstream oss; - oss << delay; - unlock_guard unlock{osd_lock}; - r = cct->_conf.set_val("osd_recovery_delay_start", oss.str().c_str()); - if (r != 0) { - ss << "kick_recovery_wq: error setting " - << "osd_recovery_delay_start to '" << delay << "': error " - << r; - goto out; - } - cct->_conf.apply_changes(nullptr); - ss << "kicking recovery queue. set osd_recovery_delay_start " - << "to " << cct->_conf->osd_recovery_delay_start; - } - - else if (prefix == "cpu_profiler") { - string arg; - cmd_getval(cct, cmdmap, "arg", arg); - vector argvec; - get_str_vec(arg, argvec); - cpu_profiler_handle_command(argvec, ds); - } - - else if (prefix == "dump_pg_recovery_stats") { - stringstream s; - if (f) { - pg_recovery_stats.dump_formatted(f.get()); - f->flush(ds); - } else { - pg_recovery_stats.dump(s); - ds << "dump pg recovery stats: " << s.str(); - } - } - - else if (prefix == "reset_pg_recovery_stats") { - ss << "reset pg recovery stats"; - pg_recovery_stats.reset(); - } - - else if (prefix == "perf histogram dump") { - std::string logger; - std::string counter; - cmd_getval(cct, cmdmap, "logger", logger); - cmd_getval(cct, cmdmap, "counter", counter); - if (f) { - cct->get_perfcounters_collection()->dump_formatted_histograms( - f.get(), false, logger, counter); - f->flush(ds); - } - } - - else if (prefix == "compact") { - dout(1) << "triggering manual compaction" << dendl; - auto start = ceph::coarse_mono_clock::now(); - store->compact(); - auto end = ceph::coarse_mono_clock::now(); - double duration = std::chrono::duration(end-start).count(); - dout(1) << "finished manual compaction in " - << duration - << " seconds" << dendl; - ss << "compacted omap in " << duration << " seconds"; - } else if (prefix == "smart") { string devid; @@ -6972,50 +6994,10 @@ int OSD::_do_command( probe_smart(devid, ds); } - else if (prefix == "cache drop") { - dout(20) << "clearing all caches" << dendl; - // Clear the objectstore's cache - onode and buffer for Bluestore, - // system's pagecache for Filestore - r = store->flush_cache(&ss); - if (r < 0) { - ds << "Error flushing objectstore cache: " << cpp_strerror(r); - goto out; - } - // Clear the objectcontext cache (per PG) - vector pgs; - _get_pgs(&pgs); - for (auto& pg: pgs) { - pg->clear_cache(); - } - } - else if (prefix == "cache status") { - int obj_ctx_count = 0; - vector pgs; - _get_pgs(&pgs); - for (auto& pg: pgs) { - obj_ctx_count += pg->get_cache_obj_count(); - } - if (f) { - f->open_object_section("cache_status"); - f->dump_int("object_ctx", obj_ctx_count); - store->dump_cache_stats(f.get()); - f->close_section(); - f->flush(ds); - } else { - ds << "object_ctx: " << obj_ctx_count; - store->dump_cache_stats(ds); - } - } - else if (prefix == "send_beacon") { - if (is_active()) { - send_beacon(ceph::coarse_mono_clock::now()); - } - } else if (prefix == "scrub_purged_snaps") { - scrub_purged_snaps(); - } else { + else { ss << "unrecognized command '" << prefix << "'"; - r = -EINVAL; + r = -ENOSYS; } out: diff --git a/src/osd/PG.h b/src/osd/PG.h index 93b9d5d590fd4..fb86cf126d8f7 100644 --- a/src/osd/PG.h +++ b/src/osd/PG.h @@ -534,13 +534,11 @@ public: virtual int get_cache_obj_count() = 0; virtual void snap_trimmer(epoch_t epoch_queued) = 0; - virtual int do_command( - cmdmap_t cmdmap, - ostream& ss, - bufferlist& idata, - bufferlist& odata, - ConnectionRef conn, - ceph_tid_t tid) = 0; + virtual void do_command( + const string_view& prefix, + const cmdmap_t& cmdmap, + const bufferlist& idata, + std::function on_finish) = 0; virtual bool agent_work(int max) = 0; virtual bool agent_work(int max, int agent_flush_quota) = 0; diff --git a/src/osd/PrimaryLogPG.cc b/src/osd/PrimaryLogPG.cc index 52386a344ba58..3fffb8d227554 100644 --- a/src/osd/PrimaryLogPG.cc +++ b/src/osd/PrimaryLogPG.cc @@ -927,23 +927,31 @@ PrimaryLogPG::get_pgls_filter(bufferlist::const_iterator& iter) // ========================================================== -int PrimaryLogPG::do_command( - cmdmap_t cmdmap, - ostream& ss, - bufferlist& idata, - bufferlist& odata, - ConnectionRef con, - ceph_tid_t tid) -{ - string prefix; +void PrimaryLogPG::do_command( + const string_view& orig_prefix, + const cmdmap_t& cmdmap, + const bufferlist& idata, + std::function on_finish) +{ string format; - cmd_getval(cct, cmdmap, "format", format); - boost::scoped_ptr f(Formatter::create(format, "json-pretty", "json")); - + std::unique_ptr f(Formatter::create( + format, "json-pretty", "json-pretty")); + int ret = 0; + stringstream ss; // stderr error message stream + bufferlist outbl; // if empty at end, we'll dump formatter as output + + // get final prefix: + // - ceph pg foo -> prefix=pg, cmd=foo + // - ceph tell foo -> prefix=foo + string prefix(orig_prefix); string command; cmd_getval(cct, cmdmap, "cmd", command); - if (command == "query") { + if (command.size()) { + prefix = command; + } + + if (prefix == "query") { f->open_object_section("pg"); f->dump_stream("snap_trimq") << snap_trimq; f->dump_unsigned("snap_trimq_len", snap_trimq.size()); @@ -960,49 +968,53 @@ int PrimaryLogPG::do_command( f->close_section(); f->close_section(); - f->flush(odata); - return 0; } - else if (command == "mark_unfound_lost") { + + else if (prefix == "mark_unfound_lost") { string mulcmd; cmd_getval(cct, cmdmap, "mulcmd", mulcmd); int mode = -1; if (mulcmd == "revert") { if (pool.info.is_erasure()) { ss << "mode must be 'delete' for ec pool"; - return -EINVAL; + ret = -EINVAL; + goto out; } mode = pg_log_entry_t::LOST_REVERT; } else if (mulcmd == "delete") { mode = pg_log_entry_t::LOST_DELETE; } else { ss << "mode must be 'revert' or 'delete'; mark not yet implemented"; - return -EINVAL; + ret = -EINVAL; + goto out; } ceph_assert(mode == pg_log_entry_t::LOST_REVERT || - mode == pg_log_entry_t::LOST_DELETE); + mode == pg_log_entry_t::LOST_DELETE); if (!is_primary()) { ss << "not primary"; - return -EROFS; + ret = -EROFS; + goto out; } uint64_t unfound = recovery_state.get_missing_loc().num_unfound(); if (!unfound) { ss << "pg has no unfound objects"; - return 0; // make command idempotent + goto out; // make command idempotent } if (!recovery_state.all_unfound_are_queried_or_lost(get_osdmap())) { ss << "pg has " << unfound << " unfound objects but we haven't probed all sources, not marking lost"; - return -EINVAL; + ret = -EINVAL; + goto out; } - mark_all_unfound_lost(mode, con, tid); - return -EAGAIN; + mark_all_unfound_lost(mode, on_finish); + return; } - else if (command == "list_unfound") { + + else if (prefix == "list_unfound") { hobject_t offset; string offset_json; bool show_offset = false; @@ -1014,7 +1026,8 @@ int PrimaryLogPG::do_command( offset.decode(v); } catch (std::runtime_error& e) { ss << "error parsing offset: " << e.what(); - return -EINVAL; + ret = -EINVAL; + goto out; } show_offset = true; } @@ -1060,14 +1073,21 @@ int PrimaryLogPG::do_command( } f->dump_bool("more", p != needs_recovery_map.end()); f->close_section(); - f->flush(odata); - return 0; } - ss << "unknown pg command " << prefix; - return -EINVAL; + else { + ret = -ENOSYS; + ss << "prefix '" << prefix << "' not implemented"; + } + + out: + if (ret >= 0 && outbl.length() == 0) { + f->flush(outbl); + } + on_finish(ret, ss.str(), outbl); } + // ========================================================== void PrimaryLogPG::do_pg_op(OpRequestRef op) @@ -11716,8 +11736,7 @@ void PrimaryLogPG::do_update_log_missing_reply(OpRequestRef &op) */ void PrimaryLogPG::mark_all_unfound_lost( int what, - ConnectionRef con, - ceph_tid_t tid) + std::function on_finish) { dout(3) << __func__ << " " << pg_log_entry_t::get_op_name(what) << dendl; list oids; @@ -11814,7 +11833,7 @@ void PrimaryLogPG::mark_all_unfound_lost( log_entries, std::move(manager), std::optional >( - [this, oids, con, num_unfound, tid]() { + [this, oids, num_unfound, on_finish]() { if (recovery_state.perform_deletes_during_peering()) { for (auto oid : oids) { // clear old locations - merge_new_log_entries will have @@ -11848,11 +11867,8 @@ void PrimaryLogPG::mark_all_unfound_lost( string rs = ss.str(); dout(0) << "do_command r=" << 0 << " " << rs << dendl; osd->clog->info() << rs; - if (con) { - MCommandReply *reply = new MCommandReply(0, rs); - reply->set_tid(tid); - con->send_message(reply); - } + bufferlist empty; + on_finish(0, rs, empty); }), OpRequestRef()); } diff --git a/src/osd/PrimaryLogPG.h b/src/osd/PrimaryLogPG.h index d2af206095c89..afc70ea015850 100644 --- a/src/osd/PrimaryLogPG.h +++ b/src/osd/PrimaryLogPG.h @@ -1494,13 +1494,11 @@ public: spg_t p); ~PrimaryLogPG() override {} - int do_command( - cmdmap_t cmdmap, - ostream& ss, - bufferlist& idata, - bufferlist& odata, - ConnectionRef conn, - ceph_tid_t tid) override; + void do_command( + const string_view& prefix, + const cmdmap_t& cmdmap, + const bufferlist& idata, + std::function on_finish) override; void clear_cache(); int get_cache_obj_count() { @@ -1910,8 +1908,7 @@ public: void mark_all_unfound_lost( int what, - ConnectionRef con, - ceph_tid_t tid); + std::function on_finish); eversion_t pick_newest_available(const hobject_t& oid); void do_update_log_missing( -- 2.39.5