getpid());
Messenger *ms_cluster = Messenger::create(g_ceph_context, g_conf->ms_type,
entity_name_t::OSD(whoami), "cluster",
- getpid());
+ getpid(), CEPH_FEATURES_ALL);
Messenger *ms_hbclient = Messenger::create(g_ceph_context, g_conf->ms_type,
entity_name_t::OSD(whoami), "hbclient",
getpid());
Messenger *Messenger::create(CephContext *cct, const string &type,
entity_name_t name, string lname,
- uint64_t nonce)
+ uint64_t nonce, uint64_t features)
{
int r = -1;
if (type == "random")
r = rand() % 2; // random does not include xio
if (r == 0 || type == "simple")
- return new SimpleMessenger(cct, name, lname, nonce);
+ return new SimpleMessenger(cct, name, lname, nonce, features);
else if ((r == 1 || type == "async") &&
cct->check_experimental_feature_enabled("ms-type-async"))
- return new AsyncMessenger(cct, name, lname, nonce);
+ return new AsyncMessenger(cct, name, lname, nonce, features);
#ifdef HAVE_XIO
else if ((type == "xio") &&
cct->check_experimental_feature_enabled("ms-type-xio"))
- return new XioMessenger(cct, name, lname, nonce);
+ return new XioMessenger(cct, name, lname, nonce, features);
#endif
lderr(cct) << "unrecognized ms_type '" << type << "'" << dendl;
return NULL;
* @param name entity name to register
* @param lname logical name of the messenger in this process (e.g., "client")
* @param nonce nonce value to uniquely identify this instance on the current host
+ * @param features bits for the local connection
*/
static Messenger *create(CephContext *cct,
const string &type,
entity_name_t name,
string lname,
- uint64_t nonce);
+ uint64_t nonce,
+ uint64_t features = 0);
/**
* @defgroup Accessors
*/
AsyncMessenger::AsyncMessenger(CephContext *cct, entity_name_t name,
- string mname, uint64_t _nonce)
+ string mname, uint64_t _nonce, uint64_t features)
: SimplePolicyMessenger(cct, name,mname, _nonce),
processor(this, cct, _nonce),
lock("AsyncMessenger::lock"),
cct->lookup_or_create_singleton_object<WorkerPool>(pool, WorkerPool::name);
Worker *w = pool->get_worker();
local_connection = new AsyncConnection(cct, this, &w->center, w->get_perf_counter());
+ local_features = features;
init_local_connection();
}
* be a value that will be repeated if the daemon restarts.
*/
AsyncMessenger(CephContext *cct, entity_name_t name,
- string mname, uint64_t _nonce);
+ string mname, uint64_t _nonce, uint64_t features);
/**
* Destroy the AsyncMessenger. Pretty simple since all the work is done
assert(lock.is_locked());
local_connection->peer_addr = my_inst.addr;
local_connection->peer_type = my_inst.name.type();
+ local_connection->set_features(local_features);
ms_deliver_handle_fast_connect(local_connection.get());
}
/// con used for sending messages to ourselves
ConnectionRef local_connection;
+ uint64_t local_features;
/**
* @defgroup AsyncMessenger internals
*/
SimpleMessenger::SimpleMessenger(CephContext *cct, entity_name_t name,
- string mname, uint64_t _nonce)
+ string mname, uint64_t _nonce, uint64_t features)
: SimplePolicyMessenger(cct, name,mname, _nonce),
accepter(this, _nonce),
dispatch_queue(cct, this),
local_connection(new PipeConnection(cct, this))
{
ceph_spin_init(&global_seq_lock);
+ local_features = features;
init_local_connection();
}
{
local_connection->peer_addr = my_inst.addr;
local_connection->peer_type = my_inst.name.type();
+ local_connection->set_features(local_features);
ms_deliver_handle_fast_connect(local_connection.get());
}
* @param name The name to assign ourselves
* _nonce A unique ID to use for this SimpleMessenger. It should not
* be a value that will be repeated if the daemon restarts.
+ * features The local features bits for the local_connection
*/
SimpleMessenger(CephContext *cct, entity_name_t name,
- string mname, uint64_t _nonce);
+ string mname, uint64_t _nonce, uint64_t features);
/**
* Destroy the SimpleMessenger. Pretty simple since all the work is done
/// con used for sending messages to ourselves
ConnectionRef local_connection;
+ uint64_t local_features;
/**
* @defgroup SimpleMessenger internals
int get_proto_version(int peer_type, bool connect);
/**
- * Fill in the address and peer type for the local connection, which
+ * Fill in the features, address and peer type for the local connection, which
* is used for delivering messages back to ourself.
*/
void init_local_connection();
/* XioMessenger */
XioMessenger::XioMessenger(CephContext *cct, entity_name_t name,
string mname, uint64_t _nonce,
- DispatchStrategy *ds)
+ DispatchStrategy *ds, uint64_t features)
: SimplePolicyMessenger(cct, name, mname, _nonce),
nsessions(0),
shutdown_called(false),
/* update class instance count */
nInstances.inc();
+ loop_con.set_features(features);
+
} /* ctor */
int XioMessenger::pool_hint(uint32_t dsize) {
public:
XioMessenger(CephContext *cct, entity_name_t name,
string mname, uint64_t nonce,
- DispatchStrategy* ds = new QueueStrategy(1));
+ DispatchStrategy* ds = new QueueStrategy(1), uint64_t features);
virtual ~XioMessenger();