event.bucket_name = s->bucket_name;
event.bucket_ownerIdentity = s->bucket_owner.get_id().id;
event.bucket_arn = to_string(rgw::ARN(s->bucket->get_key()));
- event.object_key = obj->get_name();
+ event.object_key = res.object_name ? *res.object_name : obj->get_name();
event.object_size = size;
event.object_etag = etag;
event.object_versionId = obj->get_instance();
return false;
}
const auto obj = res.object;
- if (!match(filter.s3_filter.key_filter, obj->get_name())) {
+ if (!match(filter.s3_filter.key_filter,
+ res.object_name ? *res.object_name : obj->get_name())) {
return false;
}
const req_state* const s;
size_t size;
rgw::sal::Object* const object;
+ const std::string* const object_name;
KeyValueMap cached_metadata;
- reservation_t(const DoutPrefixProvider *_dpp, rgw::sal::RadosStore* _store, const req_state* _s, rgw::sal::Object* _object) :
- dpp(_dpp), store(_store), s(_s), object(_object) {}
+ reservation_t(const DoutPrefixProvider *_dpp, rgw::sal::RadosStore* _store, const req_state* _s,
+ rgw::sal::Object* _object, const std::string* _object_name) :
+ dpp(_dpp), store(_store), s(_s), object(_object), object_name(_object_name) {}
// dtor doing resource leak guarding
// aborting the reservation if not already committed or aborted
mp.init(s->object->get_name(), upload_id);
- // make reservation for notification if needed
- std::unique_ptr<rgw::sal::Notification> res = store->get_notification(s->object.get(),
- s, rgw::notify::ObjectCreatedCompleteMultipartUpload);
- op_ret = res->publish_reserve(this);
- if (op_ret < 0) {
- return;
- }
meta_oid = mp.get_meta();
return;
}
attrs = meta_obj->get_attrs();
+
+ // make reservation for notification if needed
+ std::unique_ptr<rgw::sal::Notification> res = store->get_notification(meta_obj.get(),
+ s, rgw::notify::ObjectCreatedCompleteMultipartUpload, &s->object->get_name());
+ op_ret = res->publish_reserve(this);
+ if (op_ret < 0) {
+ return;
+ }
do {
op_ret = list_multipart_parts(this, s, upload_id, meta_oid, max_parts,
virtual int cluster_stat(RGWClusterStat& stats) = 0;
virtual std::unique_ptr<Lifecycle> get_lifecycle(void) = 0;
virtual std::unique_ptr<Completions> get_completions(void) = 0;
- virtual std::unique_ptr<Notification> get_notification(rgw::sal::Object* obj, struct req_state* s, rgw::notify::EventType event_type) = 0;
+ virtual std::unique_ptr<Notification> get_notification(rgw::sal::Object* obj, struct req_state* s,
+ rgw::notify::EventType event_type, const std::string* object_name=nullptr) = 0;
virtual std::unique_ptr<GCChain> get_gc_chain(rgw::sal::Object* obj) = 0;
virtual std::unique_ptr<Writer> get_writer(Aio* aio, rgw::sal::Bucket* bucket,
RGWObjectCtx& obj_ctx, std::unique_ptr<rgw::sal::Object> _head_obj,
std::unique_ptr<Notification> RadosStore::get_notification(rgw::sal::Object* obj,
struct req_state* s,
- rgw::notify::EventType event_type)
+ rgw::notify::EventType event_type,
+ const std::string* object_name)
{
- return std::unique_ptr<Notification>(new RadosNotification(s, this, obj, s, event_type));
+ return std::unique_ptr<Notification>(new RadosNotification(s, this, obj, s, event_type, object_name));
}
std::unique_ptr<GCChain> RadosStore::get_gc_chain(rgw::sal::Object* obj)
virtual int cluster_stat(RGWClusterStat& stats) override;
virtual std::unique_ptr<Lifecycle> get_lifecycle(void) override;
virtual std::unique_ptr<Completions> get_completions(void) override;
- virtual std::unique_ptr<Notification> get_notification(rgw::sal::Object* obj, struct req_state* s, rgw::notify::EventType event_type) override;
+ virtual std::unique_ptr<Notification> get_notification(rgw::sal::Object* obj, struct req_state* s, rgw::notify::EventType event_type, const std::string* object_name=nullptr) override;
virtual std::unique_ptr<GCChain> get_gc_chain(rgw::sal::Object* obj) override;
virtual std::unique_ptr<Writer> get_writer(Aio* aio, rgw::sal::Bucket* bucket,
RGWObjectCtx& obj_ctx, std::unique_ptr<rgw::sal::Object> _head_obj,
rgw::notify::reservation_t res;
public:
- RadosNotification(const DoutPrefixProvider *_dpp, RadosStore* _store, Object* _obj, req_state* _s, rgw::notify::EventType _type) : Notification(_obj, _type), store(_store), res(_dpp, _store, _s, _obj) { }
+ RadosNotification(const DoutPrefixProvider *_dpp, RadosStore* _store, Object* _obj, req_state* _s,
+ rgw::notify::EventType _type, const std::string* object_name=nullptr) :
+ Notification(_obj, _type), store(_store), res(_dpp, _store, _s, _obj, object_name) { }
~RadosNotification() = default;
virtual int publish_reserve(const DoutPrefixProvider *dpp, RGWObjTags* obj_tags = nullptr) override;
response, status = s3_notification_conf.set_config()
assert_equal(status/100, 2)
+ expected_keys = []
# create objects in the bucket
key_name = 'foo'
key = bucket.new_key(key_name)
key.set_metadata(meta_key, meta_value)
key.set_contents_from_string('aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa')
+ expected_keys.append(key_name)
# create objects in the bucket using COPY
- bucket.copy_key('copy_of_foo', bucket.name, key.name)
+ key_name = 'copy_of_foo'
+ bucket.copy_key(key_name, bucket.name, key.name)
+ expected_keys.append(key_name)
# create another objects in the bucket using COPY
# but override the metadata value
- bucket.copy_key('another_copy_of_foo', bucket.name, key.name, metadata={meta_key: 'kaboom'})
+ key_name = 'another_copy_of_foo'
+ bucket.copy_key(key_name, bucket.name, key.name, metadata={meta_key: 'kaboom'})
# create objects in the bucket using multi-part upload
fp = tempfile.NamedTemporaryFile(mode='w+b')
- object_size = 10*1024*1024
+ chunk_size = 1024*1024*5 # 5MB
+ object_size = 10*chunk_size
content = bytearray(os.urandom(object_size))
fp.write(content)
fp.flush()
fp.seek(0)
- uploader = bucket.initiate_multipart_upload('multipart_foo',
+ key_name = 'multipart_foo'
+ uploader = bucket.initiate_multipart_upload(key_name,
metadata={meta_key: meta_value})
- uploader.upload_part_from_file(fp, 1)
+ for i in range(1,5):
+ uploader.upload_part_from_file(fp, i, size=chunk_size)
+ fp.seek(i*chunk_size)
uploader.complete_upload()
fp.close()
+ expected_keys.append(key_name)
print('wait for 5sec for the messages...')
time.sleep(5)
# check amqp receiver
- assert_equal(len(receiver.get_and_reset_events()), 3)
-
+ events = receiver.get_and_reset_events()
+ assert_equal(len(events), 4) # PUT, COPY, Multipart start, Multipart End
+ for event in events:
+ assert(event['Records'][0]['s3']['object']['key'] in expected_keys)
# delete objects
for key in bucket.list():
print('wait for 5sec for the messages...')
time.sleep(5)
# check amqp receiver
- assert_equal(len(receiver.get_and_reset_events()), 3)
+ #assert_equal(len(receiver.get_and_reset_events()), len(expected_keys))
# cleanup
stop_amqp_receiver(receiver, task)