if (!mdsmap->is_active(mds)) {
dout(10) << "no address for mds" << mds << ", requesting new mdsmap" << dendl;
- monclient->send_mon_message(new MMDSGetMap(monclient->get_fsid(), mdsmap->get_epoch()));
+ //monclient->send_mon_message(new MMDSGetMap(monclient->get_fsid(), mdsmap->get_epoch()));
waiting_for_mdsmap.push_back(&cond);
cond.Wait(client_lock);
delete oldmap;
delete m;
+
+ monclient->update_sub("mdsmap", mdsmap->get_epoch());
}
void Client::send_reconnect(int mds)
whoami = messenger->get_myname().num();
- signed_ticket = monclient->get_signed_ticket();
-
- objecter->signed_ticket = signed_ticket;
objecter->init();
mounted = true;
dout(2) << "mounted: have osdmap " << osdmap->get_epoch()
<< " and mdsmap " << mdsmap->get_epoch()
<< dendl;
+
+ monclient->update_sub("mdsmap", mdsmap->get_epoch());
+
// hack: get+pin root inode.
// fuse assumes it's always there.
dout(21) << "tick" << dendl;
tick_event = new C_C_Tick(this);
timer.add_event_after(g_conf.client_tick_interval, tick_event);
+
+ monclient->tick();
utime_t now = g_clock.now();
Messenger *messenger;
int whoami;
- bufferlist signed_ticket;
-
// mds sessions
map<int, MDSSession> mds_sessions; // mds -> push seq
map<int, list<Cond*> > waiting_for_session;
lock.Lock();
- objecter->signed_ticket = monclient.get_signed_ticket();
objecter->set_client_incarnation(0);
objecter->init();
#include "include/nstring.h"
#include "messages/MClientMountAck.h"
+#include "messages/MMonSubscribe.h"
+#include "messages/MMonSubscribeAck.h"
#include "common/ConfUtils.h"
#include "MonClient.h"
handle_mount_ack((MClientMountAck*)m);
return true;
+ case CEPH_MSG_MON_SUBSCRIBE_ACK:
+ handle_subscribe_ack((MMonSubscribeAck*)m);
+ return true;
}
return false;
}
mounters++;
- while (signed_ticket.length() == 0 ||
- (!itsme && !mounted)) // non-doers wait a little longer
+ while (!mounted)
mount_cond.Wait(monc_lock);
- if (!itsme) {
- dout(5) << "additional mounter returning" << dendl;
- assert(mounted);
- return 0;
- }
-
- // finish.
- timer.cancel_event(mount_timeout_event);
- mount_timeout_event = 0;
-
- mounted = true;
- mount_cond.SignalAll(); // wake up non-doers
-
+ dout(5) << "mount success" << dendl;
return 0;
}
messenger->reset_myname(entity_name_t::CLIENT(m->client));
- mount_cond.Signal();
+ // finish.
+ timer.cancel_event(mount_timeout_event);
+ mount_timeout_event = 0;
+
+ mounted = true;
+ mount_cond.SignalAll();
+
delete m;
}
messenger->mark_down(monmap.get_inst(oldmon).addr);
monmap.pick_mon(true);
}
+
+
+void MonClient::ms_handle_reset(const entity_addr_t& peer)
+{
+ dout(10) << "ms_handle_reset " << peer << dendl;
+ pick_new_mon();
+ renew_subs();
+}
+
+
+// ---------
+
+void MonClient::renew_subs()
+{
+ if (sub_have.empty()) {
+ dout(10) << "renew_subs - empty" << dendl;
+ return;
+ }
+
+ dout(10) << "renew_subs" << dendl;
+
+ if (sub_renew_sent == utime_t())
+ sub_renew_sent = g_clock.now();
+
+ MMonSubscribe *m = new MMonSubscribe;
+ m->what = sub_have;
+ send_mon_message(m);
+}
+
+void MonClient::handle_subscribe_ack(MMonSubscribeAck *m)
+{
+ if (sub_renew_sent != utime_t()) {
+ sub_renew_after = sub_renew_sent;
+ sub_renew_after += 1000.0 * m->interval_ms;
+ dout(10) << "handle_subscribe_ack sent " << sub_renew_sent << " renew after " << sub_renew_after << dendl;
+ sub_renew_sent = utime_t();
+ } else {
+ dout(10) << "handle_subscribe_ack sent " << sub_renew_sent << ", ignoring" << dendl;
+ }
+}
+
+void MonClient::tick()
+{
+ dout(10) << "tick" << dendl;
+
+ if (!mounted)
+ return;
+
+ utime_t now = g_clock.now();
+ static utime_t last_tick;
+
+ if (now - last_tick < 10.0)
+ return;
+ last_tick = now;
+
+ if (now > sub_renew_after)
+ renew_subs();
+
+ int oldmon = monmap.pick_mon();
+ messenger->send_keepalive(monmap.mon_inst[oldmon]);
+}
class MonMap;
class MMonMap;
class MClientMountAck;
+class MMonSubscribeAck;
class MonClient : public Dispatcher {
public:
entity_addr_t my_addr;
- ClientTicket ticket;
- bufferlist signed_ticket;
-
Context *mount_timeout_event;
Mutex monc_lock;
int mounters;
Cond mount_cond, map_cond;
-
bool dispatch_impl(Message *m);
void handle_monmap(MMonMap *m);
+ void ms_handle_reset(const entity_addr_t& peer);
+
protected:
class C_MountTimeout : public Context {
MonClient *client;
void _try_mount(double timeout);
void _mount_timeout(double timeout);
void handle_mount_ack(MClientMountAck* m);
+
+ // mon subscriptions
+private:
+ map<nstring,version_t> sub_have; // my subs, and current versions
+ utime_t sub_renew_sent, sub_renew_after;
+
+public:
+ void renew_subs();
+ void update_sub(nstring what, version_t have) {
+ bool had = sub_have.count(what);
+ sub_have[what] = have;
+ if (!had)
+ renew_subs();
+ }
+ void handle_subscribe_ack(MMonSubscribeAck* m);
+
public:
MonClient() : messenger(NULL),
+ mount_timeout_event(NULL),
monc_lock("MonClient::monc_lock"),
timer(monc_lock) {
mounted = false;
mounters = 0;
- mount_timeout_event = 0;
}
int build_initial_monmap();
int mount(double mount_timeout);
+ void tick();
+
void send_mon_message(Message *m, bool new_mon=false);
void note_mon_leader(int m) {
monmap.last_mon = m;
void set_messenger(Messenger *m) { messenger = m; }
- bufferlist& get_signed_ticket() { return signed_ticket; }
- ClientTicket& get_ticket() { return ticket; }
};
assert(client_lock.is_locked()); // otherwise event cancellation is unsafe
timer.add_event_after(g_conf.objecter_tick_interval, new C_Tick(this));
maybe_request_map();
+
+ //monc->update_sub("osdmap", 0);
}
void Objecter::shutdown()
}
delete m;
+
+ //monc->update_sub("osdmap", osdmap->get_epoch());
}
if (op->onack)
flags |= CEPH_OSD_FLAG_ACK;
- MOSDOp *m = new MOSDOp(signed_ticket, client_inc, op->tid,
+ bufferlist empty_ticket_fixme;
+#warning remove signed ticket ref
+ MOSDOp *m = new MOSDOp(empty_ticket_fixme, client_inc, op->tid,
op->oid, op->layout, osdmap->get_epoch(),
flags);
MonClient *monc;
OSDMap *osdmap;
- bufferlist signed_ticket;
-
private:
tid_t last_tid;