case Transaction::OP_NOP:
f->dump_string("op_name", "nop");
break;
+ case Transaction::OP_CREATE:
+ {
+ coll_t cid = i.get_cid(op->cid);
+ ghobject_t oid = i.get_oid(op->oid);
+ f->dump_string("op_name", "create");
+ f->dump_stream("collection") << cid;
+ f->dump_stream("oid") << oid;
+ }
+ break;
+
case Transaction::OP_TOUCH:
{
coll_t cid = i.get_cid(op->cid);
public:
enum {
OP_NOP = 0,
+ OP_CREATE = 7, // cid, oid
OP_TOUCH = 9, // cid, oid
OP_WRITE = 10, // cid, oid, offset, len, bl
OP_ZERO = 11, // cid, oid, offset, len
case OP_NOP:
break;
+ case OP_CREATE:
case OP_TOUCH:
case OP_REMOVE:
case OP_SETATTR:
_op->op = OP_NOP;
data.ops++;
}
+ /**
+ * create
+ *
+ * create an object that does not yet exist
+ * (behavior is undefined if the object already exists)
+ */
+ void create(const coll_t& cid, const ghobject_t& oid) {
+ Op* _op = _get_next_op();
+ _op->op = OP_CREATE;
+ _op->cid = _get_coll_id(cid);
+ _op->oid = _get_object_id(oid);
+ data.ops++;
+ }
/**
* touch
*
BlueStore::OnodeRef BlueStore::Collection::get_onode(
const ghobject_t& oid,
- bool create)
+ bool create,
+ bool is_createop)
{
ceph_assert(create ? lock.is_wlocked() : lock.is_locked());
<< pretty_binary_string(key) << dendl;
bufferlist v;
- int r = store->db->get(PREFIX_OBJ, key.c_str(), key.size(), &v);
- ldout(store->cct, 20) << " r " << r << " v.len " << v.length() << dendl;
+ int r = -ENOENT;
Onode *on;
+ if (!is_createop) {
+ r = store->db->get(PREFIX_OBJ, key.c_str(), key.size(), &v);
+ ldout(store->cct, 20) << " r " << r << " v.len " << v.length() << dendl;
+ }
if (v.length() == 0) {
ceph_assert(r == -ENOENT);
if (!store->cct->_conf->bluestore_debug_misc &&
// these operations implicity create the object
bool create = false;
if (op->op == Transaction::OP_TOUCH ||
+ op->op == Transaction::OP_CREATE ||
op->op == Transaction::OP_WRITE ||
op->op == Transaction::OP_ZERO) {
create = true;
OnodeRef &o = ovec[op->oid];
if (!o) {
ghobject_t oid = i.get_oid(op->oid);
- o = c->get_onode(oid, create);
+ o = c->get_onode(oid, create, op->op == Transaction::OP_CREATE);
}
if (!create && (!o || !o->exists)) {
dout(10) << __func__ << " op " << op->op << " got ENOENT on "
}
switch (op->op) {
+ case Transaction::OP_CREATE:
case Transaction::OP_TOUCH:
r = _touch(txc, c, o);
break;
pool_opts_t pool_opts;
ContextQueue *commit_queue;
- OnodeRef get_onode(const ghobject_t& oid, bool create);
+ OnodeRef get_onode(const ghobject_t& oid, bool create, bool is_createop=false);
// the terminology is confusing here, sorry!
//
case Transaction::OP_NOP:
break;
case Transaction::OP_TOUCH:
+ case Transaction::OP_CREATE:
{
const coll_t &_cid = i.get_cid(op->cid);
const ghobject_t &oid = i.get_oid(op->oid);
// these operations implicity create the object
bool create = false;
if (op->op == Transaction::OP_TOUCH ||
+ op->op == Transaction::OP_CREATE ||
op->op == Transaction::OP_WRITE ||
op->op == Transaction::OP_ZERO) {
create = true;
switch (op->op) {
case Transaction::OP_TOUCH:
+ case Transaction::OP_CREATE:
r = _touch(txc, c, o);
break;
case Transaction::OP_NOP:
break;
case Transaction::OP_TOUCH:
+ case Transaction::OP_CREATE:
{
coll_t cid = i.get_cid(op->cid);
ghobject_t oid = i.get_oid(op->oid);
&trans,
&(op->temp_added),
&(op->temp_cleared),
- get_parent()->get_dpp());
+ get_parent()->get_dpp(),
+ get_osdmap()->require_osd_release);
}
dout(20) << __func__ << ": " << cache << dendl;
map<shard_id_t, ObjectStore::Transaction> *transactions,
set<hobject_t> *temp_added,
set<hobject_t> *temp_removed,
- DoutPrefixProvider *dpp)
+ DoutPrefixProvider *dpp,
+ const ceph_release_t require_osd_release)
{
ceph_assert(written_map);
ceph_assert(transactions);
[&](const PGTransaction::ObjectOperation::Init::None &) {},
[&](const PGTransaction::ObjectOperation::Init::Create &op) {
for (auto &&st: *transactions) {
- st.second.touch(
- coll_t(spg_t(pgid, st.first)),
- ghobject_t(oid, ghobject_t::NO_GEN, st.first));
+ if (require_osd_release >= ceph_release_t::nautilus) {
+ st.second.create(
+ coll_t(spg_t(pgid, st.first)),
+ ghobject_t(oid, ghobject_t::NO_GEN, st.first));
+ } else {
+ st.second.touch(
+ coll_t(spg_t(pgid, st.first)),
+ ghobject_t(oid, ghobject_t::NO_GEN, st.first));
+ }
}
},
[&](const PGTransaction::ObjectOperation::Init::Clone &op) {
map<shard_id_t, ObjectStore::Transaction> *transactions,
set<hobject_t> *temp_added,
set<hobject_t> *temp_removed,
- DoutPrefixProvider *dpp);
+ DoutPrefixProvider *dpp,
+ const ceph_release_t require_osd_release = ceph_release_t::unknown);
};
#endif
vector<pg_log_entry_t> &log_entries,
ObjectStore::Transaction *t,
set<hobject_t> *added,
- set<hobject_t> *removed)
+ set<hobject_t> *removed,
+ const ceph_release_t require_osd_release = ceph_release_t::unknown )
{
ceph_assert(t);
ceph_assert(added);
[&](const PGTransaction::ObjectOperation::Init::None &) {
},
[&](const PGTransaction::ObjectOperation::Init::Create &op) {
- t->touch(coll, goid);
+ if (require_osd_release >= ceph_release_t::nautilus) {
+ t->create(coll, goid);
+ } else {
+ t->touch(coll, goid);
+ }
},
[&](const PGTransaction::ObjectOperation::Init::Clone &op) {
t->clone(
log_entries,
&op_t,
&added,
- &removed);
+ &removed,
+ get_osdmap()->require_osd_release);
ceph_assert(added.size() <= 1);
ceph_assert(removed.size() <= 1);