const std::string &object_oid_prefix,
const JournalMetadataPtr &journal_metadata)
: m_cct(NULL), m_object_oid_prefix(object_oid_prefix),
- m_journal_metadata(journal_metadata), m_lock("JournalTrimmer::m_lock"),
- m_remove_set_pending(false), m_remove_set(0), m_remove_set_ctx(NULL) {
+ m_journal_metadata(journal_metadata), m_metadata_listener(this),
+ m_lock("JournalTrimmer::m_lock"), m_remove_set_pending(false),
+ m_remove_set(0), m_remove_set_ctx(NULL) {
m_ioctx.dup(ioctx);
m_cct = reinterpret_cast<CephContext *>(m_ioctx.cct());
+
+ m_journal_metadata->add_listener(&m_metadata_listener);
}
JournalTrimmer::~JournalTrimmer() {
+ m_journal_metadata->remove_listener(&m_metadata_listener);
+
m_journal_metadata->flush_commit_position();
m_async_op_tracker.wait_for_ops();
}
}
}
-void JournalTrimmer::handle_commit_position_safe(int r) {
- ldout(m_cct, 20) << __func__ << ": r=" << r << dendl;
+void JournalTrimmer::handle_metadata_updated() {
+ ldout(m_cct, 20) << __func__ << dendl;
Mutex::Locker locker(m_lock);
- if (r == 0) {
- // TODO
- /*
- uint8_t splay_width = m_journal_metadata->get_splay_width();
- uint64_t object_set = object_set_position.object_number / splay_width;
-
- JournalMetadata::RegisteredClients registered_clients;
- m_journal_metadata->get_registered_clients(®istered_clients);
-
- bool trim_permitted = true;
- for (JournalMetadata::RegisteredClients::iterator it =
- registered_clients.begin();
- it != registered_clients.end(); ++it) {
- const JournalMetadata::Client &client = *it;
- uint64_t client_object_set = client.commit_position.object_number /
- splay_width;
- if (client.id != m_journal_metadata->get_client_id() &&
- client_object_set < object_set) {
- ldout(m_cct, 20) << "object set " << client_object_set << " still "
- << "in-use by client " << client.id << dendl;
- trim_permitted = false;
- break;
- }
+
+ JournalMetadata::RegisteredClients registered_clients;
+ m_journal_metadata->get_registered_clients(®istered_clients);
+
+ uint8_t splay_width = m_journal_metadata->get_splay_width();
+ uint64_t minimum_set = m_journal_metadata->get_minimum_set();
+ uint64_t active_set = m_journal_metadata->get_active_set();
+ uint64_t minimum_commit_set = active_set;
+ std::string minimum_client_id;
+
+ // TODO: add support for trimming past "laggy" clients
+ for (auto &client : registered_clients) {
+ if (client.commit_position.object_positions.empty()) {
+ // client hasn't recorded any commits
+ minimum_commit_set = minimum_set;
+ minimum_client_id = client.id;
+ break;
}
- if (trim_permitted) {
- trim_objects(object_set_position.object_number / splay_width);
+ for (auto &position : client.commit_position.object_positions) {
+ uint64_t object_set = position.object_number / splay_width;
+ if (object_set < minimum_commit_set) {
+ minimum_client_id = client.id;
+ minimum_commit_set = object_set;
+ }
}
- */
+ }
+
+ if (minimum_commit_set > minimum_set) {
+ trim_objects(minimum_commit_set);
+ } else {
+ ldout(m_cct, 20) << "object set " << minimum_commit_set << " still "
+ << "in-use by client " << minimum_client_id << dendl;
}
}
Mutex::Locker locker(m_lock);
m_remove_set_pending = false;
- if (r == 0 || (r == -ENOENT && m_remove_set_ctx == NULL)) {
+ if (r == -ENOENT) {
+ // no objects within the set existed
+ r = 0;
+ }
+ if (r == 0) {
// advance the minimum set to the next set
m_journal_metadata->set_minimum_set(object_set + 1);
+ uint64_t active_set = m_journal_metadata->get_active_set();
uint64_t minimum_set = m_journal_metadata->get_minimum_set();
- if (m_remove_set > minimum_set) {
+ if (m_remove_set > minimum_set && minimum_set <= active_set) {
m_remove_set_pending = true;
remove_set(minimum_set);
}
- } else if (r == -ENOENT) {
- // no objects within the set existed
- r = 0;
}
- if (m_remove_set_ctx != NULL && !m_remove_set_pending) {
+ if (m_remove_set_ctx != nullptr && !m_remove_set_pending) {
ldout(m_cct, 20) << "completing remove set context" << dendl;
m_remove_set_ctx->complete(r);
+ m_remove_set_ctx = nullptr;
}
}
private:
typedef std::function<Context*()> CreateContext;
+ struct MetadataListener : public JournalMetadata::Listener {
+ JournalTrimmer *journal_trimmmer;
+
+ MetadataListener(JournalTrimmer *journal_trimmmer)
+ : journal_trimmmer(journal_trimmmer) {
+ }
+ void handle_update(JournalMetadata *) {
+ journal_trimmmer->handle_metadata_updated();
+ }
+ };
+
struct C_CommitPositionSafe : public Context {
JournalTrimmer *journal_trimmer;
C_CommitPositionSafe(JournalTrimmer *_journal_trimmer)
: journal_trimmer(_journal_trimmer) {
- Mutex::Locker locker(journal_trimmer->m_lock);
journal_trimmer->m_async_op_tracker.start_op();
}
virtual ~C_CommitPositionSafe() {
}
virtual void finish(int r) {
- journal_trimmer->handle_commit_position_safe(r);
}
};
struct C_RemoveSet : public Context {
std::string m_object_oid_prefix;
JournalMetadataPtr m_journal_metadata;
+ MetadataListener m_metadata_listener;
AsyncOpTracker m_async_op_tracker;
void trim_objects(uint64_t minimum_set);
void remove_set(uint64_t object_set);
- void handle_commit_position_safe(int r);
-
+ void handle_metadata_updated();
void handle_set_removed(int r, uint64_t object_set);
};
uint64_t commit_tid5;
uint64_t commit_tid6;
ASSERT_EQ(0, append_payload(metadata, oid, 0, "payload", &commit_tid1));
- ASSERT_EQ(0, append_payload(metadata, oid, 2, "payload", &commit_tid2));
+ ASSERT_EQ(0, append_payload(metadata, oid, 4, "payload", &commit_tid2));
ASSERT_EQ(0, append_payload(metadata, oid, 5, "payload", &commit_tid3));
ASSERT_EQ(0, append_payload(metadata, oid, 0, "payload", &commit_tid4));
- ASSERT_EQ(0, append_payload(metadata, oid, 2, "payload", &commit_tid5));
+ ASSERT_EQ(0, append_payload(metadata, oid, 4, "payload", &commit_tid5));
ASSERT_EQ(0, append_payload(metadata, oid, 5, "payload", &commit_tid6));
journal::JournalTrimmer *trimmer = create_trimmer(oid, metadata);