session->con->send_message2(std::move(r));
}
-MClientRequest::ref Client::build_client_request(MetaRequest *request)
+ref_t<MClientRequest> Client::build_client_request(MetaRequest *request)
{
auto req = MClientRequest::create(request->get_op());
req->set_tid(request->tid);
std::set<ceph_tid_t> flushing_caps_tids;
std::set<Inode*> early_flushing_caps;
- MClientCapRelease::ref release;
+ ceph::ref_t<MClientCapRelease> release;
MetaSession(mds_rank_t mds_num, ConnectionRef con,
const entity_addrvec_t& addrs)
return m->get_type() == MSG_MDS_BEACON;
}
-void Beacon::ms_fast_dispatch2(const Message::ref& m)
+void Beacon::ms_fast_dispatch2(const ref_t<Message>& m)
{
bool handled = ms_dispatch2(m);
ceph_assert(handled);
}
-bool Beacon::ms_dispatch2(const Message::ref& m)
+bool Beacon::ms_dispatch2(const ref_t<Message>& m)
{
if (m->get_type() == MSG_MDS_BEACON) {
if (m->get_connection()->get_peer_type() == CEPH_ENTITY_TYPE_MON) {
bool ms_can_fast_dispatch_any() const override { return true; }
bool ms_can_fast_dispatch2(const cref_t<Message>& m) const override;
- void ms_fast_dispatch2(const Message::ref& m) override;
- bool ms_dispatch2(const Message::ref &m) override;
+ void ms_fast_dispatch2(const ref_t<Message>& m) override;
+ bool ms_dispatch2(const ref_t<Message> &m) override;
void ms_handle_connect(Connection *c) override {}
bool ms_handle_reset(Connection *c) override {return false;}
void ms_handle_remote_reset(Connection *c) override {}
return valid;
}
-void CInode::encode_cap_message(const MClientCaps::ref &m, Capability *cap)
+void CInode::encode_cap_message(const ref_t<MClientCaps> &m, Capability *cap)
{
ceph_assert(cap);
int encode_inodestat(bufferlist& bl, Session *session, SnapRealm *realm,
snapid_t snapid=CEPH_NOSNAP, unsigned max_bytes=0,
int getattr_wants=0);
- void encode_cap_message(const MClientCaps::ref &m, Capability *cap);
+ void encode_cap_message(const ref_t<MClientCaps> &m, Capability *cap);
// -- locks --
MutationRef mut;
unsigned flags;
client_t client;
- MClientCaps::ref ack;
+ ref_t<MClientCaps> ack;
public:
C_Locker_FileUpdate_finish(Locker *l, CInode *i, MutationRef& m, unsigned f,
- const MClientCaps::ref &ack, client_t c=-1)
+ const ref_t<MClientCaps> &ack, client_t c=-1)
: LockerLogContext(l), in(i), mut(m), flags(f), client(c), ack(ack) {
in->get(CInode::PIN_PTRWAITER);
}
};
void Locker::file_update_finish(CInode *in, MutationRef& mut, unsigned flags,
- client_t client, const MClientCaps::ref &ack)
+ client_t client, const ref_t<MClientCaps> &ack)
{
dout(10) << "file_update_finish on " << *in << dendl;
in->pop_and_dirty_projected_inode(mut->ls);
mdcache->journal_dirty_inode(mut.get(), metablob, in);
}
mds->mdlog->submit_entry(le, new C_Locker_FileUpdate_finish(this, in, mut,
- UPDATE_SHAREMAX, MClientCaps::ref()));
+ UPDATE_SHAREMAX, ref_t<MClientCaps>()));
wrlock_force(&in->filelock, mut); // wrlock for duration of journal
mut->auth_pin(in);
CInode *sin = mdcache->pick_inode_snap(head_in, snapid - 1);
ceph_assert(sin);
ceph_assert(sin->first <= snapid);
- _do_snap_update(sin, snapid, 0, sin->first - 1, client, MClientCaps::ref(), MClientCaps::ref());
+ _do_snap_update(sin, snapid, 0, sin->first - 1, client, ref_t<MClientCaps>(), ref_t<MClientCaps>());
head_in->remove_need_snapflush(sin, snapid, client);
}
}
session->have_completed_flush(m->get_client_tid())) {
dout(7) << "handle_client_caps already flushed tid " << m->get_client_tid()
<< " for client." << client << dendl;
- MClientCaps::ref ack;
+ ref_t<MClientCaps> ack;
if (op == CEPH_CAP_OP_FLUSHSNAP) {
ack = MClientCaps::create(CEPH_CAP_OP_FLUSHSNAP_ACK, m->get_ino(), 0, 0, 0, 0, 0, dirty, 0, mds->get_osd_epoch_barrier());
} else {
// we can prepare the ack now, since this FLUSHEDSNAP is independent of any
// other cap ops. (except possibly duplicate FLUSHSNAP requests, but worst
// case we get a dup response, so whatever.)
- MClientCaps::ref ack;
+ ref_t<MClientCaps> ack;
if (dirty) {
ack = MClientCaps::create(CEPH_CAP_OP_FLUSHSNAP_ACK, in->ino(), 0, 0, 0, 0, 0, dirty, 0, mds->get_osd_epoch_barrier());
ack->set_snap_follows(follows);
ceph_assert(in->last != CEPH_NOSNAP);
if (in->is_auth() && dirty) {
dout(10) << " updating intermediate snapped inode " << *in << dendl;
- _do_cap_update(in, NULL, dirty, follows, m, MClientCaps::ref());
+ _do_cap_update(in, NULL, dirty, follows, m, ref_t<MClientCaps>());
}
in = mdcache->pick_inode_snap(head_in, in->last);
}
}
// head inode, and cap
- MClientCaps::ref ack;
+ ref_t<MClientCaps> ack;
int caps = m->get_caps();
if (caps & ~cap->issued()) {
/**
* m and ack might be NULL, so don't dereference them unless dirty != 0
*/
-void Locker::_do_snap_update(CInode *in, snapid_t snap, int dirty, snapid_t follows, client_t client, const cref_t<MClientCaps> &m, const MClientCaps::ref &ack)
+void Locker::_do_snap_update(CInode *in, snapid_t snap, int dirty, snapid_t follows, client_t client, const cref_t<MClientCaps> &m, const ref_t<MClientCaps> &ack)
{
dout(10) << "_do_snap_update dirty " << ccap_string(dirty)
<< " follows " << follows << " snap " << snap
*/
bool Locker::_do_cap_update(CInode *in, Capability *cap,
int dirty, snapid_t follows,
- const cref_t<MClientCaps> &m, const MClientCaps::ref &ack,
+ const cref_t<MClientCaps> &m, const ref_t<MClientCaps> &ack,
bool *need_flush)
{
dout(10) << "_do_cap_update dirty " << ccap_string(dirty)
void adjust_cap_wanted(Capability *cap, int wanted, int issue_seq);
void handle_client_caps(const cref_t<MClientCaps> &m);
void _update_cap_fields(CInode *in, int dirty, const cref_t<MClientCaps> &m, CInode::mempool_inode *pi);
- void _do_snap_update(CInode *in, snapid_t snap, int dirty, snapid_t follows, client_t client, const cref_t<MClientCaps> &m, const MClientCaps::ref &ack);
+ void _do_snap_update(CInode *in, snapid_t snap, int dirty, snapid_t follows, client_t client, const cref_t<MClientCaps> &m, const ref_t<MClientCaps> &ack);
void _do_null_snapflush(CInode *head_in, client_t client, snapid_t last=CEPH_NOSNAP);
bool _do_cap_update(CInode *in, Capability *cap, int dirty, snapid_t follows, const cref_t<MClientCaps> &m,
- const MClientCaps::ref &ack, bool *need_flush=NULL);
+ const ref_t<MClientCaps> &ack, bool *need_flush=NULL);
void handle_client_cap_release(const cref_t<MClientCapRelease> &m);
void _do_cap_release(client_t client, inodeno_t ino, uint64_t cap_id, ceph_seq_t mseq, ceph_seq_t seq);
void caps_tick();
void handle_inode_file_caps(const cref_t<MInodeFileCaps> &m);
void file_update_finish(CInode *in, MutationRef& mut, unsigned flags,
- client_t client, const MClientCaps::ref &ack);
+ client_t client, const ref_t<MClientCaps> &ack);
private:
uint64_t calc_new_max_size(CInode::mempool_inode *pi, uint64_t size);
public:
{
dout(10) << "send_slave_resolves" << dendl;
- map<mds_rank_t, MMDSResolve::ref> resolves;
+ map<mds_rank_t, ref_t<MMDSResolve>> resolves;
if (mds->is_resolve()) {
for (map<mds_rank_t, map<metareqid_t, MDSlaveUpdate*> >::iterator p = uncommitted_slave_updates.begin();
return; // not now
}
- map<mds_rank_t, MMDSResolve::ref> resolves;
+ map<mds_rank_t, ref_t<MMDSResolve>> resolves;
for (set<mds_rank_t>::iterator p = recovery_set.begin();
p != recovery_set.end();
++p) {
// send
for (auto &p : resolves) {
- const MMDSResolve::ref &m = p.second;
+ const ref_t<MMDSResolve> &m = p.second;
if (mds->is_resolve()) {
m->add_table_commits(TABLE_SNAP, resolve_snapclient_commits);
} else {
disambiguate_other_imports();
}
- map<mds_rank_t, MMDSCacheRejoin::ref> rejoins;
+ map<mds_rank_t, ref_t<MMDSCacheRejoin>> rejoins;
// if i am rejoining, send a rejoin to everyone.
if (!q.first->is_auth()) {
ceph_assert(q.second == q.first->authority().first);
if (rejoins.count(q.second) == 0) continue;
- const MMDSCacheRejoin::ref &rejoin = rejoins[q.second];
+ const ref_t<MMDSCacheRejoin> &rejoin = rejoins[q.second];
dout(15) << " " << *mdr << " authpin on " << *q.first << dendl;
MDSCacheObjectInfo i;
if (q.is_xlock() && !obj->is_auth()) {
mds_rank_t who = obj->authority().first;
if (rejoins.count(who) == 0) continue;
- const MMDSCacheRejoin::ref &rejoin = rejoins[who];
+ const ref_t<MMDSCacheRejoin> &rejoin = rejoins[who];
dout(15) << " " << *mdr << " xlock on " << *lock << " " << *obj << dendl;
MDSCacheObjectInfo i;
} else if (q.is_remote_wrlock()) {
mds_rank_t who = q.wrlock_target;
if (rejoins.count(who) == 0) continue;
- const MMDSCacheRejoin::ref &rejoin = rejoins[who];
+ const ref_t<MMDSCacheRejoin> &rejoin = rejoins[who];
dout(15) << " " << *mdr << " wrlock on " << *lock << " " << *obj << dendl;
MDSCacheObjectInfo i;
* strong dentries (no connectivity!)
* strong inodes
*/
-void MDCache::rejoin_walk(CDir *dir, const MMDSCacheRejoin::ref &rejoin)
+void MDCache::rejoin_walk(CDir *dir, const ref_t<MMDSCacheRejoin> &rejoin)
{
dout(10) << "rejoin_walk " << *dir << dendl;
mds_rank_t from = mds_rank_t(weak->get_source().num());
// possible response(s)
- MMDSCacheRejoin::ref ack; // if survivor
+ ref_t<MMDSCacheRejoin> ack; // if survivor
set<vinodeno_t> acked_inodes; // if survivor
set<SimpleLock *> gather_locks; // if survivor
bool survivor = false; // am i a survivor?
}
void MDCache::prepare_realm_split(SnapRealm *realm, client_t client, inodeno_t ino,
- map<client_t,MClientSnap::ref>& splits)
+ map<client_t,ref_t<MClientSnap>>& splits)
{
- MClientSnap::ref snap;
+ ref_t<MClientSnap> snap;
auto it = splits.find(client);
if (it != splits.end()) {
snap = it->second;
}
void MDCache::prepare_realm_merge(SnapRealm *realm, SnapRealm *parent_realm,
- map<client_t,MClientSnap::ref>& splits)
+ map<client_t,ref_t<MClientSnap>>& splits)
{
ceph_assert(parent_realm);
}
}
-void MDCache::send_snaps(map<client_t,MClientSnap::ref>& splits)
+void MDCache::send_snaps(map<client_t,ref_t<MClientSnap>>& splits)
{
dout(10) << "send_snaps" << dendl;
realm->open_parents(gather.new_sub())) {
dout(10) << " past parents now open on " << *in << dendl;
- map<client_t,MClientSnap::ref> splits;
+ map<client_t,ref_t<MClientSnap>> splits;
// finish off client snaprealm reconnects?
map<inodeno_t,map<client_t,snapid_t> >::iterator q = reconnected_snaprealms.find(in->ino());
if (q != reconnected_snaprealms.end()) {
}
void MDCache::finish_snaprealm_reconnect(client_t client, SnapRealm *realm, snapid_t seq,
- map<client_t,MClientSnap::ref>& updates)
+ map<client_t,ref_t<MClientSnap>>& updates)
{
if (seq < realm->get_newest_seq()) {
dout(10) << "finish_snaprealm_reconnect client." << client << " has old seq " << seq << " < "
rejoin_unlinked_inodes.clear();
// send acks to everyone in the recovery set
- map<mds_rank_t,MMDSCacheRejoin::ref> acks;
+ map<mds_rank_t,ref_t<MMDSCacheRejoin>> acks;
for (set<mds_rank_t>::iterator p = recovery_set.begin();
p != recovery_set.end();
++p) {
auto from = mds_rank_t(m->get_source().num());
inodeno_t ino = m->ino;
- MMDSOpenInoReply::ref reply;
+ ref_t<MMDSOpenInoReply> reply;
CInode *in = get_inode(ino);
if (in) {
dout(10) << " have " << *in << dendl;
}
set<SnapRealm*> past_children;
- map<client_t, MClientSnap::ref> updates;
+ map<client_t, ref_t<MClientSnap>> updates;
list<SnapRealm*> q;
q.push_back(in->snaprealm);
while (!q.empty()) {
using clock = ceph::coarse_mono_clock;
using time = ceph::coarse_mono_time;
- typedef std::map<mds_rank_t, MCacheExpire::ref> expiremap;
+ typedef std::map<mds_rank_t, ref_t<MCacheExpire>> expiremap;
// my master
MDSRank *mds;
list<SimpleLock*> rejoin_eval_locks;
MDSContext::vec rejoin_waiters;
- void rejoin_walk(CDir *dir, const MMDSCacheRejoin::ref &rejoin);
+ void rejoin_walk(CDir *dir, const ref_t<MMDSCacheRejoin> &rejoin);
void handle_cache_rejoin(const cref_t<MMDSCacheRejoin> &m);
void handle_cache_rejoin_weak(const cref_t<MMDSCacheRejoin> &m);
CInode* rejoin_invent_inode(inodeno_t ino, snapid_t last);
bool process_imported_caps();
void choose_lock_states_and_reconnect_caps();
void prepare_realm_split(SnapRealm *realm, client_t client, inodeno_t ino,
- map<client_t,MClientSnap::ref>& splits);
- void prepare_realm_merge(SnapRealm *realm, SnapRealm *parent_realm, map<client_t,MClientSnap::ref>& splits);
- void send_snaps(map<client_t,MClientSnap::ref>& splits);
+ map<client_t,ref_t<MClientSnap>>& splits);
+ void prepare_realm_merge(SnapRealm *realm, SnapRealm *parent_realm, map<client_t,ref_t<MClientSnap>>& splits);
+ void send_snaps(map<client_t,ref_t<MClientSnap>>& splits);
Capability* rejoin_import_cap(CInode *in, client_t client, const cap_reconnect_t& icr, mds_rank_t frommds);
void finish_snaprealm_reconnect(client_t client, SnapRealm *realm, snapid_t seq,
- map<client_t,MClientSnap::ref>& updates);
+ map<client_t,ref_t<MClientSnap>>& updates);
Capability* try_reconnect_cap(CInode *in, Session *session);
void export_remaining_imported_caps();
-bool MDSDaemon::ms_dispatch2(const Message::ref &m)
+bool MDSDaemon::ms_dispatch2(const ref_t<Message> &m)
{
std::lock_guard l(mds_lock);
if (stopping) {
void wait_for_omap_osds();
private:
- bool ms_dispatch2(const Message::ref &m) override;
+ bool ms_dispatch2(const ref_t<Message> &m) override;
bool ms_get_authorizer(int dest_type, AuthAuthorizer **authorizer) override;
int ms_handle_authentication(Connection *con) override;
KeyStore *ms_get_auth1_authorizer_keystore() override;
active_clients.erase(who);
- list<MMDSTableRequest::ref> rollback;
+ list<ref_t<MMDSTableRequest>> rollback;
for (auto p = pending_notifies.begin(); p != pending_notifies.end(); ) {
auto q = p++;
if (q->second.mds == who) {
struct notify_info_t {
set<mds_rank_t> notify_ack_gather;
mds_rank_t mds;
- MMDSTableRequest::ref reply;
+ ref_t<MMDSTableRequest> reply;
MDSContext *onfinish;
notify_info_t() : reply(NULL), onfinish(NULL) {}
};
ceph_abort();
}
-void Server::finish_reclaim_session(Session *session, const MClientReclaimReply::ref &reply)
+void Server::finish_reclaim_session(Session *session, const ref_t<MClientReclaimReply> &reply)
{
Session *target = session->reclaiming_from;
if (target) {
* include a trace to tracei
* Clean up mdr
*/
-void Server::reply_client_request(MDRequestRef& mdr, const MClientReply::ref &reply)
+void Server::reply_client_request(MDRequestRef& mdr, const ref_t<MClientReply> &reply)
{
ceph_assert(mdr.get());
const cref_t<MClientRequest> &req = mdr->client_request;
*
* trace is in reverse order (i.e. root inode comes last)
*/
-void Server::set_trace_dist(Session *session, const MClientReply::ref &reply,
+void Server::set_trace_dist(Session *session, const ref_t<MClientReply> &reply,
CInode *in, CDentry *dn,
snapid_t snapid,
int dentry_wanted,
struct C_MDS_LoggedLinkRollback : public ServerLogContext {
MutationRef mut;
- map<client_t,MClientSnap::ref> splits;
+ map<client_t,ref_t<MClientSnap>> splits;
C_MDS_LoggedLinkRollback(Server *s, MutationRef& m, MDRequestRef& r,
- map<client_t,MClientSnap::ref>&& _splits) :
+ map<client_t,ref_t<MClientSnap>>&& _splits) :
ServerLogContext(s, r), mut(m), splits(std::move(_splits)) {
}
void finish(int r) override {
else
pi.inode.nlink++;
- map<client_t,MClientSnap::ref> splits;
+ map<client_t,ref_t<MClientSnap>> splits;
if (rollback.snapbl.length() && in->snaprealm) {
bool hadrealm;
auto p = rollback.snapbl.cbegin();
}
void Server::_link_rollback_finish(MutationRef& mut, MDRequestRef& mdr,
- map<client_t,MClientSnap::ref>& splits)
+ map<client_t,ref_t<MClientSnap>>& splits)
{
dout(10) << "_link_rollback_finish" << dendl;
dout(10) << "_logged_slave_rename " << *mdr << dendl;
// prepare ack
- MMDSSlaveRequest::ref reply;
+ ref_t<MMDSSlaveRequest> reply;
if (!mdr->aborted) {
reply = MMDSSlaveRequest::create(mdr->reqid, mdr->attempt, MMDSSlaveRequest::OP_RENAMEPREPACK);
if (!mdr->more()->slave_update_journaled)
version_t srcdnpv;
CDentry *destdn;
CDentry *straydn;
- map<client_t,MClientSnap::ref> splits[2];
+ map<client_t,ref_t<MClientSnap>> splits[2];
bool finish_mdr;
C_MDS_LoggedRenameRollback(Server *s, MutationRef& m, MDRequestRef& r,
CDentry *sd, version_t pv, CDentry *dd, CDentry *st,
- map<client_t,MClientSnap::ref> _splits[2], bool f) :
+ map<client_t,ref_t<MClientSnap>> _splits[2], bool f) :
ServerLogContext(s, r), mut(m), srcdn(sd), srcdnpv(pv), destdn(dd),
straydn(st), finish_mdr(f) {
splits[0].swap(_splits[0]);
rollback.orig_src.remote_d_type);
}
- map<client_t,MClientSnap::ref> splits[2];
+ map<client_t,ref_t<MClientSnap>> splits[2];
CInode::mempool_inode *pip = nullptr;
if (in) {
void Server::_rename_rollback_finish(MutationRef& mut, MDRequestRef& mdr, CDentry *srcdn,
version_t srcdnpv, CDentry *destdn, CDentry *straydn,
- map<client_t,MClientSnap::ref> splits[2], bool finish_mdr)
+ map<client_t,ref_t<MClientSnap>> splits[2], bool finish_mdr)
{
dout(10) << "_rename_rollback_finish " << mut->reqid << dendl;
size_t get_num_pending_reclaim() const { return client_reclaim_gather.size(); }
Session *find_session_by_uuid(std::string_view uuid);
void reclaim_session(Session *session, const cref_t<MClientReclaim> &m);
- void finish_reclaim_session(Session *session, const MClientReclaimReply::ref &reply=nullptr);
+ void finish_reclaim_session(Session *session, const ref_t<MClientReclaimReply> &reply=nullptr);
void handle_client_reclaim(const cref_t<MClientReclaim> &m);
void reconnect_clients(MDSContext *reconnect_done_);
void perf_gather_op_latency(const cref_t<MClientRequest> &req, utime_t lat);
void early_reply(MDRequestRef& mdr, CInode *tracei, CDentry *tracedn);
void respond_to_request(MDRequestRef& mdr, int r = 0);
- void set_trace_dist(Session *session, const MClientReply::ref &reply, CInode *in, CDentry *dn,
+ void set_trace_dist(Session *session, const ref_t<MClientReply> &reply, CInode *in, CDentry *dn,
snapid_t snapid,
int num_dentries_wanted,
MDRequestRef& mdr);
void handle_slave_link_prep_ack(MDRequestRef& mdr, const cref_t<MMDSSlaveRequest> &m);
void do_link_rollback(bufferlist &rbl, mds_rank_t master, MDRequestRef& mdr);
void _link_rollback_finish(MutationRef& mut, MDRequestRef& mdr,
- map<client_t,MClientSnap::ref>& split);
+ map<client_t,ref_t<MClientSnap>>& split);
// unlink
void handle_client_unlink(MDRequestRef& mdr);
void _commit_slave_rename(MDRequestRef& mdr, int r, CDentry *srcdn, CDentry *destdn, CDentry *straydn);
void do_rename_rollback(bufferlist &rbl, mds_rank_t master, MDRequestRef& mdr, bool finish_mdr=false);
void _rename_rollback_finish(MutationRef& mut, MDRequestRef& mdr, CDentry *srcdn, version_t srcdnpv,
- CDentry *destdn, CDentry *staydn, map<client_t,MClientSnap::ref> splits[2],
+ CDentry *destdn, CDentry *staydn, map<client_t,ref_t<MClientSnap>> splits[2],
bool finish_mdr);
void evict_cap_revoke_non_responders();
const std::set <std::string> &changed);
private:
- void reply_client_request(MDRequestRef& mdr, const MClientReply::ref &reply);
+ void reply_client_request(MDRequestRef& mdr, const ref_t<MClientReply> &reply);
void flush_session(Session *session, MDSGatherBuilder *gather);
DecayCounter recall_throttle;
entity_addr_t socket_addr;
xlist<Session*>::item item_session_list;
- list<Message::ref> preopen_out_queue; ///< messages for client, queued before they connect
+ list<ref_t<Message>> preopen_out_queue; ///< messages for client, queued before they connect
elist<MDRequestImpl*> requests;
size_t get_request_count();
break;
}
case TABLESERVER_OP_COMMIT:
- server->_commit(tid, MMDSTableRequest::ref());
+ server->_commit(tid, ref_t<MMDSTableRequest>());
server->_note_commit(tid, true);
break;
case TABLESERVER_OP_ROLLBACK:
return (now - marrival.begin()->first);
}
-uint64_t DispatchQueue::pre_dispatch(const Message::ref& m)
+uint64_t DispatchQueue::pre_dispatch(const ref_t<Message>& m)
{
ldout(cct,1) << "<== " << m->get_source_inst()
<< " " << m->get_seq()
return msize;
}
-void DispatchQueue::post_dispatch(const Message::ref& m, uint64_t msize)
+void DispatchQueue::post_dispatch(const ref_t<Message>& m, uint64_t msize)
{
dispatch_throttle_release(msize);
ldout(cct,20) << "done calling dispatch on " << m << dendl;
return msgr->ms_can_fast_dispatch(m);
}
-void DispatchQueue::fast_dispatch(const Message::ref& m)
+void DispatchQueue::fast_dispatch(const ref_t<Message>& m)
{
uint64_t msize = pre_dispatch(m);
msgr->ms_fast_dispatch(m);
post_dispatch(m, msize);
}
-void DispatchQueue::fast_preprocess(const Message::ref& m)
+void DispatchQueue::fast_preprocess(const ref_t<Message>& m)
{
msgr->ms_fast_preprocess(m);
}
-void DispatchQueue::enqueue(const Message::ref& m, int priority, uint64_t id)
+void DispatchQueue::enqueue(const ref_t<Message>& m, int priority, uint64_t id)
{
Mutex::Locker l(lock);
if (stop) {
cond.Signal();
}
-void DispatchQueue::local_delivery(const Message::ref& m, int priority)
+void DispatchQueue::local_delivery(const ref_t<Message>& m, int priority)
{
m->set_recv_stamp(ceph_clock_now());
Mutex::Locker l(local_delivery_lock);
auto p = std::move(local_messages.front());
local_messages.pop();
local_delivery_lock.Unlock();
- const Message::ref& m = p.first;
+ const ref_t<Message>& m = p.first;
int priority = p.second;
fast_preprocess(m);
if (can_fast_dispatch(m)) {
ceph_abort();
}
} else {
- const Message::ref& m = qitem.get_message();
+ const ref_t<Message>& m = qitem.get_message();
if (stop) {
ldout(cct,10) << " stop flag set, discarding " << m << " " << *m << dendl;
} else {
i != removed.end();
++i) {
ceph_assert(!(i->is_code())); // We don't discard id 0, ever!
- const Message::ref& m = i->get_message();
+ const ref_t<Message>& m = i->get_message();
remove_arrival(m);
dispatch_throttle_release(m->get_dispatch_throttle_size());
}
}
// make message
- Message::ref m;
+ ref_t<Message> m;
int type = header.type;
switch (type) {
* @param m The Message we are fast dispatching.
* If none of our Dispatchers can handle it, ceph_abort().
*/
- void ms_fast_dispatch(const Message::ref &m) {
+ void ms_fast_dispatch(const ref_t<Message> &m) {
m->set_dispatch_stamp(ceph_clock_now());
for (const auto &dispatcher : fast_dispatchers) {
if (dispatcher->ms_can_fast_dispatch2(m)) {
ceph_abort();
}
void ms_fast_dispatch(Message *m) {
- return ms_fast_dispatch(Message::ref(m, false)); /* consume ref */
+ return ms_fast_dispatch(ref_t<Message>(m, false)); /* consume ref */
}
/**
*
*/
- void ms_fast_preprocess(const Message::ref &m) {
+ void ms_fast_preprocess(const ref_t<Message> &m) {
for (const auto &dispatcher : fast_dispatchers) {
dispatcher->ms_fast_preprocess2(m);
}
*
* @param m The Message to deliver.
*/
- void ms_deliver_dispatch(const Message::ref &m) {
+ void ms_deliver_dispatch(const ref_t<Message> &m) {
m->set_dispatch_stamp(ceph_clock_now());
for (const auto &dispatcher : dispatchers) {
if (dispatcher->ms_dispatch2(m))
ceph_assert(!cct->_conf->ms_die_on_unhandled_msg);
}
void ms_deliver_dispatch(Message *m) {
- return ms_deliver_dispatch(Message::ref(m, false)); /* consume ref */
+ return ms_deliver_dispatch(ref_t<Message>(m, false)); /* consume ref */
}
/**
* Notify each Dispatcher of a new Connection. Call
template<class T>
class MessageDencoderImpl : public Dencoder {
- typename T::ref m_object;
- list<typename T::ref> m_list;
+ ref_t<T> m_object;
+ list<ref_t<T>> m_list;
public:
MessageDencoderImpl() : m_object(T::create()) {}
auto p = bl.cbegin();
p.seek(seek);
try {
- Message::ref n(decode_message(g_ceph_context, 0, p), false);
+ ref_t<Message> n(decode_message(g_ceph_context, 0, p), false);
if (!n)
throw std::runtime_error("failed to decode");
if (n->get_type() != m_object->get_type()) {