]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
rgw/logging: add error message when log_record fails
authorYuval Lifshitz <ylifshit@ibm.com>
Thu, 11 Sep 2025 15:22:57 +0000 (15:22 +0000)
committerYuval Lifshitz <ylifshit@ibm.com>
Thu, 11 Sep 2025 15:22:57 +0000 (15:22 +0000)
when log_record fails in journal mode due to issues in the target
bucket, the result code that the client get will be confusing, since
there is no indication that the issue is wit hte target bucket and not
the source bucket on which the client was operating.
the HTTP error message will be used to convey this information.

Fixes: https://tracker.ceph.com/issues/72543
Signed-off-by: Yuval Lifshitz <ylifshit@ibm.com>
src/rgw/rgw_bucket_logging.cc
src/rgw/rgw_bucket_logging.h
src/rgw/rgw_rest_bucket_logging.cc

index fa3eaf429585a6cbbe54e99e62b59e162942c7c0..290f1f6ec2fdb7300a7c35e9ed1d33cf3eb798b4 100644 (file)
@@ -288,19 +288,19 @@ int new_logging_object(const configuration& conf,
   int ret = target_bucket->set_logging_object_name(obj_name, conf.target_prefix, y, dpp, (old_name == std::nullopt), objv_tracker);
   if (ret == -EEXIST || ret == -ECANCELED) {
    if (ret = target_bucket->get_logging_object_name(obj_name, conf.target_prefix, y, dpp, nullptr); ret < 0) {
-      ldpp_dout(dpp, 1) << "ERROR: failed to get name of logging object of bucket '" <<
+      ldpp_dout(dpp, 1) << "ERROR: failed to get name of logging object of logging bucket '" <<
         target_bucket_id << "' and prefix '" << conf.target_prefix << "', ret = " << ret << dendl;
       return ret;
     }
-    ldpp_dout(dpp, 20) << "INFO: name already set. got name of logging object '" << obj_name <<  "' of bucket '" <<
+    ldpp_dout(dpp, 20) << "INFO: name already set. got name of logging object '" << obj_name <<  "' of logging bucket '" <<
       target_bucket_id << "' and prefix '" << conf.target_prefix << "'" << dendl;
     return -ECANCELED;
   } else if (ret < 0) {
-    ldpp_dout(dpp, 1) << "ERROR: failed to write name of logging object '" << obj_name << "' of bucket '" <<
+    ldpp_dout(dpp, 1) << "ERROR: failed to write name of logging object '" << obj_name << "' of logging bucket '" <<
       target_bucket_id << "'. ret = " << ret << dendl;
     return ret;
   }
-  ldpp_dout(dpp, 20) << "INFO: wrote name of logging object '" << obj_name <<  "' of bucket '" <<
+  ldpp_dout(dpp, 20) << "INFO: wrote name of logging object '" << obj_name <<  "' of logging bucket '" <<
       target_bucket_id << "'" << dendl;
   return 0;
 }
@@ -315,7 +315,7 @@ int commit_logging_object(const configuration& conf,
   std::string target_tenant_name;
   int ret = rgw_parse_url_bucket(conf.target_bucket, tenant_name, target_tenant_name, target_bucket_name);
   if (ret < 0) {
-    ldpp_dout(dpp, 1) << "ERROR: failed to parse target bucket '" << conf.target_bucket << "' when committing logging object, ret = "
+    ldpp_dout(dpp, 1) << "ERROR: failed to parse logging bucket '" << conf.target_bucket << "' when committing logging object, ret = "
       << ret << dendl;
     return ret;
   }
@@ -324,7 +324,7 @@ int commit_logging_object(const configuration& conf,
   ret = driver->load_bucket(dpp, target_bucket_id,
                                &target_bucket, y);
   if (ret < 0) {
-    ldpp_dout(dpp, 1) << "ERROR: failed to get target logging bucket '" << target_bucket_id << "' when committing logging object, ret = "
+    ldpp_dout(dpp, 1) << "ERROR: failed to get logging bucket '" << target_bucket_id << "' when committing logging object, ret = "
       << ret << dendl;
     return ret;
   }
@@ -338,12 +338,12 @@ int commit_logging_object(const configuration& conf,
     std::string* last_committed) {
   std::string obj_name;
   if (const int ret = target_bucket->get_logging_object_name(obj_name, conf.target_prefix, y, dpp, nullptr); ret < 0) {
-    ldpp_dout(dpp, 1) << "ERROR: failed to get name of logging object of bucket '" <<
+    ldpp_dout(dpp, 1) << "ERROR: failed to get name of logging object of logging bucket '" <<
       target_bucket->get_key() << "'. ret = " << ret << dendl;
     return ret;
   }
   if (const int ret = target_bucket->commit_logging_object(obj_name, y, dpp, conf.target_prefix, last_committed); ret < 0) {
-    ldpp_dout(dpp, 1) << "ERROR: failed to commit logging object '" << obj_name << "' of bucket '" <<
+    ldpp_dout(dpp, 1) << "ERROR: failed to commit logging object '" << obj_name << "' of logging bucket '" <<
       target_bucket->get_key() << "'. ret = " << ret << dendl;
     return ret;
   }
@@ -359,19 +359,20 @@ int rollover_logging_object(const configuration& conf,
     optional_yield y,
     bool must_commit,
     RGWObjVersionTracker* objv_tracker,
-    std::string* last_committed) {
+    std::string* last_committed,
+    std::string* err_message) {
   std::string target_bucket_name;
   std::string target_tenant_name;
   std::ignore = rgw_parse_url_bucket(conf.target_bucket, target_bucket->get_tenant(), target_tenant_name, target_bucket_name);
   if (target_bucket_name != target_bucket->get_name() || target_tenant_name != target_bucket->get_tenant()) {
     ldpp_dout(dpp, 1) << "ERROR: logging bucket name mismatch. conf= '" << conf.target_bucket <<
       "', bucket= '" << target_bucket->get_key() << "'" << dendl;
-    return -EINVAL;
+    return -EINVAL; // this should never happen
   }
 
   auto old_obj = obj_name.empty() ? std::nullopt : std::optional<std::string>(obj_name);
 
-  auto handle_error = [&dpp, &old_obj, &target_bucket](int ret) {
+  auto handle_error = [&dpp, &old_obj, &target_bucket, err_message](int ret) {
     if (ret == -ECANCELED) {
       ldpp_dout(dpp, 20) << "INFO: rollover already performed for logging object '" << old_obj <<  "' to logging bucket '" <<
         target_bucket->get_key() << "'. ret = " << ret << dendl;
@@ -380,6 +381,9 @@ int rollover_logging_object(const configuration& conf,
     if (ret < 0) {
       ldpp_dout(dpp, 1) << "ERROR: failed to rollover logging object '" << old_obj << "' to logging bucket '" <<
         target_bucket->get_key() << "'. ret = " << ret << dendl;
+      if (err_message) {
+        *err_message = fmt::format("Failed to rollover logging object of logging bucket '{}'", target_bucket->get_name());
+      }
     }
     return ret;
   };
@@ -396,6 +400,9 @@ int rollover_logging_object(const configuration& conf,
   }
   if (const int ret = target_bucket->commit_logging_object(*old_obj, y, dpp, conf.target_prefix, last_committed); ret < 0) {
     if (must_commit) {
+      if (err_message) {
+        *err_message = fmt::format("Failed to commit logging object of logging bucket '{}'", target_bucket->get_name());
+      }
       return ret;
     }
     ldpp_dout(dpp, 5) << "WARNING: failed to commit object '" << old_obj << "' to logging bucket '" <<
@@ -464,13 +471,17 @@ int log_record(rgw::sal::Driver* driver,
     bool log_source_bucket) {
   if (!s->bucket) {
     ldpp_dout(dpp, 1) << "ERROR: only bucket operations are logged in bucket logging" << dendl;
-    return -EINVAL;
+    return -EINVAL; // this should never happen
   }
+  auto set_journal_err = [&conf, s](const std::string& err_message) {
+    if (conf.logging_type == LoggingType::Journal) s->err.message = err_message;
+  };
   std::string target_bucket_name;
   std::string target_tenant_name;
   int ret = rgw_parse_url_bucket(conf.target_bucket, s->bucket_tenant, target_tenant_name, target_bucket_name);
   if (ret < 0) {
-    ldpp_dout(dpp, 1) << "ERROR: failed to parse target logging bucket '" << conf.target_bucket << "', ret = " << ret << dendl;
+    ldpp_dout(dpp, 1) << "ERROR: failed to parse logging bucket name '" << conf.target_bucket << "', ret = " << ret << dendl;
+    set_journal_err(fmt::format("Faild to parse logging bucket name '{}'", conf.target_bucket));
     return ret;
   }
   const rgw_bucket target_bucket_id(target_tenant_name, target_bucket_name);
@@ -478,18 +489,20 @@ int log_record(rgw::sal::Driver* driver,
   ret = driver->load_bucket(dpp, target_bucket_id,
                                &target_bucket, y);
   if (ret < 0) {
-    ldpp_dout(dpp, 1) << "ERROR: failed to get target logging bucket '" << target_bucket_id << "'. ret = " << ret << dendl;
+    ldpp_dout(dpp, 1) << "ERROR: failed to load logging bucket '" << target_bucket_id << "'. ret = " << ret << dendl;
+    set_journal_err(fmt::format("Faild to load logging bucket '{}'", target_bucket_id.bucket_id));
     return ret;
   }
 
   rgw::ARN target_resource_arn(target_bucket_id, conf.target_prefix);
-  if (ret = verify_target_bucket_policy(dpp, target_bucket.get(), target_resource_arn, s); ret < 0) {
-    ldpp_dout(dpp, 1) << "ERROR: failed to verify target logging bucket policy for bucket '" << target_bucket->get_key() <<
-      "'. ret = " << ret << dendl;
-    return -EACCES;
+  std::string err_message;
+  if (ret = verify_target_bucket_policy(dpp, target_bucket.get(), target_resource_arn, s, &err_message); ret < 0) {
+    set_journal_err(err_message);
+    return ret;
   }
 
-  if (ret = verify_target_bucket_attributes(dpp, target_bucket.get()); ret < 0) {
+  if (ret = verify_target_bucket_attributes(dpp, target_bucket.get(), &err_message); ret < 0) {
+    set_journal_err(err_message);
     return ret;
   }
 
@@ -500,9 +513,10 @@ int log_record(rgw::sal::Driver* driver,
   if (ret == 0) {
     const auto time_to_commit = time_from_name(obj_name, dpp) + std::chrono::seconds(conf.obj_roll_time);
     if (ceph::coarse_real_time::clock::now() > time_to_commit) {
-      ldpp_dout(dpp, 20) << "INFO: logging object '" << obj_name << "' exceeded its time, will be committed to bucket '" <<
+      ldpp_dout(dpp, 20) << "INFO: logging object '" << obj_name << "' exceeded its time, will be committed to logging bucket '" <<
         target_bucket_id << "'" << dendl;
-      if (ret = rollover_logging_object(conf, target_bucket, obj_name, dpp, region, s->bucket, y, false, &objv_tracker, nullptr); ret < 0) {
+      if (ret = rollover_logging_object(conf, target_bucket, obj_name, dpp, region, s->bucket, y, false, &objv_tracker, nullptr, &err_message); ret < 0) {
+        set_journal_err(err_message);
         return ret;
       }
     } else {
@@ -512,19 +526,21 @@ int log_record(rgw::sal::Driver* driver,
     // try to create the temporary log object for the first time
     ret = new_logging_object(conf, target_bucket, obj_name, dpp, region, s->bucket, y, std::nullopt, &objv_tracker);
     if (ret == 0) {
-      ldpp_dout(dpp, 20) << "INFO: first time logging for bucket '" << target_bucket_id << "' and prefix '" <<
+      ldpp_dout(dpp, 20) << "INFO: first time logging for logging bucket '" << target_bucket_id << "' and prefix '" <<
         conf.target_prefix << "'" << dendl;
     } else if (ret == -ECANCELED) {
-      ldpp_dout(dpp, 20) << "INFO: logging object '" << obj_name << "' already exists for bucket '" << target_bucket_id << "' and prefix" <<
+      ldpp_dout(dpp, 20) << "INFO: logging object '" << obj_name << "' already exists for logging bucket '" << target_bucket_id << "' and prefix" <<
         conf.target_prefix << "'" << dendl;
     } else {
-      ldpp_dout(dpp, 1) << "ERROR: failed to create logging object of bucket '" <<
-        target_bucket_id << "' and prefix '" << conf.target_prefix << "' for the first time. ret = " << ret << dendl;
+      ldpp_dout(dpp, 1) << "ERROR: failed to create first time logging object of logging bucket '" <<
+        target_bucket_id << "' and prefix '" << conf.target_prefix << "'. ret = " << ret << dendl;
+      set_journal_err(fmt::format("Faild create first time logging object of logging bucket '{}'", target_bucket->get_name()));
       return ret;
     }
   } else {
-    ldpp_dout(dpp, 1) << "ERROR: failed to get name of logging object of bucket '" <<
+    ldpp_dout(dpp, 1) << "ERROR: failed to get name of logging object of logging bucket '" <<
       target_bucket_id << "'. ret = " << ret << dendl;
+      set_journal_err(fmt::format("Faild to get name of logging object of logging bucket '{}'", target_bucket->get_name()));
     return ret;
   }
 
@@ -559,7 +575,6 @@ int log_record(rgw::sal::Driver* driver,
     bucket_name = full_bucket_name(s->bucket);
   }
 
-
   switch (conf.logging_type) {
     case LoggingType::Standard:
       record = fmt::format("{} {} [{:%d/%b/%Y:%H:%M:%S %z}] {} {} {} {} {} \"{} {}{}{} HTTP/1.1\" {} {} {} {} {} {} {} \"{}\" {} {} {} {} {} {} {} {} {}",
@@ -607,14 +622,15 @@ int log_record(rgw::sal::Driver* driver,
     case LoggingType::Any:
       ldpp_dout(dpp, 1) << "ERROR: failed to format record when writing to logging object '" <<
         obj_name << "' due to unsupported logging type" << dendl;
-      return -EINVAL;
+      return -EINVAL; // this should never happen
   }
 
   // get quota of the owner of the target bucket
   RGWQuota user_quota;
   if (ret = get_owner_quota_info(dpp, y, driver, target_bucket->get_owner(), user_quota); ret < 0) {
-    ldpp_dout(dpp, 1) << "ERROR: failed to get quota of owner of target logging bucket '" <<
+    ldpp_dout(dpp, 1) << "ERROR: failed to get quota of owner of logging bucket '" <<
       target_bucket_id << "' failed. ret = " << ret << dendl;
+    set_journal_err(fmt::format("Faild to get quota of owner of logging bucket '{}'", target_bucket->get_name()));
     return ret;
   }
   // start with system default quota
@@ -631,8 +647,9 @@ int log_record(rgw::sal::Driver* driver,
   }
   // verify there is enough quota to write the record
   if (ret = target_bucket->check_quota(dpp, quota, record.length(), y); ret < 0) {
-    ldpp_dout(dpp, 1) << "ERROR: quota check on target logging bucket '" <<
+    ldpp_dout(dpp, 1) << "ERROR: quota check on logging bucket '" <<
       target_bucket_id << "' failed. ret = " << ret << dendl;
+    set_journal_err(fmt::format("Quota check on logging bucket '{}' failed", target_bucket->get_name()));
     return ret;
   }
 
@@ -643,12 +660,14 @@ int log_record(rgw::sal::Driver* driver,
         async_completion); ret < 0 && ret != -EFBIG) {
     ldpp_dout(dpp, 1) << "ERROR: failed to write record to logging object '" <<
       obj_name << "'. ret = " << ret << dendl;
+    set_journal_err(fmt::format("Failed to write record to logging object of logging bucket {}", target_bucket->get_name()));
     return ret;
   }
   if (ret == -EFBIG) {
-    ldpp_dout(dpp, 5) << "WARNING: logging object '" << obj_name << "' is full, will be committed to bucket '" <<
+    ldpp_dout(dpp, 5) << "WARNING: logging object '" << obj_name << "' is full, will be committed to logging bucket '" <<
       target_bucket->get_key() << "'" << dendl;
-    if (ret = rollover_logging_object(conf, target_bucket, obj_name, dpp, region, s->bucket, y, true, &objv_tracker, nullptr); ret < 0 ) {
+    if (ret = rollover_logging_object(conf, target_bucket, obj_name, dpp, region, s->bucket, y, true, &objv_tracker, nullptr, &err_message); ret < 0 ) {
+      set_journal_err(err_message);
       return ret;
     }
     if (ret = target_bucket->write_logging_object(obj_name,
@@ -658,6 +677,7 @@ int log_record(rgw::sal::Driver* driver,
         async_completion); ret < 0) {
     ldpp_dout(dpp, 1) << "ERROR: failed to write record to logging object '" <<
       obj_name << "'. ret = " << ret << dendl;
+    set_journal_err(fmt::format("Failed to write record to logging object of logging bucket {}", target_bucket->get_name()));
     return ret;
     }
   }
@@ -735,7 +755,7 @@ int update_bucket_logging_sources(const DoutPrefixProvider* dpp, rgw::sal::Drive
   std::unique_ptr<rgw::sal::Bucket> target_bucket;
   const int ret = driver->load_bucket(dpp, target_bucket_id, &target_bucket, y);
   if (ret < 0) {
-    ldpp_dout(dpp, 5) << "WARNING: failed to get target logging bucket '" << target_bucket_id  <<
+    ldpp_dout(dpp, 5) << "WARNING: failed to get logging bucket '" << target_bucket_id  <<
       "' in order to update logging sources. ret = " << ret << dendl;
     return ret;
   }
@@ -887,7 +907,7 @@ int source_bucket_cleanup(const DoutPrefixProvider* dpp,
   }
   rgw_bucket target_bucket_id;
   if (const int ret = get_bucket_id(conf->target_bucket, info.bucket.tenant, target_bucket_id); ret < 0) {
-    ldpp_dout(dpp, 5) << "WARNING: failed to parse target logging bucket '" <<
+    ldpp_dout(dpp, 5) << "WARNING: failed to parse logging bucket '" <<
       conf->target_bucket << "' during cleanup. ret = " << ret << dendl;
     return 0;
   }
@@ -904,7 +924,8 @@ int source_bucket_cleanup(const DoutPrefixProvider* dpp,
 int verify_target_bucket_policy(const DoutPrefixProvider* dpp,
     rgw::sal::Bucket* target_bucket,
     const rgw::ARN& target_resource_arn,
-    req_state* s) {
+    req_state* s,
+    std::string* err_message) {
   // verify target permissions for bucket logging
   // this is implementing the policy based permission granting from:
   // https://docs.aws.amazon.com/AmazonS3/latest/userguide/enable-server-access-logging.html#grant-log-delivery-permissions-general
@@ -913,6 +934,9 @@ int verify_target_bucket_policy(const DoutPrefixProvider* dpp,
   const auto policy_it = target_attrs.find(RGW_ATTR_IAM_POLICY);
   if (policy_it == target_attrs.end()) {
     ldpp_dout(dpp, 1) << "ERROR: logging bucket '" << target_bucket_id << "' must have bucket policy to allow logging" << dendl;
+    if (err_message) {
+      *err_message = fmt::format("Logging bucket '{}' must have policy to allow logging", target_bucket->get_name());
+    }
     return -EACCES;
   }
   try {
@@ -929,33 +953,48 @@ int verify_target_bucket_policy(const DoutPrefixProvider* dpp,
         "' must have a bucket policy that allows logging service principal to put objects in the following resource ARN: '" <<
         target_resource_arn.to_string() << "' from source bucket ARN: '" << source_bucket_arn <<
         "' and source account: '" << source_account << "'" <<  dendl;
+      if (err_message) {
+        *err_message = fmt::format("Logging bucket '{}' policy does not allow logging", target_bucket->get_name());
+      }
       return -EACCES;
     }
   } catch (const rgw::IAM::PolicyParseException& err) {
     ldpp_dout(dpp, 1) << "ERROR: failed to parse logging bucket '" << target_bucket_id <<
       "' policy. error: " << err.what() << dendl;
+    if (err_message) {
+      *err_message = fmt::format("Logging bucket '{}' has invalid policy", target_bucket->get_name());
+    }
     return -EACCES;
   }
   return 0;
 }
 
-int verify_target_bucket_attributes(const DoutPrefixProvider* dpp, rgw::sal::Bucket* target_bucket) {
+int verify_target_bucket_attributes(const DoutPrefixProvider* dpp, rgw::sal::Bucket* target_bucket, std::string* err_message) {
   const auto& target_info = target_bucket->get_info();
   if (target_info.requester_pays) {
     // target bucket must not have requester pays set on it
-    ldpp_dout(dpp, 1) << "ERROR: logging target bucket '" << target_bucket->get_key() << "', is configured with requester pays" << dendl;
+    ldpp_dout(dpp, 1) << "ERROR: logging bucket '" << target_bucket->get_key() << "', is configured with requester pays" << dendl;
+    if (err_message) {
+      *err_message = fmt::format("Logging bucket '{}' is configured with requester pays", target_bucket->get_name());
+    }
     return -EINVAL;
   }
 
   const auto& target_attrs = target_bucket->get_attrs();
   if (target_attrs.find(RGW_ATTR_BUCKET_LOGGING) != target_attrs.end()) {
     // target bucket must not have logging set on it
-    ldpp_dout(dpp, 1) << "ERROR: logging target bucket '" << target_bucket->get_key() << "', is configured with bucket logging" << dendl;
+    ldpp_dout(dpp, 1) << "ERROR: logging bucket '" << target_bucket->get_key() << "', is configured with bucket logging" << dendl;
+    if (err_message) {
+      *err_message = fmt::format("Logging bucket '{}' is configured with bucket logging", target_bucket->get_name());
+    }
     return -EINVAL;
   }
   if (target_attrs.find(RGW_ATTR_BUCKET_ENCRYPTION_POLICY) != target_attrs.end()) {
     // verify target bucket does not have encryption
-    ldpp_dout(dpp, 1) << "ERROR: logging target bucket '" << target_bucket->get_key() << "', is configured with encryption" << dendl;
+    ldpp_dout(dpp, 1) << "ERROR: logging bucket '" << target_bucket->get_key() << "', is configured with encryption" << dendl;
+    if (err_message) {
+      *err_message = fmt::format("Logging bucket '{}' is configured with encryption", target_bucket->get_name());
+    }
     return -EINVAL;
   }
   return 0;
@@ -988,13 +1027,13 @@ int get_target_and_conf_from_source(
 
   rgw_bucket target_bucket_id;
   if (const int ret = get_bucket_id(configuration.target_bucket, tenant, target_bucket_id); ret < 0) {
-    ldpp_dout(dpp, 1) << "ERROR: failed to parse target bucket '" << configuration.target_bucket << "', ret = " << ret << dendl;
+    ldpp_dout(dpp, 1) << "ERROR: failed to parse logging bucket '" << configuration.target_bucket << "', ret = " << ret << dendl;
     return ret;
   }
 
   if (const int ret = driver->load_bucket(dpp, target_bucket_id,
                                &target_bucket, y); ret < 0) {
-    ldpp_dout(dpp, 1) << "ERROR: failed to get target bucket '" << target_bucket_id << "', ret = " << ret << dendl;
+    ldpp_dout(dpp, 1) << "ERROR: failed to get logging bucket '" << target_bucket_id << "', ret = " << ret << dendl;
     return ret;
   }
   return 0;
index f27f296f53e0e9698b7005519b7e058fe90351c5..a50f998ed9c9dca4c4968195a0dc740589b9c205 100644 (file)
@@ -173,6 +173,7 @@ int log_record(rgw::sal::Driver* driver,
 // and create a new pending log object
 // if "must_commit" is "false" the function will return success even if the pending log object was not committed
 // if "last_committed" is not null, it will be set to the name of the last committed object
+// optional error message is returned by reference
 int rollover_logging_object(const configuration& conf,
     const std::unique_ptr<rgw::sal::Bucket>& bucket,
     std::string& obj_name,
@@ -182,7 +183,8 @@ int rollover_logging_object(const configuration& conf,
     optional_yield y,
     bool must_commit,
     RGWObjVersionTracker* objv_tracker,
-    std::string* last_committed);
+    std::string* last_committed,
+    std::string* err_message = nullptr);
 
 // commit the pending log object to the log bucket
 // use this for cleanup, when new pending object is not needed
@@ -259,16 +261,21 @@ int source_bucket_cleanup(const DoutPrefixProvider* dpp,
 
 // verify that the target bucket has the correct policy to allow the source bucket to log to it
 // note that this function adds entries to the request state environment
+// optional error message is returned by reference
 int verify_target_bucket_policy(const DoutPrefixProvider* dpp,
     rgw::sal::Bucket* target_bucket,
     const rgw::ARN& target_resource_arn,
-    req_state* s);
+    req_state* s,
+    std::string* err_message = nullptr);
 
 // verify that target bucket does not have:
 // - bucket logging
 // - requester pays
 // - encryption
-int verify_target_bucket_attributes(const DoutPrefixProvider* dpp, rgw::sal::Bucket* target_bucket);
+// optional error message is returned by reference
+int verify_target_bucket_attributes(const DoutPrefixProvider* dpp,
+    rgw::sal::Bucket* target_bucket,
+    std::string* err_message = nullptr);
 
 // given a source bucket this function is parsing the configuration object
 // extracting the target bucket and load it
index 28294119456db51b773ba38f33fb2f9a1a59b2c2..f3af8af09b8157b48f0ac2644ea82563e0be12eb 100644 (file)
@@ -178,13 +178,13 @@ class RGWPutBucketLoggingOp : public RGWDefaultResponseOp {
 
     rgw_bucket target_bucket_id;
     if (const auto ret = rgw::bucketlogging::get_bucket_id(configuration.target_bucket, s->bucket_tenant, target_bucket_id); ret < 0) {
-      ldpp_dout(this, 1) << "ERROR: failed to parse target bucket '" << configuration.target_bucket << "', ret = " << ret << dendl;
+      ldpp_dout(this, 1) << "ERROR: failed to parse logging bucket '" << configuration.target_bucket << "', ret = " << ret << dendl;
       return ret;
     }
 
     if (const auto ret = driver->load_bucket(this, target_bucket_id,
                                  &target_bucket, y); ret < 0) {
-      ldpp_dout(this, 1) << "ERROR: failed to get target bucket '" << target_bucket_id << "', ret = " << ret << dendl;
+      ldpp_dout(this, 1) << "ERROR: failed to get logging bucket '" << target_bucket_id << "', ret = " << ret << dendl;
       return ret;
     }
 
@@ -242,14 +242,14 @@ class RGWPutBucketLoggingOp : public RGWDefaultResponseOp {
     const auto& target_bucket_id = target_bucket->get_key();
     if (target_bucket_id == src_bucket_id) {
       // target bucket must be different from source bucket (cannot change later on)
-      ldpp_dout(this, 1) << "ERROR: target bucket '" << target_bucket_id << "' must be different from source bucket" << dendl;
+      ldpp_dout(this, 1) << "ERROR: logging bucket '" << target_bucket_id << "' must be different from source bucket" << dendl;
       op_ret = -EINVAL;
       return;
     }
     const auto& target_info = target_bucket->get_info();
     if (target_info.zonegroup != src_bucket->get_info().zonegroup) {
       // target bucket must be in the same zonegroup as source bucket (cannot change later on)
-      ldpp_dout(this, 1) << "ERROR: target bucket '" << target_bucket_id << "' zonegroup '" <<
+      ldpp_dout(this, 1) << "ERROR: logging bucket '" << target_bucket_id << "' zonegroup '" <<
         target_info.zonegroup << "' is different from the source bucket '" << src_bucket_id  <<
         "' zonegroup '" << src_bucket->get_info().zonegroup << "'" << dendl;
       op_ret = -EINVAL;
@@ -301,7 +301,7 @@ class RGWPutBucketLoggingOp : public RGWDefaultResponseOp {
       ldpp_dout(this, 20) << "INFO: new logging configuration added to bucket '" << src_bucket_id << "'. configuration: " <<
         configuration.to_json_str() << dendl;
       if (const auto ret = rgw::bucketlogging::update_bucket_logging_sources(this, target_bucket, src_bucket_id, true, y); ret < 0) {
-        ldpp_dout(this, 1) << "WARNING: failed to add source bucket '" << src_bucket_id << "' to logging sources of target bucket '" <<
+        ldpp_dout(this, 1) << "WARNING: failed to add source bucket '" << src_bucket_id << "' to logging sources of logging bucket '" <<
           target_bucket_id << "', ret = " << ret << dendl;
       }
     } else if (*old_conf != configuration) {
@@ -316,15 +316,15 @@ class RGWPutBucketLoggingOp : public RGWDefaultResponseOp {
       if (old_conf->target_bucket != configuration.target_bucket) {
         rgw_bucket old_target_bucket_id;
         if (const auto ret = rgw::bucketlogging::get_bucket_id(old_conf->target_bucket, s->bucket_tenant, old_target_bucket_id); ret < 0) {
-          ldpp_dout(this, 1) << "ERROR: failed to parse target bucket '" << old_conf->target_bucket << "', ret = " << ret << dendl;
+          ldpp_dout(this, 1) << "ERROR: failed to parse logging bucket '" << old_conf->target_bucket << "', ret = " << ret << dendl;
           return;
         }
         if (const auto ret = rgw::bucketlogging::update_bucket_logging_sources(this, driver, old_target_bucket_id, src_bucket_id, false, y); ret < 0) {
-          ldpp_dout(this, 1) << "WARNING: failed to remove source bucket '" << src_bucket_id << "' from logging sources of original target bucket '" <<
+          ldpp_dout(this, 1) << "WARNING: failed to remove source bucket '" << src_bucket_id << "' from logging sources of original logging bucket '" <<
             old_target_bucket_id << "', ret = " << ret << dendl;
         }
         if (const auto ret = rgw::bucketlogging::update_bucket_logging_sources(this, target_bucket, src_bucket_id, true, y); ret < 0) {
-          ldpp_dout(this, 1) << "WARNING: failed to add source bucket '" << src_bucket_id << "' to logging sources of target bucket '" <<
+          ldpp_dout(this, 1) << "WARNING: failed to add source bucket '" << src_bucket_id << "' to logging sources of logging bucket '" <<
             target_bucket_id << "', ret = " << ret << dendl;
         }
       }
@@ -385,25 +385,25 @@ class RGWPostBucketLoggingOp : public RGWDefaultResponseOp {
     RGWObjVersionTracker objv_tracker;
     op_ret = target_bucket->get_logging_object_name(obj_name, configuration.target_prefix, y, this, &objv_tracker);
     if (op_ret < 0 && op_ret != -ENOENT) {
-      ldpp_dout(this, 1) << "ERROR: failed to get pending logging object name from target bucket '" << target_bucket_id <<
+      ldpp_dout(this, 1) << "ERROR: failed to get pending logging object name from logging bucket '" << target_bucket_id <<
         "'. error: " << op_ret << dendl;
       return;
     }
     if (op_ret == -ENOENT) {
       // no pending object - nothing to flush
-      ldpp_dout(this, 5) << "INFO: no pending logging object in target bucket '" << target_bucket_id << "'. new object should be created" << dendl;
+      ldpp_dout(this, 5) << "INFO: no pending logging object in logging bucket '" << target_bucket_id << "'. new object should be created" << dendl;
     }
     const auto region = driver->get_zone()->get_zonegroup().get_api_name();
     op_ret = rgw::bucketlogging::rollover_logging_object(configuration, target_bucket, obj_name, this, region, source_bucket, y, true, &objv_tracker, &old_obj);
     if (op_ret < 0) {
       ldpp_dout(this, 1) << "ERROR: failed to flush pending logging object '" << obj_name << "'"
-          << " to target bucket '" << target_bucket_id << "'. "
+          << " to logging bucket '" << target_bucket_id << "'. "
           << " last committed object is '" << old_obj <<
           "'. error: " << op_ret << dendl;
       return;
     }
     ldpp_dout(this, 20) << "INFO: flushed pending logging object '" << old_obj
-                << "' to target bucket '" << target_bucket_id << "'" << dendl;
+                << "' to logging bucket '" << target_bucket_id << "'" << dendl;
   }
 
   void send_response() override {