objecter = new Objecter(cct, messenger, monclient,
0, 0);
objecter->set_client_incarnation(0); // client always 0, for now.
- writeback_handler = new ObjecterWriteback(objecter);
+ writeback_handler = new ObjecterWriteback(objecter, &objecter_finisher,
+ &client_lock);
objectcacher = new ObjectCacher(cct, "libcephfs", *writeback_handler, client_lock,
client_flush_set_callback, // all commit callback
(void*)this,
objectcacher->start();
objecter->init();
+
// ok!
+ messenger->add_dispatcher_head(objecter);
messenger->add_dispatcher_head(this);
int r = monclient->init();
}
switch (m->get_type()) {
- // osd
- case CEPH_MSG_OSD_OPREPLY:
- objecter->handle_osd_op_reply((MOSDOpReply*)m);
- break;
- case CEPH_MSG_OSD_MAP:
- objecter->handle_osd_map((class MOSDMap*)m);
- break;
- case CEPH_MSG_STATFS_REPLY:
- objecter->handle_fs_stats_reply((MStatfsReply*)m);
- break;
-
-
// mounting and mds sessions
case CEPH_MSG_MDS_MAP:
handle_mds_map(static_cast<MMDSMap*>(m));
void Client::ms_handle_connect(Connection *con)
{
ldout(cct, 10) << "ms_handle_connect on " << con->get_peer_addr() << dendl;
- Mutex::Locker l(client_lock);
- objecter->ms_handle_connect(con);
}
bool Client::ms_handle_reset(Connection *con)
{
ldout(cct, 0) << "ms_handle_reset on " << con->get_peer_addr() << dendl;
- Mutex::Locker l(client_lock);
- objecter->ms_handle_reset(con);
return false;
}
ldout(cct, 0) << "ms_handle_remote_reset on " << con->get_peer_addr() << dendl;
Mutex::Locker l(client_lock);
switch (con->get_peer_type()) {
- case CEPH_ENTITY_TYPE_OSD:
- objecter->ms_handle_remote_reset(con);
- break;
-
case CEPH_ENTITY_TYPE_MDS:
{
// kludge to figure out which mds this is; fixme with a Connection* state
class ObjecterWriteback : public WritebackHandler {
public:
- ObjecterWriteback(Objecter *o) : m_objecter(o) {}
+ ObjecterWriteback(Objecter *o, Finisher *fin, Mutex *lock)
+ : m_objecter(o),
+ m_finisher(fin),
+ m_lock(lock) { }
virtual ~ObjecterWriteback() {}
virtual void read(const object_t& oid, const object_locator_t& oloc,
bufferlist *pbl, uint64_t trunc_size, __u32 trunc_seq,
Context *onfinish) {
m_objecter->read_trunc(oid, oloc, off, len, snapid, pbl, 0,
- trunc_size, trunc_seq, onfinish);
+ trunc_size, trunc_seq,
+ new C_OnFinisher(new C_Lock(m_lock, onfinish),
+ m_finisher));
}
- virtual bool may_copy_on_write(const object_t& oid, uint64_t read_off, uint64_t read_len, snapid_t snapid) {
+ virtual bool may_copy_on_write(const object_t& oid, uint64_t read_off,
+ uint64_t read_len, snapid_t snapid) {
return false;
}
const bufferlist &bl, utime_t mtime, uint64_t trunc_size,
__u32 trunc_seq, Context *oncommit) {
return m_objecter->write_trunc(oid, oloc, off, len, snapc, bl, mtime, 0,
- trunc_size, trunc_seq, NULL, oncommit);
+ trunc_size, trunc_seq, NULL,
+ new C_OnFinisher(new C_Lock(m_lock, oncommit),
+ m_finisher));
}
virtual ceph_tid_t lock(const object_t& oid, const object_locator_t& oloc, int op,
int flags, Context *onack, Context *oncommit) {
- return m_objecter->lock(oid, oloc, op, flags, onack, oncommit);
+ return m_objecter->lock(oid, oloc, op, flags, onack,
+ new C_OnFinisher(new C_Lock(m_lock, oncommit),
+ m_finisher));
}
private:
Objecter *m_objecter;
+ Finisher *m_finisher;
+ Mutex *m_lock;
};
#endif