#include "config.h"
+void LogClient::log(log_type type, const char *s)
+{
+ string str(s);
+ log(type, str);
+}
+
void LogClient::log(log_type type, stringstream& ss)
{
while (!ss.eof()) {
deque<LogEntry> log_queue;
version_t last_log;
+ void log(log_type type, const char *s);
void log(log_type type, string& s);
void log(log_type type, stringstream& s);
void send_log();
e.type = LOG_INFO;
e.msg = "mkfs";
e.seq = 0;
- stringstream ss;
- ss << e;
- string s;
- getline(ss, s);
- pending_inc.append(s);
- pending_inc.append("\n");
+ e.encode(pending_inc);
}
bool LogMonitor::update_from_paxos()
{
+ version_t paxosv = paxos->get_version();
+
+ if (paxosv == log_version) return true;
+ assert(paxosv >= log_version);
+
+ if (log_version == 0 && paxosv > 1) {
+ log_version = mon->store->get_int("log", "last_consumed");
+ }
+
+ // walk through incrementals
+ while (paxosv > log_version) {
+ bufferlist bl, new_bl;
+ LogEntry le;
+ bool success = paxos->read(log_version+1, bl);
+ assert(success);
+ bufferlist::iterator p = bl.begin();
+
+ le.decode(p);
+ dout(7) << "update_from_paxos applying incremental log " << log_version+1 << " " << le << dendl;
+
+ stringstream ss;
+ ss << le;
+ string s;
+ getline(ss, s);
+ new_bl.append(s);
+ new_bl.append("\n");
+ mon->store->append_bl_ss(new_bl, "out", NULL);
+
+ log_version++;
+ }
+
+ mon->store->put_int(paxosv, "log", "last_consumed");
+
return true;
}
p != m->entries.end();
p++) {
dout(10) << " logging " << *p << dendl;
- stringstream ss;
- ss << *p;
- string s;
- getline(ss, s);
- pending_inc.append(s);
- pending_inc.append("\n");
+ (*p).encode(pending_inc);
}
paxos->wait_for_commit(new C_Log(this, m, m->get_orig_source_inst()));
class LogMonitor : public PaxosService {
private:
bufferlist pending_inc;
+ version_t log_version;
void create_initial();
bool update_from_paxos();
bool prepare_command(MMonCommand *m);
public:
- LogMonitor(Monitor *mn, Paxos *p) : PaxosService(mn, p) { }
+ LogMonitor(Monitor *mn, Paxos *p) : PaxosService(mn, p), log_version(0) { }
void tick(); // check state, take actions
};
return len;
}
-int MonitorStore::put_bl_ss(bufferlist& bl, const char *a, const char *b, bool sync)
+int MonitorStore::write_bl_ss(bufferlist& bl, const char *a, const char *b, bool append, bool sync)
{
char fn[200];
sprintf(fn, "%s/%s", dir.c_str(), a);
}
char tfn[200];
- sprintf(tfn, "%s.new", fn);
- int fd = ::open(tfn, O_WRONLY|O_CREAT, 0644);
+ int fd;
+ if (append) {
+ fd = ::open(fn, O_WRONLY|O_CREAT|O_APPEND, 0644);
+ } else {
+ sprintf(tfn, "%s.new", fn);
+ fd = ::open(tfn, O_WRONLY|O_CREAT, 0644);
+ }
assert(fd >= 0);
// write data
if (sync)
::fsync(fd);
::close(fd);
- ::rename(tfn, fn);
+ if (!append) {
+ ::rename(tfn, fn);
+ }
return 0;
}
+
string dir;
int lock_fd;
+ int write_bl_ss(bufferlist& bl, const char *a, const char *b, bool append, bool sync=true);
public:
MonitorStore(const char *d) : dir(d) { }
~MonitorStore() { }
// ss and sn varieties.
bool exists_bl_ss(const char *a, const char *b=0);
int get_bl_ss(bufferlist& bl, const char *a, const char *b);
- int put_bl_ss(bufferlist& bl, const char *a, const char *b, bool sync=true);
+ int put_bl_ss(bufferlist& bl, const char *a, const char *b, bool sync=true) {
+ return write_bl_ss(bl, a, b, false, sync);
+ }
+ int append_bl_ss(bufferlist& bl, const char *a, const char *b, bool sync=true) {
+ return write_bl_ss(bl, a, b, true, sync);
+ }
bool exists_bl_sn(const char *a, version_t b) {
char bs[20];
#ifdef __LP64__