OPTION(client_readahead_max_periods, 0, OPT_LONGLONG, 4), // as multiple of file layout period (object size * num stripes)
OPTION(client_snapdir, 0, OPT_STR, ".snap"),
OPTION(client_mountpoint, 'r', OPT_STR, "/"),
+ OPTION(client_notify_timeout, 0, OPT_INT, 10), // in seconds
OPTION(fuse_direct_io, 0, OPT_INT, 0),
OPTION(fuse_ll, 0, OPT_BOOL, true),
OPTION(client_oc, 0, OPT_BOOL, true),
OPTION(osd_class_tmp, 0, OPT_STR, "/var/lib/ceph/tmp"),
OPTION(osd_check_for_log_corruption, 0, OPT_BOOL, false),
OPTION(osd_use_stale_snap, 0, OPT_BOOL, false),
+ OPTION(osd_max_notify_timeout, 0, OPT_INT, 30), // max notify timeout in seconds
OPTION(filestore, 0, OPT_BOOL, false),
OPTION(filestore_max_sync_interval, 0, OPT_DOUBLE, 5), // seconds
OPTION(filestore_min_sync_interval, 0, OPT_DOUBLE, .01), // seconds
int client_oc_target_dirty;
long long unsigned client_oc_max_sync_write;
+ int client_notify_timeout;
+
// objecter
bool objecter_buffer_uncommitted;
double objecter_mon_retry_interval;
bool osd_use_stale_snap;
+ int osd_max_notify_timeout;
+
// filestore
bool filestore;
double filestore_max_sync_interval;
int watch(pool_t pool, const std::string& o, uint64_t ver, uint64_t *handle, librados::Rados::WatchCtx *ctx);
int unwatch(pool_t pool, const std::string& o, uint64_t handle);
int notify(pool_t pool, const std::string& o, uint64_t ver);
+ void set_notify_timeout(pool_t pool, uint32_t timeout);
/* assert version for next sync operations */
void set_assert_version(pool_t pool, uint64_t ver);
SnapContext snapc;
uint64_t assert_ver;
eversion_t last_objver;
+ uint32_t notify_timeout;
- PoolCtx(int pid, const char *n, snapid_t s = CEPH_NOSNAP) : poolid(pid), name(n), snap_seq(s), assert_ver(0) {}
+ PoolCtx(int pid, const char *n, snapid_t s = CEPH_NOSNAP) : poolid(pid), name(n), snap_seq(s), assert_ver(0),
+ notify_timeout(g_conf.client_notify_timeout) {}
void set_snap(snapid_t s) {
if (!s)
void set_assert_version(PoolCtx& pool, uint64_t ver) {
pool.assert_ver = ver;
}
+
+ void set_notify_timeout(PoolCtx& pool, uint32_t timeout) {
+ pool.notify_timeout = timeout;
+ }
};
int RadosClient::init()
}
lock.Lock();
register_watcher(pool, oid, ctx, &rd, &cookie);
- rd.notify(cookie, ver);
+ uint32_t prot_ver = 1;
+ uint32_t timeout = pool.notify_timeout;
+ ::encode(prot_ver, inbl);
+ ::encode(timeout, inbl);
+ rd.notify(cookie, ver, inbl);
objecter->read(oid, oloc, rd, pool.snap_seq, &outbl, 0, onack, &objver);
lock.Unlock();
return;
client->set_assert_version(*(RadosClient::PoolCtx *)pool, ver);
}
+
+void Rados::set_notify_timeout(pool_t pool, uint32_t timeout)
+{
+ if (!client)
+ return;
+ client->set_notify_timeout(*(RadosClient::PoolCtx *)pool, timeout);
+}
}
// ---------------------------------------------
case CEPH_OSD_OP_NOTIFY:
{
+ uint32_t ver;
+ uint32_t timeout;
+
+ try {
+ ::decode(ver, bp);
+ ::decode(timeout, bp);
+ } catch (const buffer::error &e) {
+ timeout = 0;
+ }
+ if (!timeout || timeout > g_conf.osd_max_notify_timeout)
+ timeout = g_conf.osd_max_notify_timeout;
dout(0) << "CEPH_OSD_OP_NOTIFY" << dendl;
ObjectContext *obc = ctx->obc;
dout(0) << "ctx->obc=" << (void *)obc << dendl;
obc->ref++;
notif->obc = obc;
notif->timeout = new Watch::C_NotifyTimeout(osd, notif);
- osd->watch_timer.add_event_after(5.0, notif->timeout); /* FIXME: use a configurable timeout here */
+ osd->watch_timer.add_event_after(timeout, notif->timeout);
}
osd->watch_lock.Unlock();
}
ops[s].data.append(method, ops[s].op.cls.method_len);
ops[s].data.append(indata);
}
- void add_watch(int op, uint64_t cookie, uint64_t ver, uint8_t flag) {
+ void add_watch(int op, uint64_t cookie, uint64_t ver, uint8_t flag, bufferlist& inbl) {
int s = ops.size();
ops.resize(s+1);
ops[s].op.op = op;
ops[s].op.watch.cookie = cookie;
ops[s].op.watch.ver = ver;
ops[s].op.watch.flag = flag;
+ ops[s].data.append(inbl);
}
void add_pgls(int op, uint64_t count, uint64_t cookie) {
int s = ops.size();
// watch/notify
void watch(uint64_t cookie, uint64_t ver, bool set) {
- add_watch(CEPH_OSD_OP_WATCH, cookie, ver, (set ? 1 : 0));
+ bufferlist inbl;
+ add_watch(CEPH_OSD_OP_WATCH, cookie, ver, (set ? 1 : 0), inbl);
}
- void notify(uint64_t cookie, uint64_t ver) {
- add_watch(CEPH_OSD_OP_NOTIFY, cookie, ver, 1);
+ void notify(uint64_t cookie, uint64_t ver, bufferlist& inbl) {
+ add_watch(CEPH_OSD_OP_NOTIFY, cookie, ver, 1, inbl);
}
void notify_ack(uint64_t notify_id, uint64_t ver) {
- add_watch(CEPH_OSD_OP_NOTIFY_ACK, notify_id, ver, 0);
+ bufferlist bl;
+ add_watch(CEPH_OSD_OP_NOTIFY_ACK, notify_id, ver, 0, bl);
}
void assert_version(uint64_t ver) {
- add_watch(CEPH_OSD_OP_ASSERT_VER, 0, ver, 0);
+ bufferlist bl;
+ add_watch(CEPH_OSD_OP_ASSERT_VER, 0, ver, 0, bl);
}
};
cout << "*** press enter to continue ***" << std::endl;
getchar();
+ rados.set_notify_timeout(pool, 7);
r = rados.notify(pool, oid, objver);
cout << "rados.notify returned " << r << std::endl;
cout << "*** press enter to continue ***" << std::endl;