name=$1
port=$2
+if [ ! -z "$RGW_FRONTEND_THREADS" ]; then
+ set_frontend_threads="num_threads=$RGW_FRONTEND_THREADS"
+fi
+
shift 2
run_root=$script_root/run/$name
-k $run_root/keyring auth get-or-create client.rgw.$port mon \
'allow rw' osd 'allow rwx' mgr 'allow rw' >> $run_root/keyring
-$vstart_path/mrun $name radosgw --rgw-frontends="$rgw_frontend port=$port" \
+$vstart_path/mrun $name radosgw --rgw-frontends="$rgw_frontend port=$port $set_frontend_threads" \
-n client.rgw.$port --pid-file=$pidfile \
--admin-socket=$asokfile "$@" --log-file=$logfile
rgw_sync_module_log.cc
rgw_sync_module_pubsub.cc
rgw_pubsub_push.cc
+ rgw_notify.cc
+ rgw_notify_event_type.cc
rgw_sync_module_pubsub_rest.cc
rgw_sync_trace.cc
rgw_trim_bilog.cc
#include "services/svc_datalog_rados.h"
#include "services/svc_mdlog.h"
#include "services/svc_meta_be_otp.h"
+#include "services/svc_zone.h"
#define dout_context g_ceph_context
#define dout_subsys ceph_subsys_rgw
string sub_dest_bucket;
string sub_push_endpoint;
string event_id;
- set<string, ltstr_nocase> event_types;
+ rgw::notify::EventTypeList event_types;
for (std::vector<const char*>::iterator i = args.begin(); i != args.end(); ) {
if (ceph_argparse_double_dash(args, i)) {
} else if (ceph_argparse_witharg(args, i, &val, "--event-id", (char*)NULL)) {
event_id = val;
} else if (ceph_argparse_witharg(args, i, &val, "--event-type", "--event-types", (char*)NULL)) {
- get_str_set(val, ",", event_types);
+ rgw::notify::from_string_list(val, event_types);
} else if (ceph_argparse_binary_flag(args, i, &detail, NULL, "--detail", (char*)NULL)) {
// do nothing
} else if (strncmp(*i, "-", 1) == 0) {
#include "include/compat.h"
#include "rgw_amqp.h"
-#include <atomic>
#include <amqp.h>
#include <amqp_tcp_socket.h>
#include <amqp_framing.h>
};
};
+std::string to_string(const connection_id_t& id) {
+ return id.host+":"+"/"+id.vhost;
+}
+
// connection_t state cleaner
// could be used for automatic cleanup when getting out of scope
class ConnectionCleaner {
amqp_bytes_free(reply_to_queue);
reply_to_queue = amqp_empty_bytes;
// fire all remaining callbacks
- std::for_each(callbacks.begin(), callbacks.end(), [s](auto& cb_tag) {
- cb_tag.cb(s);
+ std::for_each(callbacks.begin(), callbacks.end(), [this](auto& cb_tag) {
+ cb_tag.cb(status);
+ ldout(cct, 20) << "AMQP destroy: invoking callback with tag=" << cb_tag.tag << dendl;
});
+ callbacks.clear();
delivery_tag = 1;
}
if (rc == AMQP_STATUS_OK) {
auto const q_len = conn->callbacks.size();
if (q_len < max_inflight) {
+ ldout(conn->cct, 20) << "AMQP publish (with callback, tag=" << conn->delivery_tag << "): OK. Queue has: " << q_len << " callbacks" << dendl;
conn->callbacks.emplace_back(conn->delivery_tag++, message->cb);
- ldout(conn->cct, 20) << "AMQP publish (" << reinterpret_cast<unsigned char*>(&message->cb) << "): OK. Queue has: " << q_len << " callbacks" << dendl;
} else {
// immediately invoke callback with error
- ldout(conn->cct, 1) << "AMQP publish (" << reinterpret_cast<unsigned char*>(&message->cb) << "): failed with error: callback queue full" << dendl;
+ ldout(conn->cct, 1) << "AMQP publish (with callback): failed with error: callback queue full" << dendl;
message->cb(RGW_AMQP_STATUS_MAX_INFLIGHT);
}
} else {
// an error occurred, close connection
// it will be retied by the main loop
- ldout(conn->cct, 1) << "AMQP publish (" << reinterpret_cast<unsigned char*>(&message->cb) << "): failed with error: " << status_to_string(rc) << dendl;
+ ldout(conn->cct, 1) << "AMQP publish (with callback): failed with error: " << status_to_string(rc) << dendl;
conn->destroy(rc);
// immediately invoke callback with error
message->cb(rc);
info.vhost = const_cast<char*>(conn_it->first.vhost.c_str());
info.user = const_cast<char*>(conn->user.c_str());
info.password = const_cast<char*>(conn->password.c_str());
- ldout(conn->cct, 10) << "AMQP run: retry connection" << dendl;
+ ldout(conn->cct, 20) << "AMQP run: retry connection" << dendl;
if (create_connection(conn, info)->is_ok() == false) {
- ldout(conn->cct, 1) << "AMQP run: connection retry failed" << dendl;
+ ldout(conn->cct, 10) << "AMQP run: connection (" << to_string(conn_it->first) << ") retry failed" << dendl;
// TODO: add error counter for failed retries
// TODO: add exponential backoff for retries
+ } else {
+ ldout(conn->cct, 10) << "AMQP run: connection (" << to_string(conn_it->first) << ") retry successfull" << dendl;
}
INCREMENT_AND_CONTINUE(conn_it);
}
if (rc != AMQP_STATUS_OK) {
// an error occurred, close connection
// it will be retied by the main loop
- ldout(conn->cct, 1) << "AMQP run: connection read error" << dendl;
+ ldout(conn->cct, 1) << "AMQP run: connection read error: " << status_to_string(rc) << dendl;
conn->destroy(rc);
INCREMENT_AND_CONTINUE(conn_it);
}
const auto& callbacks_end = conn->callbacks.end();
const auto& callbacks_begin = conn->callbacks.begin();
- const auto it = std::find(callbacks_begin, callbacks_end, tag);
- if (it != callbacks_end) {
+ const auto tag_it = std::find(callbacks_begin, callbacks_end, tag);
+ if (tag_it != callbacks_end) {
if (multiple) {
// n/ack all up to (and including) the tag
- ldout(conn->cct, 20) << "AMQP run: multiple n/acks received" << dendl;
- for (auto rit = it; rit >= callbacks_begin; --rit) {
- rit->cb(result);
- conn->callbacks.erase(rit);
+ ldout(conn->cct, 20) << "AMQP run: multiple n/acks received with tag=" << tag << " and result=" << result << dendl;
+ auto it = callbacks_begin;
+ while (it->tag <= tag && it != conn->callbacks.end()) {
+ ldout(conn->cct, 20) << "AMQP run: invoking callback with tag=" << it->tag << dendl;
+ it->cb(result);
+ it = conn->callbacks.erase(it);
}
} else {
// n/ack a specific tag
- ldout(conn->cct, 20) << "AMQP run: n/acks received" << dendl;
- it->cb(result);
- conn->callbacks.erase(it);
+ ldout(conn->cct, 20) << "AMQP run: n/ack received, invoking callback with tag=" << tag << " and result=" << result << dendl;
+ tag_it->cb(result);
+ conn->callbacks.erase(tag_it);
}
} else {
// TODO add counter for acks with no callback
- ldout(conn->cct, 10) << "AMQP run: unsolicited n/ack received" << dendl;
+ ldout(conn->cct, 10) << "AMQP run: unsolicited n/ack received with tag=" << tag << dendl;
}
// just increment the iterator
++conn_it;
std::string message;
};
-
-
/* Helper class used for RGWHTTPArgs parsing */
class NameVal
{
- string str;
- string name;
- string val;
+ const std::string str;
+ std::string name;
+ std::string val;
public:
- explicit NameVal(string nv) : str(nv) {}
+ explicit NameVal(const std::string& nv) : str(nv) {}
int parse();
- string& get_name() { return name; }
- string& get_val() { return val; }
+ std::string& get_name() { return name; }
+ std::string& get_val() { return val; }
};
/** Stores the XML arguments associated with the HTTP request in req_state*/
class RGWHTTPArgs {
- string str, empty_str;
- map<string, string> val_map;
- map<string, string> sys_val_map;
- map<string, string> sub_resources;
- bool has_resp_modifier;
- bool admin_subresource_added;
+ std::string str, empty_str;
+ std::map<std::string, std::string> val_map;
+ std::map<std::string, std::string> sys_val_map;
+ std::map<std::string, std::string> sub_resources;
+ bool has_resp_modifier = false;
+ bool admin_subresource_added = false;
public:
- RGWHTTPArgs() : has_resp_modifier(false), admin_subresource_added(false) {}
+ RGWHTTPArgs() = default;
+ explicit RGWHTTPArgs(const std::string& s) {
+ set(s);
+ parse();
+ }
/** Set the arguments; as received */
- void set(string s) {
+ void set(const std::string& s) {
has_resp_modifier = false;
val_map.clear();
sub_resources.clear();
}
/** parse the received arguments */
int parse();
- void append(const string& name, const string& val);
+ void append(const std::string& name, const string& val);
/** Get the value for a specific argument parameter */
- const string& get(const string& name, bool *exists = NULL) const;
+ const string& get(const std::string& name, bool *exists = NULL) const;
boost::optional<const std::string&>
get_optional(const std::string& name) const;
- int get_bool(const string& name, bool *val, bool *exists);
+ int get_bool(const std::string& name, bool *val, bool *exists);
int get_bool(const char *name, bool *val, bool *exists);
void get_bool(const char *name, bool *val, bool def_val);
int get_int(const char *name, int *val, int def_val);
/** Get the value for specific system argument parameter */
- std::string sys_get(const string& name, bool *exists = nullptr) const;
+ std::string sys_get(const std::string& name, bool *exists = nullptr) const;
/** see if a parameter is contained in this RGWHTTPArgs */
bool exists(const char *name) const {
bool sub_resource_exists(const char *name) const {
return (sub_resources.find(name) != std::end(sub_resources));
}
- map<string, string>& get_params() {
+ std::map<std::string, std::string>& get_params() {
return val_map;
}
const std::map<std::string, std::string>& get_sub_resources() const {
return has_resp_modifier;
}
void set_system() { /* make all system params visible */
- map<string, string>::iterator iter;
+ std::map<std::string, std::string>::iterator iter;
for (iter = sys_val_map.begin(); iter != sys_val_map.end(); ++iter) {
val_map[iter->first] = iter->second;
}
}
- const string& get_str() {
+ const std::string& get_str() {
return str;
}
}; // RGWHTTPArgs
#include "rgw_frontend.h"
#include "rgw_http_client_curl.h"
#include "rgw_perf_counters.h"
+#ifdef WITH_RADOSGW_AMQP_ENDPOINT
+#include "rgw_amqp.h"
+#endif
#if defined(WITH_RADOSGW_BEAST_FRONTEND)
#include "rgw_asio_frontend.h"
#endif /* WITH_RADOSGW_BEAST_FRONTEND */
}
}
+ if (pubsub_enabled) {
+#ifdef WITH_RADOSGW_AMQP_ENDPOINT
+ if (!rgw::amqp::init(cct.get())) {
+ dout(1) << "ERROR: failed to initialize AMQP manager" << dendl;
+ }
+#endif
+ }
+
if (apis_map.count("swift") > 0) {
RGWRESTMgr_SWIFT* const swift_resource = new RGWRESTMgr_SWIFT;
rgw_http_client_cleanup();
rgw::curl::cleanup_curl();
g_conf().remove_observer(&implicit_tenant_context);
+#ifdef WITH_RADOSGW_AMQP_ENDPOINT
+ rgw::amqp::shutdown();
+#endif
rgw_perf_stop(g_ceph_context);
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "rgw_notify.h"
+#include <memory>
+#include <boost/algorithm/hex.hpp>
+#include "rgw_pubsub.h"
+#include "rgw_pubsub_push.h"
+#include "rgw_perf_counters.h"
+#include "common/dout.h"
+
+#define dout_subsys ceph_subsys_rgw
+
+namespace rgw::notify {
+
+// populate record from request
+void populate_record_from_request(const req_state *s,
+ const ceph::real_time& mtime,
+ const std::string& etag,
+ EventType event_type,
+ rgw_pubsub_s3_record& record) {
+ record.eventVersion = "2.1";
+ record.eventSource = "aws:s3";
+ record.eventTime = mtime;
+ record.eventName = to_string(event_type);
+ record.userIdentity = s->user->user_id.id; // user that triggered the change
+ record.sourceIPAddress = ""; // IP address of client that triggered the change: TODO
+ record.x_amz_request_id = s->req_id; // request ID of the original change
+ record.x_amz_id_2 = s->host_id; // RGW on which the change was made
+ record.s3SchemaVersion = "1.0";
+ // configurationId is filled from subscription configuration
+ record.bucket_name = s->bucket_name;
+ record.bucket_ownerIdentity = s->bucket_owner.get_id().id;
+ record.bucket_arn = to_string(rgw::ARN(s->bucket));
+ record.object_key = s->object.name;
+ record.object_size = s->obj_size;
+ record.object_etag = etag;
+ record.object_versionId = s->object.instance;
+ // use timestamp as per key sequence id (hex encoded)
+ const utime_t ts(real_clock::now());
+ boost::algorithm::hex((const char*)&ts, (const char*)&ts + sizeof(utime_t),
+ std::back_inserter(record.object_sequencer));
+ // event ID is rgw extension (not in the S3 spec), used for acking the event
+ // same format is used in both S3 compliant and Ceph specific events
+ // not used in case of push-only mode
+ record.id = "";
+ record.bucket_id = s->bucket.bucket_id;
+ // pass meta data
+ record.x_meta_map = s->info.x_meta_map;
+}
+
+bool filter(const rgw_pubsub_topic_filter& filter, const req_state* s, EventType event_type) {
+ // if event list exists, and none of the events in the list matches the event type, filter the message
+ if (filter.events.size() && std::find(filter.events.begin(), filter.events.end(), event_type) == filter.events.end()) {
+ return true;
+ }
+ // TODO: add filter by compliant conf: object name, prefix, suffix
+ // TODO: add extra filtering criteria: object size, ToD, metadata, ...
+ return false;
+}
+
+int publish(const req_state* s,
+ const ceph::real_time& mtime,
+ const std::string& etag,
+ EventType event_type,
+ rgw::sal::RGWRadosStore* store) {
+ RGWUserPubSub ps_user(store, s->user->user_id);
+ RGWUserPubSub::Bucket ps_bucket(&ps_user, s->bucket);
+ rgw_pubsub_bucket_topics bucket_topics;
+ auto rc = ps_bucket.get_topics(&bucket_topics);
+ if (rc < 0) {
+ // failed to fetch bucket topics
+ return rc;
+ }
+ rgw_pubsub_s3_record record;
+ populate_record_from_request(s, mtime, etag, event_type, record);
+ bool event_handled = false;
+ bool event_should_be_handled = false;
+ for (const auto& bucket_topic : bucket_topics.topics) {
+ const rgw_pubsub_topic_filter& topic_filter = bucket_topic.second;
+ const rgw_pubsub_topic& topic_cfg = topic_filter.topic;
+ if (filter(topic_filter, s, event_type)) {
+ // topic does not apply to req_state
+ continue;
+ }
+ event_should_be_handled = true;
+ try {
+ // TODO add endpoint LRU cache
+ const auto push_endpoint = RGWPubSubEndpoint::create(topic_cfg.dest.push_endpoint,
+ topic_cfg.dest.arn_topic,
+ RGWHTTPArgs(topic_cfg.dest.push_endpoint_args),
+ s->cct);
+ const std::string push_endpoint_str = push_endpoint->to_str();
+ ldout(s->cct, 20) << "push endpoint created: " << push_endpoint_str << dendl;
+ auto rc = push_endpoint->send_to_completion_async(s->cct, record, s->yield);
+ if (rc < 0) {
+ // bail out on first error
+ // TODO: add conf for bail out policy
+ ldout(s->cct, 1) << "push to endpoint " << push_endpoint_str << " failed, with error: " << rc << dendl;
+ if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_failed);
+ return rc;
+ }
+ if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_ok);
+ ldout(s->cct, 20) << "successfull push to endpoint " << push_endpoint_str << dendl;
+ event_handled = true;
+ } catch (const RGWPubSubEndpoint::configuration_error& e) {
+ ldout(s->cct, 1) << "ERROR: failed to create push endpoint: "
+ << topic_cfg.dest.push_endpoint << " due to: " << e.what() << dendl;
+ if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_failed);
+ return -EINVAL;
+ }
+ }
+
+ if (event_should_be_handled) {
+ // not counting events with no notifications or events that are filtered
+ // counting a single event, regardless of the number of notifications it sends
+ if (perfcounter) perfcounter->inc(l_rgw_pubsub_event_triggered);
+ if (!event_handled) {
+ // all notifications for this event failed
+ if (perfcounter) perfcounter->inc(l_rgw_pubsub_event_lost);
+ }
+ }
+
+ return 0;
+}
+
+}
+
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include <string>
+#include "common/ceph_time.h"
+#include "rgw_notify_event_type.h"
+
+// forward declarations
+class CephContext;
+namespace rgw::sal {
+ class RGWRadosStore;
+}
+class RGWRados;
+class req_state;
+
+namespace rgw::notify {
+
+// publish notification
+int publish(const req_state* s,
+ const ceph::real_time& mtime,
+ const std::string& etag,
+ EventType event_type,
+ rgw::sal::RGWRadosStore* store);
+
+}
+
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "rgw_notify_event_type.h"
+#include "include/str_list.h"
+
+namespace rgw::notify {
+
+ std::string to_string(EventType t) {
+ switch (t) {
+ case ObjectCreated:
+ return "s3:ObjectCreated:*";
+ case ObjectCreatedPut:
+ return "s3:ObjectCreated:Put";
+ case ObjectCreatedPost:
+ return "s3:ObjectCreated:Post";
+ case ObjectCreatedCopy:
+ return "s3:ObjectCreated:Copy";
+ case ObjectRemoved:
+ return "s3:ObjectRemoved:*";
+ case ObjectRemovedDelete:
+ return "s3:ObjectRemoved:Delete";
+ case ObjectRemovedDeleteMarkerCreated:
+ return "s3:ObjectRemoved:DeleteMarkerCreated";
+ case UnknownEvent:
+ return "s3:UnknownEvet";
+ }
+ return "s3:UnknownEvent";
+ }
+
+ std::string to_ceph_string(EventType t) {
+ switch (t) {
+ case ObjectCreated:
+ case ObjectCreatedPut:
+ case ObjectCreatedPost:
+ case ObjectCreatedCopy:
+ return "OBJECT_CREATE";
+ case ObjectRemoved:
+ case ObjectRemovedDelete:
+ return "OBJECT_DELETE";
+ case ObjectRemovedDeleteMarkerCreated:
+ return "DELETE_MARKER_CREATE";
+ case UnknownEvent:
+ return "UNKNOWN_EVENT";
+ }
+ return "UNKNOWN_EVENT";
+ }
+
+ EventType from_string(const std::string& s) {
+ if (s == "s3:ObjectCreated:*" || s == "OBJECT_CREATE")
+ return ObjectCreated;
+ if (s == "s3:ObjectCreated:Put")
+ return ObjectCreatedPut;
+ if (s == "s3:ObjectCreated:Post")
+ return ObjectCreatedPost;
+ if (s == "s3:ObjectCreated:Copy")
+ return ObjectCreatedCopy;
+ if (s == "s3:ObjectRemoved:*" || s == "OBJECT_DELETE")
+ return ObjectRemoved;
+ if (s == "s3:ObjectRemoved:Delete")
+ return ObjectRemovedDelete;
+ if (s == "s3:ObjectRemoved:DeleteMarkerCreated" || s == "DELETE_MARKER_CREATE")
+ return ObjectRemovedDeleteMarkerCreated;
+ return UnknownEvent;
+ }
+
+bool operator==(EventType lhs, EventType rhs) {
+ return lhs & rhs;
+}
+
+void from_string_list(const std::string& string_list, EventTypeList& event_list) {
+ event_list.clear();
+ ceph::for_each_substr(string_list, ",", [&event_list] (auto token) {
+ event_list.push_back(rgw::notify::from_string(std::string(token.begin(), token.end())));
+ });
+}
+}
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+#include <string>
+#include <vector>
+
+namespace rgw::notify {
+ enum EventType {
+ ObjectCreated = 0xF,
+ ObjectCreatedPut = 0x1,
+ ObjectCreatedPost = 0x2,
+ ObjectCreatedCopy = 0x4,
+ ObjectRemoved = 0xF0,
+ ObjectRemovedDelete = 0x10,
+ ObjectRemovedDeleteMarkerCreated = 0x20,
+ UnknownEvent = 0x100
+ };
+
+ using EventTypeList = std::vector<EventType>;
+
+ // two event types are considered equal if their bits intersect
+ bool operator==(EventType lhs, EventType rhs);
+
+ std::string to_string(EventType t);
+
+ std::string to_ceph_string(EventType t);
+
+ EventType from_string(const std::string& s);
+
+ // create a vector of event types from comma separated list of event types
+ void from_string_list(const std::string& string_list, EventTypeList& event_list);
+}
+
#include "rgw_putobj_processor.h"
#include "rgw_crypt.h"
#include "rgw_perf_counters.h"
+#include "rgw_notify.h"
+#include "rgw_notify_event_type.h"
#include "services/svc_zone.h"
#include "services/svc_quota.h"
return;
}
}
+
+ // send request to notification manager
+ const auto ret = rgw::notify::publish(s, mtime, etag, rgw::notify::ObjectCreatedPut, store);
+ if (ret < 0) {
+ ldpp_dout(this, 5) << "WARNING: publishing notification failed, with error: " << ret << dendl;
+ // TODO: we should have conf to make send a blocking coroutine and reply with error in case sending failed
+ // this should be global conf (probably returnign a different handler)
+ // so we don't need to read the configured values before we perform it
+ }
}
int RGWPostObj::verify_permission()
return;
}
} while (is_next_file_to_upload());
+ // send request to notification manager
+ const auto ret = rgw::notify::publish(s, real_time(), etag, rgw::notify::ObjectCreatedPost, store);
+ if (ret < 0) {
+ ldpp_dout(this, 5) << "WARNING: publishing notification failed, with error: " << ret << dendl;
+ // TODO: we should have conf to make send a blocking coroutine and reply with error in case sending failed
+ // this should be global conf (probably returnign a different handler)
+ // so we don't need to read the configured values before we perform it
+ }
}
} else {
op_ret = -EINVAL;
}
+ // TODO: add etag calculation
+ std::string etag;
+ const auto ret = rgw::notify::publish(s, real_time(), etag, rgw::notify::ObjectRemovedDelete, store);
+ if (ret < 0) {
+ ldpp_dout(this, 5) << "WARNING: publishing notification failed, with error: " << ret << dendl;
+ // TODO: we should have conf to make send a blocking coroutine and reply with error in case sending failed
+ // this should be global conf (probably returnign a different handler)
+ // so we don't need to read the configured values before we perform it
+ }
}
bool RGWCopyObj::parse_copy_location(const boost::string_view& url_src,
copy_obj_progress_cb, (void *)this,
this,
s->yield);
+
+ // TODO: use s3:ObjectCreated:Copy
+ const auto ret = rgw::notify::publish(s, mtime, etag, rgw::notify::ObjectCreatedCopy, store);
+ if (ret < 0) {
+ ldpp_dout(this, 5) << "WARNING: publishing notification failed, with error: " << ret << dendl;
+ // TODO: we should have conf to make send a blocking coroutine and reply with error in case sending failed
+ // this should be global conf (probably returnign a different handler)
+ // so we don't need to read the configured values before we perform it
+ }
}
int RGWGetACLs::verify_permission()
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab ft=cpp
+#include "services/svc_zone.h"
#include "rgw_b64.h"
#include "rgw_sal.h"
#include "rgw_pubsub.h"
#define dout_subsys ceph_subsys_rgw
+void do_decode_xml_obj(rgw::notify::EventTypeList& l, const string& name, XMLObj *obj)
+{
+ l.clear();
+
+ XMLObjIter iter = obj->find(name);
+ XMLObj *o;
+
+ while ((o = iter.get_next())) {
+ std::string val;
+ decode_xml_obj(val, o);
+ l.push_back(rgw::notify::from_string(val));
+ }
+}
+
bool rgw_pubsub_s3_notification::decode_xml(XMLObj *obj) {
const auto throw_if_missing = true;
RGWXMLDecoder::decode_xml("Id", id, obj, throw_if_missing);
do_decode_xml_obj(events, "Event", obj);
if (events.empty()) {
// if no events are provided, we assume all events
- events.push_back("s3:ObjectCreated:*");
- events.push_back("s3:ObjectRemoved:*");
+ events.push_back(rgw::notify::ObjectCreated);
+ events.push_back(rgw::notify::ObjectRemoved);
}
return true;
}
::encode_xml("Id", id, f);
::encode_xml("Topic", topic_arn.c_str(), f);
for (const auto& event : events) {
- ::encode_xml("Event", event, f);
+ ::encode_xml("Event", rgw::notify::to_string(event), f);
}
}
encode_json("awsRegion", awsRegion, f);
utime_t ut(eventTime);
encode_json("eventTime", ut, f);
- if (eventName == "OBJECT_CREATE") {
- encode_json("eventName", "ObjectCreated", f);
- }
- else if (eventName == "OBJECT_DELETE") {
- encode_json("eventName", "ObjectRemoved", f);
- } else {
- encode_json("eventName", "UNKNOWN_EVENT", f);
- }
+ encode_json("eventName", eventName, f);
{
Formatter::ObjectSection s(*f, "userIdentity");
encode_json("principalId", userIdentity, f);
encode_json("principalId", bucket_ownerIdentity, f);
}
encode_json("arn", bucket_arn, f);
+ encode_json("id", bucket_id, f);
}
{
Formatter::ObjectSection sub_s(*f, "object");
encode_json("etag", object_etag, f);
encode_json("versionId", object_versionId, f);
encode_json("sequencer", object_sequencer, f);
+ encode_json("metadata", x_meta_map, f);
}
}
encode_json("eventId", id, f);
void rgw_pubsub_event::dump(Formatter *f) const
{
encode_json("id", id, f);
- encode_json("event", event, f);
+ encode_json("event", event_name, f);
utime_t ut(timestamp);
encode_json("timestamp", ut, f);
encode_json("info", info, f);
encode_xml("TopicArn", arn, f);
}
+void encode_json(const char *name, const rgw::notify::EventTypeList& l, Formatter *f)
+{
+ f->open_array_section(name);
+ for (auto iter = l.cbegin(); iter != l.cend(); ++iter) {
+ f->dump_string("obj", rgw::notify::to_ceph_string(*iter));
+ }
+ f->close_section();
+}
+
void rgw_pubsub_topic_filter::dump(Formatter *f) const
{
encode_json("topic", topic, f);
encode_json("oid_prefix", oid_prefix, f);
encode_json("push_endpoint", push_endpoint, f);
encode_json("push_endpoint_args", push_endpoint_args, f);
- encode_json("arn_topic", arn_topic, f);
+ encode_json("push_endpoint_topic", arn_topic, f);
}
void rgw_pubsub_sub_dest::dump_xml(Formatter *f) const
{
encode_xml("EndpointAddress", push_endpoint, f);
encode_xml("EndpointArgs", push_endpoint_args, f);
- encode_xml("TopicArn", arn_topic, f);
+ encode_xml("EndpointTopic", arn_topic, f);
}
void rgw_pubsub_sub_config::dump(Formatter *f) const
encode_json("s3_id", s3_id, f);
}
-RGWUserPubSub::RGWUserPubSub(rgw::sal::RGWRadosStore *_store, const rgw_user& _user) : store(_store),
- user(_user),
- obj_ctx(store->svc()->sysobj->init_obj_ctx())
-{
- get_user_meta_obj(&user_meta_obj);
-}
-
-void RGWUserPubSub::get_user_meta_obj(rgw_raw_obj *obj) const {
- *obj = rgw_raw_obj(store->svc()->zone->get_zone_params().log_pool, user_meta_oid());
-}
-
-void RGWUserPubSub::get_bucket_meta_obj(const rgw_bucket& bucket, rgw_raw_obj *obj) const {
- *obj = rgw_raw_obj(store->svc()->zone->get_zone_params().log_pool, bucket_meta_oid(bucket));
-}
-
-void RGWUserPubSub::get_sub_meta_obj(const string& name, rgw_raw_obj *obj) const {
- *obj = rgw_raw_obj(store->svc()->zone->get_zone_params().log_pool, sub_meta_oid(name));
+RGWUserPubSub::RGWUserPubSub(rgw::sal::RGWRadosStore* _store, const rgw_user& _user) :
+ store(_store),
+ user(_user),
+ obj_ctx(store->svc()->sysobj->init_obj_ctx()) {
+ get_user_meta_obj(&user_meta_obj);
}
int RGWUserPubSub::remove(const rgw_raw_obj& obj, RGWObjVersionTracker *objv_tracker)
int RGWUserPubSub::read_user_topics(rgw_pubsub_user_topics *result, RGWObjVersionTracker *objv_tracker)
{
int ret = read(user_meta_obj, result, objv_tracker);
- if (ret < 0 && ret != -ENOENT) {
- ldout(store->ctx(), 1) << "ERROR: failed to read topics info: ret=" << ret << dendl;
+ if (ret < 0) {
+ ldout(store->ctx(), 10) << "WARNING: failed to read topics info: ret=" << ret << dendl;
return ret;
}
return 0;
return 0;
}
-int RGWUserPubSub::Bucket::create_notification(const string& topic_name, const EventTypeList& events)
+int RGWUserPubSub::get_topic(const string& name, rgw_pubsub_topic *result)
+{
+ rgw_pubsub_user_topics topics;
+ int ret = get_user_topics(&topics);
+ if (ret < 0) {
+ ldout(store->ctx(), 1) << "ERROR: failed to read topics info: ret=" << ret << dendl;
+ return ret;
+ }
+
+ auto iter = topics.topics.find(name);
+ if (iter == topics.topics.end()) {
+ ldout(store->ctx(), 1) << "ERROR: topic not found" << dendl;
+ return -ENOENT;
+ }
+
+ *result = iter->second.topic;
+ return 0;
+}
+
+int RGWUserPubSub::Bucket::create_notification(const string& topic_name, const rgw::notify::EventTypeList& events, const std::string& notif_name)
{
rgw_pubsub_topic_subs user_topic_info;
rgw::sal::RGWRadosStore *store = ps->store;
auto& topic_filter = bucket_topics.topics[topic_name];
topic_filter.topic = user_topic_info.topic;
topic_filter.events = events;
+ topic_filter.s3_id = notif_name;
ret = write_topics(bucket_topics, &objv_tracker);
if (ret < 0) {
int ret = read_user_topics(&topics, &objv_tracker);
if (ret < 0 && ret != -ENOENT) {
+ // its not an error if not topics exist, we create one
ldout(store->ctx(), 1) << "ERROR: failed to read topics info: ret=" << ret << dendl;
return ret;
}
-
+
rgw_pubsub_topic_subs& new_topic = topics.topics[name];
new_topic.topic.user = user;
new_topic.topic.name = name;
if (ret < 0 && ret != -ENOENT) {
ldout(store->ctx(), 1) << "ERROR: failed to read topics info: ret=" << ret << dendl;
return ret;
+ } else if (ret == -ENOENT) {
+ // its not an error if no topics exist, just a no-op
+ ldout(store->ctx(), 10) << "WARNING: failed to read topics info, deletion is a no-op: ret=" << ret << dendl;
+ return 0;
}
topics.topics.erase(name);
int ret = ps->read_user_topics(&topics, &user_objv_tracker);
if (ret < 0) {
ldout(store->ctx(), 1) << "ERROR: failed to read topics info: ret=" << ret << dendl;
- return ret;
+ return ret != -ENOENT ? ret : -EINVAL;
}
auto iter = topics.topics.find(topic);
if (iter == topics.topics.end()) {
ldout(store->ctx(), 1) << "ERROR: cannot add subscription to topic: topic not found" << dendl;
- return -ENOENT;
+ return -EINVAL;
}
auto& t = iter->second;
int ret = ps->read_user_topics(&topics, &objv_tracker);
if (ret < 0) {
- ldout(store->ctx(), 1) << "ERROR: failed to read topics info: ret=" << ret << dendl;
- }
-
- if (ret >= 0) {
+ // not an error - could be that topic was already deleted
+ ldout(store->ctx(), 10) << "WARNING: failed to read topics info: ret=" << ret << dendl;
+ } else {
auto iter = topics.topics.find(topic);
if (iter != topics.topics.end()) {
auto& t = iter->second;
return 0;
}
+void RGWUserPubSub::get_user_meta_obj(rgw_raw_obj *obj) const {
+ *obj = rgw_raw_obj(store->svc()->zone->get_zone_params().log_pool, user_meta_oid());
+}
+
+void RGWUserPubSub::get_bucket_meta_obj(const rgw_bucket& bucket, rgw_raw_obj *obj) const {
+ *obj = rgw_raw_obj(store->svc()->zone->get_zone_params().log_pool, bucket_meta_oid(bucket));
+}
+
+void RGWUserPubSub::get_sub_meta_obj(const string& name, rgw_raw_obj *obj) const {
+ *obj = rgw_raw_obj(store->svc()->zone->get_zone_params().log_pool, sub_meta_oid(name));
+}
+
template<typename EventType>
void RGWUserPubSub::SubWithEvents<EventType>::dump(Formatter* f) const {
list.dump(f);
#ifndef CEPH_RGW_PUBSUB_H
#define CEPH_RGW_PUBSUB_H
+#include "rgw_sal.h"
+#include "services/svc_sys_obj.h"
#include "rgw_tools.h"
#include "rgw_zone.h"
#include "rgw_rados.h"
-#include "services/svc_sys_obj.h"
-#include "services/svc_zone.h"
+#include "rgw_notify_event_type.h"
class XMLObj;
// notification id
std::string id;
// types of events
- std::list<std::string> events;
+ rgw::notify::EventTypeList events;
// topic ARN
std::string topic_arn;
"principalId":""
},
"arn":""
+ "id": ""
},
"object":{
"key":"",
"size": ,
"eTag":"",
"versionId":"",
- "sequencer": ""
+ "sequencer": "",
+ "metadata": ""
}
},
"eventId":"",
// used to store a globally unique identifier of the event
// that could be used for acking
std::string id;
+ // this is an rgw extension holding the internal bucket id
+ std::string bucket_id;
+ // meta data
+ std::map<std::string, std::string> x_meta_map;
void encode(bufferlist& bl) const {
- ENCODE_START(1, 1, bl);
+ ENCODE_START(2, 1, bl);
encode(eventVersion, bl);
encode(eventSource, bl);
encode(awsRegion, bl);
encode(object_versionId, bl);
encode(object_sequencer, bl);
encode(id, bl);
+ encode(bucket_id, bl);
+ encode(x_meta_map, bl);
ENCODE_FINISH(bl);
}
void decode(bufferlist::const_iterator& bl) {
- DECODE_START(1, bl);
+ DECODE_START(2, bl);
decode(eventVersion, bl);
decode(eventSource, bl);
decode(awsRegion, bl);
decode(object_versionId, bl);
decode(object_sequencer, bl);
decode(id, bl);
+ if (struct_v >= 2) {
+ decode(bucket_id, bl);
+ decode(x_meta_map, bl);
+ }
DECODE_FINISH(bl);
}
struct rgw_pubsub_event {
constexpr static const char* const json_type_single = "event";
constexpr static const char* const json_type_plural = "events";
- string id;
- string event;
- string source;
+ std::string id;
+ std::string event_name;
+ std::string source;
ceph::real_time timestamp;
JSONFormattable info;
void encode(bufferlist& bl) const {
ENCODE_START(1, 1, bl);
encode(id, bl);
- encode(event, bl);
+ encode(event_name, bl);
encode(source, bl);
encode(timestamp, bl);
encode(info, bl);
void decode(bufferlist::const_iterator& bl) {
DECODE_START(1, bl);
decode(id, bl);
- decode(event, bl);
+ decode(event_name, bl);
decode(source, bl);
decode(timestamp, bl);
decode(info, bl);
};
WRITE_CLASS_ENCODER(rgw_pubsub_sub_dest)
-
struct rgw_pubsub_sub_config {
rgw_user user;
std::string name;
};
WRITE_CLASS_ENCODER(rgw_pubsub_topic_subs)
-typedef std::set<std::string, ltstr_nocase> EventTypeList;
-
struct rgw_pubsub_topic_filter {
rgw_pubsub_topic topic;
- EventTypeList events;
+ rgw::notify::EventTypeList events;
+ std::string s3_id;
void encode(bufferlist& bl) const {
- ENCODE_START(1, 1, bl);
+ ENCODE_START(2, 1, bl);
encode(topic, bl);
- encode(events, bl);
+ // events are stored as a vector of strings
+ std::vector<std::string> tmp_events;
+ const auto converter = s3_id.empty() ? rgw::notify::to_ceph_string : rgw::notify::to_string;
+ std::transform(events.begin(), events.end(), std::back_inserter(tmp_events), converter);
+ encode(tmp_events, bl);
+ encode(s3_id, bl);
ENCODE_FINISH(bl);
}
void decode(bufferlist::const_iterator& bl) {
- DECODE_START(1, bl);
+ DECODE_START(2, bl);
decode(topic, bl);
- decode(events, bl);
+ // events are stored as a vector of strings
+ events.clear();
+ std::vector<std::string> tmp_events;
+ decode(tmp_events, bl);
+ std::transform(tmp_events.begin(), tmp_events.end(), std::back_inserter(events), rgw::notify::from_string);
+ if (struct_v >= 2) {
+ decode(s3_id, bl);
+ }
DECODE_FINISH(bl);
}
WRITE_CLASS_ENCODER(rgw_pubsub_topic_filter)
struct rgw_pubsub_bucket_topics {
- std::map<string, rgw_pubsub_topic_filter> topics;
+ std::map<std::string, rgw_pubsub_topic_filter> topics;
void encode(bufferlist& bl) const {
ENCODE_START(1, 1, bl);
rgw_raw_obj user_meta_obj;
- string user_meta_oid() const {
+ std::string user_meta_oid() const {
return pubsub_user_oid_prefix + user.to_str();
}
- string bucket_meta_oid(const rgw_bucket& bucket) const {
+ std::string bucket_meta_oid(const rgw_bucket& bucket) const {
return pubsub_user_oid_prefix + user.to_str() + ".bucket." + bucket.name + "/" + bucket.bucket_id;
}
- string sub_meta_oid(const string& name) const {
+ std::string sub_meta_oid(const string& name) const {
return pubsub_user_oid_prefix + user.to_str() + ".sub." + name;
}
// return 0 on success or if no topic was associated with the bucket, error code otherwise
int get_topics(rgw_pubsub_bucket_topics *result);
// adds a topic + filter (event list) to a bucket
+ // assigning a notification name is optional (needed for S3 compatible notifications)
// if the topic already exist on the bucket, the filter event list may be updated
// return -ENOENT if the topic does not exists
// return 0 on success, error code otherwise
- int create_notification(const string& topic_name, const EventTypeList& events);
+ int create_notification(const string& topic_name, const rgw::notify::EventTypeList& events, const std::string& notif_name="");
// remove a topic and filter from bucket
// if the topic does not exists on the bucket it is a no-op (considered success)
// return -ENOENT if the topic does not exists
int write_sub(const rgw_pubsub_sub_config& sub_conf, RGWObjVersionTracker *objv_tracker);
int remove_sub(RGWObjVersionTracker *objv_tracker);
public:
- Sub(RGWUserPubSub *_ps, const string& _sub) : ps(_ps), sub(_sub) {
+ Sub(RGWUserPubSub *_ps, const std::string& _sub) : ps(_ps), sub(_sub) {
ps->get_sub_meta_obj(sub, &sub_meta_obj);
}
void get_user_meta_obj(rgw_raw_obj *obj) const;
void get_bucket_meta_obj(const rgw_bucket& bucket, rgw_raw_obj *obj) const;
+
void get_sub_meta_obj(const string& name, rgw_raw_obj *obj) const;
// get all topics defined for the user and populate them into "result"
// return 0 on success or if no topics exist, error code otherwise
int get_user_topics(rgw_pubsub_user_topics *result);
- // get a topic by its name and populate it into "result"
+ // get a topic with its subscriptions by its name and populate it into "result"
// return -ENOENT if the topic does not exists
// return 0 on success, error code otherwise
int get_topic(const string& name, rgw_pubsub_topic_subs *result);
+ // get a topic with by its name and populate it into "result"
+ // return -ENOENT if the topic does not exists
+ // return 0 on success, error code otherwise
+ int get_topic(const string& name, rgw_pubsub_topic *result);
// create a topic with a name only
// if the topic already exists it is a no-op (considered success)
// return 0 on success, error code otherwise
#include <algorithm>
#include "include/buffer_fwd.h"
#include "common/Formatter.h"
+#include "common/async/completion.h"
#include "rgw_common.h"
#include "rgw_data_sync.h"
#include "rgw_pubsub.h"
} else {
ack_level = std::atoi(str_ack_level.c_str());
if (ack_level < 100 || ack_level >= 600) {
- throw configuration_error("HTTP/S: invalid http-ack-level " + str_ack_level);
+ throw configuration_error("HTTP/S: invalid http-ack-level: " + str_ack_level);
}
}
return new PostCR(json_format_pubsub_event(record), env, endpoint, ack_level, verify_ssl);
}
+ int send_to_completion_async(CephContext* cct, const rgw_pubsub_s3_record& record, optional_yield y) override {
+ bufferlist read_bl;
+ RGWPostHTTPData request(cct, "POST", endpoint, &read_bl, verify_ssl);
+ const auto post_data = json_format_pubsub_event(record);
+ request.set_post_data(post_data);
+ request.set_send_length(post_data.length());
+ if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_pending);
+ const auto rc = RGWHTTP::process(&request, y);
+ if (perfcounter) perfcounter->dec(l_rgw_pubsub_push_pending);
+ // TODO: use read_bl to process return code and handle according to ack level
+ return rc;
+ }
+
std::string to_str() const override {
std::string str("HTTP/S Endpoint");
str += "\nURI: " + endpoint;
ACK_LEVEL_BROKER,
ACK_LEVEL_ROUTEABLE
};
+ CephContext* const cct;
const std::string endpoint;
const std::string topic;
amqp::connection_ptr_t conn;
// This coroutine ends when it send the message and does not wait for an ack
class NoAckPublishCR : public RGWCoroutine {
private:
- RGWDataSyncEnv* const sync_env;
const std::string topic;
amqp::connection_ptr_t conn;
const std::string message;
public:
- NoAckPublishCR(RGWDataSyncEnv* _sync_env,
+ NoAckPublishCR(CephContext* cct,
const std::string& _topic,
amqp::connection_ptr_t& _conn,
const std::string& _message) :
- RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
+ RGWCoroutine(cct),
topic(_topic), conn(_conn), message(_message) {}
// send message to endpoint, without waiting for reply
// note that it does not wait for an ack fron the end client
class AckPublishCR : public RGWCoroutine, public RGWIOProvider {
private:
- RGWDataSyncEnv* const sync_env;
const std::string topic;
amqp::connection_ptr_t conn;
const std::string message;
const ack_level_t ack_level; // TODO not used for now
public:
- AckPublishCR(RGWDataSyncEnv* _sync_env,
+ AckPublishCR(CephContext* cct,
const std::string& _topic,
amqp::connection_ptr_t& _conn,
const std::string& _message,
ack_level_t _ack_level) :
- RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
+ RGWCoroutine(cct),
topic(_topic), conn(_conn), message(_message), ack_level(_ack_level) {}
// send message to endpoint, waiting for reply
RGWPubSubAMQPEndpoint(const std::string& _endpoint,
const std::string& _topic,
const RGWHTTPArgs& args,
- CephContext* cct) :
+ CephContext* _cct) :
+ cct(_cct),
endpoint(_endpoint),
topic(_topic),
conn(amqp::connect(endpoint, get_exchange(args))) {
+ if (!conn) {
+ throw configuration_error("AMQP: failed to create connection to: " + endpoint);
+ }
bool exists;
// get ack level
str_ack_level = args.get("amqp-ack-level", &exists);
} else if (str_ack_level == "routable") {
ack_level = ACK_LEVEL_ROUTEABLE;
} else {
- throw configuration_error("HTTP: invalid amqp-ack-level " + str_ack_level);
+ throw configuration_error("AMQP: invalid amqp-ack-level: " + str_ack_level);
}
}
RGWCoroutine* send_to_completion_async(const rgw_pubsub_event& event, RGWDataSyncEnv* env) override {
+ ceph_assert(conn);
if (ack_level == ACK_LEVEL_NONE) {
- return new NoAckPublishCR(env, topic, conn, json_format_pubsub_event(event));
+ return new NoAckPublishCR(cct, topic, conn, json_format_pubsub_event(event));
} else {
// TODO: currently broker and routable are the same - this will require different flags
// but the same mechanism
- return new AckPublishCR(env, topic, conn, json_format_pubsub_event(event), ack_level);
+ return new AckPublishCR(cct, topic, conn, json_format_pubsub_event(event), ack_level);
}
}
RGWCoroutine* send_to_completion_async(const rgw_pubsub_s3_record& record, RGWDataSyncEnv* env) override {
+ ceph_assert(conn);
if (ack_level == ACK_LEVEL_NONE) {
- return new NoAckPublishCR(env, topic, conn, json_format_pubsub_event(record));
+ return new NoAckPublishCR(cct, topic, conn, json_format_pubsub_event(record));
} else {
// TODO: currently broker and routable are the same - this will require different flags
// but the same mechanism
- return new AckPublishCR(env, topic, conn, json_format_pubsub_event(record), ack_level);
+ return new AckPublishCR(cct, topic, conn, json_format_pubsub_event(record), ack_level);
+ }
+ }
+
+ // this allows waiting untill "finish()" is called from a different thread
+ // waiting could be blocking the waiting thread or yielding, depending
+ // with compilation flag support and whether the optional_yield is set
+ class Waiter {
+ using Signature = void(boost::system::error_code);
+ using Completion = ceph::async::Completion<Signature>;
+ std::unique_ptr<Completion> completion = nullptr;
+ int ret;
+
+ mutable std::atomic<bool> done = false;
+ mutable std::mutex lock;
+ mutable std::condition_variable cond;
+
+ template <typename ExecutionContext, typename CompletionToken>
+ auto async_wait(ExecutionContext& ctx, CompletionToken&& token) {
+ boost::asio::async_completion<CompletionToken, Signature> init(token);
+ auto& handler = init.completion_handler;
+ {
+ std::unique_lock l{lock};
+ completion = Completion::create(ctx.get_executor(), std::move(handler));
+ }
+ return init.result.get();
+ }
+
+ public:
+ int wait(optional_yield y) {
+ if (done) {
+ return ret;
+ }
+#ifdef HAVE_BOOST_CONTEXT
+ if (y) {
+ auto& io_ctx = y.get_io_context();
+ auto& yield_ctx = y.get_yield_context();
+ boost::system::error_code ec;
+ async_wait(io_ctx, yield_ctx[ec]);
+ return -ec.value();
+ }
+#endif
+ std::unique_lock l(lock);
+ cond.wait(l, [this]{return (done==true);});
+ return ret;
+ }
+
+ void finish(int r) {
+ std::unique_lock l{lock};
+ ret = r;
+ done = true;
+ if (completion) {
+ boost::system::error_code ec(-ret, boost::system::system_category());
+ Completion::post(std::move(completion), ec);
+ } else {
+ cond.notify_all();
+ }
+ }
+ };
+
+ int send_to_completion_async(CephContext* cct, const rgw_pubsub_s3_record& record, optional_yield y) override {
+ ceph_assert(conn);
+ if (ack_level == ACK_LEVEL_NONE) {
+ return amqp::publish(conn, topic, json_format_pubsub_event(record));
+ } else {
+ // TODO: currently broker and routable are the same - this will require different flags but the same mechanism
+ // note: dynamic allocation of Waiter is needed when this is invoked from a beast coroutine
+ auto w = std::unique_ptr<Waiter>(new Waiter);
+ const auto rc = amqp::publish_with_confirm(conn,
+ topic,
+ json_format_pubsub_event(record),
+ std::bind(&Waiter::finish, w.get(), std::placeholders::_1));
+ if (rc < 0) {
+ // failed to publish, does not wait for reply
+ return rc;
+ }
+ return w->wait(y);
}
}
if (version == AMQP_0_9_1) {
return Ptr(new RGWPubSubAMQPEndpoint(endpoint, topic, args, cct));
} else if (version == AMQP_1_0) {
- throw configuration_error("amqp v1.0 not supported");
+ throw configuration_error("AMQP: v1.0 not supported");
return nullptr;
} else {
- throw configuration_error("unknown amqp version " + version);
+ throw configuration_error("AMQP: unknown version: " + version);
return nullptr;
}
} else if (schema == "amqps") {
- throw configuration_error("amqps not supported");
+ throw configuration_error("AMQP: ssl not supported");
return nullptr;
#endif
}
#include <memory>
#include <stdexcept>
#include "include/buffer_fwd.h"
+#include "common/async/yield_context.h"
// TODO the env should be used as a template parameter to differentiate the source that triggers the pushes
class RGWDataSyncEnv;
static Ptr create(const std::string& endpoint, const std::string& topic, const RGWHTTPArgs& args, CephContext *cct=nullptr);
// this method is used in order to send notification (Ceph specific) and wait for completion
- // in async manner via a coroutine
+ // in async manner via a coroutine when invoked in the data sync environment
virtual RGWCoroutine* send_to_completion_async(const rgw_pubsub_event& event, RGWDataSyncEnv* env) = 0;
// this method is used in order to send notification (S3 compliant) and wait for completion
- // in async manner via a coroutine
+ // in async manner via a coroutine when invoked in the data sync environment
virtual RGWCoroutine* send_to_completion_async(const rgw_pubsub_s3_record& record, RGWDataSyncEnv* env) = 0;
+ // this method is used in order to send notification (S3 compliant) and wait for completion
+ // in async manner via a coroutine when invoked in the frontend environment
+ virtual int send_to_completion_async(CephContext* cct, const rgw_pubsub_s3_record& record, optional_yield y) = 0;
+
// present as string
virtual std::string to_str() const { return ""; }
#include <algorithm>
#include <boost/tokenizer.hpp>
+#include <optional>
#include "rgw_rest_pubsub_common.h"
#include "rgw_rest_pubsub.h"
#include "rgw_pubsub_push.h"
#include "rgw_rest_s3.h"
#include "rgw_arn.h"
#include "rgw_auth_s3.h"
+#include "services/svc_zone.h"
#define dout_context g_ceph_context
#define dout_subsys ceph_subsys_rgw
return RGW_Auth_S3::authorize(dpp, store, auth_registry, s);
}
-namespace {
-// conversion functions between S3 and GCP style event names
-std::string s3_to_gcp_event(const std::string& event) {
- if (event == "s3:ObjectCreated:*") {
- return "OBJECT_CREATE";
- }
- if (event == "s3:ObjectRemoved:*") {
- return "OBJECT_DELETE";
- }
- return "UNKNOWN_EVENT";
-}
-std::string gcp_to_s3_event(const std::string& event) {
- if (event == "OBJECT_CREATE") {
- return "s3:ObjectCreated:";
- }
- if (event == "OBJECT_DELETE") {
- return "s3:ObjectRemoved:";
- }
- return "UNKNOWN_EVENT";
-}
+namespace {
// return a unique topic by prefexing with the notification name: <notification>_<topic>
std::string topic_to_unique(const std::string& topic, const std::string& notification) {
return notification + "_" + topic;
}
return unique_topic.substr(notification.length() + 1);
}
+
+// from list of bucket topics, find the one that was auto-generated by a notification
+auto find_unique_topic(const rgw_pubsub_bucket_topics& bucket_topics, const std::string& notif_name) {
+ auto it = std::find_if(bucket_topics.topics.begin(), bucket_topics.topics.end(), [&](const auto& val) { return notif_name == val.second.s3_id; });
+ return it != bucket_topics.topics.end() ?
+ std::optional<std::reference_wrapper<const rgw_pubsub_topic_filter>>(it->second):
+ std::nullopt;
+}
}
// command (S3 compliant): PUT /<bucket name>?notification
ceph_assert(b);
std::string data_bucket_prefix = "";
std::string data_oid_prefix = "";
+ bool push_only = true;
if (store->getRados()->get_sync_module()) {
const auto psmodule = dynamic_cast<RGWPSSyncModuleInstance*>(store->getRados()->get_sync_module().get());
if (psmodule) {
const auto& conf = psmodule->get_effective_conf();
data_bucket_prefix = conf["data_bucket_prefix"];
data_oid_prefix = conf["data_oid_prefix"];
+ // TODO: allow "push-only" on PS zone as well
+ push_only = false;
}
}
for (const auto& c : configurations.list) {
- const auto& sub_name = c.id;
- if (sub_name.empty()) {
+ const auto& notif_name = c.id;
+ if (notif_name.empty()) {
ldout(s->cct, 1) << "missing notification id" << dendl;
op_ret = -EINVAL;
return;
}
if (c.topic_arn.empty()) {
- ldout(s->cct, 1) << "missing topic ARN" << dendl;
+ ldout(s->cct, 1) << "missing topic ARN in notification: '" << notif_name << "'" << dendl;
op_ret = -EINVAL;
return;
}
const auto arn = rgw::ARN::parse(c.topic_arn);
if (!arn || arn->resource.empty()) {
- ldout(s->cct, 1) << "topic ARN has invalid format:" << c.topic_arn << dendl;
+ ldout(s->cct, 1) << "topic ARN has invalid format: '" << c.topic_arn << "' in notification: '" << notif_name << "'" << dendl;
+ op_ret = -EINVAL;
+ return;
+ }
+
+ if (std::find(c.events.begin(), c.events.end(), rgw::notify::UnknownEvent) != c.events.end()) {
+ ldout(s->cct, 1) << "unknown event type in notification: '" << notif_name << "'" << dendl;
op_ret = -EINVAL;
return;
}
const auto topic_name = arn->resource;
// get topic information. destination information is stored in the topic
- rgw_pubsub_topic_subs topic_info;
+ rgw_pubsub_topic topic_info;
op_ret = ups->get_topic(topic_name, &topic_info);
if (op_ret < 0) {
ldout(s->cct, 1) << "failed to get topic '" << topic_name << "', ret=" << op_ret << dendl;
// create unique topic name. this has 2 reasons:
// (1) topics cannot be shared between different S3 notifications because they hold the filter information
// (2) make topic clneaup easier, when notification is removed
- const auto unique_topic_name = topic_to_unique(topic_name, sub_name);
- // generate the internal topic, no need to store destination info in thr unique topic
+ const auto unique_topic_name = topic_to_unique(topic_name, notif_name);
+ // generate the internal topic. destination is stored here for the "push-only" case
+ // when no subscription exists
// ARN is cached to make the "GET" method faster
- op_ret = ups->create_topic(unique_topic_name, rgw_pubsub_sub_dest(), topic_info.topic.arn);
+ op_ret = ups->create_topic(unique_topic_name, topic_info.dest, topic_info.arn);
if (op_ret < 0) {
- ldout(s->cct, 1) << "failed to auto-generate topic '" << unique_topic_name <<
+ ldout(s->cct, 1) << "failed to auto-generate unique topic '" << unique_topic_name <<
"', ret=" << op_ret << dendl;
return;
}
+ ldout(s->cct, 20) << "successfully auto-generated unique topic '" << unique_topic_name << "'" << dendl;
// generate the notification
- EventTypeList events;
- std::transform(c.events.begin(), c.events.end(), std::inserter(events, events.begin()), s3_to_gcp_event);
- op_ret = b->create_notification(unique_topic_name, events);
+ rgw::notify::EventTypeList events;
+ op_ret = b->create_notification(unique_topic_name, c.events, notif_name);
if (op_ret < 0) {
- ldout(s->cct, 1) << "failed to auto-generate notification on topic '" << unique_topic_name <<
+ ldout(s->cct, 1) << "failed to auto-generate notification for unique topic '" << unique_topic_name <<
"', ret=" << op_ret << dendl;
// rollback generated topic (ignore return value)
ups->remove_topic(unique_topic_name);
return;
}
-
- // generate the subscription with destination information from the original topic
- rgw_pubsub_sub_dest dest = topic_info.topic.dest;
- dest.bucket_name = data_bucket_prefix + s->owner.get_id().to_str() + "-" + unique_topic_name;
- dest.oid_prefix = data_oid_prefix + sub_name + "/";
- auto sub = ups->get_sub(sub_name);
- op_ret = sub->subscribe(unique_topic_name, dest, sub_name);
- if (op_ret < 0) {
- ldout(s->cct, 1) << "failed to auto-generate subscription '" << sub_name << "', ret=" << op_ret << dendl;
- // rollback generated notification (ignore return value)
- b->remove_notification(unique_topic_name);
- // rollback generated topic (ignore return value)
- ups->remove_topic(unique_topic_name);
- return;
+ ldout(s->cct, 20) << "successfully auto-generated notification for unique topic'" << unique_topic_name << "'" << dendl;
+
+ if (!push_only) {
+ // generate the subscription with destination information from the original topic
+ rgw_pubsub_sub_dest dest = topic_info.dest;
+ dest.bucket_name = data_bucket_prefix + s->owner.get_id().to_str() + "-" + unique_topic_name;
+ dest.oid_prefix = data_oid_prefix + notif_name + "/";
+ auto sub = ups->get_sub(notif_name);
+ op_ret = sub->subscribe(unique_topic_name, dest, notif_name);
+ if (op_ret < 0) {
+ ldout(s->cct, 1) << "failed to auto-generate subscription '" << notif_name << "', ret=" << op_ret << dendl;
+ // rollback generated notification (ignore return value)
+ b->remove_notification(unique_topic_name);
+ // rollback generated topic (ignore return value)
+ ups->remove_topic(unique_topic_name);
+ return;
+ }
+ ldout(s->cct, 20) << "successfully auto-generated subscription '" << notif_name << "'" << dendl;
}
- ldout(s->cct, 20) << "successfully auto-generated subscription '" << sub_name << "'" << dendl;
}
}
// command (extension to S3): DELETE /bucket?notification[=<notification-id>]
class RGWPSDeleteNotif_ObjStore_S3 : public RGWPSDeleteNotifOp {
private:
- std::string sub_name;
+ std::string notif_name;
int get_params() override {
bool exists;
- sub_name = s->info.args.get("notification", &exists);
+ notif_name = s->info.args.get("notification", &exists);
if (!exists) {
ldout(s->cct, 1) << "missing required param 'notification'" << dendl;
return -EINVAL;
return 0;
}
- void delete_notification(const std::string& _sub_name, const RGWUserPubSub::BucketRef& b, bool must_delete) {
- auto sub = ups->get_sub(_sub_name);
- rgw_pubsub_sub_config sub_conf;
- op_ret = sub->get_conf(&sub_conf);
- if (op_ret < 0) {
- ldout(s->cct, 1) << "failed to get notification info, ret=" << op_ret << dendl;
- return;
- }
- if (sub_conf.s3_id.empty()) {
- if (must_delete) {
- op_ret = -ENOENT;
- ldout(s->cct, 1) << "notification does not have an ID, ret=" << op_ret << dendl;
- }
- return;
- }
- const auto& sub_topic_name = sub_conf.topic;
- op_ret = sub->unsubscribe(sub_topic_name);
+ void remove_notification_by_topic(const std::string& topic_name, const RGWUserPubSub::BucketRef& b) {
+ op_ret = b->remove_notification(topic_name);
if (op_ret < 0) {
- ldout(s->cct, 1) << "failed to remove auto-generated subscription, ret=" << op_ret << dendl;
- return;
- }
- op_ret = b->remove_notification(sub_topic_name);
- if (op_ret < 0) {
- ldout(s->cct, 1) << "failed to remove auto-generated notification, ret=" << op_ret << dendl;
+ ldout(s->cct, 1) << "failed to remove notification of topic '" << topic_name << "', ret=" << op_ret << dendl;
}
- op_ret = ups->remove_topic(sub_topic_name);
+ op_ret = ups->remove_topic(topic_name);
if (op_ret < 0) {
- ldout(s->cct, 1) << "failed to remove auto-generated topic, ret=" << op_ret << dendl;
+ ldout(s->cct, 1) << "failed to remove auto-generated topic '" << topic_name << "', ret=" << op_ret << dendl;
}
- return;
}
public:
auto b = ups->get_bucket(bucket_info.bucket);
ceph_assert(b);
- if (!sub_name.empty()) {
+ // get all topics on a bucket
+ rgw_pubsub_bucket_topics bucket_topics;
+ op_ret = b->get_topics(&bucket_topics);
+ if (op_ret < 0) {
+ ldout(s->cct, 1) << "failed to get list of topics from bucket '" << bucket_info.bucket.name << "', ret=" << op_ret << dendl;
+ return;
+ }
+
+ if (!notif_name.empty()) {
// delete a specific notification
- delete_notification(sub_name, b, true);
+ const auto unique_topic = find_unique_topic(bucket_topics, notif_name);
+ if (unique_topic) {
+ // remove the auto generated subscription according to notification name (if exist)
+ const auto unique_topic_name = unique_topic->get().topic.name;
+ auto sub = ups->get_sub(notif_name);
+ op_ret = sub->unsubscribe(unique_topic_name);
+ if (op_ret < 0 && op_ret != -ENOENT) {
+ ldout(s->cct, 1) << "failed to remove auto-generated subscription '" << notif_name << "', ret=" << op_ret << dendl;
+ return;
+ }
+ remove_notification_by_topic(unique_topic_name, b);
+ return;
+ }
+ // notification to be removed is not found - considered success
+ ldout(s->cct, 20) << "notification '" << notif_name << "' already removed" << dendl;
return;
}
- // delete all notifications on a bucket
- rgw_pubsub_bucket_topics bucket_topics;
- b->get_topics(&bucket_topics);
- // loop through all topics of the bucket
+ // delete all notification of on a bucket
for (const auto& topic : bucket_topics.topics) {
- // for each topic get all subscriptions
+ // remove the auto generated subscription of the topic (if exist)
rgw_pubsub_topic_subs topic_subs;
- ups->get_topic(topic.first, &topic_subs);
- // loop through all subscriptions
+ op_ret = ups->get_topic(topic.first, &topic_subs);
for (const auto& topic_sub_name : topic_subs.subs) {
- delete_notification(topic_sub_name, b, false);
+ auto sub = ups->get_sub(topic_sub_name);
+ rgw_pubsub_sub_config sub_conf;
+ op_ret = sub->get_conf(&sub_conf);
+ if (op_ret < 0) {
+ ldout(s->cct, 1) << "failed to get subscription '" << topic_sub_name << "' info, ret=" << op_ret << dendl;
+ return;
+ }
+ if (!sub_conf.s3_id.empty()) {
+ // S3 notification, has autogenerated subscription
+ const auto& sub_topic_name = sub_conf.topic;
+ op_ret = sub->unsubscribe(sub_topic_name);
+ if (op_ret < 0) {
+ ldout(s->cct, 1) << "failed to remove auto-generated subscription '" << topic_sub_name << "', ret=" << op_ret << dendl;
+ return;
+ }
+ }
}
+ remove_notification_by_topic(topic.first, b);
}
}
// command (S3 compliant): GET /bucket?notification[=<notification-id>]
class RGWPSListNotifs_ObjStore_S3 : public RGWPSListNotifsOp {
private:
- std::string sub_name;
+ std::string notif_name;
rgw_pubsub_s3_notifications notifications;
int get_params() override {
bool exists;
- sub_name = s->info.args.get("notification", &exists);
+ notif_name = s->info.args.get("notification", &exists);
if (!exists) {
ldout(s->cct, 1) << "missing required param 'notification'" << dendl;
return -EINVAL;
return 0;
}
- void add_notification_to_list(const rgw_pubsub_sub_config& sub_conf,
- const EventTypeList& events,
- const std::string& topic_arn);
+ void add_notification_to_list(const rgw_pubsub_topic_filter& topic_filter);
public:
void execute() override;
const char* name() const override { return "pubsub_notifications_get_s3"; }
};
-void RGWPSListNotifs_ObjStore_S3::add_notification_to_list(const rgw_pubsub_sub_config& sub_conf,
- const EventTypeList& events,
- const std::string& topic_arn) {
+void RGWPSListNotifs_ObjStore_S3::add_notification_to_list(const rgw_pubsub_topic_filter& topic_filter) {
rgw_pubsub_s3_notification notification;
- notification.id = sub_conf.s3_id;
- notification.topic_arn = topic_arn,
- std::transform(events.begin(), events.end(), std::back_inserter(notification.events), gcp_to_s3_event);
+ notification.id = topic_filter.s3_id;
+ notification.topic_arn = topic_filter.topic.arn;
+ notification.events = topic_filter.events;
notifications.list.push_back(notification);
}
ups.emplace(store, s->owner.get_id());
auto b = ups->get_bucket(bucket_info.bucket);
ceph_assert(b);
- if (!sub_name.empty()) {
+
+ // get all topics on a bucket
+ rgw_pubsub_bucket_topics bucket_topics;
+ op_ret = b->get_topics(&bucket_topics);
+ if (op_ret < 0) {
+ ldout(s->cct, 1) << "failed to get list of topics from bucket '" << bucket_info.bucket.name << "', ret=" << op_ret << dendl;
+ return;
+ }
+ if (!notif_name.empty()) {
// get info of a specific notification
- auto sub = ups->get_sub(sub_name);
- rgw_pubsub_sub_config sub_conf;
- op_ret = sub->get_conf(&sub_conf);
- if (op_ret < 0) {
- ldout(s->cct, 1) << "failed to get notification info, ret=" << op_ret << dendl;
- return;
- }
- if (sub_conf.s3_id.empty()) {
- op_ret = -ENOENT;
- ldout(s->cct, 1) << "notification does not have an ID, ret=" << op_ret << dendl;
+ const auto unique_topic = find_unique_topic(bucket_topics, notif_name);
+ if (unique_topic) {
+ add_notification_to_list(unique_topic->get());
return;
}
- rgw_pubsub_bucket_topics bucket_topics;
- op_ret = b->get_topics(&bucket_topics);
- if (op_ret < 0) {
- ldout(s->cct, 1) << "failed to get notification info, ret=" << op_ret << dendl;
- return;
- }
- const auto topic_it = bucket_topics.topics.find(sub_conf.topic);
- if (topic_it == bucket_topics.topics.end()) {
- op_ret = -ENOENT;
- ldout(s->cct, 1) << "notification does not have topic information, ret=" << op_ret << dendl;
- return;
- }
- add_notification_to_list(sub_conf, topic_it->second.events, topic_it->second.topic.arn);
+ op_ret = -ENOENT;
+ ldout(s->cct, 1) << "failed to get notification info for '" << notif_name << "', ret=" << op_ret << dendl;
return;
}
- // get info on all s3 notifications of the bucket
- rgw_pubsub_bucket_topics bucket_topics;
- b->get_topics(&bucket_topics);
// loop through all topics of the bucket
for (const auto& topic : bucket_topics.topics) {
- // for each topic get all subscriptions
- rgw_pubsub_topic_subs topic_subs;
- ups->get_topic(topic.first, &topic_subs);
- const auto& events = topic.second.events;
- const auto& topic_arn = topic.second.topic.arn;
- // loop through all subscriptions
- for (const auto& topic_sub_name : topic_subs.subs) {
- // get info of a specific notification
- auto sub = ups->get_sub(topic_sub_name);
- rgw_pubsub_sub_config sub_conf;
- op_ret = sub->get_conf(&sub_conf);
- if (op_ret < 0) {
- ldout(s->cct, 1) << "failed to get notification info, ret=" << op_ret << dendl;
- return;
- }
- if (sub_conf.s3_id.empty()) {
+ if (topic.second.s3_id.empty()) {
// not an s3 notification
continue;
- }
- add_notification_to_list(sub_conf, events, topic_arn);
}
+ add_notification_to_list(topic.second);
}
}
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab ft=cpp
+#include "services/svc_zone.h"
#include "rgw_common.h"
#include "rgw_coroutine.h"
#include "rgw_sync_module.h"
#include "rgw_op.h"
#include "rgw_pubsub.h"
#include "rgw_pubsub_push.h"
+#include "rgw_notify_event_type.h"
#include "rgw_perf_counters.h"
#ifdef WITH_RADOSGW_AMQP_ENDPOINT
#include "rgw_amqp.h"
}
};
-enum RGWPubSubEventType {
- UNKNOWN_EVENT = 0,
- OBJECT_CREATE = 1,
- OBJECT_DELETE = 2,
- DELETE_MARKER_CREATE = 3,
-};
-
-#define EVENT_NAME_OBJECT_CREATE "OBJECT_CREATE"
-#define EVENT_NAME_OBJECT_DELETE "OBJECT_DELETE"
-#define EVENT_NAME_OBJECT_DELETE_MARKER_CREATE "DELETE_MARKER_CREATE"
-#define EVENT_NAME_UNKNOWN "UNKNOWN_EVENT"
-
-static const char *get_event_name(const RGWPubSubEventType& val)
-{
- switch (val) {
- case OBJECT_CREATE:
- return EVENT_NAME_OBJECT_CREATE;
- case OBJECT_DELETE:
- return EVENT_NAME_OBJECT_DELETE;
- case DELETE_MARKER_CREATE:
- return EVENT_NAME_OBJECT_DELETE_MARKER_CREATE;
- default:
- return "EVENT_NAME_UNKNOWN";
- };
-}
-
using PSConfigRef = std::shared_ptr<PSConfig>;
template<typename EventType>
using EventRef = std::shared_ptr<EventType>;
const rgw_obj_key& key,
const ceph::real_time& mtime,
const std::vector<std::pair<std::string, std::string> > *attrs,
- const string& event_name,
+ rgw::notify::EventType event_type,
EventRef<rgw_pubsub_event> *event) {
*event = std::make_shared<rgw_pubsub_event>();
EventRef<rgw_pubsub_event>& e = *event;
- e->event = event_name;
+ e->event_name = rgw::notify::to_ceph_string(event_type);
e->source = bucket.name + "/" + key.name;
e->timestamp = real_clock::now();
const rgw_obj_key& key,
const ceph::real_time& mtime,
const std::vector<std::pair<std::string, std::string> > *attrs,
- const string& event_name,
+ rgw::notify::EventType event_type,
EventRef<rgw_pubsub_s3_record> *record) {
*record = std::make_shared<rgw_pubsub_s3_record>();
r->eventVersion = "2.1";
r->eventSource = "aws:s3";
r->eventTime = mtime;
- r->eventName = event_name;
- r->userIdentity = ""; // user that triggered the change: not supported yet
- r->sourceIPAddress = ""; // IP address of client that triggered the change: not supported yet
- r->x_amz_request_id = ""; // request ID of the original change: not supported yet
- r->x_amz_id_2 = ""; // RGW on which the change was made: not supported yet
+ r->eventName = rgw::notify::to_string(event_type);
+ r->userIdentity = ""; // user that triggered the change: not supported in sync module
+ r->sourceIPAddress = ""; // IP address of client that triggered the change: not supported in sync module
+ r->x_amz_request_id = ""; // request ID of the original change: not supported in sync module
+ r->x_amz_id_2 = ""; // RGW on which the change was made: not supported in sync module
r->s3SchemaVersion = "1.0";
// configurationId is filled from subscription configuration
r->bucket_name = bucket.name;
r->bucket_ownerIdentity = owner.to_str();
r->bucket_arn = to_string(rgw::ARN(bucket));
+ r->bucket_id = bucket.bucket_id; // rgw extension
r->object_key = key.name;
- r->object_size = 0; // not supported yet
+ r->object_size = 0; // not supported in sync module
objstore_event oevent(bucket, key, mtime, attrs);
r->object_etag = oevent.get_hash();
r->object_versionId = key.instance;
rgw_user owner;
rgw_bucket bucket;
rgw_obj_key key;
- string event_name;
+ rgw::notify::EventType event_type;
RGWUserPubSub ups;
const rgw_user& _owner,
const rgw_bucket& _bucket,
const rgw_obj_key& _key,
- const string& _event_name,
+ rgw::notify::EventType _event_type,
TopicsRef *_topics) : RGWCoroutine(_sync_env->cct),
sync_env(_sync_env),
env(_env),
owner(_owner),
bucket(_bucket),
key(_key),
- event_name(_event_name),
+ event_type(_event_type),
ups(_sync_env->store, owner),
topics(_topics) {
*topics = std::make_shared<vector<PSTopicConfigRef> >();
for (auto& titer : bucket_topics.topics) {
auto& topic_filter = titer.second;
auto& info = topic_filter.topic;
+ // if event list is defined but event does not match any in the list, we skip to the next one
if (!topic_filter.events.empty() &&
- topic_filter.events.find(event_name) == topic_filter.events.end()) {
+ std::find(topic_filter.events.begin(), topic_filter.events.end(), event_type) == topic_filter.events.end()) {
continue;
}
- shared_ptr<PSTopicConfig> tc = std::make_shared<PSTopicConfig>();
+ std::shared_ptr<PSTopicConfig> tc = std::make_shared<PSTopicConfig>();
tc->name = info.name;
tc->subs = user_topics.topics[info.name].subs;
(*topics)->push_back(tc);
make_event_ref(sync_env->cct,
bucket_info.bucket, key,
mtime, &attrs,
- EVENT_NAME_OBJECT_CREATE, &event);
+ rgw::notify::ObjectCreated, &event);
make_s3_record_ref(sync_env->cct,
bucket_info.bucket, bucket_info.owner, key,
mtime, &attrs,
- EVENT_NAME_OBJECT_CREATE, &record);
+ rgw::notify::ObjectCreated, &record);
}
yield call(new RGWPSHandleObjEventCR(sync_env, env, bucket_info.owner, event, record, topics));
reenter(this) {
yield call(new RGWPSFindBucketTopicsCR(sync_env, env, bucket_info.owner,
bucket_info.bucket, key,
- EVENT_NAME_OBJECT_CREATE,
+ rgw::notify::ObjectCreated,
&topics));
if (retcode < 0) {
ldout(sync_env->cct, 1) << "ERROR: RGWPSFindBucketTopicsCR returned ret=" << retcode << dendl;
rgw_bucket bucket;
rgw_obj_key key;
ceph::real_time mtime;
- string event_name;
+ rgw::notify::EventType event_type;
EventRef<rgw_pubsub_event> event;
EventRef<rgw_pubsub_s3_record> record;
TopicsRef topics;
RGWPSGenericObjEventCBCR(RGWDataSyncEnv *_sync_env,
PSEnvRef _env,
RGWBucketInfo& _bucket_info, rgw_obj_key& _key, const ceph::real_time& _mtime,
- RGWPubSubEventType _event_type) : RGWCoroutine(_sync_env->cct),
+ rgw::notify::EventType _event_type) : RGWCoroutine(_sync_env->cct),
sync_env(_sync_env),
env(_env),
owner(_bucket_info.owner),
bucket(_bucket_info.bucket),
key(_key),
- mtime(_mtime), event_name(get_event_name(_event_type)) {}
+ mtime(_mtime), event_type(_event_type) {}
int operate() override {
reenter(this) {
ldout(sync_env->cct, 20) << ": remove remote obj: z=" << sync_env->source_zone
<< " b=" << bucket << " k=" << key << " mtime=" << mtime << dendl;
- yield call(new RGWPSFindBucketTopicsCR(sync_env, env, owner, bucket, key, event_name, &topics));
+ yield call(new RGWPSFindBucketTopicsCR(sync_env, env, owner, bucket, key, event_type, &topics));
if (retcode < 0) {
ldout(sync_env->cct, 1) << "ERROR: RGWPSFindBucketTopicsCR returned ret=" << retcode << dendl;
return set_cr_error(retcode);
make_event_ref(sync_env->cct,
bucket, key,
mtime, nullptr,
- event_name, &event);
+ event_type, &event);
make_s3_record_ref(sync_env->cct,
bucket, owner, key,
mtime, nullptr,
- event_name, &record);
+ event_type, &record);
yield call(new RGWPSHandleObjEventCR(sync_env, env, owner, event, record, topics));
if (retcode < 0) {
return set_cr_error(retcode);
rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override {
ldout(sync_env->cct, 10) << conf->id << ": rm_object: b=" << bucket_info.bucket <<
" k=" << key << " mtime=" << mtime << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl;
- return new RGWPSGenericObjEventCBCR(sync_env, env, bucket_info, key, mtime, OBJECT_DELETE);
+ return new RGWPSGenericObjEventCBCR(sync_env, env, bucket_info, key, mtime, rgw::notify::ObjectRemovedDelete);
}
RGWCoroutine *create_delete_marker(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info,
rgw_obj_key& key, real_time& mtime, rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override {
ldout(sync_env->cct, 10) << conf->id << ": create_delete_marker: b=" << bucket_info.bucket <<
" k=" << key << " mtime=" << mtime << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl;
- return new RGWPSGenericObjEventCBCR(sync_env, env, bucket_info, key, mtime, DELETE_MARKER_CREATE);
+ return new RGWPSGenericObjEventCBCR(sync_env, env, bucket_info, key, mtime, rgw::notify::ObjectRemovedDeleteMarkerCreated);
}
PSConfigRef& get_conf() { return conf; }
}
#ifdef WITH_RADOSGW_AMQP_ENDPOINT
if (!rgw::amqp::init(cct)) {
- ldout(cct, 1) << "ERROR: failed to initialize AMQP server in pubsub sync module" << dendl;
+ ldout(cct, 1) << "ERROR: failed to initialize AMQP manager in pubsub sync module" << dendl;
}
#endif
}
#include "rgw_rest.h"
#include "rgw_rest_s3.h"
#include "rgw_arn.h"
+#include "rgw_zone.h"
+#include "services/svc_zone.h"
#define dout_context g_ceph_context
#define dout_subsys ceph_subsys_rgw
class RGWPSCreateNotif_ObjStore : public RGWPSCreateNotifOp {
private:
std::string topic_name;
- std::set<std::string, ltstr_nocase> events;
+ rgw::notify::EventTypeList events;
int get_params() override {
bool exists;
return -EINVAL;
}
- string events_str = s->info.args.get("events", &exists);
+ std::string events_str = s->info.args.get("events", &exists);
if (exists) {
- get_str_set(events_str, ",", events);
+ rgw::notify::from_string_list(events_str, events);
+ if (std::find(events.begin(), events.end(), rgw::notify::UnknownEvent) != events.end()) {
+ ldout(s->cct, 1) << "invalid event type in list: " << events_str << dendl;
+ return -EINVAL;
+ }
+ } else {
+ // if no events are provided, we assume all events
+ events.push_back(rgw::notify::ObjectCreated);
+ events.push_back(rgw::notify::ObjectRemoved);
}
return notif_bucket_path(s->object.name, bucket_name);
}
VALID_PASSWORD = password;
}
+std::atomic<unsigned> g_tag_skip = 0;
+std::atomic<int> g_multiple = 0;
+
+void set_multiple(unsigned tag_skip) {
+ g_multiple = 1;
+ g_tag_skip = tag_skip;
+}
+
+void reset_multiple() {
+ g_multiple = 0;
+ g_tag_skip = 0;
+}
+
bool FAIL_NEXT_WRITE(false);
bool FAIL_NEXT_READ(false);
bool REPLY_ACK(true);
// "wait" for queue
usleep(tv->tv_sec*1000000+tv->tv_usec);
// read from queue
- if (REPLY_ACK) {
- if (state->ack_list.pop(state->ack)) {
- decoded_frame->frame_type = AMQP_FRAME_METHOD;
+ if (g_multiple) {
+ // pop multiples and reply once at the end
+ for (auto i = 0U; i < g_tag_skip; ++i) {
+ if (REPLY_ACK && !state->ack_list.pop(state->ack)) {
+ // queue is empty
+ return AMQP_STATUS_TIMEOUT;
+ } else if (!REPLY_ACK && !state->nack_list.pop(state->nack)) {
+ // queue is empty
+ return AMQP_STATUS_TIMEOUT;
+ }
+ }
+ if (REPLY_ACK) {
+ state->ack.multiple = g_multiple;
decoded_frame->payload.method.id = AMQP_BASIC_ACK_METHOD;
decoded_frame->payload.method.decoded = &state->ack;
- state->reply.reply_type = AMQP_RESPONSE_NORMAL;
- return AMQP_STATUS_OK;
} else {
- // queue is empty
- return AMQP_STATUS_TIMEOUT;
- }
- } else {
- if (state->nack_list.pop(state->nack)) {
- decoded_frame->frame_type = AMQP_FRAME_METHOD;
+ state->nack.multiple = g_multiple;
decoded_frame->payload.method.id = AMQP_BASIC_NACK_METHOD;
decoded_frame->payload.method.decoded = &state->nack;
- state->reply.reply_type = AMQP_RESPONSE_NORMAL;
- return AMQP_STATUS_OK;
- } else {
- // queue is empty
- return AMQP_STATUS_TIMEOUT;
}
+ decoded_frame->frame_type = AMQP_FRAME_METHOD;
+ state->reply.reply_type = AMQP_RESPONSE_NORMAL;
+ reset_multiple();
+ return AMQP_STATUS_OK;
+ }
+ // pop replies one by one
+ if (REPLY_ACK && state->ack_list.pop(state->ack)) {
+ state->ack.multiple = g_multiple;
+ decoded_frame->frame_type = AMQP_FRAME_METHOD;
+ decoded_frame->payload.method.id = AMQP_BASIC_ACK_METHOD;
+ decoded_frame->payload.method.decoded = &state->ack;
+ state->reply.reply_type = AMQP_RESPONSE_NORMAL;
+ return AMQP_STATUS_OK;
+ } else if (!REPLY_ACK && state->nack_list.pop(state->nack)) {
+ state->nack.multiple = g_multiple;
+ decoded_frame->frame_type = AMQP_FRAME_METHOD;
+ decoded_frame->payload.method.id = AMQP_BASIC_NACK_METHOD;
+ decoded_frame->payload.method.decoded = &state->nack;
+ state->reply.reply_type = AMQP_RESPONSE_NORMAL;
+ return AMQP_STATUS_OK;
+ } else {
+ // queue is empty
+ return AMQP_STATUS_TIMEOUT;
}
}
return AMQP_STATUS_CONNECTION_CLOSED;
void set_valid_host(const std::string& host);
void set_valid_vhost(const std::string& vhost);
void set_valid_user(const std::string& user, const std::string& password);
+void set_multiple(unsigned tag);
+void reset_multiple();
extern bool FAIL_NEXT_WRITE; // default "false"
extern bool FAIL_NEXT_READ; // default "false"
import subprocess
import socket
import time
+import os
from .tests import get_realm, \
ZonegroupConns, \
zonegroup_meta_checkpoint, \
# configure logging for the tests module
log = logging.getLogger(__name__)
-skip_push_tests = True
+skip_push_tests = False
####################################
# utility functions for pubsub tests
####################################
+def set_contents_from_string(key, content):
+ try:
+ key.set_contents_from_string(content)
+ except Exception as e:
+ print 'Error: ' + str(e)
+
+
# HTTP endpoint functions
# multithreaded streaming server, based on: https://stackoverflow.com/questions/46210672/
def get_events(self):
return self.httpd.events
+ def reset_events(self):
+ self.httpd.events = []
+
class StreamingHTTPServer:
"""multi-threaded http server class also holding list of events received into the handler
each thread has its own server, and all servers share the same socket"""
- def __init__(self, host, port, num_workers=20):
+ def __init__(self, host, port, num_workers=100):
addr = (host, port)
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.sock.bind(addr)
- # maximum of 10 connection backlog on the listener
- self.sock.listen(20)
+ self.sock.listen(num_workers)
self.workers = [HTTPServerThread(i, self.sock, addr) for i in range(num_workers)]
def verify_s3_events(self, keys, exact_match=False, deletions=False):
events = []
for worker in self.workers:
events += worker.get_events()
+ worker.reset_events()
verify_s3_records_by_elements(events, keys, exact_match=exact_match, deletions=deletions)
def verify_events(self, keys, exact_match=False, deletions=False):
events = []
for worker in self.workers:
events += worker.get_events()
+ worker.reset_events()
verify_events_by_elements(events, keys, exact_match=exact_match, deletions=deletions)
def close(self):
for record in records:
if record['s3']['bucket']['name'] == key.bucket.name and \
record['s3']['object']['key'] == key.name:
- if deletions and record['eventName'] == 'ObjectRemoved':
+ if deletions and 'ObjectRemoved' in record['eventName']:
key_found = True
break
- elif not deletions and record['eventName'] == 'ObjectCreated':
+ elif not deletions and 'ObjectCreated' in record['eventName']:
key_found = True
break
if not key_found:
# log.info('rabbitmq directories already removed')
-def init_env():
+def init_env(require_ps=True):
"""initialize the environment"""
- check_ps_configured()
+ if require_ps:
+ check_ps_configured()
realm = get_realm()
zonegroup = realm.master_zonegroup()
zones.append(conn)
assert_not_equal(len(zones), 0)
- assert_not_equal(len(ps_zones), 0)
+ if require_ps:
+ assert_not_equal(len(ps_zones), 0)
return zones, ps_zones
# delete the bucket
zones[0].delete_bucket(bucket_name)
+def test_ps_s3_topic_on_master():
+ """ test s3 notification set/get/delete on master """
+ zones, _ = init_env(require_ps=False)
+ realm = get_realm()
+ zonegroup = realm.master_zonegroup()
+ bucket_name = gen_bucket_name()
+ topic_name = bucket_name + TOPIC_SUFFIX
+
+ # create s3 topics
+ endpoint_address = 'amqp://127.0.0.1:7001'
+ endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=amqp.direct&amqp-ack-level=none'
+ topic_conf1 = PSTopicS3(zones[0].conn, topic_name+'_1', zonegroup.name, endpoint_args=endpoint_args)
+ topic_arn = topic_conf1.set_config()
+ assert_equal(topic_arn,
+ 'arn:aws:sns:' + zonegroup.name + ':' + get_tenant() + ':' + topic_name + '_1')
+
+ endpoint_address = 'http://127.0.0.1:9001'
+ endpoint_args = 'push-endpoint='+endpoint_address
+ topic_conf2 = PSTopicS3(zones[0].conn, topic_name+'_2', zonegroup.name, endpoint_args=endpoint_args)
+ topic_arn = topic_conf2.set_config()
+ assert_equal(topic_arn,
+ 'arn:aws:sns:' + zonegroup.name + ':' + get_tenant() + ':' + topic_name + '_2')
+ endpoint_address = 'http://127.0.0.1:9002'
+ endpoint_args = 'push-endpoint='+endpoint_address
+ topic_conf3 = PSTopicS3(zones[0].conn, topic_name+'_3', zonegroup.name, endpoint_args=endpoint_args)
+ topic_arn = topic_conf3.set_config()
+ assert_equal(topic_arn,
+ 'arn:aws:sns:' + zonegroup.name + ':' + get_tenant() + ':' + topic_name + '_3')
+
+ try:
+ # get topic 3
+ result, status = topic_conf3.get_config()
+ assert_equal(status, 200)
+ assert_equal(topic_arn, result['GetTopicResponse']['GetTopicResult']['Topic']['TopicArn'])
+ assert_equal(endpoint_address, result['GetTopicResponse']['GetTopicResult']['Topic']['EndPoint']['EndpointAddress'])
+ # Note that endpoint args may be ordered differently in the result
+
+ # delete topic 1
+ result = topic_conf1.del_config()
+ assert_equal(status, 200)
+
+ # try to get a deleted topic (1)
+ _, status = topic_conf1.get_config()
+ assert_equal(status, 404)
+
+ # get the remaining 2 topics
+ result = topic_conf1.get_list()
+ assert_equal(len(result['Topics']), 2)
+
+ # delete topics
+ result = topic_conf2.del_config()
+ # TODO: should be 200OK
+ # assert_equal(status, 200)
+ result = topic_conf3.del_config()
+ # TODO: should be 200OK
+ # assert_equal(status, 200)
+
+ # get topic list, make sure it is empty
+ result = topic_conf1.get_list()
+ assert_equal(len(result['Topics']), 0)
+ except AssertionError as e:
+ # topics are stored at user level, so cleanup is needed
+ # to prevent failures in consequent runs
+ topic_conf1.del_config()
+ topic_conf2.del_config()
+ topic_conf3.del_config()
+ raise e
+
def test_ps_s3_notification_on_master():
""" test s3 notification set/get/delete on master """
- zones, _ = init_env()
+ zones, _ = init_env(require_ps=False)
realm = get_realm()
zonegroup = realm.master_zonegroup()
bucket_name = gen_bucket_name()
- # create bucket on the first of the rados zones
- zones[0].create_bucket(bucket_name)
+ # create bucket
+ bucket = zones[0].create_bucket(bucket_name)
topic_name = bucket_name + TOPIC_SUFFIX
# create s3 topic
- topic_conf = PSTopicS3(zones[0].conn, topic_name, zonegroup.name)
+ endpoint_address = 'amqp://127.0.0.1:7001'
+ endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=amqp.direct&amqp-ack-level=none'
+ topic_conf = PSTopicS3(zones[0].conn, topic_name, zonegroup.name, endpoint_args=endpoint_args)
topic_arn = topic_conf.set_config()
# create s3 notification
notification_name = bucket_name + NOTIFICATION_SUFFIX
- topic_conf_list = [{'Id': notification_name,
+ topic_conf_list = [{'Id': notification_name+'_1',
'TopicArn': topic_arn,
'Events': ['s3:ObjectCreated:*']
+ },
+ {'Id': notification_name+'_2',
+ 'TopicArn': topic_arn,
+ 'Events': ['s3:ObjectRemoved:*']
+ },
+ {'Id': notification_name+'_3',
+ 'TopicArn': topic_arn,
+ 'Events': []
}]
s3_notification_conf = PSNotificationS3(zones[0].conn, bucket_name, topic_conf_list)
- response, status = s3_notification_conf.set_config()
+ _, status = s3_notification_conf.set_config()
assert_equal(status/100, 2)
# get notifications on a bucket
+ response, status = s3_notification_conf.get_config(notification=notification_name+'_1')
+ assert_equal(status/100, 2)
+ assert_equal(response['NotificationConfiguration']['TopicConfiguration']['Topic'], topic_arn)
+
+ # delete specific notifications
+ _, status = s3_notification_conf.del_config(notification=notification_name+'_1')
+ assert_equal(status/100, 2)
+
+ # get the remaining 2 notifications on a bucket
response, status = s3_notification_conf.get_config()
assert_equal(status/100, 2)
- assert_equal(len(response['TopicConfigurations']), 1)
+ assert_equal(len(response['TopicConfigurations']), 2)
assert_equal(response['TopicConfigurations'][0]['TopicArn'], topic_arn)
+ assert_equal(response['TopicConfigurations'][1]['TopicArn'], topic_arn)
- # delete specific notifications
- _, status = s3_notification_conf.del_config(notification=notification_name)
+ # delete remaining notifications
+ _, status = s3_notification_conf.del_config()
assert_equal(status/100, 2)
+ # make sure that the notifications are now deleted
+ _, status = s3_notification_conf.get_config()
+
# cleanup
topic_conf.del_config()
# delete the bucket
zones[0].delete_bucket(bucket_name)
+def test_ps_s3_notification_errors_on_master():
+ """ test s3 notification set/get/delete on master """
+ zones, _ = init_env(require_ps=False)
+ realm = get_realm()
+ zonegroup = realm.master_zonegroup()
+ bucket_name = gen_bucket_name()
+ # create bucket
+ bucket = zones[0].create_bucket(bucket_name)
+ topic_name = bucket_name + TOPIC_SUFFIX
+ # create s3 topic
+ endpoint_address = 'amqp://127.0.0.1:7001'
+ endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=amqp.direct&amqp-ack-level=none'
+ topic_conf = PSTopicS3(zones[0].conn, topic_name, zonegroup.name, endpoint_args=endpoint_args)
+ topic_arn = topic_conf.set_config()
+
+ # create s3 notification with invalid event name
+ notification_name = bucket_name + NOTIFICATION_SUFFIX
+ topic_conf_list = [{'Id': notification_name,
+ 'TopicArn': topic_arn,
+ 'Events': ['s3:ObjectCreated:Kaboom']
+ }]
+ s3_notification_conf = PSNotificationS3(zones[0].conn, bucket_name, topic_conf_list)
+ try:
+ result, status = s3_notification_conf.set_config()
+ except Exception as error:
+ print str(error) + ' - is expected'
+ else:
+ assert False, 'invalid event name is expected to fail'
+
+ # create s3 notification with missing name
+ topic_conf_list = [{'Id': '',
+ 'TopicArn': topic_arn,
+ 'Events': ['s3:ObjectCreated:Put']
+ }]
+ s3_notification_conf = PSNotificationS3(zones[0].conn, bucket_name, topic_conf_list)
+ try:
+ _, _ = s3_notification_conf.set_config()
+ except Exception as error:
+ print str(error) + ' - is expected'
+ else:
+ assert False, 'missing notification name is expected to fail'
+
+ # create s3 notification with invalid topic ARN
+ topic_conf_list = [{'Id': notification_name,
+ 'TopicArn': 'kaboom',
+ 'Events': ['s3:ObjectCreated:Put']
+ }]
+ s3_notification_conf = PSNotificationS3(zones[0].conn, bucket_name, topic_conf_list)
+ try:
+ _, _ = s3_notification_conf.set_config()
+ except Exception as error:
+ print str(error) + ' - is expected'
+ else:
+ assert False, 'invalid ARN is expected to fail'
+
+ # create s3 notification with wrong bucket
+ topic_conf_list = [{'Id': notification_name,
+ 'TopicArn': topic_arn,
+ 'Events': ['s3:ObjectCreated:Put']
+ }]
+ s3_notification_conf = PSNotificationS3(zones[0].conn, 'kaboom', topic_conf_list)
+ try:
+ _, _ = s3_notification_conf.set_config()
+ except Exception as error:
+ print str(error) + ' - is expected'
+ else:
+ assert False, 'unknown bucket is expected to fail'
+
+ # cleanup
+ topic_conf.del_config()
+ # delete the bucket
+ zones[0].delete_bucket(bucket_name)
+
+
+def test_objcet_timing():
+ return SkipTest("only used in manual testing")
+ zones, _ = init_env(require_ps=False)
+
+ # create bucket
+ bucket_name = gen_bucket_name()
+ bucket = zones[0].create_bucket(bucket_name)
+ # create objects in the bucket (async)
+ print 'creating objects...'
+ number_of_objects = 1000
+ client_threads = []
+ start_time = time.time()
+ content = str(bytearray(os.urandom(1024*1024)))
+ for i in range(number_of_objects):
+ key = bucket.new_key(str(i))
+ thr = threading.Thread(target = set_contents_from_string, args=(key, content,))
+ thr.start()
+ client_threads.append(thr)
+ [thr.join() for thr in client_threads]
+
+ time_diff = time.time() - start_time
+ print 'average time for object creation: ' + str(time_diff*1000/number_of_objects) + ' milliseconds'
+
+ print 'total number of objects: ' + str(len(list(bucket.list())))
+
+ print 'deleting objects...'
+ client_threads = []
+ start_time = time.time()
+ for key in bucket.list():
+ thr = threading.Thread(target = key.delete, args=())
+ thr.start()
+ client_threads.append(thr)
+ [thr.join() for thr in client_threads]
+
+ time_diff = time.time() - start_time
+ print 'average time for object deletion: ' + str(time_diff*1000/number_of_objects) + ' milliseconds'
+
+ # cleanup
+ zones[0].delete_bucket(bucket_name)
+
+
+def test_ps_s3_notification_push_amqp_on_master():
+ """ test pushing amqp s3 notification on master """
+ if skip_push_tests:
+ return SkipTest("PubSub push tests don't run in teuthology")
+ hostname = get_ip()
+ proc = init_rabbitmq()
+ if proc is None:
+ return SkipTest('end2end amqp tests require rabbitmq-server installed')
+ zones, _ = init_env(require_ps=False)
+ realm = get_realm()
+ zonegroup = realm.master_zonegroup()
+
+ # create bucket
+ bucket_name = gen_bucket_name()
+ bucket = zones[0].create_bucket(bucket_name)
+ topic_name1 = bucket_name + TOPIC_SUFFIX + '_1'
+ topic_name2 = bucket_name + TOPIC_SUFFIX + '_2'
+
+ # start amqp receivers
+ exchange = 'ex1'
+ task1, receiver1 = create_amqp_receiver_thread(exchange, topic_name1)
+ task2, receiver2 = create_amqp_receiver_thread(exchange, topic_name2)
+ task1.start()
+ task2.start()
+
+ # create two s3 topic
+ endpoint_address = 'amqp://' + hostname
+ # with acks from broker
+ endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=broker'
+ topic_conf1 = PSTopicS3(zones[0].conn, topic_name1, zonegroup.name, endpoint_args=endpoint_args)
+ topic_arn1 = topic_conf1.set_config()
+ # without acks from broker
+ endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=none'
+ topic_conf2 = PSTopicS3(zones[0].conn, topic_name2, zonegroup.name, endpoint_args=endpoint_args)
+ topic_arn2 = topic_conf2.set_config()
+ # create s3 notification
+ notification_name = bucket_name + NOTIFICATION_SUFFIX
+ topic_conf_list = [{'Id': notification_name+'_1', 'TopicArn': topic_arn1,
+ 'Events': []
+ },
+ {'Id': notification_name+'_2', 'TopicArn': topic_arn2,
+ 'Events': ['s3:ObjectCreated:*']
+ }]
+
+ s3_notification_conf = PSNotificationS3(zones[0].conn, bucket_name, topic_conf_list)
+ response, status = s3_notification_conf.set_config()
+ assert_equal(status/100, 2)
+
+ # create objects in the bucket (async)
+ number_of_objects = 100
+ client_threads = []
+ start_time = time.time()
+ for i in range(number_of_objects):
+ key = bucket.new_key(str(i))
+ content = 'bar'
+ thr = threading.Thread(target = set_contents_from_string, args=(key, content,))
+ thr.start()
+ client_threads.append(thr)
+ [thr.join() for thr in client_threads]
+
+ time_diff = time.time() - start_time
+ print 'average time for creation + qmqp notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds'
+
+ print 'wait for 5sec for the messages...'
+ time.sleep(5)
+
+ # check amqp receiver
+ keys = list(bucket.list())
+ print 'total number of objects: ' + str(len(keys))
+ receiver1.verify_s3_events(keys, exact_match=True)
+ receiver2.verify_s3_events(keys, exact_match=True)
+
+ # delete objects from the bucket
+ client_threads = []
+ start_time = time.time()
+ for key in bucket.list():
+ thr = threading.Thread(target = key.delete, args=())
+ thr.start()
+ client_threads.append(thr)
+ [thr.join() for thr in client_threads]
+
+ time_diff = time.time() - start_time
+ print 'average time for creation + http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds'
+
+ print 'wait for 5sec for the messages...'
+ time.sleep(5)
+
+ # check amqp receiver 1 for deletions
+ receiver1.verify_s3_events(keys, exact_match=True, deletions=True)
+ # check amqp receiver 2 has no deletions
+ try:
+ receiver1.verify_s3_events(keys, exact_match=False, deletions=True)
+ except:
+ pass
+ else:
+ err = 'amqp receiver 2 should have no deletions'
+ assert False, err
+
+
+ # cleanup
+ stop_amqp_receiver(receiver1, task1)
+ stop_amqp_receiver(receiver2, task2)
+ s3_notification_conf.del_config()
+ topic_conf1.del_config()
+ topic_conf2.del_config()
+ # delete the bucket
+ zones[0].delete_bucket(bucket_name)
+ clean_rabbitmq(proc)
+
+
+def test_ps_s3_notification_push_http_on_master():
+ """ test pushing http s3 notification on master """
+ if skip_push_tests:
+ return SkipTest("PubSub push tests don't run in teuthology")
+ hostname = get_ip()
+ zones, _ = init_env(require_ps=False)
+ realm = get_realm()
+ zonegroup = realm.master_zonegroup()
+
+ # create random port for the http server
+ host = get_ip()
+ port = random.randint(10000, 20000)
+ # start an http server in a separate thread
+ number_of_objects = 10
+ http_server = StreamingHTTPServer(host, port, num_workers=number_of_objects)
+
+ # create bucket
+ bucket_name = gen_bucket_name()
+ bucket = zones[0].create_bucket(bucket_name)
+ topic_name = bucket_name + TOPIC_SUFFIX
+
+ # create s3 topic
+ endpoint_address = 'http://'+host+':'+str(port)
+ endpoint_args = 'push-endpoint='+endpoint_address
+ topic_conf = PSTopicS3(zones[0].conn, topic_name, zonegroup.name, endpoint_args=endpoint_args)
+ topic_arn = topic_conf.set_config()
+ # create s3 notification
+ notification_name = bucket_name + NOTIFICATION_SUFFIX
+ topic_conf_list = [{'Id': notification_name,
+ 'TopicArn': topic_arn,
+ 'Events': []
+ }]
+ s3_notification_conf = PSNotificationS3(zones[0].conn, bucket_name, topic_conf_list)
+ response, status = s3_notification_conf.set_config()
+ assert_equal(status/100, 2)
+
+ # create objects in the bucket
+ client_threads = []
+ start_time = time.time()
+ content = 'bar'
+ for i in range(number_of_objects):
+ key = bucket.new_key(str(i))
+ thr = threading.Thread(target = set_contents_from_string, args=(key, content,))
+ thr.start()
+ client_threads.append(thr)
+ [thr.join() for thr in client_threads]
+
+ time_diff = time.time() - start_time
+ print 'average time for creation + http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds'
+
+ print 'wait for 5sec for the messages...'
+ time.sleep(5)
+
+ # check http receiver
+ keys = list(bucket.list())
+ print 'total number of objects: ' + str(len(keys))
+ http_server.verify_s3_events(keys, exact_match=True)
+
+ # delete objects from the bucket
+ client_threads = []
+ start_time = time.time()
+ for key in bucket.list():
+ thr = threading.Thread(target = key.delete, args=())
+ thr.start()
+ client_threads.append(thr)
+ [thr.join() for thr in client_threads]
+
+ time_diff = time.time() - start_time
+ print 'average time for creation + http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds'
+
+ print 'wait for 5sec for the messages...'
+ time.sleep(5)
+
+ # check http receiver
+ http_server.verify_s3_events(keys, exact_match=True, deletions=True)
+
+ # cleanup
+ topic_conf.del_config()
+ s3_notification_conf.del_config(notification=notification_name)
+ # delete the bucket
+ zones[0].delete_bucket(bucket_name)
+ http_server.close()
+
+
def test_ps_topic():
""" test set/get/delete of topic """
_, ps_zones = init_env()
assert_equal(parsed_result['Code'], 'NoSuchKey')
-def test_ps_s3_topic():
- """ test set/get/delete of s3 topic """
- zones, _ = init_env()
- realm = get_realm()
- zonegroup = realm.master_zonegroup()
- bucket_name = gen_bucket_name()
- topic_name = bucket_name+TOPIC_SUFFIX
-
- # create topic
- topic_conf = PSTopicS3(zones[0].conn, topic_name, zonegroup.name)
- topic_arn = topic_conf.set_config()
- assert_equal(topic_arn,
- 'arn:aws:sns:' + zonegroup.name + ':' + get_tenant() + ':' + topic_name)
-
- # list topics
- result = topic_conf.get_list()
- assert len(result['Topics']) > 0
-
- # get topic
- result, status = topic_conf.get_config()
- assert_equal(status/100, 2)
- assert_equal(topic_arn, result['GetTopicResponse']['GetTopicResult']['Topic']['TopicArn'])
-
- # delete topic
- topic_conf.del_config()
-
-
def test_ps_topic_with_endpoint():
""" test set topic with endpoint"""
_, ps_zones = init_env()
topic_conf.del_config()
-def test_ps_s3_topic_with_endpoint():
- """ test set/get/delete of s3 topic with endpoint """
- zones, _ = init_env()
- realm = get_realm()
- zonegroup = realm.master_zonegroup()
- bucket_name = gen_bucket_name()
- topic_name = bucket_name+TOPIC_SUFFIX
-
- # create topic
- endpoint_address = 'amqp://127.0.0.1:7001'
- endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=amqp.direct&amqp-ack-level=none'
- topic_conf = PSTopicS3(zones[0].conn, topic_name, zonegroup.name, endpoint_args)
- topic_arn = topic_conf.set_config()
- assert_equal(topic_arn,
- 'arn:aws:sns:' + zonegroup.name + ':' + get_tenant() + ':' + topic_name)
-
- # get topic
- result, status = topic_conf.get_config()
- assert_equal(status/100, 2)
- assert_equal(topic_arn, result['GetTopicResponse']['GetTopicResult']['Topic']['TopicArn'])
- assert_equal(endpoint_address, result['GetTopicResponse']['GetTopicResult']['Topic']['EndPoint']['EndpointAddress'])
- # Note that endpoint args may be ordered differently in the result
-
- # delete topic
- topic_conf.del_config()
-
-
def test_ps_notification():
""" test set/get/delete of notification """
zones, ps_zones = init_env()
zones[0].delete_bucket(bucket_name)
+def test_ps_s3_creation_triggers_on_master():
+ """ test object creation s3 notifications in using put/copy/post on master"""
+ if skip_push_tests:
+ return SkipTest("PubSub push tests don't run in teuthology")
+ hostname = get_ip()
+ proc = init_rabbitmq()
+ if proc is None:
+ return SkipTest('end2end amqp tests require rabbitmq-server installed')
+ zones, _ = init_env(require_ps=False)
+ realm = get_realm()
+ zonegroup = realm.master_zonegroup()
+
+ # create bucket
+ bucket_name = gen_bucket_name()
+ bucket = zones[0].create_bucket(bucket_name)
+ topic_name = bucket_name + TOPIC_SUFFIX
+
+ # start amqp receiver
+ exchange = 'ex1'
+ task, receiver = create_amqp_receiver_thread(exchange, topic_name)
+ task.start()
+
+ # create s3 topic
+ endpoint_address = 'amqp://' + hostname
+ endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=broker'
+ topic_conf = PSTopicS3(zones[0].conn, topic_name, zonegroup.name, endpoint_args=endpoint_args)
+ topic_arn = topic_conf.set_config()
+ # create s3 notification
+ notification_name = bucket_name + NOTIFICATION_SUFFIX
+ topic_conf_list = [{'Id': notification_name,'TopicArn': topic_arn,
+ 'Events': ['s3:ObjectCreated:Put', 's3:ObjectCreated:Copy', 's3:ObjectCreated:Post']
+ }]
+
+ s3_notification_conf = PSNotificationS3(zones[0].conn, bucket_name, topic_conf_list)
+ response, status = s3_notification_conf.set_config()
+ assert_equal(status/100, 2)
+
+ # create objects in the bucket using PUT
+ key = bucket.new_key('put')
+ key.set_contents_from_string('bar')
+ # create objects in the bucket using COPY
+ bucket.copy_key('copy', bucket.name, key.name)
+ # create objects in the bucket using multi-part upload
+ fp = tempfile.TemporaryFile(mode='w')
+ fp.write('bar')
+ fp.close()
+ uploader = bucket.initiate_multipart_upload('multipart')
+ fp = tempfile.TemporaryFile(mode='r')
+ uploader.upload_part_from_file(fp, 1)
+ uploader.complete_upload()
+ fp.close()
+
+ print 'wait for 5sec for the messages...'
+ time.sleep(5)
+
+ # check amqp receiver
+ keys = list(bucket.list())
+ receiver.verify_s3_events(keys, exact_match=True)
+
+ # cleanup
+ stop_amqp_receiver(receiver, task)
+ s3_notification_conf.del_config()
+ topic_conf.del_config()
+ for key in bucket.list():
+ key.delete()
+ # delete the bucket
+ zones[0].delete_bucket(bucket_name)
+ clean_rabbitmq(proc)
+
+
def test_ps_versioned_deletion():
""" test notification of deletion markers """
zones, ps_zones = init_env()
# wait for sync
zone_meta_checkpoint(ps_zones[0].zone)
# create notifications
+ # TODO use 'DELETE_MARKER_CREATE'
+ event_type = 'OBJECT_DELETE'
notification_conf = PSNotification(ps_zones[0].conn, bucket_name,
- topic_name, "OBJECT_DELETE")
+ topic_name, event_type)
_, status = notification_conf.set_config()
assert_equal(status/100, 2)
# create subscription
parsed_result = json.loads(result)
for event in parsed_result['events']:
log.debug('Event key: "' + str(event['info']['key']['name']) + '" type: "' + str(event['event']) + '"')
- assert_equal(str(event['event']), 'OBJECT_DELETE')
+ assert_equal(str(event['event']), event_type)
# TODO: verify we have exactly 2 events
assert len(parsed_result['events']) >= 2
topic_conf.del_config()
+def test_ps_s3_metadata_on_master():
+ """ test s3 notification of metadata on master """
+ if skip_push_tests:
+ return SkipTest("PubSub push tests don't run in teuthology")
+ hostname = get_ip()
+ proc = init_rabbitmq()
+ if proc is None:
+ return SkipTest('end2end amqp tests require rabbitmq-server installed')
+ zones, _ = init_env(require_ps=False)
+ realm = get_realm()
+ zonegroup = realm.master_zonegroup()
+
+ # create bucket
+ bucket_name = gen_bucket_name()
+ bucket = zones[0].create_bucket(bucket_name)
+ topic_name = bucket_name + TOPIC_SUFFIX
+
+ # start amqp receiver
+ exchange = 'ex1'
+ task, receiver = create_amqp_receiver_thread(exchange, topic_name)
+ task.start()
+
+ # create s3 topic
+ endpoint_address = 'amqp://' + hostname
+ endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=broker'
+ topic_conf = PSTopicS3(zones[0].conn, topic_name, zonegroup.name, endpoint_args=endpoint_args)
+ topic_arn = topic_conf.set_config()
+ # create s3 notification
+ notification_name = bucket_name + NOTIFICATION_SUFFIX
+ topic_conf_list = [{'Id': notification_name,'TopicArn': topic_arn,
+ 'Events': ['s3:ObjectCreated:*']
+ }]
+
+ s3_notification_conf = PSNotificationS3(zones[0].conn, bucket_name, topic_conf_list)
+ response, status = s3_notification_conf.set_config()
+ assert_equal(status/100, 2)
+
+ # create objects in the bucket
+ key = bucket.new_key('foo')
+ key.set_metadata('meta1', 'This is my metadata value')
+ key.set_contents_from_string('aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa')
+ keys = list(bucket.list())
+ print 'wait for 5sec for the messages...'
+ time.sleep(5)
+ # check amqp receiver
+ receiver.verify_s3_events(keys, exact_match=True)
+
+ # cleanup
+ stop_amqp_receiver(receiver, task)
+ s3_notification_conf.del_config()
+ topic_conf.del_config()
+ for key in bucket.list():
+ key.delete()
+ # delete the bucket
+ zones[0].delete_bucket(bucket_name)
+ clean_rabbitmq(proc)
+
+
+def test_ps_s3_versioned_deletion_on_master():
+ """ test s3 notification of deletion markers on master """
+ if skip_push_tests:
+ return SkipTest("PubSub push tests don't run in teuthology")
+ hostname = get_ip()
+ proc = init_rabbitmq()
+ if proc is None:
+ return SkipTest('end2end amqp tests require rabbitmq-server installed')
+ zones, _ = init_env(require_ps=False)
+ realm = get_realm()
+ zonegroup = realm.master_zonegroup()
+
+ # create bucket
+ bucket_name = gen_bucket_name()
+ bucket = zones[0].create_bucket(bucket_name)
+ bucket.configure_versioning(True)
+ topic_name = bucket_name + TOPIC_SUFFIX
+
+ # start amqp receiver
+ exchange = 'ex1'
+ task, receiver = create_amqp_receiver_thread(exchange, topic_name)
+ task.start()
+
+ # create s3 topic
+ endpoint_address = 'amqp://' + hostname
+ endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=broker'
+ topic_conf = PSTopicS3(zones[0].conn, topic_name, zonegroup.name, endpoint_args=endpoint_args)
+ topic_arn = topic_conf.set_config()
+ # create s3 notification
+ notification_name = bucket_name + NOTIFICATION_SUFFIX
+ # TODO use s3:ObjectRemoved:DeleteMarkerCreated once supported in the code
+ topic_conf_list = [{'Id': notification_name,'TopicArn': topic_arn,
+ 'Events': ['s3:ObjectRemoved:Delete', 's3:ObjectCreated:Put']
+ }]
+ s3_notification_conf = PSNotificationS3(zones[0].conn, bucket_name, topic_conf_list)
+ response, status = s3_notification_conf.set_config()
+ assert_equal(status/100, 2)
+
+ # create objects in the bucket
+ key = bucket.new_key('foo')
+ key.set_contents_from_string('bar')
+ v1 = key.version_id
+ key.set_contents_from_string('kaboom')
+ v2 = key.version_id
+ keys = list(bucket.list())
+
+ print 'wait for 5sec for the messages...'
+ time.sleep(5)
+
+ # check amqp receiver
+ # Note: should not do exact match in case of versioned objects
+ receiver.verify_s3_events(keys, exact_match=False)
+ # set delete markers
+ bucket.delete_key(key.name, version_id=v2)
+ bucket.delete_key(key.name, version_id=v1)
+
+ print 'wait for 5sec for the messages...'
+ time.sleep(5)
+
+ # check amqp receiver
+ # Note: should not do exact match in case of versioned objects
+ receiver.verify_s3_events(keys, exact_match=False, deletions=True)
+
+ # cleanup
+ stop_amqp_receiver(receiver, task)
+ s3_notification_conf.del_config()
+ topic_conf.del_config()
+ # delete the bucket
+ zones[0].delete_bucket(bucket_name)
+ clean_rabbitmq(proc)
+
+
def test_ps_push_http():
""" test pushing to http endpoint """
if skip_push_tests:
# create subscription
sub_conf = PSSubscription(ps_zones[0].conn, bucket_name+SUB_SUFFIX,
topic_name, endpoint='amqp://'+hostname,
- endpoint_args='amqp-exchange='+exchange+'&amqp-ack-level=none')
+ endpoint_args='amqp-exchange='+exchange+'&amqp-ack-level=broker')
_, status = sub_conf.set_config()
assert_equal(status/100, 2)
# create objects in the bucket
keys = list(bucket.list())
# TODO: use exact match
verify_s3_records_by_elements(parsed_result['Records'], keys, exact_match=False)
- http_server.verify_s3_events(keys, exact_match=True)
+ http_server.verify_s3_events(keys, exact_match=False)
# cleanup
stop_amqp_receiver(receiver, amqp_task)
def del_config(self):
"""delete topic"""
- self.client.delete_topic(TopicArn=self.topic_arn)
+ result = self.client.delete_topic(TopicArn=self.topic_arn)
+ return result['ResponseMetadata']['HTTPStatusCode']
def get_list(self):
"""list all topics"""
```
$ nosetests test_multi.py:<specific_test_name>
```
+To run miltiple tests based on wildcard string, use the following format:
+```
+$ nosetests test_multi.py -m "<wildcard string>"
+```
Note that the test to run, does not have to be inside the `test_multi.py` file.
Note that different options for running specific and multiple tests exists in the [nose documentation](https://nose.readthedocs.io/en/latest/usage.html#options), as well as other options to control the execution of the tests.
## Configuration
+### Environment Variables
+Following RGW environment variables are taken into consideration when running the tests:
+ - `RGW_FRONTEND`: used to change frontend to 'civetweb' or 'beast' (default)
+ - `RGW_VALGRIND`: used to run the radosgw under valgrind. e.g. RGW_VALGRIND=yes
+Other environment variables used to configure elements other than RGW can also be used as they are used in vstart.sh. E.g. MON, OSD, MGR, MSD
The configuration file for the run has 3 sections:
### Default
This section holds the following parameters:
*TODO*
### Cloud
*TODO*
+### PubSub
+*TODO*
## Writing Tests
New tests should be added into the `/path/to/ceph/src/test/rgw/rgw_multi` subdirectory.
- Base classes are in: `/path/to/ceph/src/test/rgw/rgw_multi/multisite.py`
std::atomic<bool> callback_invoked = false;
+std::atomic<int> callbacks_invoked = 0;
+
// note: because these callback are shared among different "publish" calls
// they should be used on different connections
callback_invoked = true;
}
+void my_callback_expect_multiple_acks(int rc) {
+ EXPECT_EQ(0, rc);
+ ++callbacks_invoked;
+}
+
+class dynamic_callback_wrapper {
+ dynamic_callback_wrapper() = default;
+public:
+ static dynamic_callback_wrapper* create() {
+ return new dynamic_callback_wrapper;
+ }
+ void callback(int rc) {
+ EXPECT_EQ(0, rc);
+ ++callbacks_invoked;
+ delete this;
+ }
+};
+
+
TEST_F(TestAMQP, ReceiveAck)
{
callback_invoked = false;
amqp_mock::set_valid_host("localhost");
}
+TEST_F(TestAMQP, ReceiveMultipleAck)
+{
+ callbacks_invoked = 0;
+ const std::string host("localhost1");
+ amqp_mock::set_valid_host(host);
+ amqp::connection_ptr_t conn = amqp::connect("amqp://" + host, "ex1");
+ EXPECT_TRUE(conn);
+ const auto NUMBER_OF_CALLS = 100;
+ for (auto i=0; i < NUMBER_OF_CALLS; ++i) {
+ auto rc = publish_with_confirm(conn, "topic", "message", my_callback_expect_multiple_acks);
+ EXPECT_EQ(rc, 0);
+ }
+ std::this_thread::sleep_for(wait_time);
+ EXPECT_EQ(callbacks_invoked, NUMBER_OF_CALLS);
+ callbacks_invoked = 0;
+ amqp_mock::set_valid_host("localhost");
+}
+
+TEST_F(TestAMQP, ReceiveAckForMultiple)
+{
+ callbacks_invoked = 0;
+ const std::string host("localhost1");
+ amqp_mock::set_valid_host(host);
+ amqp::connection_ptr_t conn = amqp::connect("amqp://" + host, "ex1");
+ EXPECT_TRUE(conn);
+ amqp_mock::set_multiple(59);
+ const auto NUMBER_OF_CALLS = 100;
+ for (auto i=0; i < NUMBER_OF_CALLS; ++i) {
+ auto rc = publish_with_confirm(conn, "topic", "message", my_callback_expect_multiple_acks);
+ EXPECT_EQ(rc, 0);
+ }
+ std::this_thread::sleep_for(wait_time);
+ EXPECT_EQ(callbacks_invoked, NUMBER_OF_CALLS);
+ callbacks_invoked = 0;
+ amqp_mock::set_valid_host("localhost");
+}
+
+TEST_F(TestAMQP, DynamicCallback)
+{
+ callbacks_invoked = 0;
+ const std::string host("localhost1");
+ amqp_mock::set_valid_host(host);
+ amqp::connection_ptr_t conn = amqp::connect("amqp://" + host, "ex1");
+ EXPECT_TRUE(conn);
+ amqp_mock::set_multiple(59);
+ const auto NUMBER_OF_CALLS = 100;
+ for (auto i=0; i < NUMBER_OF_CALLS; ++i) {
+ auto rc = publish_with_confirm(conn, "topic", "message",
+ std::bind(&dynamic_callback_wrapper::callback, dynamic_callback_wrapper::create(), std::placeholders::_1));
+ EXPECT_EQ(rc, 0);
+ }
+ std::this_thread::sleep_for(wait_time);
+ EXPECT_EQ(callbacks_invoked, NUMBER_OF_CALLS);
+ callbacks_invoked = 0;
+ amqp_mock::set_valid_host("localhost");
+}
+
TEST_F(TestAMQP, ReceiveNack)
{
callback_invoked = false;
amqp_mock::set_valid_host("localhost");
}
+int fail_after = -1;
+int recover_after = -1;
+bool expect_zero_rc = true;
+
+void my_callback_triggering_failure(int rc) {
+ if (expect_zero_rc) {
+ EXPECT_EQ(rc, 0);
+ } else {
+ EXPECT_NE(rc, 0);
+ }
+ ++callbacks_invoked;
+ if (fail_after == callbacks_invoked) {
+ amqp_mock::FAIL_NEXT_READ = true;
+ expect_zero_rc = false;
+
+ }
+ if (recover_after == callbacks_invoked) {
+ amqp_mock::FAIL_NEXT_READ = false;
+ }
+}
+
+TEST_F(TestAMQP, AcksWithReconnect)
+{
+ callbacks_invoked = 0;
+ const std::string host("localhost1");
+ amqp_mock::set_valid_host(host);
+ amqp::connection_ptr_t conn = amqp::connect("amqp://" + host, "ex1");
+ EXPECT_TRUE(conn);
+ amqp_mock::set_multiple(59);
+ // failure will take effect after: max(59, 70)
+ fail_after = 70;
+ // all callback are flushed during failure, so, recover will take effect after: max(90, 100)
+ recover_after = 90;
+ const auto NUMBER_OF_CALLS = 100;
+ for (auto i = 0; i < NUMBER_OF_CALLS; ++i) {
+ auto rc = publish_with_confirm(conn, "topic", "message", my_callback_triggering_failure);
+ EXPECT_EQ(rc, 0);
+ }
+ // connection failes before multiple acks
+ std::this_thread::sleep_for(wait_time);
+ EXPECT_EQ(callbacks_invoked, NUMBER_OF_CALLS);
+ // publish more mesages
+ expect_zero_rc = true;
+ for (auto i = 0; i < NUMBER_OF_CALLS; ++i) {
+ auto rc = publish_with_confirm(conn, "topic", "message", my_callback_triggering_failure);
+ EXPECT_EQ(rc, 0);
+ }
+ std::this_thread::sleep_for(wait_time);
+ EXPECT_EQ(callbacks_invoked, 2*NUMBER_OF_CALLS);
+ callbacks_invoked = 0;
+ amqp_mock::set_valid_host("localhost");
+ fail_after = -1;
+}
+