assert(in->last == CEPH_NOSNAP);
}
+ __u64 period = in->inode.layout.fl_object_size * in->inode.layout.fl_stripe_count;
__u64 to = MAX(in->inode.size, in->inode.get_max_size());
- dout(10) << "purge_stray 0~" << to << " snapc " << snapc << " on " << *in << dendl;
+ __u64 num = (to + period - 1) / period;
+ dout(10) << "purge_stray 0~" << to << " objects 0~" << num << " snapc " << snapc << " on " << *in << dendl;
if (to)
- mds->filer->remove(in->inode.ino, &in->inode.layout, *snapc,
- 0, to, g_clock.now(), 0,
- 0, new C_MDC_PurgeStrayPurged(this, dn));
+ mds->filer->purge_range(in->inode.ino, &in->inode.layout, *snapc,
+ 0, num, g_clock.now(), 0,
+ new C_MDC_PurgeStrayPurged(this, dn));
else
_purge_stray_purged(dn);
}
}
+// -----------------------
+
+struct PurgeRange {
+ inodeno_t ino;
+ ceph_file_layout layout;
+ SnapContext snapc;
+ __u64 first, num;
+ utime_t mtime;
+ int flags;
+ Context *oncommit;
+ int uncommitted;
+};
+
+int Filer::purge_range(inodeno_t ino,
+ ceph_file_layout *layout,
+ const SnapContext& snapc,
+ __u64 first_obj, __u64 num_obj,
+ utime_t mtime,
+ int flags,
+ Context *oncommit)
+{
+ assert(num_obj > 0);
+
+ // single object? easy!
+ if (num_obj == 1) {
+ object_t oid = file_object_t(ino, first_obj);
+ ceph_object_layout ol = objecter->osdmap->file_to_object_layout(oid, *layout);
+ objecter->remove(oid, ol, snapc, mtime, flags, NULL, oncommit);
+ return 0;
+ }
+
+ // lots! let's do this in pieces.
+ PurgeRange *pr = new PurgeRange;
+ pr->ino = ino;
+ pr->layout = *layout;
+ pr->snapc = snapc;
+ pr->first = first_obj;
+ pr->num = num_obj;
+ pr->mtime = mtime;
+ pr->flags = flags;
+ pr->oncommit = oncommit;
+ pr->uncommitted = 0;
+
+ _do_purge_range(pr, 0);
+ return 0;
+}
+
+struct C_PurgeRange : public Context {
+ Filer *filer;
+ PurgeRange *pr;
+ C_PurgeRange(Filer *f, PurgeRange *p) : filer(f), pr(p) {}
+ void finish(int r) {
+ filer->_do_purge_range(pr, 1);
+ }
+};
+
+void Filer::_do_purge_range(PurgeRange *pr, int fin)
+{
+ pr->uncommitted -= fin;
+ dout(10) << "_do_purge_range " << pr->ino << " objects " << pr->first << "~" << pr->num
+ << " uncommitted " << pr->uncommitted << dendl;
+
+ if (pr->num == 0 && pr->uncommitted == 0) {
+ pr->oncommit->finish(0);
+ delete pr->oncommit;
+ delete pr;
+ return;
+ }
+
+ int max = 10 - pr->uncommitted;
+ while (pr->num > 0 && max > 0) {
+ object_t oid = file_object_t(pr->ino, pr->first);
+ ceph_object_layout ol = objecter->osdmap->file_to_object_layout(oid, pr->layout);
+ objecter->remove(oid, ol, pr->snapc, pr->mtime, pr->flags,
+ NULL, new C_PurgeRange(this, pr));
+ pr->uncommitted++;
+ pr->first++;
+ pr->num--;
+ max--;
+ }
+}
+
+
+// -----------------------
+
void Filer::file_to_extents(inodeno_t ino, ceph_file_layout *layout,
__u64 offset, __u64 len,
vector<ObjectExtent>& extents)
return 0;
}
- int remove(inodeno_t ino,
- ceph_file_layout *layout,
- const SnapContext& snapc,
- __u64 offset,
- __u64 len,
- utime_t mtime,
- int flags,
- Context *onack,
- Context *oncommit) {
- vector<ObjectExtent> extents;
- file_to_extents(ino, layout, offset, len, extents);
- if (extents.size() == 1) {
- objecter->remove(extents[0].oid, extents[0].layout,
- snapc, mtime, flags, onack, oncommit);
- } else {
- C_Gather *gack = 0, *gcom = 0;
- if (onack)
- gack = new C_Gather(onack);
- if (oncommit)
- gcom = new C_Gather(oncommit);
- for (vector<ObjectExtent>::iterator p = extents.begin(); p != extents.end(); p++)
- objecter->remove(p->oid, p->layout,
- snapc, mtime, flags,
- gack ? gack->new_sub():0,
- gcom ? gcom->new_sub():0);
- }
- return 0;
- }
+ // purge range of ino.### objects
+ int purge_range(inodeno_t ino,
+ ceph_file_layout *layout,
+ const SnapContext& snapc,
+ __u64 first_obj, __u64 num_obj,
+ utime_t mtime,
+ int flags,
+ Context *oncommit);
+ void _do_purge_range(class PurgeRange *pr, int fin);
/*
* probe
void Journaler::trim()
{
+ __u64 period = layout.fl_stripe_count * layout.fl_object_size;
+
__s64 trim_to = last_committed.expire_pos;
- trim_to -= trim_to % (layout.fl_stripe_count * layout.fl_object_size);
+ trim_to -= trim_to % period;
dout(10) << "trim last_commited head was " << last_committed
<< ", can trim to " << trim_to
<< dendl;
<< ", trimmed/trimming/expire are "
<< trimmed_pos << "/" << trimming_pos << "/" << expire_pos
<< dendl;
-
+
+ // delete range of objects
+ __u64 first = trimming_pos / period;
+ __u64 num = (trim_to - trimming_pos) / period;
SnapContext snapc;
- filer.remove(ino, &layout, snapc,
- trimming_pos, trim_to-trimming_pos, g_clock.now(), 0,
- NULL, new C_Trim(this, trim_to));
+ filer.purge_range(ino, &layout, snapc, first, num, g_clock.now(), 0,
+ new C_Trim(this, trim_to));
trimming_pos = trim_to;
}