if (op->objver)
*op->objver = m->get_version();
+ if (op->reply_epoch)
+ *op->reply_epoch = m->get_map_epoch();
// got data?
if (op->outbl) {
dout(10) << "The placement groups have changed, restarting with " << pg_num << dendl;
list_context->current_pg = 0;
list_context->cookie = 0;
+ list_context->current_pg_epoch = 0;
list_context->starting_pg_num = pg_num;
}
if (list_context->current_pg == pg_num){ //this context got all the way through
}
ObjectOperation op;
- op.pg_ls(list_context->max_entries, list_context->filter, list_context->cookie);
+ op.pg_ls(list_context->max_entries, list_context->filter, list_context->cookie,
+ list_context->current_pg_epoch);
bufferlist *bl = new bufferlist();
C_List *onack = new C_List(list_context, onfinish, bl, this);
o->priority = op.priority;
o->snapid = list_context->pool_snap_seq;
o->outbl = bl;
+ o->reply_epoch = &onack->epoch;
o->pgid = pg_t(list_context->current_pg, list_context->pool_id, -1);
op_submit(o);
}
-void Objecter::_list_reply(ListContext *list_context, bufferlist *bl, Context *final_finish)
+void Objecter::_list_reply(ListContext *list_context, bufferlist *bl, Context *final_finish, epoch_t reply_epoch)
{
dout(10) << "_list_reply" << dendl;
::decode(extra_info, iter);
}
list_context->cookie = (uint64_t)response.handle;
+ if (!list_context->current_pg_epoch) {
+ // first pgls result, set epoch marker
+ dout(20) << "first pgls piece, reply_epoch is " << reply_epoch << dendl;
+ list_context->current_pg_epoch = reply_epoch;
+ }
int response_size = response.entries.size();
dout(20) << "response.entries.size " << response_size
return;
}
}
- //if we make this this far, there are no objects left in the current pg, but we want more!
+
+ // if we make this this far, there are no objects left in the current pg, but we want more!
++list_context->current_pg;
+ list_context->current_pg_epoch = 0;
dout(20) << "emptied current pg, moving on to next one:" << list_context->current_pg << dendl;
- if(list_context->current_pg < list_context->starting_pg_num){ //we have more pgs to go through
+ if (list_context->current_pg < list_context->starting_pg_num){ // we have more pgs to go through
list_context->cookie = 0;
delete bl;
list_objects(list_context, final_finish);
return;
}
- //if we make it this far, there are no more pgs
+ // if we make it this far, there are no more pgs
dout(20) << "out of pgs, returning to" << final_finish << dendl;
list_context->at_end = true;
delete bl;
ops[s].op.watch.flag = flag;
ops[s].data.append(inbl);
}
- void add_pgls(int op, uint64_t count, uint64_t cookie) {
+ void add_pgls(int op, uint64_t count, uint64_t cookie, epoch_t start_epoch) {
int s = ops.size();
ops.resize(s+1);
ops[s].op.op = op;
ops[s].op.pgls.count = count;
ops[s].op.pgls.cookie = cookie;
+ ops[s].op.pgls.start_epoch = start_epoch;
}
- void add_pgls_filter(int op, uint64_t count, bufferlist& filter, uint64_t cookie) {
+ void add_pgls_filter(int op, uint64_t count, bufferlist& filter, uint64_t cookie, epoch_t start_epoch) {
int s = ops.size();
ops.resize(s+1);
ops[s].op.op = op;
ops[s].op.pgls.count = count;
ops[s].op.pgls.cookie = cookie;
+ ops[s].op.pgls.start_epoch = start_epoch;
string cname = "pg";
string mname = "filter";
::encode(cname, ops[s].data);
// ------
// pg
- void pg_ls(uint64_t count, bufferlist& filter, uint64_t cookie) {
+ void pg_ls(uint64_t count, bufferlist& filter, uint64_t cookie, epoch_t start_epoch) {
if (filter.length() == 0)
- add_pgls(CEPH_OSD_OP_PGLS, count, cookie);
+ add_pgls(CEPH_OSD_OP_PGLS, count, cookie, start_epoch);
else
- add_pgls_filter(CEPH_OSD_OP_PGLS_FILTER, count, filter, cookie);
+ add_pgls_filter(CEPH_OSD_OP_PGLS_FILTER, count, filter, cookie, start_epoch);
flags |= CEPH_OSD_FLAG_PGOP;
}
bool paused;
eversion_t *objver;
+ epoch_t *reply_epoch;
utime_t stamp;
used_replica(false), con(NULL),
snapid(CEPH_NOSNAP), outbl(0), flags(f), priority(0), onack(ac), oncommit(co),
tid(0), attempts(0),
- paused(false), objver(ov) {
+ paused(false), objver(ov), reply_epoch(NULL) {
ops.swap(op);
}
};
struct ListContext {
int current_pg;
uint64_t cookie;
+ epoch_t current_pg_epoch;
int starting_pg_num;
bool at_end;
bufferlist extra_info;
- ListContext() : current_pg(0), cookie(0), starting_pg_num(0),
+ ListContext() : current_pg(0), cookie(0), current_pg_epoch(0), starting_pg_num(0),
at_end(false), pool_id(0),
pool_snap_seq(0), max_entries(0) {}
};
Context *final_finish;
bufferlist *bl;
Objecter *objecter;
+ epoch_t epoch;
C_List(ListContext *lc, Context * finish, bufferlist *b, Objecter *ob) :
- list_context(lc), final_finish(finish), bl(b), objecter(ob) {}
+ list_context(lc), final_finish(finish), bl(b), objecter(ob), epoch(0) {}
void finish(int r) {
if (r >= 0) {
- objecter->_list_reply(list_context, bl, final_finish);
+ objecter->_list_reply(list_context, bl, final_finish, epoch);
} else {
final_finish->finish(r);
delete final_finish;
void reopen_session(OSDSession *session);
void close_session(OSDSession *session);
- void _list_reply(ListContext *list_context, bufferlist *bl, Context *final_finish);
+ void _list_reply(ListContext *list_context, bufferlist *bl, Context *final_finish,
+ epoch_t reply_epoch);
void resend_mon_ops();