]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
osd: route tell commands to asok; migrate commands
authorSage Weil <sage@redhat.com>
Tue, 10 Sep 2019 03:07:03 +0000 (22:07 -0500)
committerSage Weil <sage@redhat.com>
Fri, 4 Oct 2019 14:07:02 +0000 (09:07 -0500)
- move items from _do_command to asok_command in OSD.cc
- update PG::do_command to take a std::function on_finish
- sprinkle in some osd_lock locking (_do_command implicitly locks osd_lock,
  asok_command() does not; most commands don't need it)

Signed-off-by: Sage Weil <sage@redhat.com>
src/osd/OSD.cc
src/osd/PG.h
src/osd/PrimaryLogPG.cc
src/osd/PrimaryLogPG.h

index d6170ce7d868ec358e426a8f2f047eb6e78bf7cc..b733173fd276cb6c0002b50ac982678112c8c685 100644 (file)
@@ -2367,13 +2367,65 @@ void OSD::asok_command(
   const bufferlist& inbl,
   std::function<void(int,const std::string&,bufferlist&)> on_finish)
 {
-  int ret = 0;
-  stringstream ss;   // stderr error message stream
   std::unique_ptr<Formatter> f(Formatter::create(
                                 format, "json-pretty", "json-pretty"));
+  int ret = 0;
+  stringstream ss;   // stderr error message stream
   bufferlist outbl;  // if empty at end, we'll dump formatter as output
 
-  if (prefix == "status") {
+  // --- PG commands are routed here to PG::do_command ---
+  if (prefix == "pg" ||
+      prefix == "query" ||
+      prefix == "mark_unfound_lost" ||
+      prefix == "list_unfound"
+    ) {
+    string pgidstr;
+    pg_t pgid;
+    if (!cmd_getval(cct, cmdmap, "pgid", pgidstr)) {
+      ss << "no pgid specified";
+      ret = -EINVAL;
+      goto out;
+    }
+    if (!pgid.parse(pgidstr.c_str())) {
+      ss << "couldn't parse pgid '" << pgidstr << "'";
+      ret = -EINVAL;
+      goto out;
+    }
+    spg_t pcand;
+    PGRef pg;
+    if (osdmap->get_primary_shard(pgid, &pcand) &&
+       (pg = _lookup_lock_pg(pcand))) {
+      if (pg->is_primary()) {
+       cmdmap_t new_cmdmap = cmdmap;
+       try {
+         pg->do_command(prefix, new_cmdmap, inbl, on_finish);
+         pg->unlock();
+         return; // the pg handler calls on_finish directly
+       } catch (const bad_cmd_get& e) {
+         pg->unlock();
+         ss << e.what();
+         ret = -EINVAL;
+         goto out;
+       }
+      } else {
+       ss << "not primary for pgid " << pgid;
+       // do not reply; they will get newer maps and realize they
+       // need to resend.
+#warning fixme on client side
+       pg->unlock();
+       ret = -EAGAIN;
+       goto out;
+      }
+    } else {
+      ss << "i don't have pgid " << pgid;
+      ret = -ENOENT;
+    }
+  }
+
+  // --- OSD commands follow ---
+
+  else if (prefix == "status") {
+    lock_guard l(osd_lock);
     f->open_object_section("status");
     f->dump_stream("cluster_fsid") << superblock.cluster_fsid;
     f->dump_stream("osd_fsid") << superblock.osd_fsid;
@@ -2509,14 +2561,6 @@ will start to track new ops received afterwards.";
     f->close_section();
   } else if (prefix == "get_latest_osdmap") {
     get_latest_osdmap();
-  } else if (prefix == "heap") {
-    auto result = ceph::osd_cmds::heap(*cct, cmdmap, *f, ss);
-
-    // Note: Failed heap profile commands won't necessarily trigger an error:
-    f->open_object_section("result");
-    f->dump_string("error", cpp_strerror(result));
-    f->dump_bool("success", result >= 0);
-    f->close_section();
   } else if (prefix == "set_heap_property") {
     string property;
     int64_t value = 0;
@@ -2621,114 +2665,400 @@ will start to track new ops received afterwards.";
     }
     f->close_section();
   } else if (prefix == "send_beacon") {
+    lock_guard l(osd_lock);
     if (is_active()) {
       send_beacon(ceph::coarse_mono_clock::now());
     }
-  } else if (prefix == "dump_osd_network") {
-    int64_t value = 0;
-    if (!(cmd_getval(cct, cmdmap, "value", value))) {
-      // Convert milliseconds to microseconds
-      value = static_cast<int64_t>(g_conf().get_val<double>("mon_warn_on_slow_ping_time")) * 1000;
-      if (value == 0) {
-        double ratio = g_conf().get_val<double>("mon_warn_on_slow_ping_ratio");
-        value = g_conf().get_val<int64_t>("osd_heartbeat_grace");
-        value *= 1000000 * ratio; // Seconds of grace to microseconds at ratio
+  }
+
+  else if (prefix == "cluster_log") {
+    vector<string> msg;
+    cmd_getval(cct, cmdmap, "message", msg);
+    if (msg.empty()) {
+      ret = -EINVAL;
+      ss << "ignoring empty log message";
+      goto out;
+    }
+    string message = msg.front();
+    for (vector<string>::iterator a = ++msg.begin(); a != msg.end(); ++a)
+      message += " " + *a;
+    string lvl;
+    cmd_getval(cct, cmdmap, "level", lvl);
+    clog_type level = string_to_clog_type(lvl);
+    if (level < 0) {
+      ret = -EINVAL;
+      ss << "unknown level '" << lvl << "'";
+      goto out;
+    }
+    clog->do_log(level, message);
+  }
+
+  else if (prefix == "bench") {
+    lock_guard l(osd_lock);
+    int64_t count;
+    int64_t bsize;
+    int64_t osize, onum;
+    // default count 1G, size 4MB
+    cmd_getval(cct, cmdmap, "count", count, (int64_t)1 << 30);
+    cmd_getval(cct, cmdmap, "size", bsize, (int64_t)4 << 20);
+    cmd_getval(cct, cmdmap, "object_size", osize, (int64_t)0);
+    cmd_getval(cct, cmdmap, "object_num", onum, (int64_t)0);
+
+    uint32_t duration = cct->_conf->osd_bench_duration;
+
+    if (bsize > (int64_t) cct->_conf->osd_bench_max_block_size) {
+      // let us limit the block size because the next checks rely on it
+      // having a sane value.  If we allow any block size to be set things
+      // can still go sideways.
+      ss << "block 'size' values are capped at "
+         << byte_u_t(cct->_conf->osd_bench_max_block_size) << ". If you wish to use"
+         << " a higher value, please adjust 'osd_bench_max_block_size'";
+      ret = -EINVAL;
+      goto out;
+    } else if (bsize < (int64_t) (1 << 20)) {
+      // entering the realm of small block sizes.
+      // limit the count to a sane value, assuming a configurable amount of
+      // IOPS and duration, so that the OSD doesn't get hung up on this,
+      // preventing timeouts from going off
+      int64_t max_count =
+        bsize * duration * cct->_conf->osd_bench_small_size_max_iops;
+      if (count > max_count) {
+        ss << "'count' values greater than " << max_count
+           << " for a block size of " << byte_u_t(bsize) << ", assuming "
+           << cct->_conf->osd_bench_small_size_max_iops << " IOPS,"
+           << " for " << duration << " seconds,"
+           << " can cause ill effects on osd. "
+           << " Please adjust 'osd_bench_small_size_max_iops' with a higher"
+           << " value if you wish to use a higher 'count'.";
+        ret = -EINVAL;
+        goto out;
       }
     } else {
-      // Convert user input to microseconds
-      value *= 1000;
+      // 1MB block sizes are big enough so that we get more stuff done.
+      // However, to avoid the osd from getting hung on this and having
+      // timers being triggered, we are going to limit the count assuming
+      // a configurable throughput and duration.
+      // NOTE: max_count is the total amount of bytes that we believe we
+      //       will be able to write during 'duration' for the given
+      //       throughput.  The block size hardly impacts this unless it's
+      //       way too big.  Given we already check how big the block size
+      //       is, it's safe to assume everything will check out.
+      int64_t max_count =
+        cct->_conf->osd_bench_large_size_max_throughput * duration;
+      if (count > max_count) {
+        ss << "'count' values greater than " << max_count
+           << " for a block size of " << byte_u_t(bsize) << ", assuming "
+           << byte_u_t(cct->_conf->osd_bench_large_size_max_throughput) << "/s,"
+           << " for " << duration << " seconds,"
+           << " can cause ill effects on osd. "
+           << " Please adjust 'osd_bench_large_size_max_throughput'"
+           << " with a higher value if you wish to use a higher 'count'.";
+        ret = -EINVAL;
+        goto out;
+      }
     }
-    if (value < 0) value = 0;
 
-    struct osd_ping_time_t {
-      uint32_t pingtime;
-      int to;
-      bool back;
-      std::array<uint32_t,3> times;
-      std::array<uint32_t,3> min;
-      std::array<uint32_t,3> max;
-      uint32_t last;
-      uint32_t last_update;
+    if (osize && bsize > osize)
+      bsize = osize;
 
-      bool operator<(const osd_ping_time_t& rhs) const {
-       if (pingtime < rhs.pingtime)
-          return true;
-       if (pingtime > rhs.pingtime)
-         return false;
-        if (to < rhs.to)
-         return true;
-        if (to > rhs.to)
-         return false;
-       return back;
+    dout(1) << " bench count " << count
+            << " bsize " << byte_u_t(bsize) << dendl;
+
+    ObjectStore::Transaction cleanupt;
+
+    if (osize && onum) {
+      bufferlist bl;
+      bufferptr bp(osize);
+      bp.zero();
+      bl.push_back(std::move(bp));
+      bl.rebuild_page_aligned();
+      for (int i=0; i<onum; ++i) {
+       char nm[30];
+       snprintf(nm, sizeof(nm), "disk_bw_test_%d", i);
+       object_t oid(nm);
+       hobject_t soid(sobject_t(oid, 0));
+       ObjectStore::Transaction t;
+       t.write(coll_t(), ghobject_t(soid), 0, osize, bl);
+       store->queue_transaction(service.meta_ch, std::move(t), NULL);
+       cleanupt.remove(coll_t(), ghobject_t(soid));
       }
-    };
+    }
 
-    set<osd_ping_time_t> sorted;
-    // Get pingtimes under lock and not on the stack
-    map<int, osd_stat_t::Interfaces> *pingtimes = new map<int, osd_stat_t::Interfaces>;
-    service.get_hb_pingtime(pingtimes);
-    for (auto j : *pingtimes) {
-      if (j.second.last_update == 0)
-       continue;
-      osd_ping_time_t item;
-      item.pingtime = std::max(j.second.back_pingtime[0], j.second.back_pingtime[1]);
-      item.pingtime = std::max(item.pingtime, j.second.back_pingtime[2]);
-      if (item.pingtime >= value) {
-       item.to = j.first;
-       item.times[0] = j.second.back_pingtime[0];
-       item.times[1] = j.second.back_pingtime[1];
-       item.times[2] = j.second.back_pingtime[2];
-       item.min[0] = j.second.back_min[0];
-       item.min[1] = j.second.back_min[1];
-       item.min[2] = j.second.back_min[2];
-       item.max[0] = j.second.back_max[0];
-       item.max[1] = j.second.back_max[1];
-       item.max[2] = j.second.back_max[2];
-       item.last = j.second.back_last;
-       item.back = true;
-       item.last_update = j.second.last_update;
-       sorted.emplace(item);
+    bufferlist bl;
+    bufferptr bp(bsize);
+    bp.zero();
+    bl.push_back(std::move(bp));
+    bl.rebuild_page_aligned();
+
+    {
+      C_SaferCond waiter;
+      if (!service.meta_ch->flush_commit(&waiter)) {
+       waiter.wait();
       }
-      if (j.second.front_last == 0)
-       continue;
-      item.pingtime = std::max(j.second.front_pingtime[0], j.second.front_pingtime[1]);
-      item.pingtime = std::max(item.pingtime, j.second.front_pingtime[2]);
-      if (item.pingtime >= value) {
-       item.to = j.first;
-       item.times[0] = j.second.front_pingtime[0];
-       item.times[1] = j.second.front_pingtime[1];
-       item.times[2] = j.second.front_pingtime[2];
-       item.min[0] = j.second.front_min[0];
-       item.min[1] = j.second.front_min[1];
-       item.min[2] = j.second.front_min[2];
-       item.max[0] = j.second.front_max[0];
-       item.max[1] = j.second.front_max[1];
-       item.max[2] = j.second.front_max[2];
-       item.last = j.second.front_last;
-       item.last_update = j.second.last_update;
-       item.back = false;
-       sorted.emplace(item);
+    }
+
+    utime_t start = ceph_clock_now();
+    for (int64_t pos = 0; pos < count; pos += bsize) {
+      char nm[30];
+      unsigned offset = 0;
+      if (onum && osize) {
+       snprintf(nm, sizeof(nm), "disk_bw_test_%d", (int)(rand() % onum));
+       offset = rand() % (osize / bsize) * bsize;
+      } else {
+       snprintf(nm, sizeof(nm), "disk_bw_test_%lld", (long long)pos);
       }
+      object_t oid(nm);
+      hobject_t soid(sobject_t(oid, 0));
+      ObjectStore::Transaction t;
+      t.write(coll_t::meta(), ghobject_t(soid), offset, bsize, bl);
+      store->queue_transaction(service.meta_ch, std::move(t), NULL);
+      if (!onum || !osize)
+       cleanupt.remove(coll_t::meta(), ghobject_t(soid));
     }
-    delete pingtimes;
-    //
-    // Network ping times (1min 5min 15min)
-    f->open_object_section("network_ping_times");
-    f->dump_int("threshold", value / 1000);
-    f->open_array_section("entries");
-    for (auto &sitem : boost::adaptors::reverse(sorted)) {
-      ceph_assert(sitem.pingtime >= value);
-      f->open_object_section("entry");
 
-      const time_t lu(sitem.last_update);
-      char buffer[26];
-      string lustr(ctime_r(&lu, buffer));
-      lustr.pop_back();   // Remove trailing \n
-      auto stale = cct->_conf.get_val<int64_t>("osd_heartbeat_stale");
-      f->dump_string("last update", lustr);
-      f->dump_bool("stale", ceph_clock_now().sec() - sitem.last_update > stale);
-      f->dump_int("from osd", whoami);
-      f->dump_int("to osd", sitem.to);
+    {
+      C_SaferCond waiter;
+      if (!service.meta_ch->flush_commit(&waiter)) {
+       waiter.wait();
+      }
+    }
+    utime_t end = ceph_clock_now();
+
+    // clean up
+    store->queue_transaction(service.meta_ch, std::move(cleanupt), NULL);
+    {
+      C_SaferCond waiter;
+      if (!service.meta_ch->flush_commit(&waiter)) {
+       waiter.wait();
+      }
+    }
+
+    double elapsed = end - start;
+    double rate = count / elapsed;
+    double iops = rate / bsize;
+    f->open_object_section("osd_bench_results");
+    f->dump_int("bytes_written", count);
+    f->dump_int("blocksize", bsize);
+    f->dump_float("elapsed_sec", elapsed);
+    f->dump_float("bytes_per_sec", rate);
+    f->dump_float("iops", iops);
+    f->close_section();
+  }
+
+  else if (prefix == "flush_pg_stats") {
+    mgrc.send_pgstats();
+    f->dump_unsigned("stat_seq", service.get_osd_stat_seq());
+  }
+
+  else if (prefix == "heap") {
+    ret = ceph::osd_cmds::heap(*cct, cmdmap, *f, ss);
+  }
+
+  else if (prefix == "debug dump_missing") {
+    f->open_array_section("pgs");
+    vector<PGRef> pgs;
+    _get_pgs(&pgs);
+    for (auto& pg : pgs) {
+      string s = stringify(pg->pg_id);
+      f->open_array_section(s.c_str());
+      pg->lock();
+      pg->dump_missing(f.get());
+      pg->unlock();
+      f->close_section();
+    }
+    f->close_section();
+  }
+
+  else if (prefix == "debug kick_recovery_wq") {
+    int64_t delay;
+    cmd_getval(cct, cmdmap, "delay", delay);
+    ostringstream oss;
+    oss << delay;
+    ret = cct->_conf.set_val("osd_recovery_delay_start", oss.str().c_str());
+    if (ret != 0) {
+      ss << "kick_recovery_wq: error setting "
+        << "osd_recovery_delay_start to '" << delay << "': error "
+        << ret;
+      goto out;
+    }
+    cct->_conf.apply_changes(nullptr);
+    ss << "kicking recovery queue. set osd_recovery_delay_start "
+       << "to " << cct->_conf->osd_recovery_delay_start;
+  }
+
+  else if (prefix == "cpu_profiler") {
+    ostringstream ds;
+    string arg;
+    cmd_getval(cct, cmdmap, "arg", arg);
+    vector<string> argvec;
+    get_str_vec(arg, argvec);
+    cpu_profiler_handle_command(argvec, ds);
+    outbl.append(ds.str());
+  }
+
+  else if (prefix == "dump_pg_recovery_stats") {
+    lock_guard l(osd_lock);
+    pg_recovery_stats.dump_formatted(f.get());
+  }
+
+  else if (prefix == "reset_pg_recovery_stats") {
+    lock_guard l(osd_lock);
+    pg_recovery_stats.reset();
+  }
+
+  else if (prefix == "perf histogram dump") {
+    std::string logger;
+    std::string counter;
+    cmd_getval(cct, cmdmap, "logger", logger);
+    cmd_getval(cct, cmdmap, "counter", counter);
+    cct->get_perfcounters_collection()->dump_formatted_histograms(
+      f.get(), false, logger, counter);
+  }
+
+  else if (prefix == "cache drop") {
+    lock_guard l(osd_lock);
+    dout(20) << "clearing all caches" << dendl;
+    // Clear the objectstore's cache - onode and buffer for Bluestore,
+    // system's pagecache for Filestore
+    ret = store->flush_cache(&ss);
+    if (ret < 0) {
+      ss << "Error flushing objectstore cache: " << cpp_strerror(ret);
+      goto out;
+    }
+    // Clear the objectcontext cache (per PG)
+    vector<PGRef> pgs;
+    _get_pgs(&pgs);
+    for (auto& pg: pgs) {
+      pg->clear_cache();
+    }
+  }
+
+  else if (prefix == "cache status") {
+    lock_guard l(osd_lock);
+    int obj_ctx_count = 0;
+    vector<PGRef> pgs;
+    _get_pgs(&pgs);
+    for (auto& pg: pgs) {
+      obj_ctx_count += pg->get_cache_obj_count();
+    }
+    f->open_object_section("cache_status");
+    f->dump_int("object_ctx", obj_ctx_count);
+    store->dump_cache_stats(f.get());
+    f->close_section();
+  }
+
+  else if (prefix == "scrub_purged_snaps") {
+    lock_guard l(osd_lock);
+    scrub_purged_snaps();
+  }
+
+  else if (prefix == "dump_osd_network") {
+    lock_guard l(osd_lock);
+    int64_t value = 0;
+    if (!(cmd_getval(cct, cmdmap, "value", value))) {
+      // Convert milliseconds to microseconds
+      value = static_cast<double>(g_conf().get_val<double>(
+                                   "mon_warn_on_slow_ping_time")) * 1000;
+      if (value == 0) {
+       double ratio = g_conf().get_val<double>("mon_warn_on_slow_ping_ratio");
+       value = g_conf().get_val<int64_t>("osd_heartbeat_grace");
+       value *= 1000000 * ratio; // Seconds of grace to microseconds at ratio
+      }
+    } else {
+      // Convert user input to microseconds
+      value *= 1000;
+    }
+    if (value < 0) value = 0;
+
+    struct osd_ping_time_t {
+      uint32_t pingtime;
+      int to;
+      bool back;
+      std::array<uint32_t,3> times;
+      std::array<uint32_t,3> min;
+      std::array<uint32_t,3> max;
+      uint32_t last;
+      uint32_t last_update;
+
+      bool operator<(const osd_ping_time_t& rhs) const {
+       if (pingtime < rhs.pingtime)
+          return true;
+       if (pingtime > rhs.pingtime)
+         return false;
+        if (to < rhs.to)
+         return true;
+        if (to > rhs.to)
+         return false;
+       return back;
+      }
+    };
+
+    set<osd_ping_time_t> sorted;
+    // Get pingtimes under lock and not on the stack
+    map<int, osd_stat_t::Interfaces> *pingtimes = new map<int, osd_stat_t::Interfaces>;
+    service.get_hb_pingtime(pingtimes);
+    for (auto j : *pingtimes) {
+      if (j.second.last_update == 0)
+       continue;
+      osd_ping_time_t item;
+      item.pingtime = std::max(j.second.back_pingtime[0], j.second.back_pingtime[1]);
+      item.pingtime = std::max(item.pingtime, j.second.back_pingtime[2]);
+      if (item.pingtime >= value) {
+       item.to = j.first;
+       item.times[0] = j.second.back_pingtime[0];
+       item.times[1] = j.second.back_pingtime[1];
+       item.times[2] = j.second.back_pingtime[2];
+       item.min[0] = j.second.back_min[0];
+       item.min[1] = j.second.back_min[1];
+       item.min[2] = j.second.back_min[2];
+       item.max[0] = j.second.back_max[0];
+       item.max[1] = j.second.back_max[1];
+       item.max[2] = j.second.back_max[2];
+       item.last = j.second.back_last;
+       item.back = true;
+       item.last_update = j.second.last_update;
+       sorted.emplace(item);
+      }
+      if (j.second.front_last == 0)
+       continue;
+      item.pingtime = std::max(j.second.front_pingtime[0], j.second.front_pingtime[1]);
+      item.pingtime = std::max(item.pingtime, j.second.front_pingtime[2]);
+      if (item.pingtime >= value) {
+       item.to = j.first;
+       item.times[0] = j.second.front_pingtime[0];
+       item.times[1] = j.second.front_pingtime[1];
+       item.times[2] = j.second.front_pingtime[2];
+       item.min[0] = j.second.front_min[0];
+       item.min[1] = j.second.front_min[1];
+       item.min[2] = j.second.front_min[2];
+       item.max[0] = j.second.front_max[0];
+       item.max[1] = j.second.front_max[1];
+       item.max[2] = j.second.front_max[2];
+       item.last = j.second.front_last;
+       item.last_update = j.second.last_update;
+       item.back = false;
+       sorted.emplace(item);
+      }
+    }
+    delete pingtimes;
+    //
+    // Network ping times (1min 5min 15min)
+    f->open_object_section("network_ping_times");
+    f->dump_int("threshold", value / 1000);
+    f->open_array_section("entries");
+    for (auto &sitem : boost::adaptors::reverse(sorted)) {
+      ceph_assert(sitem.pingtime >= value);
+      f->open_object_section("entry");
+
+      const time_t lu(sitem.last_update);
+      char buffer[26];
+      string lustr(ctime_r(&lu, buffer));
+      lustr.pop_back();   // Remove trailing \n
+      auto stale = cct->_conf.get_val<int64_t>("osd_heartbeat_stale");
+      f->dump_string("last update", lustr);
+      f->dump_bool("stale", ceph_clock_now().sec() - sitem.last_update > stale);
+      f->dump_int("from osd", whoami);
+      f->dump_int("to osd", sitem.to);
       f->dump_string("interface", (sitem.back ? "back" : "front"));
       f->open_object_section("average");
       f->dump_format_unquoted("1min", "%s", fixed_u_to_string(sitem.times[0],3).c_str());
@@ -3381,14 +3711,6 @@ void OSD::final_init()
                                     "the mon");
   ceph_assert(r == 0);
 
-  r = admin_socket->register_command( "heap " \
-                                      "name=heapcmd,type=CephString " \
-                                      "name=value,type=CephString,req=false",
-                                      asok_hook,
-                                      "show heap usage info (available only if "
-                                      "compiled with tcmalloc)");
-  ceph_assert(r == 0);
-
   r = admin_socket->register_command("set_heap_property " \
                                     "name=property,type=CephString " \
                                     "name=value,type=CephInt",
@@ -3438,7 +3760,7 @@ void OSD::final_init()
 
   ceph_assert(r == 0);
 
-  r = admin_socket->register_command("smart name=devid,type=CephString,req=False",
+  r = admin_socket->register_command("smart name=devid,type=CephString,req=false",
                                      asok_hook,
                                      "probe OSD devices for SMART data.");
 
@@ -3546,6 +3868,125 @@ void OSD::final_init()
    test_ops_hook,
    "Inject a full disk (optional count times)");
   ceph_assert(r == 0);
+  r = admin_socket->register_command(
+    "bench " \
+    "name=count,type=CephInt,req=false "    \
+    "name=size,type=CephInt,req=false "                   \
+    "name=object_size,type=CephInt,req=false "    \
+    "name=object_num,type=CephInt,req=false ",
+    asok_hook,
+    "OSD benchmark: write <count> <size>-byte objects(with <obj_size> <obj_num>), " \
+    "(default count=1G default size=4MB). Results in log.");
+  ceph_assert(r == 0);
+  r = admin_socket->register_command(
+    "cluster_log " \
+    "name=level,type=CephChoices,strings=error,warning,info,debug "    \
+    "name=message,type=CephString,n=N",
+    asok_hook,
+    "log a message to the cluster log");
+  ceph_assert(r == 0);
+  r = admin_socket->register_command(
+    "flush_pg_stats",
+    asok_hook,
+    "flush pg stats");
+  ceph_assert(r == 0);
+  r = admin_socket->register_command(
+    "heap " \
+    "name=heapcmd,type=CephChoices,strings="                           \
+    "dump|start_profiler|stop_profiler|release|get_release_rate|set_release_rate|stats " \
+    "name=value,type=CephString,req=false",
+    asok_hook,
+    "show heap usage info (available only if compiled with tcmalloc)");
+  ceph_assert(r == 0);
+  r = admin_socket->register_command(
+    "debug dump_missing "                      \
+    "name=filename,type=CephFilepath",
+    asok_hook,
+    "dump missing objects to a named file");
+  ceph_assert(r == 0);
+  r = admin_socket->register_command(
+    "debug kick_recovery_wq "                                          \
+    "name=delay,type=CephInt,range=0",
+    asok_hook,
+    "set osd_recovery_delay_start to <val>");
+  ceph_assert(r == 0);
+  r = admin_socket->register_command(
+    "cpu_profiler "                                            \
+    "name=arg,type=CephChoices,strings=status|flush",
+    asok_hook,
+    "run cpu profiling on daemon");
+  ceph_assert(r == 0);
+  r = admin_socket->register_command(
+    "dump_pg_recovery_stats",
+    asok_hook,
+    "dump pg recovery statistics");
+  ceph_assert(r == 0);
+  r = admin_socket->register_command(
+    "reset_pg_recovery_stats",
+    asok_hook,
+    "reset pg recovery statistics");
+  ceph_assert(r == 0);
+  r = admin_socket->register_command(
+    "cache drop",
+    asok_hook,
+    "Drop all OSD caches");
+  ceph_assert(r == 0);
+  r = admin_socket->register_command(
+    "cache status",
+    asok_hook,
+    "Get OSD caches statistics");
+  ceph_assert(r == 0);
+  r = admin_socket->register_command(
+    "scrub_purged_snaps",
+    asok_hook,
+    "Scrub purged_snaps vs snapmapper index");
+  ceph_assert(r == 0);
+
+  // -- pg commands --
+  // old form: ceph pg <pgid> command ...
+  r = admin_socket->register_command(
+    "pg "                         \
+    "name=pgid,type=CephPgid "    \
+    "name=cmd,type=CephChoices,strings=query",
+    asok_hook,
+    "");
+  ceph_assert(r == 0);
+  r = admin_socket->register_command(
+    "pg "                         \
+    "name=pgid,type=CephPgid "    \
+    "name=cmd,type=CephChoices,strings=mark_unfound_lost " \
+    "name=mulcmd,type=CephChoices,strings=revert|delete",
+    asok_hook,
+    "");
+  ceph_assert(r == 0);
+  r = admin_socket->register_command(
+    "pg "                         \
+    "name=pgid,type=CephPgid "    \
+    "name=cmd,type=CephChoices,strings=list_unfound " \
+    "name=offset,type=CephString,req=false",
+    asok_hook,
+    "");
+  ceph_assert(r == 0);
+  // new form: tell <pgid> <cmd> for both cli and rest
+  r = admin_socket->register_command(
+    "query",
+    asok_hook,
+    "show details of a specific pg");
+  ceph_assert(r == 0);
+  r = admin_socket->register_command(
+    "mark_unfound_lost "                                       \
+    "name=pgid,type=CephPgid,req=false "                       \
+    "name=mulcmd,type=CephChoices,strings=revert|delete",
+    asok_hook,
+    "mark all unfound objects in this pg as lost, either removing or reverting to a prior version if one is available");
+  ceph_assert(r == 0);
+  r = admin_socket->register_command(
+    "list_unfound "                                    \
+    "name=pgid,type=CephPgid,req=false "               \
+    "name=offset,type=CephString,req=false",
+    asok_hook,
+    "list unfound objects on this pg, perhaps starting at an offset given in JSON");
+  ceph_assert(r == 0);
 }
 
 void OSD::create_logger()
@@ -6346,18 +6787,12 @@ void OSD::handle_command(MCommand *m)
     m->put();
     return;
   }
-
-  OSDCap& caps = session->caps;
-
-  if (!caps.allow_all() || m->get_source().is_mon()) {
+  if (!session->caps.allow_all()) {
     con->send_message(new MCommandReply(m, -EPERM));
     m->put();
     return;
   }
-
-  Command *c = new Command(m->cmd, m->get_tid(), m->get_data(), con.get());
-  command_wq.queue(c);
-
+  cct->get_admin_socket()->queue_tell_command(m);
   m->put();
 }
 
@@ -6371,40 +6806,6 @@ struct OSDCommand {
 #define COMMAND(parsesig, helptext, module, perm) \
   {parsesig, helptext, module, perm},
 
-// yes, these are really pg commands, but there's a limit to how
-// much work it's worth.  The OSD returns all of them.  Make this
-// form (pg <pgid> <cmd>) valid only for the cli.
-// Rest uses "tell <pgid> <cmd>"
-
-COMMAND("pg " \
-       "name=pgid,type=CephPgid " \
-       "name=cmd,type=CephChoices,strings=query", \
-       "show details of a specific pg", "osd", "r")
-COMMAND("pg " \
-       "name=pgid,type=CephPgid " \
-       "name=cmd,type=CephChoices,strings=mark_unfound_lost " \
-       "name=mulcmd,type=CephChoices,strings=revert|delete", \
-       "mark all unfound objects in this pg as lost, either removing or reverting to a prior version if one is available",
-       "osd", "rw")
-COMMAND("pg " \
-       "name=pgid,type=CephPgid " \
-       "name=cmd,type=CephChoices,strings=list_unfound " \
-       "name=offset,type=CephString,req=false",
-       "list unfound objects on this pg, perhaps starting at an offset given in JSON",
-       "osd", "r")
-
-// new form: tell <pgid> <cmd> for both cli and rest
-
-COMMAND("query",
-       "show details of a specific pg", "osd", "r")
-COMMAND("mark_unfound_lost " \
-       "name=mulcmd,type=CephChoices,strings=revert|delete", \
-       "mark all unfound objects in this pg as lost, either removing or reverting to a prior version if one is available",
-       "osd", "rw")
-COMMAND("list_unfound " \
-       "name=offset,type=CephString,req=false",
-       "list unfound objects on this pg, perhaps starting at an offset given in JSON",
-       "osd", "r")
 COMMAND("perf histogram dump "
         "name=logger,type=CephString,req=false "
         "name=counter,type=CephString,req=false",
@@ -6430,58 +6831,9 @@ COMMAND("config unset " \
        "name=key,type=CephString",
        "Unset a configuration option at runtime (not persistent)",
        "osd", "rw")
-COMMAND("cluster_log " \
-       "name=level,type=CephChoices,strings=error,warning,info,debug " \
-       "name=message,type=CephString,n=N",
-       "log a message to the cluster log",
-       "osd", "rw")
-COMMAND("bench " \
-       "name=count,type=CephInt,req=false " \
-       "name=size,type=CephInt,req=false " \
-       "name=object_size,type=CephInt,req=false " \
-       "name=object_num,type=CephInt,req=false ", \
-       "OSD benchmark: write <count> <size>-byte objects(with <obj_size> <obj_num>), " \
-       "(default count=1G default size=4MB). Results in log.",
-       "osd", "rw")
-COMMAND("flush_pg_stats", "flush pg stats", "osd", "rw")
-COMMAND("heap " \
-       "name=heapcmd,type=CephChoices,strings="\
-           "dump|start_profiler|stop_profiler|release|get_release_rate|set_release_rate|stats " \
-       "name=value,type=CephString,req=false",
-       "show heap usage info (available only if compiled with tcmalloc)",
-       "osd", "rw")
-COMMAND("debug dump_missing " \
-       "name=filename,type=CephFilepath",
-       "dump missing objects to a named file", "osd", "r")
-COMMAND("debug kick_recovery_wq " \
-       "name=delay,type=CephInt,range=0",
-       "set osd_recovery_delay_start to <val>", "osd", "rw")
-COMMAND("cpu_profiler " \
-       "name=arg,type=CephChoices,strings=status|flush",
-       "run cpu profiling on daemon", "osd", "rw")
-COMMAND("dump_pg_recovery_stats", "dump pg recovery statistics",
-       "osd", "r")
-COMMAND("reset_pg_recovery_stats", "reset pg recovery statistics",
-       "osd", "rw")
-COMMAND("compact",
-        "compact object store's omap. "
-        "WARNING: Compaction probably slows your requests",
-        "osd", "rw")
-COMMAND("smart name=devid,type=CephString,req=False",
+COMMAND("smart name=devid,type=CephString,req=false",
         "runs smartctl on this osd devices.  ",
         "osd", "rw")
-COMMAND("cache drop",
-        "Drop all OSD caches",
-        "osd", "rwx")
-COMMAND("cache status",
-        "Get OSD caches statistics",
-        "osd", "r")
-COMMAND("send_beacon",
-        "Send OSD beacon to mon immediately",
-        "osd", "r")
-COMMAND("scrub_purged_snaps",
-       "Scrub purged_snaps vs snapmapper index",
-       "osd", "r")
 };
 
 void OSD::do_command(
@@ -6635,336 +6987,6 @@ int OSD::_do_command(
       r = 0;  // make command idempotent
     }
   }
-  else if (prefix == "cluster_log") {
-    vector<string> msg;
-    cmd_getval(cct, cmdmap, "message", msg);
-    if (msg.empty()) {
-      r = -EINVAL;
-      ss << "ignoring empty log message";
-      goto out;
-    }
-    string message = msg.front();
-    for (vector<string>::iterator a = ++msg.begin(); a != msg.end(); ++a)
-      message += " " + *a;
-    string lvl;
-    cmd_getval(cct, cmdmap, "level", lvl);
-    clog_type level = string_to_clog_type(lvl);
-    if (level < 0) {
-      r = -EINVAL;
-      ss << "unknown level '" << lvl << "'";
-      goto out;
-    }
-    clog->do_log(level, message);
-  }
-
-  // either 'pg <pgid> <command>' or
-  // 'tell <pgid>' (which comes in without any of that prefix)?
-
-  else if (prefix == "pg" ||
-           prefix == "query" ||
-           prefix == "mark_unfound_lost" ||
-           prefix == "list_unfound"
-          ) {
-    pg_t pgid;
-
-    if (!cmd_getval(cct, cmdmap, "pgid", pgidstr)) {
-      ss << "no pgid specified";
-      r = -EINVAL;
-    } else if (!pgid.parse(pgidstr.c_str())) {
-      ss << "couldn't parse pgid '" << pgidstr << "'";
-      r = -EINVAL;
-    } else {
-      spg_t pcand;
-      PGRef pg;
-      if (osdmap->get_primary_shard(pgid, &pcand) &&
-         (pg = _lookup_lock_pg(pcand))) {
-       if (pg->is_primary()) {
-         // simulate pg <pgid> cmd= for pg->do-command
-         if (prefix != "pg")
-           cmd_putval(cct, cmdmap, "cmd", prefix);
-         try {
-           r = pg->do_command(cmdmap, ss, data, odata, con, tid);
-         } catch (const bad_cmd_get& e) {
-           pg->unlock();
-           ss << e.what();
-           return -EINVAL;
-         }
-         if (r == -EAGAIN) {
-           pg->unlock();
-           // don't reply, pg will do so async
-           return -EAGAIN;
-         }
-       } else {
-         ss << "not primary for pgid " << pgid;
-
-         // send them the latest diff to ensure they realize the mapping
-         // has changed.
-         service.send_incremental_map(osdmap->get_epoch() - 1, con, osdmap);
-
-         // do not reply; they will get newer maps and realize they
-         // need to resend.
-         pg->unlock();
-         return -EAGAIN;
-       }
-       pg->unlock();
-      } else {
-       ss << "i don't have pgid " << pgid;
-       r = -ENOENT;
-      }
-    }
-  }
-
-  else if (prefix == "bench") {
-    int64_t count;
-    int64_t bsize;
-    int64_t osize, onum;
-    // default count 1G, size 4MB
-    cmd_getval(cct, cmdmap, "count", count, (int64_t)1 << 30);
-    cmd_getval(cct, cmdmap, "size", bsize, (int64_t)4 << 20);
-    cmd_getval(cct, cmdmap, "object_size", osize, (int64_t)0);
-    cmd_getval(cct, cmdmap, "object_num", onum, (int64_t)0);
-
-    uint32_t duration = cct->_conf->osd_bench_duration;
-
-    if (bsize > (int64_t) cct->_conf->osd_bench_max_block_size) {
-      // let us limit the block size because the next checks rely on it
-      // having a sane value.  If we allow any block size to be set things
-      // can still go sideways.
-      ss << "block 'size' values are capped at "
-         << byte_u_t(cct->_conf->osd_bench_max_block_size) << ". If you wish to use"
-         << " a higher value, please adjust 'osd_bench_max_block_size'";
-      r = -EINVAL;
-      goto out;
-    } else if (bsize < (int64_t) (1 << 20)) {
-      // entering the realm of small block sizes.
-      // limit the count to a sane value, assuming a configurable amount of
-      // IOPS and duration, so that the OSD doesn't get hung up on this,
-      // preventing timeouts from going off
-      int64_t max_count =
-        bsize * duration * cct->_conf->osd_bench_small_size_max_iops;
-      if (count > max_count) {
-        ss << "'count' values greater than " << max_count
-           << " for a block size of " << byte_u_t(bsize) << ", assuming "
-           << cct->_conf->osd_bench_small_size_max_iops << " IOPS,"
-           << " for " << duration << " seconds,"
-           << " can cause ill effects on osd. "
-           << " Please adjust 'osd_bench_small_size_max_iops' with a higher"
-           << " value if you wish to use a higher 'count'.";
-        r = -EINVAL;
-        goto out;
-      }
-    } else {
-      // 1MB block sizes are big enough so that we get more stuff done.
-      // However, to avoid the osd from getting hung on this and having
-      // timers being triggered, we are going to limit the count assuming
-      // a configurable throughput and duration.
-      // NOTE: max_count is the total amount of bytes that we believe we
-      //       will be able to write during 'duration' for the given
-      //       throughput.  The block size hardly impacts this unless it's
-      //       way too big.  Given we already check how big the block size
-      //       is, it's safe to assume everything will check out.
-      int64_t max_count =
-        cct->_conf->osd_bench_large_size_max_throughput * duration;
-      if (count > max_count) {
-        ss << "'count' values greater than " << max_count
-           << " for a block size of " << byte_u_t(bsize) << ", assuming "
-           << byte_u_t(cct->_conf->osd_bench_large_size_max_throughput) << "/s,"
-           << " for " << duration << " seconds,"
-           << " can cause ill effects on osd. "
-           << " Please adjust 'osd_bench_large_size_max_throughput'"
-           << " with a higher value if you wish to use a higher 'count'.";
-        r = -EINVAL;
-        goto out;
-      }
-    }
-
-    if (osize && bsize > osize)
-      bsize = osize;
-
-    dout(1) << " bench count " << count
-            << " bsize " << byte_u_t(bsize) << dendl;
-
-    ObjectStore::Transaction cleanupt;
-
-    if (osize && onum) {
-      bufferlist bl;
-      bufferptr bp(osize);
-      bp.zero();
-      bl.push_back(std::move(bp));
-      bl.rebuild_page_aligned();
-      for (int i=0; i<onum; ++i) {
-       char nm[30];
-       snprintf(nm, sizeof(nm), "disk_bw_test_%d", i);
-       object_t oid(nm);
-       hobject_t soid(sobject_t(oid, 0));
-       ObjectStore::Transaction t;
-       t.write(coll_t(), ghobject_t(soid), 0, osize, bl);
-       store->queue_transaction(service.meta_ch, std::move(t), NULL);
-       cleanupt.remove(coll_t(), ghobject_t(soid));
-      }
-    }
-
-    bufferlist bl;
-    bufferptr bp(bsize);
-    bp.zero();
-    bl.push_back(std::move(bp));
-    bl.rebuild_page_aligned();
-
-    {
-      C_SaferCond waiter;
-      if (!service.meta_ch->flush_commit(&waiter)) {
-       waiter.wait();
-      }
-    }
-
-    utime_t start = ceph_clock_now();
-    for (int64_t pos = 0; pos < count; pos += bsize) {
-      char nm[30];
-      unsigned offset = 0;
-      if (onum && osize) {
-       snprintf(nm, sizeof(nm), "disk_bw_test_%d", (int)(rand() % onum));
-       offset = rand() % (osize / bsize) * bsize;
-      } else {
-       snprintf(nm, sizeof(nm), "disk_bw_test_%lld", (long long)pos);
-      }
-      object_t oid(nm);
-      hobject_t soid(sobject_t(oid, 0));
-      ObjectStore::Transaction t;
-      t.write(coll_t::meta(), ghobject_t(soid), offset, bsize, bl);
-      store->queue_transaction(service.meta_ch, std::move(t), NULL);
-      if (!onum || !osize)
-       cleanupt.remove(coll_t::meta(), ghobject_t(soid));
-    }
-
-    {
-      C_SaferCond waiter;
-      if (!service.meta_ch->flush_commit(&waiter)) {
-       waiter.wait();
-      }
-    }
-    utime_t end = ceph_clock_now();
-
-    // clean up
-    store->queue_transaction(service.meta_ch, std::move(cleanupt), NULL);
-    {
-      C_SaferCond waiter;
-      if (!service.meta_ch->flush_commit(&waiter)) {
-       waiter.wait();
-      }
-    }
-
-    double elapsed = end - start;
-    double rate = count / elapsed;
-    double iops = rate / bsize;
-    if (f) {
-      f->open_object_section("osd_bench_results");
-      f->dump_int("bytes_written", count);
-      f->dump_int("blocksize", bsize);
-      f->dump_float("elapsed_sec", elapsed);
-      f->dump_float("bytes_per_sec", rate);
-      f->dump_float("iops", iops);
-      f->close_section();
-      f->flush(ds);
-    } else {
-      ds << "bench: wrote " << byte_u_t(count)
-        << " in blocks of " << byte_u_t(bsize) << " in "
-        << elapsed << " sec at " << byte_u_t(rate) << "/sec "
-        << si_u_t(iops) << " IOPS";
-    }
-  }
-
-  else if (prefix == "flush_pg_stats") {
-    mgrc.send_pgstats();
-    ds << service.get_osd_stat_seq() << "\n";
-  }
-
-  else if (prefix == "heap") {
-    r = ceph::osd_cmds::heap(*cct, cmdmap, *f, ds);
-  }
-
-  else if (prefix == "debug dump_missing") {
-    if (!f) {
-      f.reset(new JSONFormatter(true));
-    }
-    f->open_array_section("pgs");
-    vector<PGRef> pgs;
-    _get_pgs(&pgs);
-    for (auto& pg : pgs) {
-      string s = stringify(pg->pg_id);
-      f->open_array_section(s.c_str());
-      pg->lock();
-      pg->dump_missing(f.get());
-      pg->unlock();
-      f->close_section();
-    }
-    f->close_section();
-    f->flush(ds);
-  }
-  else if (prefix == "debug kick_recovery_wq") {
-    int64_t delay;
-    cmd_getval(cct, cmdmap, "delay", delay);
-    ostringstream oss;
-    oss << delay;
-    unlock_guard unlock{osd_lock};
-    r = cct->_conf.set_val("osd_recovery_delay_start", oss.str().c_str());
-    if (r != 0) {
-      ss << "kick_recovery_wq: error setting "
-        << "osd_recovery_delay_start to '" << delay << "': error "
-        << r;
-      goto out;
-    }
-    cct->_conf.apply_changes(nullptr);
-    ss << "kicking recovery queue. set osd_recovery_delay_start "
-       << "to " << cct->_conf->osd_recovery_delay_start;
-  }
-
-  else if (prefix == "cpu_profiler") {
-    string arg;
-    cmd_getval(cct, cmdmap, "arg", arg);
-    vector<string> argvec;
-    get_str_vec(arg, argvec);
-    cpu_profiler_handle_command(argvec, ds);
-  }
-
-  else if (prefix == "dump_pg_recovery_stats") {
-    stringstream s;
-    if (f) {
-      pg_recovery_stats.dump_formatted(f.get());
-      f->flush(ds);
-    } else {
-      pg_recovery_stats.dump(s);
-      ds << "dump pg recovery stats: " << s.str();
-    }
-  }
-
-  else if (prefix == "reset_pg_recovery_stats") {
-    ss << "reset pg recovery stats";
-    pg_recovery_stats.reset();
-  }
-
-  else if (prefix == "perf histogram dump") {
-    std::string logger;
-    std::string counter;
-    cmd_getval(cct, cmdmap, "logger", logger);
-    cmd_getval(cct, cmdmap, "counter", counter);
-    if (f) {
-      cct->get_perfcounters_collection()->dump_formatted_histograms(
-          f.get(), false, logger, counter);
-      f->flush(ds);
-    }
-  }
-
-  else if (prefix == "compact") {
-    dout(1) << "triggering manual compaction" << dendl;
-    auto start = ceph::coarse_mono_clock::now();
-    store->compact();
-    auto end = ceph::coarse_mono_clock::now();
-    double duration = std::chrono::duration<double>(end-start).count();
-    dout(1) << "finished manual compaction in "
-            << duration
-            << " seconds" << dendl;
-    ss << "compacted omap in " << duration << " seconds";
-  }
 
   else if (prefix == "smart") {
     string devid;
@@ -6972,50 +6994,10 @@ int OSD::_do_command(
     probe_smart(devid, ds);
   }
 
-  else if (prefix == "cache drop") {
-    dout(20) << "clearing all caches" << dendl;
-    // Clear the objectstore's cache - onode and buffer for Bluestore,
-    // system's pagecache for Filestore
-    r = store->flush_cache(&ss);
-    if (r < 0) {
-      ds << "Error flushing objectstore cache: " << cpp_strerror(r);
-      goto out;
-    }
-    // Clear the objectcontext cache (per PG)
-    vector<PGRef> pgs;
-    _get_pgs(&pgs);
-    for (auto& pg: pgs) {
-      pg->clear_cache();
-    }
-  }
 
-  else if (prefix == "cache status") {
-    int obj_ctx_count = 0;
-    vector<PGRef> pgs;
-    _get_pgs(&pgs);
-    for (auto& pg: pgs) {
-      obj_ctx_count += pg->get_cache_obj_count();
-    }
-    if (f) {
-      f->open_object_section("cache_status");
-      f->dump_int("object_ctx", obj_ctx_count);
-      store->dump_cache_stats(f.get());
-      f->close_section();
-      f->flush(ds);
-    } else {
-      ds << "object_ctx: " << obj_ctx_count;
-      store->dump_cache_stats(ds);
-    }
-  }
-  else if (prefix == "send_beacon") {
-    if (is_active()) {
-      send_beacon(ceph::coarse_mono_clock::now());
-    }
-  } else if (prefix == "scrub_purged_snaps") {
-    scrub_purged_snaps();
-  } else {
+  else {
     ss << "unrecognized command '" << prefix << "'";
-    r = -EINVAL;
+    r = -ENOSYS;
   }
 
  out:
index 93b9d5d590fd46a59cdbe6c77b74fe352496d37f..fb86cf126d8f71efa8dc8b6a4127ab3a099f94fe 100644 (file)
@@ -534,13 +534,11 @@ public:
   virtual int get_cache_obj_count() = 0;
 
   virtual void snap_trimmer(epoch_t epoch_queued) = 0;
-  virtual int do_command(
-    cmdmap_t cmdmap,
-    ostream& ss,
-    bufferlist& idata,
-    bufferlist& odata,
-    ConnectionRef conn,
-    ceph_tid_t tid) = 0;
+  virtual void do_command(
+    const string_view& prefix,
+    const cmdmap_t& cmdmap,
+    const bufferlist& idata,
+    std::function<void(int,const std::string&,bufferlist&)> on_finish) = 0;
 
   virtual bool agent_work(int max) = 0;
   virtual bool agent_work(int max, int agent_flush_quota) = 0;
index 52386a344ba580c9d8185cbde3fc80b1703814bc..3fffb8d227554fafbf68272ae1655ced9aeb8980 100644 (file)
@@ -927,23 +927,31 @@ PrimaryLogPG::get_pgls_filter(bufferlist::const_iterator& iter)
 
 // ==========================================================
 
-int PrimaryLogPG::do_command(
-  cmdmap_t cmdmap,
-  ostream& ss,
-  bufferlist& idata,
-  bufferlist& odata,
-  ConnectionRef con,
-  ceph_tid_t tid)
-{
-  string prefix;
+void PrimaryLogPG::do_command(
+  const string_view& orig_prefix,
+  const cmdmap_t& cmdmap,
+  const bufferlist& idata,
+  std::function<void(int,const std::string&,bufferlist&)> on_finish)
+{
   string format;
-
   cmd_getval(cct, cmdmap, "format", format);
-  boost::scoped_ptr<Formatter> f(Formatter::create(format, "json-pretty", "json"));
-
+  std::unique_ptr<Formatter> f(Formatter::create(
+                                format, "json-pretty", "json-pretty"));
+  int ret = 0;
+  stringstream ss;   // stderr error message stream
+  bufferlist outbl;  // if empty at end, we'll dump formatter as output
+
+  // get final prefix:
+  // - ceph pg <pgid> foo -> prefix=pg, cmd=foo
+  // - ceph tell <pgid> foo -> prefix=foo
+  string prefix(orig_prefix);
   string command;
   cmd_getval(cct, cmdmap, "cmd", command);
-  if (command == "query") {
+  if (command.size()) {
+    prefix = command;
+  }
+
+  if (prefix == "query") {
     f->open_object_section("pg");
     f->dump_stream("snap_trimq") << snap_trimq;
     f->dump_unsigned("snap_trimq_len", snap_trimq.size());
@@ -960,49 +968,53 @@ int PrimaryLogPG::do_command(
     f->close_section();
 
     f->close_section();
-    f->flush(odata);
-    return 0;
   }
-  else if (command == "mark_unfound_lost") {
+
+  else if (prefix == "mark_unfound_lost") {
     string mulcmd;
     cmd_getval(cct, cmdmap, "mulcmd", mulcmd);
     int mode = -1;
     if (mulcmd == "revert") {
       if (pool.info.is_erasure()) {
        ss << "mode must be 'delete' for ec pool";
-       return -EINVAL;
+       ret = -EINVAL;
+       goto out;
       }
       mode = pg_log_entry_t::LOST_REVERT;
     } else if (mulcmd == "delete") {
       mode = pg_log_entry_t::LOST_DELETE;
     } else {
       ss << "mode must be 'revert' or 'delete'; mark not yet implemented";
-      return -EINVAL;
+      ret = -EINVAL;
+      goto out;
     }
     ceph_assert(mode == pg_log_entry_t::LOST_REVERT ||
-          mode == pg_log_entry_t::LOST_DELETE);
+               mode == pg_log_entry_t::LOST_DELETE);
 
     if (!is_primary()) {
       ss << "not primary";
-      return -EROFS;
+      ret = -EROFS;
+      goto out;
     }
 
     uint64_t unfound = recovery_state.get_missing_loc().num_unfound();
     if (!unfound) {
       ss << "pg has no unfound objects";
-      return 0;  // make command idempotent
+      goto out;  // make command idempotent
     }
 
     if (!recovery_state.all_unfound_are_queried_or_lost(get_osdmap())) {
       ss << "pg has " << unfound
         << " unfound objects but we haven't probed all sources, not marking lost";
-      return -EINVAL;
+      ret = -EINVAL;
+      goto out;
     }
 
-    mark_all_unfound_lost(mode, con, tid);
-    return -EAGAIN;
+    mark_all_unfound_lost(mode, on_finish);
+    return;
   }
-  else if (command == "list_unfound") {
+
+  else if (prefix == "list_unfound") {
     hobject_t offset;
     string offset_json;
     bool show_offset = false;
@@ -1014,7 +1026,8 @@ int PrimaryLogPG::do_command(
        offset.decode(v);
       } catch (std::runtime_error& e) {
        ss << "error parsing offset: " << e.what();
-       return -EINVAL;
+       ret = -EINVAL;
+       goto out;
       }
       show_offset = true;
     }
@@ -1060,14 +1073,21 @@ int PrimaryLogPG::do_command(
     }
     f->dump_bool("more", p != needs_recovery_map.end());
     f->close_section();
-    f->flush(odata);
-    return 0;
   }
 
-  ss << "unknown pg command " << prefix;
-  return -EINVAL;
+  else {
+    ret = -ENOSYS;
+    ss << "prefix '" << prefix << "' not implemented";
+  }
+
+ out:
+  if (ret >= 0 && outbl.length() == 0) {
+    f->flush(outbl);
+  }
+  on_finish(ret, ss.str(), outbl);
 }
 
+
 // ==========================================================
 
 void PrimaryLogPG::do_pg_op(OpRequestRef op)
@@ -11716,8 +11736,7 @@ void PrimaryLogPG::do_update_log_missing_reply(OpRequestRef &op)
  */
 void PrimaryLogPG::mark_all_unfound_lost(
   int what,
-  ConnectionRef con,
-  ceph_tid_t tid)
+  std::function<void(int,const std::string&,bufferlist&)> on_finish)
 {
   dout(3) << __func__ << " " << pg_log_entry_t::get_op_name(what) << dendl;
   list<hobject_t> oids;
@@ -11814,7 +11833,7 @@ void PrimaryLogPG::mark_all_unfound_lost(
     log_entries,
     std::move(manager),
     std::optional<std::function<void(void)> >(
-      [this, oids, con, num_unfound, tid]() {
+      [this, oids, num_unfound, on_finish]() {
        if (recovery_state.perform_deletes_during_peering()) {
          for (auto oid : oids) {
            // clear old locations - merge_new_log_entries will have
@@ -11848,11 +11867,8 @@ void PrimaryLogPG::mark_all_unfound_lost(
        string rs = ss.str();
        dout(0) << "do_command r=" << 0 << " " << rs << dendl;
        osd->clog->info() << rs;
-       if (con) {
-         MCommandReply *reply = new MCommandReply(0, rs);
-         reply->set_tid(tid);
-         con->send_message(reply);
-       }
+       bufferlist empty;
+       on_finish(0, rs, empty);
       }),
     OpRequestRef());
 }
index d2af206095c8980eaf016a2547217f540058a9d7..afc70ea0158507db26ebb4b4fa32fe3b835956f8 100644 (file)
@@ -1494,13 +1494,11 @@ public:
               spg_t p);
   ~PrimaryLogPG() override {}
 
-  int do_command(
-    cmdmap_t cmdmap,
-    ostream& ss,
-    bufferlist& idata,
-    bufferlist& odata,
-    ConnectionRef conn,
-    ceph_tid_t tid) override;
+  void do_command(
+    const string_view& prefix,
+    const cmdmap_t& cmdmap,
+    const bufferlist& idata,
+    std::function<void(int,const std::string&,bufferlist&)> on_finish) override;
 
   void clear_cache();
   int get_cache_obj_count() {
@@ -1910,8 +1908,7 @@ public:
 
   void mark_all_unfound_lost(
     int what,
-    ConnectionRef con,
-    ceph_tid_t tid);
+    std::function<void(int,const std::string&,bufferlist&)> on_finish);
   eversion_t pick_newest_available(const hobject_t& oid);
 
   void do_update_log_missing(