*
*/
+#include <atomic>
#include <iostream>
#include <unistd.h>
#include <stdlib.h>
class FakeDispatcher : public Dispatcher {
public:
struct Session : public RefCountedObject {
- Mutex lock;
- uint64_t count;
+ atomic<uint64_t> count;
ConnectionRef con;
- explicit Session(ConnectionRef c): RefCountedObject(g_ceph_context), lock("FakeDispatcher::Session::lock"), count(0), con(c) {
+ explicit Session(ConnectionRef c): RefCountedObject(g_ceph_context), count(0), con(c) {
}
uint64_t get_count() { return count; }
};
lock.Unlock();
}
void ms_handle_fast_accept(Connection *con) {
- Mutex::Locker l(lock);
Session *s = static_cast<Session*>(con->get_priv());
if (!s) {
s = new Session(con);
s->put();
}
bool ms_dispatch(Message *m) {
- Mutex::Locker l(lock);
Session *s = static_cast<Session*>(m->get_connection()->get_priv());
if (!s) {
s = new Session(m->get_connection());
m->get_connection()->set_priv(s->get());
}
s->put();
- Mutex::Locker l1(s->lock);
s->count++;
lderr(g_ceph_context) << __func__ << " conn: " << m->get_connection() << " session " << s << " count: " << s->count << dendl;
if (is_server) {
reply_message(m);
}
+ Mutex::Locker l(lock);
got_new = true;
cond.Signal();
m->put();
got_remote_reset = true;
}
void ms_fast_dispatch(Message *m) {
- Mutex::Locker l(lock);
Session *s = static_cast<Session*>(m->get_connection()->get_priv());
if (!s) {
s = new Session(m->get_connection());
m->get_connection()->set_priv(s->get());
}
s->put();
- Mutex::Locker l1(s->lock);
s->count++;
lderr(g_ceph_context) << __func__ << " conn: " << m->get_connection() << " session " << s << " count: " << s->count << dendl;
if (is_server) {
} else if (loopback) {
assert(m->get_source().is_client());
}
+ m->put();
+ Mutex::Locker l(lock);
got_new = true;
cond.Signal();
- m->put();
}
bool ms_verify_authorizer(Connection *con, int peer_type, int protocol,
return ;
}
- Mutex::Locker l(lock);
uint64_t i;
bool reply;
assert(m->get_middle().length());
if (reply) {
lderr(g_ceph_context) << __func__ << " conn=" << m->get_connection() << " reply=" << reply << " i=" << i << dendl;
reply_message(m, i);
- } else if (sent.count(i)) {
- lderr(g_ceph_context) << __func__ << " conn=" << m->get_connection() << " reply=" << reply << " i=" << i << dendl;
- ASSERT_EQ(conn_sent[m->get_connection()].front(), i);
- ASSERT_TRUE(m->get_data().contents_equal(sent[i]));
- conn_sent[m->get_connection()].pop_front();
- sent.erase(i);
+ m->put();
+ Mutex::Locker l(lock);
+ got_new = true;
+ cond.Signal();
+ } else {
+ Mutex::Locker l(lock);
+ if (sent.count(i)) {
+ lderr(g_ceph_context) << __func__ << " conn=" << m->get_connection() << " reply=" << reply << " i=" << i << dendl;
+ ASSERT_EQ(conn_sent[m->get_connection()].front(), i);
+ ASSERT_TRUE(m->get_data().contents_equal(sent[i]));
+ conn_sent[m->get_connection()].pop_front();
+ sent.erase(i);
+ }
+ m->put();
+ got_new = true;
+ cond.Signal();
}
- got_new = true;
- cond.Signal();
- m->put();
}
bool ms_verify_authorizer(Connection *con, int peer_type, int protocol,