]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
rgw/logging: use bucket policy for logging
authorYuval Lifshitz <ylifshit@ibm.com>
Thu, 13 Mar 2025 17:34:48 +0000 (17:34 +0000)
committerYuval Lifshitz <ylifshit@ibm.com>
Thu, 27 Mar 2025 14:01:40 +0000 (14:01 +0000)
verifying that there is policy in place to allow the source bucket
to log to the target bucket. following:
https://docs.aws.amazon.com/AmazonS3/latest/userguide/enable-server-access-logging.html

additional changes:
* verify that:
  * only the bucket owner can enable/disable bucket logging on the bucket
  * src and log bucket are in the same zonegroup
  * log bucket does not have "requester_pays"
  * restricted log bucket definitions don't change
* add owner's display name to the commited object
* unify bucket names in debug logs

Signed-off-by: Yuval Lifshitz <ylifshit@ibm.com>
src/rgw/driver/rados/rgw_sal_rados.cc
src/rgw/rgw_bucket_logging.cc
src/rgw/rgw_bucket_logging.h
src/rgw/rgw_rest_bucket_logging.cc

index 0578d2e59581f31cb1ae212a6735270a8acd28a4..c322800ea5a6d14fa20adeee2a6ebf10535bfd15 100644 (file)
@@ -1044,7 +1044,7 @@ int RadosBucket::get_logging_object_name(std::string& obj_name,
   rgw_pool data_pool;
   const auto obj_name_oid = bucketlogging::object_name_oid(this, prefix);
   if (!store->getRados()->get_obj_data_pool(get_placement_rule(), rgw_obj{get_key(), obj_name_oid}, &data_pool)) {
-    ldpp_dout(dpp, 1) << "ERROR: failed to get data pool for bucket '" << get_name() <<
+    ldpp_dout(dpp, 1) << "ERROR: failed to get data pool for bucket '" << get_key() <<
       "' when getting logging object name" << dendl;
     return -EIO;
   }
@@ -1060,6 +1060,10 @@ int RadosBucket::get_logging_object_name(std::string& obj_name,
                                nullptr,
                                nullptr);
   if (ret < 0) {
+    if (ret == -ENOENT) {
+      ldpp_dout(dpp, 20) << "INFO: logging object name '" << obj_name_oid << "' not found. ret = " << ret << dendl;
+      return ret;
+    }
     ldpp_dout(dpp, 1) << "ERROR: failed to get logging object name from '" << obj_name_oid << "'. ret = " << ret << dendl;
     return ret;
   }
@@ -1076,7 +1080,7 @@ int RadosBucket::set_logging_object_name(const std::string& obj_name,
   rgw_pool data_pool;
   const auto obj_name_oid = bucketlogging::object_name_oid(this, prefix);
   if (!store->getRados()->get_obj_data_pool(get_placement_rule(), rgw_obj{get_key(), obj_name_oid}, &data_pool)) {
-    ldpp_dout(dpp, 1) << "ERROR: failed to get data pool for bucket '" << get_name() <<
+    ldpp_dout(dpp, 1) << "ERROR: failed to get data pool for bucket '" << get_key() <<
       "' when setting logging object name"  << dendl;
     return -EIO;
   }
@@ -1108,7 +1112,7 @@ int RadosBucket::remove_logging_object_name(const std::string& prefix,
   rgw_pool data_pool;
   const auto obj_name_oid = bucketlogging::object_name_oid(this, prefix);
   if (!store->getRados()->get_obj_data_pool(get_placement_rule(), rgw_obj{get_key(), obj_name_oid}, &data_pool)) {
-    ldpp_dout(dpp, 1) << "ERROR: failed to get data pool for bucket '" << get_name() <<
+    ldpp_dout(dpp, 1) << "ERROR: failed to get data pool for bucket '" << get_key() <<
       "' when setting logging object name"  << dendl;
     return -EIO;
   }
@@ -1131,7 +1135,7 @@ int RadosBucket::remove_logging_object(const std::string& obj_name, optional_yie
   const auto placement_rule = get_placement_rule();
 
   if (!store->getRados()->get_obj_data_pool(placement_rule, head_obj, &data_pool)) {
-    ldpp_dout(dpp, 1) << "ERROR: failed to get data pool for bucket '" << get_name() <<
+    ldpp_dout(dpp, 1) << "ERROR: failed to get data pool for bucket '" << get_key() <<
       "' when deleting logging object"  << dendl;
     return -EIO;
   }
@@ -1150,8 +1154,8 @@ int RadosBucket::commit_logging_object(const std::string& obj_name, optional_yie
   const auto placement_rule = get_placement_rule();
 
   if (!store->getRados()->get_obj_data_pool(placement_rule, head_obj, &data_pool)) {
-    ldpp_dout(dpp, 1) << "ERROR: failed to get data pool for bucket '" << get_name() <<
-      "' when comitting logging object"  << dendl;
+    ldpp_dout(dpp, 1) << "ERROR: failed to get data pool for bucket '" << get_key() <<
+      "' when committing logging object"  << dendl;
     return -EIO;
   }
 
@@ -1169,7 +1173,7 @@ int RadosBucket::commit_logging_object(const std::string& obj_name, optional_yie
                      dpp,
                      &obj_attrs,
                      nullptr); ret < 0 && ret != -ENOENT) {
-    ldpp_dout(dpp, 1) << "ERROR: failed to read logging data when comitting object '" << temp_obj_name
+    ldpp_dout(dpp, 1) << "ERROR: failed to read logging data when committing object '" << temp_obj_name
       << ". error: " << ret << dendl;
     return ret;
   } else if (ret == -ENOENT) {
@@ -1188,13 +1192,13 @@ int RadosBucket::commit_logging_object(const std::string& obj_name, optional_yie
                                 nullptr, // no special placment for tail
                                 get_key(),
                                 head_obj); ret < 0) {
-    ldpp_dout(dpp, 1) << "ERROR: failed to create manifest when comitting logging object. error: " <<
+    ldpp_dout(dpp, 1) << "ERROR: failed to create manifest when committing logging object. error: " <<
       ret << dendl;
     return ret;
   }
 
   if (const auto ret = manifest_gen.create_next(size); ret < 0) {
-    ldpp_dout(dpp, 1) << "ERROR: failed to add object to manifest when comitting logging object. error: " <<
+    ldpp_dout(dpp, 1) << "ERROR: failed to add object to manifest when committing logging object. error: " <<
       ret << dendl;
     return ret;
   }
@@ -1224,7 +1228,10 @@ int RadosBucket::commit_logging_object(const std::string& obj_name, optional_yie
   // TODO: head_obj_wop.meta.ptag
   // the owner of the logging object is the bucket owner
   // not the user that wrote the log that triggered the commit
-  const ACLOwner owner{bucket_info.owner, ""}; // TODO: missing display name
+  ACLOwner owner{bucket_info.owner, ""};
+  if (auto i = get_attrs().find(RGW_ATTR_ACL); i != get_attrs().end()) {
+    std::ignore = store->getRados()->decode_policy(dpp, i->second, &owner);
+  }
   head_obj_wop.meta.owner = owner;
   const auto etag = TOPNSPC::crypto::digest<TOPNSPC::crypto::MD5>(bl_data).to_str();
   bufferlist bl_etag;
@@ -1234,7 +1241,7 @@ int RadosBucket::commit_logging_object(const std::string& obj_name, optional_yie
   jspan_context trace{false, false};
   if (const auto ret = head_obj_wop.write_meta(0, size, obj_attrs, rctx, trace); ret < 0) {
   ldpp_dout(dpp, 1) << "ERROR: failed to commit logging object '" << temp_obj_name <<
-    "' to bucket id '" << get_info().bucket <<"'. error: " << ret << dendl;
+    "' to bucket '" << get_key() <<"'. error: " << ret << dendl;
     return ret;
   }
   ldpp_dout(dpp, 20) << "INFO: committed logging object '" << temp_obj_name <<
@@ -1272,7 +1279,7 @@ int RadosBucket::write_logging_object(const std::string& obj_name,
   rgw_pool data_pool;
   rgw_obj obj{get_key(), obj_name};
   if (!store->getRados()->get_obj_data_pool(get_placement_rule(), obj, &data_pool)) {
-    ldpp_dout(dpp, 1) << "ERROR: failed to get data pool for bucket '" << get_name() <<
+    ldpp_dout(dpp, 1) << "ERROR: failed to get data pool for bucket '" << get_key() <<
       "' when writing logging object" << dendl;
     return -EIO;
   }
index d183b54485f41cf97775744a1b663cc2c45c01f1..a9710dee996cf6443906cd2c5ccfc4b6edb8a68a 100644 (file)
@@ -215,7 +215,7 @@ std::string full_bucket_name(const std::unique_ptr<rgw::sal::Bucket>& bucket) {
 }
 
 int new_logging_object(const configuration& conf,
-    const std::unique_ptr<rgw::sal::Bucket>& bucket,
+    const std::unique_ptr<rgw::sal::Bucket>& target_bucket,
     std::string& obj_name,
     const DoutPrefixProvider *dpp,
     optional_yield y,
@@ -226,7 +226,6 @@ int new_logging_object(const configuration& conf,
   localtime_r(&tt, &t);
 
   const auto unique = unique_string<UniqueStringLength>();
-  const auto old_name = obj_name;
 
   switch (conf.obj_key_format) {
     case KeyFormat::Simple:
@@ -241,32 +240,33 @@ int new_logging_object(const configuration& conf,
         const auto source_region = ""; // TODO
         obj_name = fmt::format("{}{}/{}/{}/{:%Y/%m/%d}/{:%Y-%m-%d-%H-%M-%S}-{}",
           conf.target_prefix,
-          to_string(bucket->get_owner()),
+          to_string(target_bucket->get_owner()),
           source_region,
-          full_bucket_name(bucket),
+          full_bucket_name(target_bucket),
           t,
           t,
           unique);
       }
       break;
   }
-  int ret = bucket->set_logging_object_name(obj_name, conf.target_prefix, y, dpp, init_obj, objv_tracker);
+  const auto& target_bucket_id = target_bucket->get_key();
+  auto ret = target_bucket->set_logging_object_name(obj_name, conf.target_prefix, y, dpp, init_obj, objv_tracker);
   if (ret == -EEXIST || ret == -ECANCELED) {
-   if (ret = bucket->get_logging_object_name(obj_name, conf.target_prefix, y, dpp, nullptr); ret < 0) {
+   if (ret = target_bucket->get_logging_object_name(obj_name, conf.target_prefix, y, dpp, nullptr); ret < 0) {
       ldpp_dout(dpp, 1) << "ERROR: failed to get name of logging object of bucket '" <<
-        conf.target_bucket << "' and prefix '" << conf.target_prefix << "', ret = " << ret << dendl;
+        target_bucket_id << "' and prefix '" << conf.target_prefix << "', ret = " << ret << dendl;
       return ret;
     }
     ldpp_dout(dpp, 20) << "INFO: name already set. got name of logging object '" << obj_name <<  "' of bucket '" <<
-      conf.target_bucket << "' and prefix '" << conf.target_prefix << "'" << dendl;
+      target_bucket_id << "' and prefix '" << conf.target_prefix << "'" << dendl;
     return -ECANCELED;
   } else if (ret < 0) {
     ldpp_dout(dpp, 1) << "ERROR: failed to write name of logging object '" << obj_name << "' of bucket '" <<
-      conf.target_bucket << "'. ret = " << ret << dendl;
+      target_bucket_id << "'. ret = " << ret << dendl;
     return ret;
   }
   ldpp_dout(dpp, 20) << "INFO: wrote name of logging object '" << obj_name <<  "' of bucket '" <<
-      conf.target_bucket << "'" << dendl;
+      target_bucket_id << "'" << dendl;
   return 0;
 }
 
@@ -302,14 +302,14 @@ int commit_logging_object(const configuration& conf,
   std::string obj_name;
   if (const auto ret = target_bucket->get_logging_object_name(obj_name, conf.target_prefix, y, dpp, nullptr); ret < 0) {
     ldpp_dout(dpp, 1) << "ERROR: failed to get name of logging object of bucket '" <<
-      target_bucket->get_info().bucket << "'. ret = " << ret << dendl;
+      target_bucket->get_key() << "'. ret = " << ret << dendl;
     return ret;
   }
   return target_bucket->commit_logging_object(obj_name, y, dpp);
 }
 
 int rollover_logging_object(const configuration& conf,
-    const std::unique_ptr<rgw::sal::Bucket>& bucket,
+    const std::unique_ptr<rgw::sal::Bucket>& target_bucket,
     std::string& obj_name,
     const DoutPrefixProvider *dpp,
     optional_yield y,
@@ -317,29 +317,29 @@ int rollover_logging_object(const configuration& conf,
     RGWObjVersionTracker* objv_tracker) {
   std::string target_bucket_name;
   std::string target_tenant_name;
-  std::ignore = rgw_parse_url_bucket(conf.target_bucket, bucket->get_tenant(), target_tenant_name, target_bucket_name);
-  if (target_bucket_name != bucket->get_name() || target_tenant_name != bucket->get_tenant()) {
-    ldpp_dout(dpp, 1) << "ERROR: bucket name mismatch. conf= '" << conf.target_bucket <<
-      "', bucket= '" << bucket->get_info().bucket << "'" << dendl;
+  std::ignore = rgw_parse_url_bucket(conf.target_bucket, target_bucket->get_tenant(), target_tenant_name, target_bucket_name);
+  if (target_bucket_name != target_bucket->get_name() || target_tenant_name != target_bucket->get_tenant()) {
+    ldpp_dout(dpp, 1) << "ERROR: logging bucket name mismatch. conf= '" << conf.target_bucket <<
+      "', bucket= '" << target_bucket->get_key() << "'" << dendl;
     return -EINVAL;
   }
   const auto old_obj = obj_name;
-  const auto ret = new_logging_object(conf, bucket, obj_name, dpp, y, false, objv_tracker);
+  const auto ret = new_logging_object(conf, target_bucket, obj_name, dpp, y, false, objv_tracker);
   if (ret == -ECANCELED) {
-    ldpp_dout(dpp, 20) << "INFO: rollover already performed for '" << old_obj <<  "' to bucket '" <<
-      conf.target_bucket << "'. ret = " << ret << dendl;
+    ldpp_dout(dpp, 20) << "INFO: rollover already performed for object '" << old_obj <<  "' to logging bucket '" <<
+      target_bucket->get_key() << "'. ret = " << ret << dendl;
     return 0;
   } else if (ret < 0) {
-    ldpp_dout(dpp, 1) << "ERROR: failed to rollover logging object '" << old_obj << "' to bucket '" <<
-      conf.target_bucket << "'. ret = " << ret << dendl;
+    ldpp_dout(dpp, 1) << "ERROR: failed to rollover object '" << old_obj << "' to logging bucket '" <<
+      target_bucket->get_key() << "'. ret = " << ret << dendl;
     return ret;
   }
-  if (const auto ret = bucket->commit_logging_object(old_obj, y, dpp); ret < 0) {
+  if (const auto ret = target_bucket->commit_logging_object(old_obj, y, dpp); ret < 0) {
     if (must_commit) {
       return ret;
     }
-    ldpp_dout(dpp, 5) << "WARNING: failed to commit logging object '" << old_obj << "' to bucket '" <<
-      conf.target_bucket << "'. ret = " << ret << dendl;
+    ldpp_dout(dpp, 5) << "WARNING: failed to commit object '" << old_obj << "' to logging bucket '" <<
+      target_bucket->get_key() << "'. ret = " << ret << dendl;
     // we still want to write the new records to the new object even if commit failed
     // will try to commit again next time
   }
@@ -393,7 +393,7 @@ S3 bucket short (ceph) log record
 
 int log_record(rgw::sal::Driver* driver,
     const sal::Object* obj,
-    const req_state* s,
+    req_state* s,
     const std::string& op_name,
     const std::string& etag,
     size_t size,
@@ -403,14 +403,14 @@ int log_record(rgw::sal::Driver* driver,
     bool async_completion,
     bool log_source_bucket) {
   if (!s->bucket) {
-    ldpp_dout(dpp, 1) << "ERROR: only bucket operations are logged" << dendl;
+    ldpp_dout(dpp, 1) << "ERROR: only bucket operations are logged in bucket logging" << dendl;
     return -EINVAL;
   }
   std::string target_bucket_name;
   std::string target_tenant_name;
   auto ret = rgw_parse_url_bucket(conf.target_bucket, s->bucket_tenant, target_tenant_name, target_bucket_name);
   if (ret < 0) {
-    ldpp_dout(dpp, 1) << "ERROR: failed to parse target bucket '" << conf.target_bucket << "', ret = " << ret << dendl;
+    ldpp_dout(dpp, 1) << "ERROR: failed to parse target logging bucket '" << conf.target_bucket << "', ret = " << ret << dendl;
     return ret;
   }
   const rgw_bucket target_bucket_id(target_tenant_name, target_bucket_name);
@@ -421,6 +421,18 @@ int log_record(rgw::sal::Driver* driver,
     ldpp_dout(dpp, 1) << "ERROR: failed to get target logging bucket '" << target_bucket_id << "'. ret = " << ret << dendl;
     return ret;
   }
+
+  rgw::ARN target_resource_arn(target_bucket_id, conf.target_prefix);
+  if (ret = verify_target_bucket_policy(dpp, target_bucket.get(), target_resource_arn, s); ret < 0) {
+    ldpp_dout(dpp, 1) << "ERROR: failed to verify target logging bucket policy for bucket '" << target_bucket->get_key() <<
+      "'. ret = " << ret << dendl;
+    return -EACCES;
+  }
+
+  if (ret = verify_target_bucket_attributes(dpp, target_bucket.get()); ret < 0) {
+    return ret;
+  }
+
   std::string obj_name;
   RGWObjVersionTracker objv_tracker;
   ret = target_bucket->get_logging_object_name(obj_name, conf.target_prefix, y, dpp, &objv_tracker);
@@ -428,7 +440,7 @@ int log_record(rgw::sal::Driver* driver,
     const auto time_to_commit = time_from_name(obj_name, dpp) + std::chrono::seconds(conf.obj_roll_time);
     if (ceph::coarse_real_time::clock::now() > time_to_commit) {
       ldpp_dout(dpp, 20) << "INFO: logging object '" << obj_name << "' exceeded its time, will be committed to bucket '" <<
-        conf.target_bucket << "'" << dendl;
+        target_bucket_id << "'" << dendl;
       if (ret = rollover_logging_object(conf, target_bucket, obj_name, dpp, y, false, &objv_tracker); ret < 0) {
         return ret;
       }
@@ -439,19 +451,19 @@ int log_record(rgw::sal::Driver* driver,
     // try to create the temporary log object for the first time
     ret = new_logging_object(conf, target_bucket, obj_name, dpp, y, true, nullptr);
     if (ret == 0) {
-      ldpp_dout(dpp, 20) << "INFO: first time logging for bucket '" << conf.target_bucket << "' and prefix '" <<
+      ldpp_dout(dpp, 20) << "INFO: first time logging for bucket '" << target_bucket_id << "' and prefix '" <<
         conf.target_prefix << "'" << dendl;
     } else if (ret == -ECANCELED) {
-      ldpp_dout(dpp, 20) << "INFO: logging object '" << obj_name << "' already exists for bucket '" << conf.target_bucket << "' and prefix" <<
+      ldpp_dout(dpp, 20) << "INFO: logging object '" << obj_name << "' already exists for bucket '" << target_bucket_id << "' and prefix" <<
         conf.target_prefix << "'" << dendl;
     } else {
       ldpp_dout(dpp, 1) << "ERROR: failed to create logging object of bucket '" <<
-        conf.target_bucket << "' and prefix '" << conf.target_prefix << "' for the first time. ret = " << ret << dendl;
+        target_bucket_id << "' and prefix '" << conf.target_prefix << "' for the first time. ret = " << ret << dendl;
       return ret;
     }
   } else {
     ldpp_dout(dpp, 1) << "ERROR: failed to get name of logging object of bucket '" <<
-      conf.target_bucket << "'. ret = " << ret << dendl;
+      target_bucket_id << "'. ret = " << ret << dendl;
     return ret;
   }
 
@@ -478,13 +490,12 @@ int log_record(rgw::sal::Driver* driver,
     bucket_owner = to_string(s->src_object->get_bucket()->get_owner());
     bucket_name = s->src_bucket_name;
   } else {
-    bucket_owner = to_string( s->bucket->get_owner());
+    bucket_owner = to_string(s->bucket->get_owner());
     bucket_name = full_bucket_name(s->bucket);
   }
 
-  using namespace rgw::auth::s3;
-  string aws_version("-");
-  string auth_type("-");
+  std::string aws_version("-");
+  std::string auth_type("-");
   rgw::auth::s3::get_aws_version_and_auth_type(s, aws_version, auth_type);
 
   switch (conf.logging_type) {
@@ -547,8 +558,8 @@ int log_record(rgw::sal::Driver* driver,
     return ret;
   }
   if (ret == -EFBIG) {
-    ldpp_dout(dpp, 20) << "WARNING: logging object '" << obj_name << "' is full, will be committed to bucket '" <<
-      conf.target_bucket << "'" << dendl;
+    ldpp_dout(dpp, 5) << "WARNING: logging object '" << obj_name << "' is full, will be committed to bucket '" <<
+      target_bucket->get_key() << "'" << dendl;
     if (ret = rollover_logging_object(conf, target_bucket, obj_name, dpp, y, true, nullptr); ret < 0 ) {
       return ret;
     }
@@ -576,7 +587,7 @@ std::string object_name_oid(const rgw::sal::Bucket* bucket, const std::string& p
 int log_record(rgw::sal::Driver* driver,
     LoggingType type,
     const sal::Object* obj,
-    const req_state* s,
+    req_state* s,
     const std::string& op_name,
     const std::string& etag,
     size_t size,
@@ -607,10 +618,10 @@ int log_record(rgw::sal::Driver* driver,
         return 0;
       }
     }
-    ldpp_dout(dpp, 20) << "INFO: found matching logging configuration of bucket '" << s->bucket->get_info().bucket <<
+    ldpp_dout(dpp, 20) << "INFO: found matching logging configuration of bucket '" << s->bucket->get_key() <<
       "' configuration: " << configuration.to_json_str() << dendl;
     if (auto ret = log_record(driver, obj, s, op_name, etag, size, configuration, dpp, y, async_completion, log_source_bucket); ret < 0) {
-      ldpp_dout(dpp, 1) << "ERROR: failed to perform logging for bucket '" << s->bucket->get_info().bucket <<
+      ldpp_dout(dpp, 1) << "ERROR: failed to perform logging for bucket '" << s->bucket->get_key() <<
         "'. ret=" << ret << dendl;
       return ret;
     }
@@ -636,7 +647,8 @@ int update_bucket_logging_sources(const DoutPrefixProvider* dpp, rgw::sal::Drive
   std::unique_ptr<rgw::sal::Bucket> target_bucket;
   const auto ret = driver->load_bucket(dpp, target_bucket_id, &target_bucket, y);
   if (ret < 0) {
-    ldpp_dout(dpp, 1) << "WARNING: failed to get target bucket '" << target_bucket_id  << "', ret = " << ret << dendl;
+    ldpp_dout(dpp, 5) << "WARNING: failed to get target logging bucket '" << target_bucket_id  <<
+      "' in order to update logging sources. ret = " << ret << dendl;
     return ret;
   }
   return update_bucket_logging_sources(dpp, target_bucket, src_bucket_id, add, y);
@@ -649,7 +661,7 @@ int update_bucket_logging_sources(const DoutPrefixProvider* dpp, std::unique_ptr
     if (iter == attrs.end()) {
       if (!add) {
         ldpp_dout(dpp, 20) << "INFO: no logging sources attribute '" << RGW_ATTR_BUCKET_LOGGING_SOURCES
-          << "' for bucket '" << bucket->get_info().bucket << "', nothing to remove" << dendl;
+          << "' for bucket '" << bucket->get_key() << "', nothing to remove" << dendl;
         return 0;
       }
       source_buckets sources{src_bucket_id};
@@ -669,11 +681,11 @@ int update_bucket_logging_sources(const DoutPrefixProvider* dpp, std::unique_ptr
         return bucket->merge_and_store_attrs(dpp, attrs, y);
       }
     } catch (buffer::error& err) {
-      ldpp_dout(dpp, 1) << "WARNING: failed to decode logging sources attribute '" << RGW_ATTR_BUCKET_LOGGING_SOURCES
-        << "' for bucket '" << bucket->get_info().bucket << "', error: " << err.what() << dendl;
+      ldpp_dout(dpp, 5) << "WARNING: failed to decode logging sources attribute '" << RGW_ATTR_BUCKET_LOGGING_SOURCES
+        << "' for bucket '" << bucket->get_key() << "' when updating logging sources. error: " << err.what() << dendl;
     }
     ldpp_dout(dpp, 20) << "INFO: logging source '" << src_bucket_id << "' already " <<
-      (add ? "added to" : "removed from") << " bucket '" << bucket->get_info().bucket << "'" << dendl;
+      (add ? "added to" : "removed from") << " bucket '" << bucket->get_key() << "'" << dendl;
     return 0;
   }, y);
 }
@@ -693,8 +705,8 @@ int bucket_deletion_cleanup(const DoutPrefixProvider* dpp,
       for (const auto& source : sources) {
         std::unique_ptr<rgw::sal::Bucket> src_bucket;
         if (const auto ret = driver->load_bucket(dpp, source, &src_bucket, y); ret < 0) {
-          ldpp_dout(dpp, 1) << "WARNING: failed to get logging source bucket '" << source << "' for log bucket '" <<
-            bucket->get_info().bucket << "', ret = " << ret << dendl;
+          ldpp_dout(dpp, 5) << "WARNING: failed to get logging source bucket '" << source << "' for logging bucket '" <<
+            bucket->get_key() << "' during cleanup. ret = " << ret << dendl;
           continue;
         }
         auto& src_attrs = src_bucket->get_attrs();
@@ -706,33 +718,33 @@ int bucket_deletion_cleanup(const DoutPrefixProvider* dpp,
             std::string obj_name;
             RGWObjVersionTracker objv;
             if (const auto ret = bucket->get_logging_object_name(obj_name, conf.target_prefix, y, dpp, &objv); ret < 0) {
-              ldpp_dout(dpp, 1) << "WARNING: failed to get logging object name for log bucket '" << bucket->get_info().bucket <<
-                "', ret = " << ret << dendl;
+              ldpp_dout(dpp, 5) << "WARNING: failed to get logging object name for logging bucket '" << bucket->get_key() <<
+                "' during cleanup. ret = " << ret << dendl;
               continue;
             }
             if (const auto ret = bucket->remove_logging_object(obj_name, y, dpp); ret < 0) {
-              ldpp_dout(dpp, 1) << "WARNING: failed to delete pending logging object '" << obj_name << "' for log bucket '" <<
-                bucket->get_info().bucket << "', ret = " << ret << dendl;
+              ldpp_dout(dpp, 5) << "WARNING: failed to delete pending logging object '" << obj_name << "' for logging bucket '" <<
+                bucket->get_key() << "' during cleanup. ret = " << ret << dendl;
               continue;
             }
-            ldpp_dout(dpp, 20) << "INFO: successfully deleted pending logging object '" << obj_name << "' from deleted log bucket '" <<
-                bucket->get_info().bucket << "'" << dendl;
+            ldpp_dout(dpp, 20) << "INFO: successfully deleted pending logging object '" << obj_name << "' from deleted logging bucket '" <<
+                bucket->get_key() << "'" << dendl;
             if (const auto ret = bucket->remove_logging_object_name(conf.target_prefix, y, dpp, &objv); ret < 0) {
-              ldpp_dout(dpp, 1) << "WARNING: failed to delete object holding bucket logging object name for log bucket '" <<
-                bucket->get_info().bucket << "', ret = " << ret << dendl;
+              ldpp_dout(dpp, 5) << "WARNING: failed to delete object holding bucket logging object name for logging bucket '" <<
+                bucket->get_key() << "' during cleanup. ret = " << ret << dendl;
               continue;
             }
-            ldpp_dout(dpp, 20) << "INFO: successfully deleted object holding bucket logging object name from deleted log bucket '" <<
-              bucket->get_info().bucket << "'" << dendl;
+            ldpp_dout(dpp, 20) << "INFO: successfully deleted object holding bucket logging object name from deleted logging bucket '" <<
+              bucket->get_key() << "'" << dendl;
           } catch (buffer::error& err) {
-            ldpp_dout(dpp, 1) << "WARNING: failed to decode logging attribute '" << RGW_ATTR_BUCKET_LOGGING
-              << "' of bucket '" << src_bucket->get_info().bucket << "', error: " << err.what() << dendl;
+            ldpp_dout(dpp, 5) << "WARNING: failed to decode logging attribute '" << RGW_ATTR_BUCKET_LOGGING
+              << "' of bucket '" << src_bucket->get_key() << "' during cleanup. error: " << err.what() << dendl;
           }
         }
       }
     } catch (buffer::error& err) {
-      ldpp_dout(dpp, 1) << "WARNING: failed to decode logging sources attribute '" << RGW_ATTR_BUCKET_LOGGING_SOURCES
-        << "' for bucket '" << bucket->get_info().bucket << "', error: " << err.what() << dendl;
+      ldpp_dout(dpp, 5) << "WARNING: failed to decode logging sources attribute '" << RGW_ATTR_BUCKET_LOGGING_SOURCES
+        << "' for logging bucket '" << bucket->get_key() << "'. error: " << err.what() << dendl;
       return -EIO;
     }
   }
@@ -746,8 +758,7 @@ int source_bucket_cleanup(const DoutPrefixProvider* dpp,
                                    bool remove_attr,
                                    optional_yield y) {
   std::optional<configuration> conf;
-  const auto& info = bucket->get_info();
-  if (const auto ret = retry_raced_bucket_write(dpp, bucket, [dpp, bucket, &conf, &info, remove_attr, y] {
+  if (const auto ret = retry_raced_bucket_write(dpp, bucket, [dpp, bucket, &conf, remove_attr, y] {
     auto& attrs = bucket->get_attrs();
     if (auto iter = attrs.find(RGW_ATTR_BUCKET_LOGGING); iter != attrs.end()) {
       try {
@@ -757,8 +768,8 @@ int source_bucket_cleanup(const DoutPrefixProvider* dpp,
         decode(tmp_conf, bl_iter);
         conf = std::move(tmp_conf);
       } catch (buffer::error& err) {
-        ldpp_dout(dpp, 1) << "WARNING: failed to decode existing logging attribute '" << RGW_ATTR_BUCKET_LOGGING
-          << "' of bucket '" << info.bucket << "', error: " << err.what() << dendl;
+        ldpp_dout(dpp, 5) << "WARNING: failed to decode existing logging attribute '" << RGW_ATTR_BUCKET_LOGGING
+          << "' of bucket '" << bucket->get_key() << "' during cleanup. error: " << err.what() << dendl;
         return -EIO;
       }
       if (remove_attr) {
@@ -770,8 +781,8 @@ int source_bucket_cleanup(const DoutPrefixProvider* dpp,
     return 0;
   }, y); ret < 0) {
     if (remove_attr) {
-      ldpp_dout(dpp, 1) << "ERROR: failed to remove logging attribute '" << RGW_ATTR_BUCKET_LOGGING << "' from bucket '" <<
-        info.bucket << "', ret = " << ret << dendl;
+      ldpp_dout(dpp, 5) << "WARNING: failed to remove logging attribute '" << RGW_ATTR_BUCKET_LOGGING <<
+        "' from bucket '" << bucket->get_key() << "' during cleanup. ret = " << ret << dendl;
     }
     return ret;
   }
@@ -779,25 +790,86 @@ int source_bucket_cleanup(const DoutPrefixProvider* dpp,
     // no logging attribute found
     return 0;
   }
+  const auto& info = bucket->get_info();
   if (const auto ret = commit_logging_object(*conf, dpp, driver, info.bucket.tenant, y); ret < 0) {
-    ldpp_dout(dpp, 1) << "WARNING: could not commit pending logging object of bucket '" <<
-      info.bucket << "', ret = " << ret << dendl;
+    ldpp_dout(dpp, 5) << "WARNING: could not commit pending logging object of bucket '" <<
+      bucket->get_key() << "' during cleanup. ret = " << ret << dendl;
   } else {
-    ldpp_dout(dpp, 20) << "INFO: successfully committed pending logging object of bucket '" << info.bucket << "'" << dendl;
+    ldpp_dout(dpp, 20) << "INFO: successfully committed pending logging object of bucket '" << bucket->get_key() << "'" << dendl;
   }
   rgw_bucket target_bucket_id;
-  rgw_bucket src_bucket_id{info.bucket.tenant, info.bucket.name};
   if (const auto ret = get_bucket_id(conf->target_bucket, info.bucket.tenant, target_bucket_id); ret < 0) {
-    ldpp_dout(dpp, 1) << "WARNING: failed to parse target bucket '" << conf->target_bucket << "', ret = " << ret << dendl;
+    ldpp_dout(dpp, 5) << "WARNING: failed to parse target logging bucket '" <<
+      conf->target_bucket << "' during cleanup. ret = " << ret << dendl;
     return 0;
   }
-  if (const auto ret = update_bucket_logging_sources(dpp, driver, target_bucket_id, src_bucket_id, false, y); ret < 0) {
-    ldpp_dout(dpp, 1) << "WARNING: could not update bucket logging source '" <<
-      info.bucket << "', ret = " << ret << dendl;
+  if (const auto ret = update_bucket_logging_sources(dpp, driver, target_bucket_id, bucket->get_key(), false, y); ret < 0) {
+    ldpp_dout(dpp, 5) << "WARNING: could not update bucket logging source '" <<
+      bucket->get_key() << "' during cleanup. ret = " << ret << dendl;
     return 0;
   }
   ldpp_dout(dpp, 20) << "INFO: successfully updated bucket logging source '" <<
-    info.bucket << "'"<< dendl;
+    bucket->get_key() << "' during cleanup"<< dendl;
+  return 0;
+}
+
+int verify_target_bucket_policy(const DoutPrefixProvider* dpp,
+    rgw::sal::Bucket* target_bucket,
+    const rgw::ARN& target_resource_arn,
+    req_state* s) {
+  // verify target permissions for bucket logging
+  // this is implementing the policy based permission granting from:
+  // https://docs.aws.amazon.com/AmazonS3/latest/userguide/enable-server-access-logging.html#grant-log-delivery-permissions-general
+  const auto& target_bucket_id = target_bucket->get_key();
+  const auto& target_attrs = target_bucket->get_attrs();
+  const auto policy_it = target_attrs.find(RGW_ATTR_IAM_POLICY);
+  if (policy_it == target_attrs.end()) {
+    ldpp_dout(dpp, 1) << "ERROR: logging bucket '" << target_bucket_id << "' must have bucket policy to allow logging" << dendl;
+    return -EACCES;
+  }
+  try {
+    const rgw::IAM::Policy policy{s->cct, &target_bucket_id.tenant, policy_it->second.to_str(), false};
+    ldpp_dout(dpp, 20) << "INFO: logging bucket '" << target_bucket_id <<
+      "' policy: " << policy << dendl;
+    rgw::auth::ServiceIdentity ident(rgw::bucketlogging::service_principal);
+    const auto source_bucket_arn = rgw::ARN(s->bucket->get_key()).to_string();
+    const auto source_account = to_string(s->bucket_owner.id);
+    s->env.emplace("aws:SourceArn", source_bucket_arn);
+    s->env.emplace("aws:SourceAccount", source_account);
+    if (policy.eval(s->env, ident, rgw::IAM::s3PutObject, target_resource_arn) != rgw::IAM::Effect::Allow) {
+      ldpp_dout(dpp, 1) << "ERROR: logging bucket: '" << target_bucket_id <<
+        "' must have a bucket policy that allows logging service principal to put objects in the following resource ARN: '" <<
+        target_resource_arn.to_string() << "' from source bucket ARN: '" << source_bucket_arn <<
+        "' and source account: '" << source_account << "'" <<  dendl;
+      return -EACCES;
+    }
+  } catch (const rgw::IAM::PolicyParseException& err) {
+    ldpp_dout(dpp, 1) << "ERROR: failed to parse logging bucket '" << target_bucket_id <<
+      "' policy. error: " << err.what() << dendl;
+    return -EACCES;
+  }
+  return 0;
+}
+
+int verify_target_bucket_attributes(const DoutPrefixProvider* dpp, rgw::sal::Bucket* target_bucket) {
+  const auto& target_info = target_bucket->get_info();
+  if (target_info.requester_pays) {
+    // target bucket must not have requester pays set on it
+    ldpp_dout(dpp, 1) << "ERROR: logging target bucket '" << target_bucket->get_key() << "', is configured with requester pays" << dendl;
+    return -EINVAL;
+  }
+
+  const auto& target_attrs = target_bucket->get_attrs();
+  if (target_attrs.find(RGW_ATTR_BUCKET_LOGGING) != target_attrs.end()) {
+    // target bucket must not have logging set on it
+    ldpp_dout(dpp, 1) << "ERROR: logging target bucket '" << target_bucket->get_key() << "', is configured with bucket logging" << dendl;
+    return -EINVAL;
+  }
+  if (target_attrs.find(RGW_ATTR_BUCKET_ENCRYPTION_POLICY) != target_attrs.end()) {
+    // verify target bucket does not have encryption
+    ldpp_dout(dpp, 1) << "ERROR: logging target bucket '" << target_bucket->get_key() << "', is configured with encryption" << dendl;
+    return -EINVAL;
+  }
   return 0;
 }
 
index d4c2e28b4752445988fbf6cffd4865e14016b0a4..857edba5260ddb0ddf69b398298248f06fb2357e 100644 (file)
@@ -159,13 +159,13 @@ inline std::string to_string(const Records& records) {
 // log a bucket logging record according to the configuration
 int log_record(rgw::sal::Driver* driver,
     const sal::Object* obj,
-    const req_state* s, 
-    const std::string& op_name, 
-    const std::string& etag, 
-    size_t size, 
+    req_state* s,
+    const std::string& op_name,
+    const std::string& etag,
+    size_t size,
     const configuration& conf,
-    const DoutPrefixProvider *dpp, 
-    optional_yield y, 
+    const DoutPrefixProvider *dpp,
+    optional_yield y,
     bool async_completion,
     bool log_source_bucket);
 
@@ -208,12 +208,12 @@ std::string object_name_oid(const rgw::sal::Bucket* bucket, const std::string& p
 int log_record(rgw::sal::Driver* driver,
     LoggingType type,
     const sal::Object* obj,
-    const req_state* s, 
-    const std::string& op_name, 
-    const std::string& etag, 
-    size_t size, 
-    const DoutPrefixProvider *dpp, 
-    optional_yield y, 
+    req_state* s,
+    const std::string& op_name,
+    const std::string& etag,
+    size_t size,
+    const DoutPrefixProvider *dpp,
+    optional_yield y,
     bool async_completion,
     bool log_source_bucket);
 
@@ -248,5 +248,19 @@ int source_bucket_cleanup(const DoutPrefixProvider* dpp,
                                    sal::Bucket* bucket,
                                    bool remove_attr,
                                    optional_yield y);
+
+// verify that the target bucket has the correct policy to allow the source bucket to log to it
+// note that this function adds entries to the request state environment
+int verify_target_bucket_policy(const DoutPrefixProvider* dpp,
+    rgw::sal::Bucket* target_bucket,
+    const rgw::ARN& target_resource_arn,
+    req_state* s);
+
+// verify that target bucket does not have:
+// - bucket logging
+// - requester pays
+// - encryption
+int verify_target_bucket_attributes(const DoutPrefixProvider* dpp, rgw::sal::Bucket* target_bucket);
+
 } // namespace rgw::bucketlogging
 
index 505107d89679c85ded58139cc74e4e22b81718ad..60a4b5b2b66bfddad47a75ed1b712b42330f45bf 100644 (file)
@@ -21,7 +21,7 @@ namespace {
     if (!exists) {
       ldpp_dout(dpp, 1) << "ERROR: missing required param 'logging'" << dendl;
       return -EINVAL;
-    } 
+    }
     if (no_value.length() > 0) {
       ldpp_dout(dpp, 1) << "ERROR: param 'logging' should not have any value" << dendl;
       return -EINVAL;
@@ -58,14 +58,17 @@ public:
       return;
     }
 
-    const rgw_bucket src_bucket_id(s->bucket_tenant, s->bucket_name);
     std::unique_ptr<rgw::sal::Bucket> src_bucket;
-    op_ret = driver->load_bucket(this, src_bucket_id,
+    {
+      const rgw_bucket src_bucket_id(s->bucket_tenant, s->bucket_name);
+      op_ret = driver->load_bucket(this, src_bucket_id,
                                  &src_bucket, y);
-    if (op_ret < 0) {
-      ldpp_dout(this, 1) << "ERROR: failed to get bucket '" << src_bucket_id << "', ret = " << op_ret << dendl;
-      return;
+      if (op_ret < 0) {
+        ldpp_dout(this, 1) << "ERROR: failed to get bucket '" << src_bucket_id << "', ret = " << op_ret << dendl;
+        return;
+      }
     }
+    const auto src_bucket_id = src_bucket->get_key();
     if (auto iter = src_bucket->get_attrs().find(RGW_ATTR_BUCKET_LOGGING); iter != src_bucket->get_attrs().end()) {
       try {
         configuration.enabled = true;
@@ -80,7 +83,7 @@ public:
       ldpp_dout(this, 5) << "WARNING: no logging configuration on bucket '" << src_bucket_id << "'" << dendl;
       return;
     }
-    ldpp_dout(this, 20) << "INFO: found logging configuration on bucket '" << src_bucket_id << "'" 
+    ldpp_dout(this, 20) << "INFO: found logging configuration on bucket '" << src_bucket_id << "'"
       << "'. configuration: " << configuration.to_json_str() << dendl;
   }
 
@@ -102,112 +105,139 @@ public:
 // PUT /<bucket name>/?logging
 // actual configuration is XML encoded in the body of the message
 class RGWPutBucketLoggingOp : public RGWDefaultResponseOp {
-  int verify_permission(optional_yield y) override {
-    auto [has_s3_existing_tag, has_s3_resource_tag] = rgw_check_policy_condition(this, s, false);
-    if (has_s3_resource_tag)
-      rgw_iam_add_buckettags(this, s);
-
-    if (!verify_bucket_permission(this, s, rgw::IAM::s3PutBucketLogging)) {
-      return -EACCES;
-    }
-
-    return 0;
-  }
-
-  const char* name() const override { return "put_bucket_logging"; }
-  RGWOpType get_type() override { return RGW_OP_PUT_BUCKET_LOGGING; }
-  uint32_t op_mask() override { return RGW_OP_TYPE_WRITE; }
+  // following data members are set in verify_permission()
+  // and usd in execute()
+  rgw::bucketlogging::configuration configuration;
+  std::unique_ptr<rgw::sal::Bucket> target_bucket;
 
-  void execute(optional_yield y) override { 
-    op_ret = verify_bucket_logging_params(this, s);
-    if (op_ret < 0) {
-      return;
+  int init_processing(optional_yield y) override {
+    if (const auto ret = verify_bucket_logging_params(this, s); ret < 0) {
+      return ret;
     }
 
     const auto max_size = s->cct->_conf->rgw_max_put_param_size;
     bufferlist data;
-    std::tie(op_ret, data) = read_all_input(s, max_size, false);
-    if (op_ret < 0) {
-      ldpp_dout(this, 1) << "ERROR: failed to read XML logging payload, ret = " << op_ret << dendl;
-      return;
+    int ret = 0;
+    if (std::tie(ret, data) = read_all_input(s, max_size, false); ret < 0) {
+      ldpp_dout(this, 1) << "ERROR: failed to read XML logging payload, ret = " << ret << dendl;
+      return ret;
     }
     if (data.length() == 0) {
       ldpp_dout(this, 1) << "ERROR: XML logging payload missing" << dendl;
-      op_ret = -EINVAL;
-      return;
+      return -EINVAL;
     }
 
     RGWXMLDecoder::XMLParser parser;
     if (!parser.init()){
       ldpp_dout(this, 1) << "ERROR: failed to initialize XML parser" << dendl;
-      op_ret = -EINVAL;
-      return;
+      return -EINVAL;
     }
     if (!parser.parse(data.c_str(), data.length(), 1)) {
       ldpp_dout(this, 1) << "ERROR: failed to parse XML logging payload" << dendl;
-      op_ret = -ERR_MALFORMED_XML;
-      return;
+      return -ERR_MALFORMED_XML;
     }
-    rgw::bucketlogging::configuration configuration;
     configuration.default_obj_roll_time = get_cct()->_conf->rgw_bucket_logging_obj_roll_time;
     try {
       RGWXMLDecoder::decode_xml("BucketLoggingStatus", configuration, &parser, true);
     } catch (RGWXMLDecoder::err& err) {
       ldpp_dout(this, 1) << "ERROR: failed to parse XML logging payload. error: " << err << dendl;
-      op_ret = -ERR_MALFORMED_XML;
-      return;
+      return -ERR_MALFORMED_XML;
     }
 
-    const rgw_bucket src_bucket_id(s->bucket_tenant, s->bucket_name);
-    std::unique_ptr<rgw::sal::Bucket> src_bucket;
-    op_ret = driver->load_bucket(this, src_bucket_id,
-                                 &src_bucket, y);
-    if (op_ret < 0) {
-      ldpp_dout(this, 1) << "ERROR: failed to get bucket '" << src_bucket_id << "', ret = " << op_ret << dendl;
-      return;
+    if (!configuration.enabled) {
+      // when disabling logging, there is no target bucket
+      return 0;
+    }
+
+    rgw_bucket target_bucket_id;
+    if (const auto ret = rgw::bucketlogging::get_bucket_id(configuration.target_bucket, s->bucket_tenant, target_bucket_id); ret < 0) {
+      ldpp_dout(this, 1) << "ERROR: failed to parse target bucket '" << configuration.target_bucket << "', ret = " << ret << dendl;
+      return ret;
+    }
+
+    if (const auto ret = driver->load_bucket(this, target_bucket_id,
+                                 &target_bucket, y); ret < 0) {
+      ldpp_dout(this, 1) << "ERROR: failed to get target bucket '" << target_bucket_id << "', ret = " << ret << dendl;
+      return ret;
+    }
+
+    return 0;
+  }
+
+  int verify_permission(optional_yield y) override {
+    auto [has_s3_existing_tag, has_s3_resource_tag] = rgw_check_policy_condition(this, s, false);
+    if (has_s3_resource_tag)
+      rgw_iam_add_buckettags(this, s);
+
+    if (!s->auth.identity->is_owner_of(s->bucket->get_info().owner)) {
+      ldpp_dout(this, 1) << "ERROR: user '" << s->auth.identity->get_aclowner().id << "' is not the owner of bucket '" <<
+        s->bucket_name << "'. owner is '" << s->bucket->get_info().owner << "'" << dendl;
+      return -EACCES;
+    }
+
+    if (!verify_bucket_permission(this, s, rgw::IAM::s3PutBucketLogging)) {
+      return -EACCES;
     }
 
     if (!configuration.enabled) {
-      op_ret = rgw::bucketlogging::source_bucket_cleanup(this, driver, src_bucket.get(), true, y);
-      return;
+      // when disabling logging, there is no target bucket
+      return 0;
     }
 
-    // set logging configuration
-    rgw_bucket target_bucket_id;
-    if (op_ret = rgw::bucketlogging::get_bucket_id(configuration.target_bucket, s->bucket_tenant, target_bucket_id); op_ret < 0) {
-      ldpp_dout(this, 1) << "ERROR: failed to parse target bucket '" << configuration.target_bucket << "', ret = " << op_ret << dendl;
+    const auto target_resource_arn = rgw::ARN(target_bucket->get_key(), configuration.target_prefix);
+    return rgw::bucketlogging::verify_target_bucket_policy(this, target_bucket.get(), target_resource_arn, s);
+  }
+
+  const char* name() const override { return "put_bucket_logging"; }
+  RGWOpType get_type() override { return RGW_OP_PUT_BUCKET_LOGGING; }
+  uint32_t op_mask() override { return RGW_OP_TYPE_WRITE; }
+
+  void execute(optional_yield y) override {
+
+    std::unique_ptr<rgw::sal::Bucket> src_bucket;
+    {
+      const rgw_bucket src_bucket_id{s->bucket_tenant, s->bucket_name};
+      op_ret = driver->load_bucket(this, src_bucket_id,
+                                   &src_bucket, y);
+      if (op_ret < 0) {
+        ldpp_dout(this, 1) << "ERROR: failed to get bucket '" << src_bucket_id << "', ret = " << op_ret << dendl;
+        return;
+      }
+    }
+
+    if (!configuration.enabled) {
+      op_ret = rgw::bucketlogging::source_bucket_cleanup(this, driver, src_bucket.get(), true, y);
       return;
     }
 
+    const auto src_bucket_id = src_bucket->get_key();
+    const auto& target_bucket_id = target_bucket->get_key();
     if (target_bucket_id == src_bucket_id) {
+      // target bucket must be different from source bucket (cannot change later on)
       ldpp_dout(this, 1) << "ERROR: target bucket '" << target_bucket_id << "' must be different from source bucket" << dendl;
       op_ret = -EINVAL;
       return;
     }
-    std::unique_ptr<rgw::sal::Bucket> target_bucket;
-    op_ret = driver->load_bucket(this, target_bucket_id,
-                                 &target_bucket, y);
-    if (op_ret < 0) {
-      ldpp_dout(this, 1) << "ERROR: failed to get target bucket '" << target_bucket_id << "', ret = " << op_ret << dendl;
-      return;
-    }
-    auto& target_attrs = target_bucket->get_attrs();
-    if (target_attrs.find(RGW_ATTR_BUCKET_LOGGING) != target_attrs.end()) {
-      // target bucket must not have logging set on it
-      ldpp_dout(this, 1) << "ERROR: logging target bucket '" << target_bucket_id << "', is configured with bucket logging" << dendl;
+    const auto& target_info = target_bucket->get_info();
+    if (target_info.zonegroup != src_bucket->get_info().zonegroup) {
+      // target bucket must be in the same zonegroup as source bucket (cannot change later on)
+      ldpp_dout(this, 1) << "ERROR: target bucket '" << target_bucket_id << "' zonegroup '" <<
+        target_info.zonegroup << "' is different from the source bucket '" << src_bucket_id  <<
+        "' zonegroup '" << src_bucket->get_info().zonegroup << "'" << dendl;
       op_ret = -EINVAL;
       return;
     }
-    // verify target bucket does not have encryption
-    if (target_attrs.find(RGW_ATTR_BUCKET_ENCRYPTION_POLICY) != target_attrs.end()) {
-      ldpp_dout(this, 1) << "ERROR: logging target bucket '" << target_bucket_id << "', is configured with encryption" << dendl;
-      op_ret = -EINVAL;
+
+    // following checks need to also be done on every write to the log bucket
+    if (const auto ret = rgw::bucketlogging::verify_target_bucket_attributes(this, target_bucket.get()); ret < 0) {
+      op_ret = ret;
       return;
     }
+
     std::optional<rgw::bucketlogging::configuration> old_conf;
     bufferlist conf_bl;
     encode(configuration, conf_bl);
-    op_ret = retry_raced_bucket_write(this, src_bucket.get(), [this, &conf_bl, &src_bucket, &old_conf, &configuration, y] {
+    op_ret = retry_raced_bucket_write(this, src_bucket.get(), [this, &conf_bl, &src_bucket, &old_conf, y] {
       auto& attrs = src_bucket->get_attrs();
       auto it = attrs.find(RGW_ATTR_BUCKET_LOGGING);
       if (it != attrs.end()) {
@@ -218,7 +248,7 @@ class RGWPutBucketLoggingOp : public RGWDefaultResponseOp {
           old_conf = std::move(tmp_conf);
         } catch (buffer::error& err) {
           ldpp_dout(this, 1) << "WARNING: failed to decode existing logging attribute '" << RGW_ATTR_BUCKET_LOGGING
-              << "' for bucket '" << src_bucket->get_info().bucket << "', error: " << err.what() << dendl;
+              << "' for bucket '" << src_bucket->get_key() << "', error: " << err.what() << dendl;
         }
         if (!old_conf || (old_conf && *old_conf != configuration)) {
           // conf changed (or was unknown) - update
@@ -233,7 +263,7 @@ class RGWPutBucketLoggingOp : public RGWDefaultResponseOp {
       return src_bucket->merge_and_store_attrs(this, attrs, y);
     }, y);
     if (op_ret < 0) {
-      ldpp_dout(this, 1) << "ERROR: failed to set logging attribute '" << RGW_ATTR_BUCKET_LOGGING << "' to bucket '" << 
+      ldpp_dout(this, 1) << "ERROR: failed to set logging attribute '" << RGW_ATTR_BUCKET_LOGGING << "' to bucket '" <<
         src_bucket_id << "', ret = " << op_ret << dendl;
       return;
     }
@@ -247,11 +277,11 @@ class RGWPutBucketLoggingOp : public RGWDefaultResponseOp {
     } else if (*old_conf != configuration) {
       // conf changed - do cleanup
       if (const auto ret = commit_logging_object(*old_conf, target_bucket, this, y); ret < 0) {
-        ldpp_dout(this, 1) << "WARNING: could not commit pending logging object when updating logging configuration of bucket '" << 
-          src_bucket->get_info().bucket << "', ret = " << ret << dendl;
+        ldpp_dout(this, 1) << "WARNING: could not commit pending logging object when updating logging configuration of bucket '" <<
+          src_bucket->get_key() << "', ret = " << ret << dendl;
       } else {
-        ldpp_dout(this, 20) << "INFO: committed pending logging object when updating logging configuration of bucket '" << 
-          src_bucket->get_info().bucket << "'" << dendl;
+        ldpp_dout(this, 20) << "INFO: committed pending logging object when updating logging configuration of bucket '" <<
+          src_bucket->get_key() << "'" << dendl;
       }
       if (old_conf->target_bucket != configuration.target_bucket) {
         rgw_bucket old_target_bucket_id;
@@ -278,65 +308,74 @@ class RGWPutBucketLoggingOp : public RGWDefaultResponseOp {
 
 // Post /<bucket name>/?logging
 class RGWPostBucketLoggingOp : public RGWDefaultResponseOp {
-  int verify_permission(optional_yield y) override {
-    auto [has_s3_existing_tag, has_s3_resource_tag] = rgw_check_policy_condition(this, s, false);
-    if (has_s3_resource_tag)
-      rgw_iam_add_buckettags(this, s);
+  // following data members are set in verify_permission()
+  // and usd in execute()
+  rgw::bucketlogging::configuration configuration;
+  std::unique_ptr<rgw::sal::Bucket> target_bucket;
 
-    if (!verify_bucket_permission(this, s, rgw::IAM::s3PostBucketLogging)) {
-      return -EACCES;
+  int init_processing(optional_yield y) override {
+    if (const auto ret = verify_bucket_logging_params(this, s); ret < 0) {
+      return ret;
     }
 
-    return 0;
-  }
-
-  const char* name() const override { return "post_bucket_logging"; }
-  RGWOpType get_type() override { return RGW_OP_POST_BUCKET_LOGGING; }
-  uint32_t op_mask() override { return RGW_OP_TYPE_WRITE; }
-
-  void execute(optional_yield y) override {
-    op_ret = verify_bucket_logging_params(this, s);
-    if (op_ret < 0) {
-      return;
-    }
-
-    const rgw_bucket src_bucket_id(s->bucket_tenant, s->bucket_name);
     std::unique_ptr<rgw::sal::Bucket> src_bucket;
-    op_ret = driver->load_bucket(this, src_bucket_id,
-                                 &src_bucket, y);
-    if (op_ret < 0) {
-      ldpp_dout(this, 1) << "ERROR: failed to get bucket '" << src_bucket_id << "', ret = " << op_ret << dendl;
-      return;
+    {
+      const rgw_bucket src_bucket_id{s->bucket_tenant, s->bucket_name};
+      if (const auto ret = driver->load_bucket(this, src_bucket_id,
+                                 &src_bucket, y); ret < 0) {
+        ldpp_dout(this, 1) << "ERROR: failed to get bucket '" << src_bucket_id << "', ret = " << ret << dendl;
+        return ret;
+      }
     }
+    const auto src_bucket_id = src_bucket->get_key();
     const auto& bucket_attrs = src_bucket->get_attrs();
     auto iter = bucket_attrs.find(RGW_ATTR_BUCKET_LOGGING);
     if (iter == bucket_attrs.end()) {
       ldpp_dout(this, 1) << "WARNING: no logging configured on bucket '" << src_bucket_id << "'" << dendl;
-      return;
+      return 0;
     }
-    rgw::bucketlogging::configuration configuration;
     try {
       configuration.enabled = true;
       decode(configuration, iter->second);
     } catch (buffer::error& err) {
       ldpp_dout(this, 1) << "WARNING: failed to decode logging attribute '" << RGW_ATTR_BUCKET_LOGGING
         << "' for bucket '" << src_bucket_id << "', error: " << err.what() << dendl;
-      op_ret = -EINVAL;
-      return;
+      return -EINVAL;
     }
 
     rgw_bucket target_bucket_id;
-    if (op_ret = rgw::bucketlogging::get_bucket_id(configuration.target_bucket, s->bucket_tenant, target_bucket_id); op_ret < 0) {
-      ldpp_dout(this, 1) << "ERROR: failed to parse target bucket '" << configuration.target_bucket << "', ret = " << op_ret << dendl;
-      return;
+    if (const auto ret = rgw::bucketlogging::get_bucket_id(configuration.target_bucket, s->bucket_tenant, target_bucket_id); ret < 0) {
+      ldpp_dout(this, 1) << "ERROR: failed to parse target bucket '" << configuration.target_bucket << "', ret = " << ret << dendl;
+      return ret;
     }
-    std::unique_ptr<rgw::sal::Bucket> target_bucket;
-    op_ret = driver->load_bucket(this, target_bucket_id,
-                                 &target_bucket, y);
-    if (op_ret < 0) {
-      ldpp_dout(this, 1) << "ERROR: failed to get target bucket '" << target_bucket_id << "', ret = " << op_ret << dendl;
-      return;
+
+    if (const auto ret = driver->load_bucket(this, target_bucket_id,
+                                 &target_bucket, y); ret < 0) {
+      ldpp_dout(this, 1) << "ERROR: failed to get target bucket '" << target_bucket_id << "', ret = " << ret << dendl;
+      return ret;
     }
+    return 0;
+  }
+
+  int verify_permission(optional_yield y) override {
+    auto [has_s3_existing_tag, has_s3_resource_tag] = rgw_check_policy_condition(this, s, false);
+    if (has_s3_resource_tag)
+      rgw_iam_add_buckettags(this, s);
+
+    if (!verify_bucket_permission(this, s, rgw::IAM::s3PostBucketLogging)) {
+      return -EACCES;
+    }
+
+    const auto target_resource_arn = rgw::ARN(target_bucket->get_key(), configuration.target_prefix);
+    return rgw::bucketlogging::verify_target_bucket_policy(this, target_bucket.get(), target_resource_arn, s);
+  }
+
+  const char* name() const override { return "post_bucket_logging"; }
+  RGWOpType get_type() override { return RGW_OP_POST_BUCKET_LOGGING; }
+  uint32_t op_mask() override { return RGW_OP_TYPE_WRITE; }
+
+  void execute(optional_yield y) override {
+    const auto& target_bucket_id = target_bucket->get_key();
     std::string obj_name;
     RGWObjVersionTracker objv_tracker;
     op_ret = target_bucket->get_logging_object_name(obj_name, configuration.target_prefix, null_yield, this, &objv_tracker);
@@ -352,7 +391,7 @@ class RGWPostBucketLoggingOp : public RGWDefaultResponseOp {
       return;
     }
     ldpp_dout(this, 20) << "INFO: flushed pending logging object '" << old_obj
-                << "' to target bucket '" << configuration.target_bucket << "'" << dendl;
+                << "' to target bucket '" << target_bucket_id << "'" << dendl;
   }
 };