#include "common/Clock.h"
+#include "common/Timer.h"
#include "common/utf8.h"
#include "rgw_log.h"
return o;
}
+/* usage logger */
+class UsageLogger {
+ CephContext *cct;
+ map<rgw_user_bucket, RGWUsageInfo> usage_map;
+ Mutex lock;
+ int32_t num_entries;
+ Mutex timer_lock;
+ SafeTimer timer;
+ utime_t round_timestamp;
+
+ class C_UsageLogTimeout : public Context {
+ CephContext *cct;
+ UsageLogger *logger;
+ public:
+ C_UsageLogTimeout(CephContext *_cct, UsageLogger *_l) : cct(_cct), logger(_l) {}
+ void finish(int r) {
+ logger->flush();
+ logger->set_timer();
+ }
+ };
+
+ void set_timer() {
+ timer.add_event_after(cct->_conf->rgw_usage_log_tick_interval, new C_UsageLogTimeout(cct, this));
+ }
+public:
+
+ UsageLogger(CephContext *_cct) : cct(_cct), lock("UsageLogger"), num_entries(0), timer_lock("UsageLogger::timer_lock"), timer(cct, timer_lock) {
+ timer.init();
+ Mutex::Locker l(timer_lock);
+ set_timer();
+ utime_t ts = ceph_clock_now(cct);
+ recalc_round_timestamp(ts);
+ }
+
+ ~UsageLogger() {
+ Mutex::Locker l(timer_lock);
+ flush();
+ timer.cancel_all_events();
+ timer.shutdown();
+ }
+
+ void recalc_round_timestamp(utime_t& ts) {
+ struct tm bdt;
+ time_t tt = ts.sec();
+ gmtime_r(&tt, &bdt);
+ bdt.tm_sec = 0;
+ bdt.tm_min = 0;
+ tt = mktime(&bdt);
+ round_timestamp = utime_t(tt, 0);
+ }
+
+ void insert(utime_t& timestamp, rgw_usage_log_entry& entry) {
+ if (timestamp.sec() > round_timestamp + 3600)
+ recalc_round_timestamp(timestamp);
+ entry.epoch = round_timestamp.sec();
+ lock.Lock();
+ bool account;
+ rgw_user_bucket ub(entry.owner, entry.bucket);
+ usage_map[ub].insert(round_timestamp, entry, &account);
+ if (account)
+ num_entries++;
+ bool need_flush = (num_entries > cct->_conf->rgw_usage_log_flush_threshold);
+ lock.Unlock();
+ if (need_flush) {
+ Mutex::Locker l(timer_lock);
+ flush();
+ }
+ }
+
+ void flush() {
+ map<rgw_user_bucket, RGWUsageInfo> old_map;
+ lock.Lock();
+ old_map.swap(usage_map);
+ num_entries = 0;
+ lock.Unlock();
+
+ rgwstore->log_usage(old_map);
+ }
+};
+
+static UsageLogger *usage_logger = NULL;
+
+void rgw_log_usage_init(CephContext *cct)
+{
+ usage_logger = new UsageLogger(cct);
+}
+
+void rgw_log_usage_finalize()
+{
+ delete usage_logger;
+ usage_logger = NULL;
+}
+
+static void log_usage(struct req_state *s)
+{
+ if (!usage_logger)
+ return;
+
+ if (s->bucket_owner.empty())
+ return;
+
+ rgw_usage_log_entry entry(s->bucket_owner, s->bucket.name, s->bytes_sent, s->bytes_received);
+
+ utime_t ts = ceph_clock_now(s->cct);
+
+ usage_logger->insert(ts, entry);
+}
+
int rgw_log_op(struct req_state *s)
{
struct rgw_log_entry entry;
string bucket_id;
+ if (s->enable_usage_log)
+ log_usage(s);
+
if (!s->enable_ops_log)
return 0;