]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/tools/perf_crimson_msgr: configure clients and client connections
authorYingxin Cheng <yingxin.cheng@intel.com>
Wed, 31 May 2023 07:31:59 +0000 (15:31 +0800)
committerYingxin Cheng <yingxin.cheng@intel.com>
Mon, 3 Jul 2023 05:50:18 +0000 (13:50 +0800)
Signed-off-by: Yingxin Cheng <yingxin.cheng@intel.com>
src/crimson/tools/perf_crimson_msgr.cc

index d5e28737391ade53b4770a91f383dd914c45c492..9761e8db3bfe470c5c7fecf3da27f83f90aa21ec 100644 (file)
@@ -4,6 +4,7 @@
 #include <map>
 #include <random>
 #include <boost/program_options.hpp>
+#include <boost/iterator/counting_iterator.hpp>
 
 #include <seastar/core/app-template.hh>
 #include <seastar/core/do_with.hh>
@@ -80,7 +81,8 @@ struct client_config {
   unsigned block_size;
   unsigned ramptime;
   unsigned msgtime;
-  unsigned jobs;
+  unsigned num_clients;
+  unsigned num_conns;
   unsigned depth;
 
   std::string str() const {
@@ -89,7 +91,8 @@ struct client_config {
         << "](bs=" << block_size
         << ", ramptime=" << ramptime
         << ", msgtime=" << msgtime
-        << ", jobs=" << jobs
+        << ", num_clients=" << num_clients
+        << ", num_conns=" << num_conns
         << ", depth=" << depth
         << ")";
     return out.str();
@@ -105,7 +108,10 @@ struct client_config {
     conf.block_size = options["client-bs"].as<unsigned>();
     conf.ramptime = options["ramptime"].as<unsigned>();
     conf.msgtime = options["msgtime"].as<unsigned>();
-    conf.jobs = options["client-jobs"].as<unsigned>();
+    conf.num_clients = options["clients"].as<unsigned>();
+    ceph_assert_always(conf.num_clients > 0);
+    conf.num_conns = options["conns-per-client"].as<unsigned>();
+    ceph_assert_always(conf.num_conns > 0);
     conf.depth = options["depth"].as<unsigned>();
     return conf;
   }
@@ -299,10 +305,12 @@ static seastar::future<> run(
         }
 
         void finish_connecting() {
+          ceph_assert_always(connected_time == mono_clock::zero());
           connected_time = mono_clock::now();
         }
 
         void start_collect() {
+          ceph_assert_always(connected_time != mono_clock::zero());
           start_time = mono_clock::now();
           start_count = received_count;
           sampled_count = 0u;
@@ -315,7 +323,6 @@ static seastar::future<> run(
           finish_time = mono_clock::now();
         }
       };
-      ConnStats conn_stats;
 
       struct PeriodStats {
         mono_time start_time = mono_clock::zero();
@@ -327,7 +334,6 @@ static seastar::future<> run(
         mono_time finish_time = mono_clock::zero();
         unsigned finish_count = 0u;
         unsigned depth = 0u;
-        double reactor_utilization = 0;
 
         void start_collect(unsigned received_count) {
           start_time = mono_clock::now();
@@ -345,31 +351,12 @@ static seastar::future<> run(
           snapshot.finish_time = mono_clock::now();
           snapshot.finish_count = received_count;
           snapshot.depth = _depth;
-          snapshot.reactor_utilization = get_reactor_utilization();
 
           start_collect(received_count);
         }
       };
-      PeriodStats period_stats;
 
-      const seastar::shard_id sid;
-      const unsigned id;
-      std::string lname;
-      const std::optional<unsigned> server_sid;
-
-      const unsigned jobs;
-      crimson::net::MessengerRef msgr;
-      const unsigned msg_len;
-      bufferlist msg_data;
-      const unsigned nr_depth;
-      seastar::semaphore depth;
-      std::vector<mono_time> time_msgs_sent;
-      crimson::auth::DummyAuthClientServer dummy_auth;
-
-      unsigned sent_count = 0u;
-      crimson::net::ConnectionRef active_conn = nullptr;
-
-      struct ClientStats {
+      struct JobReport {
         std::string name;
         unsigned depth = 0;
         double connect_time_s = 0;
@@ -379,6 +366,16 @@ static seastar::future<> run(
         double iops = 0;
         double throughput_mbps = 0;
 
+        void account(const JobReport &stats) {
+          depth += stats.depth;
+          connect_time_s += stats.connect_time_s;
+          total_msgs += stats.total_msgs;
+          messaging_time_s += stats.messaging_time_s;
+          latency_ms += stats.latency_ms;
+          iops += stats.iops;
+          throughput_mbps += stats.throughput_mbps;
+        }
+
         void report() const {
           auto str = fmt::format(
             "{}(depth={}):\n"
@@ -396,65 +393,111 @@ static seastar::future<> run(
         }
       };
 
-      bool stop_send = false;
-      seastar::promise<ClientStats> stopped_send_promise;
+      struct ConnectionPriv : public crimson::net::Connection::user_private_t {
+        unsigned index;
+        ConnectionPriv(unsigned i) : index{i} {}
+      };
+
+      struct ConnState {
+        crimson::net::MessengerRef msgr;
+        ConnStats conn_stats;
+        PeriodStats period_stats;
+        seastar::semaphore depth;
+        std::vector<mono_time> time_msgs_sent;
+        unsigned sent_count = 0u;
+        crimson::net::ConnectionRef active_conn;
+        bool stop_send = false;
+        seastar::promise<JobReport> stopped_send_promise;
+
+        ConnState(std::size_t _depth)
+          : depth{_depth},
+            time_msgs_sent{_depth, mono_clock::zero()} {}
+
+        unsigned get_current_units() const {
+          ceph_assert(depth.available_units() >= 0);
+          return depth.current();
+        }
+
+        seastar::future<JobReport> stop_dispatch_messages() {
+          stop_send = true;
+          depth.broken(DepthBroken());
+          return stopped_send_promise.get_future();
+        }
+      };
 
-      Client(unsigned jobs,
+      const seastar::shard_id sid;
+      const unsigned id;
+      const std::optional<unsigned> server_sid;
+
+      const unsigned num_clients;
+      const unsigned num_conns;
+      const unsigned msg_len;
+      bufferlist msg_data;
+      const unsigned nr_depth;
+      crimson::auth::DummyAuthClientServer dummy_auth;
+
+      std::vector<ConnState> conn_states;
+
+      Client(unsigned num_clients,
+             unsigned num_conns,
              unsigned msg_len,
              unsigned _depth,
              std::optional<unsigned> server_sid)
         : sid{seastar::this_shard_id()},
-          id{sid + jobs - seastar::smp::count},
+          id{sid + num_clients - seastar::smp::count},
           server_sid{server_sid},
-          jobs{jobs},
+          num_clients{num_clients},
+          num_conns{num_conns},
           msg_len{msg_len},
-          nr_depth{_depth},
-          depth{_depth},
-          time_msgs_sent{_depth, mono_clock::zero()} {
+          nr_depth{_depth} {
         if (is_active()) {
           assert(sid > 0);
-          lname = "client";
-          lname += std::to_string(id);
-          lname += "@";
-          lname += std::to_string(sid);
-        } else {
-          lname = "invalid_client";
+          for (unsigned i = 0; i < num_conns; ++i) {
+            conn_states.emplace_back(nr_depth);
+          }
         }
         msg_data.append_zero(msg_len);
       }
 
-      unsigned get_current_depth() const {
-        ceph_assert(depth.available_units() >= 0);
-        return nr_depth - depth.current();
+      std::string get_name(unsigned i) {
+        return fmt::format("client{}Conn{}@{}", id, i, sid);
       }
 
       void ms_handle_connect(
           crimson::net::ConnectionRef conn,
           seastar::shard_id new_shard) override {
         ceph_assert_always(new_shard == seastar::this_shard_id());
-        conn_stats.finish_connecting();
+        assert(is_active());
+        unsigned index = static_cast<ConnectionPriv&>(conn->get_user_private()).index;
+        auto &conn_state = conn_states[index];
+        conn_state.conn_stats.finish_connecting();
       }
 
       std::optional<seastar::future<>> ms_dispatch(
-          crimson::net::ConnectionRef, MessageRef m) override {
+          crimson::net::ConnectionRef conn, MessageRef m) override {
         assert(is_active());
         // server replies with MOSDOp to generate server-side write workload
         ceph_assert(m->get_type() == CEPH_MSG_OSD_OP);
 
+        unsigned index = static_cast<ConnectionPriv&>(conn->get_user_private()).index;
+        assert(index < num_conns);
+        auto &conn_state = conn_states[index];
+
         auto msg_id = m->get_tid();
         if (msg_id % SAMPLE_RATE == 0) {
-          auto index = msg_id % time_msgs_sent.size();
-          ceph_assert(time_msgs_sent[index] != mono_clock::zero());
-          std::chrono::duration<double> cur_latency = mono_clock::now() - time_msgs_sent[index];
-          conn_stats.sampled_total_lat_s += cur_latency.count();
-          ++(conn_stats.sampled_count);
-          period_stats.sampled_total_lat_s += cur_latency.count();
-          ++(period_stats.sampled_count);
-          time_msgs_sent[index] = mono_clock::zero();
+          auto msg_index = msg_id % conn_state.time_msgs_sent.size();
+          ceph_assert(conn_state.time_msgs_sent[msg_index] != mono_clock::zero());
+          std::chrono::duration<double> cur_latency =
+              mono_clock::now() - conn_state.time_msgs_sent[msg_index];
+          conn_state.conn_stats.sampled_total_lat_s += cur_latency.count();
+          ++(conn_state.conn_stats.sampled_count);
+          conn_state.period_stats.sampled_total_lat_s += cur_latency.count();
+          ++(conn_state.period_stats.sampled_count);
+          conn_state.time_msgs_sent[msg_index] = mono_clock::zero();
         }
 
-        ++(conn_stats.received_count);
-        depth.signal(1);
+        ++(conn_state.conn_stats.received_count);
+        conn_state.depth.signal(1);
 
         return {seastar::now()};
       }
@@ -462,20 +505,27 @@ static seastar::future<> run(
       // should start messenger at this shard?
       bool is_active() {
         ceph_assert(seastar::this_shard_id() == sid);
-        ceph_assert(seastar::smp::count > jobs);
-        return sid + jobs >= seastar::smp::count;
+        ceph_assert(seastar::smp::count > num_clients);
+        return sid + num_clients >= seastar::smp::count;
       }
 
       seastar::future<> init() {
-        return container().invoke_on_all([] (auto& client) {
+        return container().invoke_on_all([](auto& client) {
           if (client.is_active()) {
-            client.msgr = crimson::net::Messenger::create(
-                entity_name_t::OSD(client.sid),
-                client.lname, client.sid, true);
-            client.msgr->set_default_policy(crimson::net::SocketPolicy::lossy_client(0));
-            client.msgr->set_auth_client(&client.dummy_auth);
-            client.msgr->set_auth_server(&client.dummy_auth);
-            return client.msgr->start({&client});
+            return seastar::do_for_each(
+                boost::make_counting_iterator(0u),
+                boost::make_counting_iterator(client.num_conns),
+                [&client](auto i) {
+              auto &conn_state = client.conn_states[i];
+              std::string name = client.get_name(i);
+              conn_state.msgr = crimson::net::Messenger::create(
+                  entity_name_t::OSD(client.id * client.num_conns + i),
+                  name, client.id * client.num_conns + i, true);
+              conn_state.msgr->set_default_policy(crimson::net::SocketPolicy::lossy_client(0));
+              conn_state.msgr->set_auth_client(&client.dummy_auth);
+              conn_state.msgr->set_auth_server(&client.dummy_auth);
+              return conn_state.msgr->start({&client});
+            });
           }
           return seastar::now();
         });
@@ -483,56 +533,87 @@ static seastar::future<> run(
 
       seastar::future<> shutdown() {
         return seastar::do_with(
-            std::vector<ClientStats>(jobs),
+            std::vector<JobReport>(num_clients * num_conns),
             [this](auto &all_stats) {
           return container().invoke_on_all([&all_stats](auto& client) {
             if (!client.is_active()) {
               return seastar::now();
             }
 
-            logger().info("{} shutdown...", client.lname);
-            ceph_assert(client.msgr);
-            client.msgr->stop();
-            return seastar::when_all(
-              client.stop_dispatch_messages(
-              ).then([&all_stats, &client](auto stats) {
-                all_stats[client.id] = stats;
-              }),
-              client.msgr->shutdown()
-            ).discard_result();
-          }).then([&all_stats] {
-            auto nr_clients = all_stats.size();
-            ClientStats summary;
-            summary.name = "AllClients" + std::to_string(nr_clients);
-            for (const auto &stats : all_stats) {
+            return seastar::parallel_for_each(
+                boost::make_counting_iterator(0u),
+                boost::make_counting_iterator(client.num_conns),
+                [&all_stats, &client](auto i) {
+              logger().info("{} shutdown...", client.get_name(i));
+              auto &conn_state = client.conn_states[i];
+              return conn_state.stop_dispatch_messages(
+              ).then([&all_stats, &client, i](auto stats) {
+                all_stats[client.id * client.num_conns + i] = stats;
+              });
+            }).then([&client] {
+              return seastar::do_for_each(
+                  boost::make_counting_iterator(0u),
+                  boost::make_counting_iterator(client.num_conns),
+                  [&client](auto i) {
+                auto &conn_state = client.conn_states[i];
+                ceph_assert(conn_state.msgr);
+                conn_state.msgr->stop();
+                return conn_state.msgr->shutdown();
+              });
+            });
+          }).then([&all_stats, this] {
+            auto nr_jobs = all_stats.size();
+            JobReport summary;
+            std::vector<JobReport> clients(num_clients);
+
+            for (unsigned i = 0; i < nr_jobs; ++i) {
+              auto &stats = all_stats[i];
               stats.report();
-              summary.depth += stats.depth;
-              summary.connect_time_s += stats.connect_time_s;
-              summary.total_msgs += stats.total_msgs;
-              summary.messaging_time_s += stats.messaging_time_s;
-              summary.latency_ms += stats.latency_ms;
-              summary.iops += stats.iops;
-              summary.throughput_mbps += stats.throughput_mbps;
+              clients[i / num_conns].account(stats);
+              summary.account(stats);
             }
-            summary.connect_time_s /= nr_clients;
-            summary.messaging_time_s /= nr_clients;
-            summary.latency_ms /= nr_clients;
+
+            std::cout << std::endl;
+            std::cout << "per client:" << std::endl;
+            for (unsigned i = 0; i < num_clients; ++i) {
+              auto &stats = clients[i];
+              stats.name = fmt::format("client{}", i);
+              stats.connect_time_s /= num_conns;
+              stats.messaging_time_s /= num_conns;
+              stats.latency_ms /= num_conns;
+              stats.report();
+            }
+
+            std::cout << std::endl;
+            summary.name = fmt::format("all", nr_jobs);
+            summary.connect_time_s /= nr_jobs;
+            summary.messaging_time_s /= nr_jobs;
+            summary.latency_ms /= nr_jobs;
             summary.report();
           });
         });
       }
 
       seastar::future<> connect_wait_verify(const entity_addr_t& peer_addr) {
-        return container().invoke_on_all([peer_addr] (auto& client) {
+        return container().invoke_on_all([peer_addr](auto& client) {
           // start clients in active cores
           if (client.is_active()) {
-            client.conn_stats.start_connecting();
-            client.active_conn = client.msgr->connect(peer_addr, entity_name_t::TYPE_OSD);
+            for (unsigned i = 0; i < client.num_conns; ++i) {
+              auto &conn_state = client.conn_states[i];
+              conn_state.conn_stats.start_connecting();
+              conn_state.active_conn = conn_state.msgr->connect(peer_addr, entity_name_t::TYPE_OSD);
+              conn_state.active_conn->set_user_private(
+                  std::make_unique<ConnectionPriv>(i));
+            }
             // make sure handshake won't hurt the performance
             return seastar::sleep(1s).then([&client] {
-              if (client.conn_stats.connected_time == mono_clock::zero()) {
-                logger().error("\n{} not connected after 1s!\n", client.lname);
-                ceph_assert(false);
+              for (unsigned i = 0; i < client.num_conns; ++i) {
+                auto &conn_state = client.conn_states[i];
+                if (conn_state.conn_stats.connected_time == mono_clock::zero()) {
+                  logger().error("\n{} not connected after 1s!\n",
+                                 client.get_name(i));
+                  ceph_assert(false);
+                }
               }
             });
           }
@@ -543,31 +624,39 @@ static seastar::future<> run(
      private:
       class TimerReport {
        private:
-        const unsigned jobs;
+        const unsigned num_clients;
+        const unsigned num_conns;
         const unsigned msgtime;
         const unsigned bytes_of_block;
 
         unsigned elapsed = 0u;
         std::vector<PeriodStats> snaps;
         std::vector<ConnStats> summaries;
+        std::vector<double> client_reactor_utilizations;
         std::optional<double> server_reactor_utilization;
 
        public:
-        TimerReport(unsigned jobs, unsigned msgtime, unsigned bs)
-          : jobs{jobs},
+        TimerReport(unsigned num_clients, unsigned num_conns, unsigned msgtime, unsigned bs)
+          : num_clients{num_clients},
+            num_conns{num_conns},
             msgtime{msgtime},
             bytes_of_block{bs},
-            snaps{jobs},
-            summaries{jobs} {}
+            snaps{num_clients * num_conns},
+            summaries{num_clients * num_conns},
+            client_reactor_utilizations(num_clients) {}
 
         unsigned get_elapsed() const { return elapsed; }
 
-        PeriodStats& get_snap_by_job(unsigned client_id) {
-          return snaps[client_id];
+        PeriodStats& get_snap(unsigned client_id, unsigned i) {
+          return snaps[client_id * num_conns + i];
         }
 
-        ConnStats& get_summary_by_job(unsigned client_id) {
-          return summaries[client_id];
+        ConnStats& get_summary(unsigned client_id, unsigned i) {
+          return summaries[client_id * num_conns + i];
+        }
+
+        void set_client_reactor_utilization(unsigned client_id, double ru) {
+          client_reactor_utilizations[client_id] = ru;
         }
 
         void set_server_reactor_utilization(double ru) {
@@ -608,7 +697,7 @@ static seastar::future<> run(
             sampled_count += snap.sampled_count;
             sampled_total_lat_s += snap.sampled_total_lat_s;
           }
-          double elapsed_s = elapsed_d.count() / jobs;
+          double elapsed_s = elapsed_d.count() / (num_clients * num_conns);
           double iops = ops/elapsed_s;
           std::ostringstream sout;
           sout << setfill(' ')
@@ -621,8 +710,8 @@ static seastar::future<> run(
           if (server_reactor_utilization.has_value()) {
             sout << *server_reactor_utilization << " -- ";
           }
-          for (const auto& snap : snaps) {
-            sout << snap.reactor_utilization << ",";
+          for (double cru : client_reactor_utilizations) {
+            sout << cru << ",";
           }
           std::cout << sout.str() << std::endl;
         }
@@ -638,7 +727,7 @@ static seastar::future<> run(
             sampled_count += summary.sampled_count;
             sampled_total_lat_s += summary.sampled_total_lat_s;
           }
-          double elapsed_s = elapsed_d.count() / jobs;
+          double elapsed_s = elapsed_d.count() / (num_clients * num_conns);
           double iops = ops / elapsed_s;
           std::ostringstream sout;
           sout << "--------------"
@@ -658,11 +747,15 @@ static seastar::future<> run(
       seastar::future<> report_period(TimerReport& report) {
         return container().invoke_on_all([&report] (auto& client) {
           if (client.is_active()) {
-            PeriodStats& snap = report.get_snap_by_job(client.id);
-            client.period_stats.reset_period(
-                client.conn_stats.received_count,
-                client.get_current_depth(),
-                snap);
+            for (unsigned i = 0; i < client.num_conns; ++i) {
+              auto &conn_state = client.conn_states[i];
+              PeriodStats& snap = report.get_snap(client.id, i);
+              conn_state.period_stats.reset_period(
+                  conn_state.conn_stats.received_count,
+                  client.nr_depth - conn_state.get_current_units(),
+                  snap);
+            }
+            report.set_client_reactor_utilization(client.id, get_reactor_utilization());
           }
           if (client.server_sid.has_value() &&
               seastar::this_shard_id() == *client.server_sid) {
@@ -677,8 +770,11 @@ static seastar::future<> run(
       seastar::future<> report_summary(TimerReport& report) {
         return container().invoke_on_all([&report] (auto& client) {
           if (client.is_active()) {
-            ConnStats& summary = report.get_summary_by_job(client.id);
-            summary.prepare_summary(client.conn_stats);
+            for (unsigned i = 0; i < client.num_conns; ++i) {
+              auto &conn_state = client.conn_states[i];
+              ConnStats& summary = report.get_summary(client.id, i);
+              summary.prepare_summary(conn_state.conn_stats);
+            }
           }
         }).then([&report] {
           report.report_summary();
@@ -687,10 +783,13 @@ static seastar::future<> run(
 
      public:
       seastar::future<> dispatch_with_timer(unsigned ramptime, unsigned msgtime) {
-        logger().info("[all clients]: start sending MOSDOps from {} clients", jobs);
+        logger().info("[all clients]: start sending MOSDOps from {} clients * {} conns",
+                      num_clients, num_conns);
         return container().invoke_on_all([] (auto& client) {
           if (client.is_active()) {
-            client.do_dispatch_messages(client.active_conn.get());
+            for (unsigned i = 0; i < client.num_conns; ++i) {
+              client.do_dispatch_messages(i);
+            }
           }
         }).then([ramptime] {
           logger().info("[all clients]: ramping up {} seconds...", ramptime);
@@ -698,14 +797,18 @@ static seastar::future<> run(
         }).then([this] {
           return container().invoke_on_all([] (auto& client) {
             if (client.is_active()) {
-              client.conn_stats.start_collect();
-              client.period_stats.start_collect(client.conn_stats.received_count);
+              for (unsigned i = 0; i < client.num_conns; ++i) {
+                auto &conn_state = client.conn_states[i];
+                conn_state.conn_stats.start_collect();
+                conn_state.period_stats.start_collect(conn_state.conn_stats.received_count);
+              }
             }
           });
         }).then([this, msgtime] {
           logger().info("[all clients]: reporting {} seconds...\n", msgtime);
           return seastar::do_with(
-              TimerReport(jobs, msgtime, msg_len), [this] (auto& report) {
+              TimerReport(num_clients, num_conns, msgtime, msg_len),
+              [this](auto& report) {
             report.report_header();
             return seastar::do_until(
               [&report] { return report.should_stop(); },
@@ -735,9 +838,11 @@ static seastar::future<> run(
       }
 
      private:
-      seastar::future<> send_msg(crimson::net::Connection* conn) {
+      seastar::future<> send_msg(ConnState &conn_state) {
         ceph_assert(seastar::this_shard_id() == sid);
-        return depth.wait(1).then([this, conn] {
+        conn_state.sent_count += 1;
+        return conn_state.depth.wait(1
+        ).then([this, &conn_state] {
           const static pg_t pgid;
           const static object_locator_t oloc;
           const static hobject_t hobj(object_t(), oloc.key, CEPH_NOSNAP, pgid.ps(),
@@ -747,59 +852,66 @@ static seastar::future<> run(
           bufferlist data(msg_data);
           m->write(0, msg_len, data);
           // use tid as the identity of each round
-          m->set_tid(sent_count);
+          m->set_tid(conn_state.sent_count);
 
           // sample message latency
-          if (unlikely(sent_count % SAMPLE_RATE == 0)) {
-            auto index = sent_count % time_msgs_sent.size();
-            ceph_assert(time_msgs_sent[index] == mono_clock::zero());
-            time_msgs_sent[index] = mono_clock::now();
+          if (unlikely(conn_state.sent_count % SAMPLE_RATE == 0)) {
+            auto index = conn_state.sent_count % conn_state.time_msgs_sent.size();
+            ceph_assert(conn_state.time_msgs_sent[index] == mono_clock::zero());
+            conn_state.time_msgs_sent[index] = mono_clock::now();
           }
 
-          return conn->send(std::move(m));
+          return conn_state.active_conn->send(std::move(m));
         });
       }
 
       class DepthBroken: public std::exception {};
 
-      seastar::future<ClientStats> stop_dispatch_messages() {
-        stop_send = true;
-        depth.broken(DepthBroken());
-        return stopped_send_promise.get_future();
+      seastar::future<JobReport> stop_dispatch_messages(unsigned i) {
+        auto &conn_state = conn_states[i];
+        conn_state.stop_send = true;
+        conn_state.depth.broken(DepthBroken());
+        return conn_state.stopped_send_promise.get_future();
       }
 
-      void do_dispatch_messages(crimson::net::Connection* conn) {
+      void do_dispatch_messages(unsigned i) {
         ceph_assert(seastar::this_shard_id() == sid);
-        ceph_assert(sent_count == 0);
-        conn_stats.start_time = mono_clock::now();
+        auto &conn_state = conn_states[i];
+        ceph_assert(conn_state.sent_count == 0);
+        conn_state.conn_stats.start_time = mono_clock::now();
         // forwarded to stopped_send_promise
         (void) seastar::do_until(
-          [this] { return stop_send; },
-          [this, conn] {
-            sent_count += 1;
-            return send_msg(conn);
-          }
+          [&conn_state] { return conn_state.stop_send; },
+          [this, &conn_state] { return send_msg(conn_state); }
         ).handle_exception_type([] (const DepthBroken& e) {
           // ok, stopped by stop_dispatch_messages()
-        }).then([this, conn] {
-          logger().info("{}: stopped sending OSDOPs", *conn);
-
-          std::chrono::duration<double> dur_conn = conn_stats.connected_time - conn_stats.connecting_time;
-          std::chrono::duration<double> dur_msg = mono_clock::now() - conn_stats.start_time;
-          unsigned ops = conn_stats.received_count - conn_stats.start_count;
-
-          ClientStats stats;
-          stats.name = lname;
+        }).then([this, &conn_state, i] {
+          std::string name = get_name(i);
+          logger().info("{} {}: stopped sending OSDOPs",
+                        name, *conn_state.active_conn);
+
+          std::chrono::duration<double> dur_conn =
+              conn_state.conn_stats.connected_time -
+              conn_state.conn_stats.connecting_time;
+          std::chrono::duration<double> dur_msg =
+              mono_clock::now() - conn_state.conn_stats.start_time;
+          unsigned ops =
+              conn_state.conn_stats.received_count -
+              conn_state.conn_stats.start_count;
+
+          JobReport stats;
+          stats.name = name;
           stats.depth = nr_depth;
           stats.connect_time_s = dur_conn.count();
           stats.total_msgs = ops;
           stats.messaging_time_s = dur_msg.count();
           stats.latency_ms =
-            conn_stats.sampled_total_lat_s / conn_stats.sampled_count * 1000;
+              conn_state.conn_stats.sampled_total_lat_s /
+              conn_state.conn_stats.sampled_count * 1000;
           stats.iops = ops / dur_msg.count();
           stats.throughput_mbps = ops / dur_msg.count() * msg_len / 1048576;
 
-          stopped_send_promise.set_value(stats);
+          conn_state.stopped_send_promise.set_value(stats);
         });
       }
     };
@@ -819,7 +931,8 @@ static seastar::future<> run(
         server_conf.block_size,
         server_needs_report),
       create_sharded<test_state::Client>(
-        client_conf.jobs,
+        client_conf.num_clients,
+        client_conf.num_conns,
         client_conf.block_size,
         client_conf.depth,
         server_sid),
@@ -839,9 +952,9 @@ static seastar::future<> run(
     if (mode == perf_mode_t::both) {
       logger().info("\nperf settings:\n  smp={}\n  {}\n  {}\n",
                     seastar::smp::count, client_conf.str(), server_conf.str());
-      ceph_assert(seastar::smp::count > client_conf.jobs);
-      ceph_assert(client_conf.jobs > 0);
-      ceph_assert(seastar::smp::count > server_conf.core + client_conf.jobs);
+      ceph_assert(seastar::smp::count > client_conf.num_clients);
+      ceph_assert(client_conf.num_clients > 0);
+      ceph_assert(seastar::smp::count > server_conf.core + client_conf.num_clients);
       return seastar::when_all_succeed(
         server->init(server_conf.addr),
         client->init()
@@ -858,8 +971,8 @@ static seastar::future<> run(
     } else if (mode == perf_mode_t::client) {
       logger().info("\nperf settings:\n  smp={}\n  {}\n",
                     seastar::smp::count, client_conf.str());
-      ceph_assert(seastar::smp::count > client_conf.jobs);
-      ceph_assert(client_conf.jobs > 0);
+      ceph_assert(seastar::smp::count > client_conf.num_clients);
+      ceph_assert(client_conf.num_clients > 0);
       return client->init(
       ).then([client, addr = client_conf.server_addr] {
         return client->connect_wait_verify(addr);
@@ -900,8 +1013,10 @@ int main(int argc, char** argv)
      "seconds of client ramp-up time")
     ("msgtime", bpo::value<unsigned>()->default_value(15),
      "seconds of client messaging time")
-    ("client-jobs", bpo::value<unsigned>()->default_value(1),
-     "number of client jobs (messengers)")
+    ("clients", bpo::value<unsigned>()->default_value(1),
+     "number of client messengers")
+    ("conns-per-client", bpo::value<unsigned>()->default_value(1),
+     "number of connections per client")
     ("client-bs", bpo::value<unsigned>()->default_value(4096),
      "client block size")
     ("depth", bpo::value<unsigned>()->default_value(512),