From 91623599a4d476312a8904870113636ca7ba1bc9 Mon Sep 17 00:00:00 2001 From: Kanthi Subramanian Date: Mon, 21 Apr 2025 10:47:51 -0400 Subject: [PATCH 1/6] Added quartz scheduler to schedule maintenance job. --- ice-rest-catalog/pom.xml | 24 +++ .../com/altinity/ice/rest/catalog/Main.java | 74 +++++++ .../internal/maintenance/MaintenanceJob.java | 25 +++ .../maintenance/MaintenanceScheduler.java | 92 +++++++++ .../QuartzMaintenanceScheduler.java | 187 ++++++++++++++++++ 5 files changed, 402 insertions(+) create mode 100644 ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/maintenance/MaintenanceJob.java create mode 100644 ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/maintenance/MaintenanceScheduler.java create mode 100644 ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/maintenance/QuartzMaintenanceScheduler.java diff --git a/ice-rest-catalog/pom.xml b/ice-rest-catalog/pom.xml index 3fcdb74..54c05e7 100644 --- a/ice-rest-catalog/pom.xml +++ b/ice-rest-catalog/pom.xml @@ -17,6 +17,7 @@ 6.1.0 1.3.6 3.46.1.0 + 2.5.0 @@ -231,6 +232,29 @@ jackson-annotations 2.18.2 + + + org.quartz-scheduler + quartz + ${quartz.version} + + + org.slf4j + slf4j-api + + + + + org.quartz-scheduler + quartz-jobs + ${quartz.version} + + + org.slf4j + slf4j-api + + + diff --git a/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/Main.java b/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/Main.java index e7f5546..d89233f 100644 --- a/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/Main.java +++ b/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/Main.java @@ -3,6 +3,7 @@ import com.altinity.ice.rest.catalog.internal.jetty.AuthorizationHandler; import com.altinity.ice.rest.catalog.internal.jetty.PlainErrorHandler; import com.altinity.ice.rest.catalog.internal.jetty.ServerConfig; +import com.altinity.ice.rest.catalog.internal.maintenance.QuartzMaintenanceScheduler; import io.prometheus.metrics.exporter.servlet.jakarta.PrometheusMetricsServlet; import io.prometheus.metrics.instrumentation.jvm.JvmMetrics; import jakarta.servlet.http.HttpServlet; @@ -12,7 +13,9 @@ import java.io.PrintWriter; import java.nio.charset.StandardCharsets; import java.util.Map; +import java.util.Properties; import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; import org.apache.iceberg.CatalogUtil; import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.rest.RESTCatalogAdapter; @@ -25,6 +28,7 @@ import org.eclipse.jetty.server.handler.gzip.GzipHandler; import org.eclipse.jetty.servlet.ServletContextHandler; import org.eclipse.jetty.servlet.ServletHolder; +import org.quartz.SchedulerException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import picocli.CommandLine; @@ -52,6 +56,21 @@ public String[] getVersion() { description = "/path/to/config.yaml ($CWD/.ice-rest-catalog.yaml by default)") String configFile; + @CommandLine.Option( + names = {"--maintenance-interval"}, + description = "Maintenance interval in hours (default: 24)") + static Long maintenanceInterval; + + @CommandLine.Option( + names = {"--maintenance-time-unit"}, + description = "Maintenance time unit (HOURS, DAYS, MINUTES) (default: HOURS)") + static String maintenanceTimeUnit; + + @CommandLine.Option( + names = {"--maintenance-cron"}, + description = "Maintenance cron expression (overrides interval and time-unit)") + static String maintenanceCron; + private Main() {} private static Server createServer(int port, Catalog catalog, Map config) { @@ -162,6 +181,9 @@ public Integer call() throws Exception { Catalog catalog = CatalogUtil.buildIcebergCatalog("rest_backend", config, null); + // Initialize and start the maintenance scheduler + initializeMaintenanceScheduler(catalog, config); + // TODO: replace with uds (jetty-unixdomain-server is all that is needed here but in ice you'll // need to implement custom org.apache.iceberg.rest.RESTClient) String adminPort = config.get("ice.admin.port"); @@ -183,6 +205,58 @@ public Integer call() throws Exception { return 0; } + private static void initializeMaintenanceScheduler(Catalog catalog, Map config) { + try { + // Create Quartz properties if needed + Properties quartzProperties = new Properties(); + // Add any custom Quartz properties from config + for (Map.Entry entry : config.entrySet()) { + if (entry.getKey().startsWith("ice.quartz.")) { + String quartzKey = entry.getKey().substring("ice.quartz.".length()); + quartzProperties.setProperty(quartzKey, entry.getValue()); + } + } + + // Create the scheduler + QuartzMaintenanceScheduler scheduler = + new QuartzMaintenanceScheduler(catalog, quartzProperties); + + // Configure the schedule + if (maintenanceCron != null && !maintenanceCron.isEmpty()) { + // Use cron expression if provided + scheduler.setMaintenanceSchedule(maintenanceCron); + logger.info("Maintenance schedule set to cron expression: {}", maintenanceCron); + } else { + // Use interval and time unit + Long interval = + maintenanceInterval != null + ? maintenanceInterval + : PropertyUtil.propertyAsLong(config, "ice.maintenance.interval", 24L); + + String timeUnitStr = + maintenanceTimeUnit != null + ? maintenanceTimeUnit + : config.getOrDefault("ice.maintenance.time-unit", "HOURS"); + + try { + TimeUnit timeUnit = TimeUnit.valueOf(timeUnitStr.toUpperCase()); + scheduler.setMaintenanceSchedule(interval, timeUnit); + logger.info("Maintenance schedule set to: {} {}", interval, timeUnit); + } catch (IllegalArgumentException e) { + logger.warn("Invalid maintenance time unit: {}. Using default: HOURS", timeUnitStr); + scheduler.setMaintenanceSchedule(interval, TimeUnit.HOURS); + } + } + + // Start the scheduler + scheduler.startScheduledMaintenance(); + logger.info("Maintenance scheduler started"); + + } catch (SchedulerException e) { + logger.error("Failed to initialize maintenance scheduler", e); + } + } + public static void main(String[] args) throws Exception { int exitCode = new CommandLine(new Main()).execute(args); System.exit(exitCode); diff --git a/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/maintenance/MaintenanceJob.java b/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/maintenance/MaintenanceJob.java new file mode 100644 index 0000000..e067fb3 --- /dev/null +++ b/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/maintenance/MaintenanceJob.java @@ -0,0 +1,25 @@ +package com.altinity.ice.rest.catalog.internal.maintenance; + +import org.quartz.Job; +import org.quartz.JobExecutionContext; +import org.quartz.JobExecutionException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MaintenanceJob implements Job { + private static final Logger logger = LoggerFactory.getLogger(MaintenanceJob.class); + + @Override + public void execute(JobExecutionContext context) throws JobExecutionException { + logger.info("Maintenance job triggered by Quartz scheduler"); + + // Get the scheduler from the context + QuartzMaintenanceScheduler scheduler = + (QuartzMaintenanceScheduler) context.getMergedJobDataMap().get("scheduler"); + if (scheduler != null) { + scheduler.performMaintenance(); + } else { + logger.error("Maintenance scheduler not found in job context"); + } + } +} diff --git a/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/maintenance/MaintenanceScheduler.java b/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/maintenance/MaintenanceScheduler.java new file mode 100644 index 0000000..bc4b1e8 --- /dev/null +++ b/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/maintenance/MaintenanceScheduler.java @@ -0,0 +1,92 @@ +package com.altinity.ice.rest.catalog.internal.maintenance; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MaintenanceScheduler { + private static final Logger logger = LoggerFactory.getLogger(MaintenanceScheduler.class); + + private final ScheduledExecutorService scheduler; + private final AtomicBoolean isMaintenanceMode = new AtomicBoolean(false); + private long maintenanceInterval = 24; // Default 24 hours + private TimeUnit maintenanceTimeUnit = TimeUnit.HOURS; + + public MaintenanceScheduler() { + this.scheduler = Executors.newSingleThreadScheduledExecutor(); + } + + public void startScheduledMaintenance() { + logger.info( + "Starting scheduled maintenance with interval: {} {}", + maintenanceInterval, + maintenanceTimeUnit); + scheduler.scheduleAtFixedRate( + this::performMaintenance, maintenanceInterval, maintenanceInterval, maintenanceTimeUnit); + } + + public void stopScheduledMaintenance() { + logger.info("Stopping scheduled maintenance"); + scheduler.shutdown(); + try { + if (!scheduler.awaitTermination(60, TimeUnit.SECONDS)) { + scheduler.shutdownNow(); + } + } catch (InterruptedException e) { + scheduler.shutdownNow(); + Thread.currentThread().interrupt(); + } + } + + public void setMaintenanceSchedule(long interval, TimeUnit timeUnit) { + this.maintenanceInterval = interval; + this.maintenanceTimeUnit = timeUnit; + logger.info("Maintenance schedule updated to: {} {}", interval, timeUnit); + + // Restart the scheduler with new settings + stopScheduledMaintenance(); + startScheduledMaintenance(); + } + + public Map getMaintenanceSchedule() { + Map schedule = new HashMap<>(); + schedule.put("interval", maintenanceInterval); + schedule.put("timeUnit", maintenanceTimeUnit.name()); + return schedule; + } + + public boolean isInMaintenanceMode() { + return isMaintenanceMode.get(); + } + + public void setMaintenanceMode(boolean enabled) { + isMaintenanceMode.set(enabled); + logger.info("Maintenance mode {}", enabled ? "enabled" : "disabled"); + } + + private void performMaintenance() { + if (isMaintenanceMode.get()) { + logger.info("Skipping maintenance task as system is already in maintenance mode"); + return; + } + + try { + logger.info("Starting scheduled maintenance task"); + setMaintenanceMode(true); + + // Perform maintenance tasks here + // For example: cleanup old files, optimize tables, etc. + + logger.info("Scheduled maintenance task completed successfully"); + } catch (Exception e) { + logger.error("Error during scheduled maintenance task", e); + } finally { + setMaintenanceMode(false); + } + } +} diff --git a/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/maintenance/QuartzMaintenanceScheduler.java b/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/maintenance/QuartzMaintenanceScheduler.java new file mode 100644 index 0000000..0924f7a --- /dev/null +++ b/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/maintenance/QuartzMaintenanceScheduler.java @@ -0,0 +1,187 @@ +package com.altinity.ice.rest.catalog.internal.maintenance; + +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.iceberg.catalog.Catalog; +import org.quartz.CronScheduleBuilder; +import org.quartz.JobBuilder; +import org.quartz.JobDetail; +import org.quartz.JobKey; +import org.quartz.Scheduler; +import org.quartz.SchedulerException; +import org.quartz.SchedulerFactory; +import org.quartz.Trigger; +import org.quartz.TriggerBuilder; +import org.quartz.impl.StdSchedulerFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class QuartzMaintenanceScheduler { + private static final Logger logger = LoggerFactory.getLogger(QuartzMaintenanceScheduler.class); + private static final String MAINTENANCE_JOB_NAME = "MaintenanceJob"; + private static final String MAINTENANCE_GROUP_NAME = "MaintenanceGroup"; + private static final String MAINTENANCE_TRIGGER_NAME = "MaintenanceTrigger"; + + private final Scheduler scheduler; + private final AtomicBoolean isMaintenanceMode = new AtomicBoolean(false); + private String cronExpression = "0 0 0 * * ?"; // Default: Run at midnight every day + private final Catalog catalog; + + public QuartzMaintenanceScheduler(Catalog catalog) throws SchedulerException { + this.catalog = catalog; + SchedulerFactory schedulerFactory = new StdSchedulerFactory(); + this.scheduler = schedulerFactory.getScheduler(); + } + + public QuartzMaintenanceScheduler(Catalog catalog, Properties quartzProperties) + throws SchedulerException { + this.catalog = catalog; + SchedulerFactory schedulerFactory = new StdSchedulerFactory(quartzProperties); + this.scheduler = schedulerFactory.getScheduler(); + } + + public void startScheduledMaintenance() throws SchedulerException { + logger.info("Starting scheduled maintenance with cron expression: {}", cronExpression); + + // Create the job + JobDetail job = + JobBuilder.newJob(MaintenanceJob.class) + .withIdentity(MAINTENANCE_JOB_NAME, MAINTENANCE_GROUP_NAME) + .build(); + + // Add scheduler to job data map + job.getJobDataMap().put("scheduler", this); + + // Create the trigger + Trigger trigger = + TriggerBuilder.newTrigger() + .withIdentity(MAINTENANCE_TRIGGER_NAME, MAINTENANCE_GROUP_NAME) + .withSchedule(CronScheduleBuilder.cronSchedule(cronExpression)) + .build(); + + // Schedule the job + scheduler.scheduleJob(job, trigger); + + // Start the scheduler + scheduler.start(); + } + + public void stopScheduledMaintenance() throws SchedulerException { + logger.info("Stopping scheduled maintenance"); + scheduler.shutdown(true); + } + + public void setMaintenanceSchedule(String cronExpression) throws SchedulerException { + this.cronExpression = cronExpression; + logger.info("Maintenance schedule updated to cron expression: {}", cronExpression); + + // Update the trigger + JobKey jobKey = JobKey.jobKey(MAINTENANCE_JOB_NAME, MAINTENANCE_GROUP_NAME); + if (scheduler.checkExists(jobKey)) { + Trigger newTrigger = + TriggerBuilder.newTrigger() + .withIdentity(MAINTENANCE_TRIGGER_NAME, MAINTENANCE_GROUP_NAME) + .withSchedule(CronScheduleBuilder.cronSchedule(cronExpression)) + .build(); + + scheduler.rescheduleJob( + TriggerBuilder.newTrigger() + .withIdentity(MAINTENANCE_TRIGGER_NAME, MAINTENANCE_GROUP_NAME) + .build() + .getKey(), + newTrigger); + } else { + // If job doesn't exist, create it + JobDetail job = + JobBuilder.newJob(MaintenanceJob.class) + .withIdentity(MAINTENANCE_JOB_NAME, MAINTENANCE_GROUP_NAME) + .build(); + + // Add scheduler to job data map + job.getJobDataMap().put("scheduler", this); + + Trigger trigger = + TriggerBuilder.newTrigger() + .withIdentity(MAINTENANCE_TRIGGER_NAME, MAINTENANCE_GROUP_NAME) + .withSchedule(CronScheduleBuilder.cronSchedule(cronExpression)) + .build(); + + scheduler.scheduleJob(job, trigger); + } + } + + public void setMaintenanceSchedule(long interval, TimeUnit timeUnit) throws SchedulerException { + // Convert interval and timeUnit to cron expression + String cronExpression = convertToCronExpression(interval, timeUnit); + setMaintenanceSchedule(cronExpression); + } + + private String convertToCronExpression(long interval, TimeUnit timeUnit) { + // Default to daily at midnight + if (interval <= 0) { + return "0 0 0 * * ?"; + } + + switch (timeUnit) { + case MINUTES: + return "0 */" + interval + " * * * ?"; + case HOURS: + return "0 0 */" + interval + " * * ?"; + case DAYS: + return "0 0 0 */" + interval + " * ?"; + default: + return "0 0 0 * * ?"; // Default to daily at midnight + } + } + + public Map getMaintenanceSchedule() { + Map schedule = new HashMap<>(); + schedule.put("cronExpression", cronExpression); + return schedule; + } + + public boolean isInMaintenanceMode() { + return isMaintenanceMode.get(); + } + + public void setMaintenanceMode(boolean enabled) { + isMaintenanceMode.set(enabled); + logger.info("Maintenance mode {}", enabled ? "enabled" : "disabled"); + } + + public void performMaintenance() { + if (isMaintenanceMode.get()) { + logger.info("Skipping maintenance task as system is already in maintenance mode"); + return; + } + + try { + logger.info("Starting scheduled maintenance task"); + setMaintenanceMode(true); + + // Perform maintenance tasks on the catalog + if (catalog != null) { + logger.info("Performing maintenance on catalog: {}", catalog.name()); + // Example maintenance operations: + // - Clean up old metadata files + // - Optimize table layouts + // - Vacuum old snapshots + // - Compact small files + + // For now, just log that we're performing maintenance + logger.info("Maintenance operations completed for catalog: {}", catalog.name()); + } else { + logger.warn("No catalog available for maintenance operations"); + } + + logger.info("Scheduled maintenance task completed successfully"); + } catch (Exception e) { + logger.error("Error during scheduled maintenance task", e); + } finally { + setMaintenanceMode(false); + } + } +} From e4b025b8847240745e994971c36b1748ea953b88 Mon Sep 17 00:00:00 2001 From: Kanthi Subramanian Date: Mon, 21 Apr 2025 15:58:32 -0400 Subject: [PATCH 2/6] Added logic to perform maintenance functions, expireSnapshots and rewriteManifests --- .../QuartzMaintenanceScheduler.java | 40 +++++++++++++++++++ 1 file changed, 40 insertions(+) diff --git a/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/maintenance/QuartzMaintenanceScheduler.java b/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/maintenance/QuartzMaintenanceScheduler.java index 0924f7a..14e6539 100644 --- a/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/maintenance/QuartzMaintenanceScheduler.java +++ b/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/maintenance/QuartzMaintenanceScheduler.java @@ -1,11 +1,18 @@ package com.altinity.ice.rest.catalog.internal.maintenance; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Properties; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.iceberg.Table; +import org.apache.iceberg.actions.Action; import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.SupportsNamespaces; +import org.apache.iceberg.catalog.TableIdentifier; import org.quartz.CronScheduleBuilder; import org.quartz.JobBuilder; import org.quartz.JobDetail; @@ -172,6 +179,39 @@ public void performMaintenance() { // - Compact small files // For now, just log that we're performing maintenance + // get the list of namespaces. + List namespaces; + if (catalog instanceof SupportsNamespaces) { + SupportsNamespaces nsCatalog = (SupportsNamespaces) catalog; + namespaces = nsCatalog.listNamespaces(); + for (Namespace ns : namespaces) { + logger.debug("Namespace: " + ns); + } + } else { + logger.error("Catalog does not support namespace operations."); + return; + } // Iterate through namespace + + for (Namespace namespace : namespaces) { + // Get the table + List tables = catalog.listTables(namespace); + for (TableIdentifier tableIdent : tables) { + long olderThanMillis = System.currentTimeMillis() - TimeUnit.DAYS.toMillis(30); + // Get the table + Table table = catalog.loadTable(tableIdent); + // Expire snapshots older than 30 days + table.expireSnapshots() + .expireOlderThan(olderThanMillis) + .commit(); + // Remove Orphan Files + + // Rewrite Manifests + table.rewriteManifests() + .rewriteIf(manifest -> true) + .commit(); + } + + } logger.info("Maintenance operations completed for catalog: {}", catalog.name()); } else { logger.warn("No catalog available for maintenance operations"); From 56640d48d5bb4c4d4042eac09208eb2e099d5c57 Mon Sep 17 00:00:00 2001 From: Kanthi Subramanian Date: Wed, 23 Apr 2025 12:44:47 -0400 Subject: [PATCH 3/6] Use skeduler library instead of quartz to setup maintenance schedules. --- ice-rest-catalog/pom.xml | 30 +-- .../com/altinity/ice/rest/catalog/Main.java | 74 +----- .../rest/catalog/internal/config/Config.java | 2 + .../internal/maintenance/MaintenanceJob.java | 25 -- .../maintenance/MaintenanceScheduler.java | 132 +++++++---- .../QuartzMaintenanceScheduler.java | 220 ------------------ 6 files changed, 111 insertions(+), 372 deletions(-) delete mode 100644 ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/maintenance/MaintenanceJob.java delete mode 100644 ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/maintenance/QuartzMaintenanceScheduler.java diff --git a/ice-rest-catalog/pom.xml b/ice-rest-catalog/pom.xml index d264354..a0a63f2 100644 --- a/ice-rest-catalog/pom.xml +++ b/ice-rest-catalog/pom.xml @@ -17,7 +17,6 @@ 6.1.0 1.3.6 3.46.1.0 - 2.5.0 @@ -232,35 +231,18 @@ jackson-annotations 2.18.2 - - - org.quartz-scheduler - quartz - ${quartz.version} - - - org.slf4j - slf4j-api - - - - - org.quartz-scheduler - quartz-jobs - ${quartz.version} - - - org.slf4j - slf4j-api - - - org.testng testng 7.9.0 test + + com.github.shyiko.skedule + skedule + 0.4.0 + kalvanized + diff --git a/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/Main.java b/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/Main.java index e61f351..a012ebb 100644 --- a/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/Main.java +++ b/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/Main.java @@ -6,7 +6,7 @@ import com.altinity.ice.rest.catalog.internal.config.Config; import com.altinity.ice.rest.catalog.internal.jetty.PlainErrorHandler; import com.altinity.ice.rest.catalog.internal.jetty.ServerConfig; -import com.altinity.ice.rest.catalog.internal.maintenance.QuartzMaintenanceScheduler; +import com.altinity.ice.rest.catalog.internal.maintenance.MaintenanceScheduler; import com.altinity.ice.rest.catalog.internal.rest.RESTCatalogAdapter; import com.altinity.ice.rest.catalog.internal.rest.RESTCatalogAuthorizationHandler; import com.altinity.ice.rest.catalog.internal.rest.RESTCatalogHandler; @@ -24,10 +24,8 @@ import java.util.Arrays; import java.util.HashMap; import java.util.Map; -import java.util.Properties; import java.util.Set; import java.util.concurrent.Callable; -import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import org.apache.iceberg.CatalogUtil; import org.apache.iceberg.aws.s3.S3FileIOProperties; @@ -39,7 +37,6 @@ import org.eclipse.jetty.server.handler.gzip.GzipHandler; import org.eclipse.jetty.servlet.ServletContextHandler; import org.eclipse.jetty.servlet.ServletHolder; -import org.quartz.SchedulerException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import picocli.CommandLine; @@ -71,19 +68,11 @@ public String[] getVersion() { String configFile; @CommandLine.Option( - names = {"--maintenance-interval"}, - description = "Maintenance interval in hours (default: 24)") - static Long maintenanceInterval; - - @CommandLine.Option( - names = {"--maintenance-time-unit"}, - description = "Maintenance time unit (HOURS, DAYS, MINUTES) (default: HOURS)") - static String maintenanceTimeUnit; - - @CommandLine.Option( - names = {"--maintenance-cron"}, - description = "Maintenance cron expression (overrides interval and time-unit)") - static String maintenanceCron; + names = "--maintenance-interval", + description = + "Maintenance interval in human-friendly format (e.g. 'every day', 'every monday 09:00')", + defaultValue = "every day 00:00") + private String maintenanceInterval; private Main() {} @@ -319,55 +308,14 @@ public Integer call() throws Exception { return 0; } - private static void initializeMaintenanceScheduler(Catalog catalog, Map config) { + private void initializeMaintenanceScheduler(Catalog catalog, Map config) { try { - // Create Quartz properties if needed - Properties quartzProperties = new Properties(); - // Add any custom Quartz properties from config - for (Map.Entry entry : config.entrySet()) { - if (entry.getKey().startsWith("ice.quartz.")) { - String quartzKey = entry.getKey().substring("ice.quartz.".length()); - quartzProperties.setProperty(quartzKey, entry.getValue()); - } - } - - // Create the scheduler - QuartzMaintenanceScheduler scheduler = - new QuartzMaintenanceScheduler(catalog, quartzProperties); - - // Configure the schedule - if (maintenanceCron != null && !maintenanceCron.isEmpty()) { - // Use cron expression if provided - scheduler.setMaintenanceSchedule(maintenanceCron); - logger.info("Maintenance schedule set to cron expression: {}", maintenanceCron); - } else { - // Use interval and time unit - Long interval = - maintenanceInterval != null - ? maintenanceInterval - : PropertyUtil.propertyAsLong(config, "ice.maintenance.interval", 24L); - - String timeUnitStr = - maintenanceTimeUnit != null - ? maintenanceTimeUnit - : config.getOrDefault("ice.maintenance.time-unit", "HOURS"); - - try { - TimeUnit timeUnit = TimeUnit.valueOf(timeUnitStr.toUpperCase()); - scheduler.setMaintenanceSchedule(interval, timeUnit); - logger.info("Maintenance schedule set to: {} {}", interval, timeUnit); - } catch (IllegalArgumentException e) { - logger.warn("Invalid maintenance time unit: {}. Using default: HOURS", timeUnitStr); - scheduler.setMaintenanceSchedule(interval, TimeUnit.HOURS); - } - } - - // Start the scheduler + MaintenanceScheduler scheduler = new MaintenanceScheduler(catalog, config); + scheduler.setMaintenanceSchedule(maintenanceInterval); scheduler.startScheduledMaintenance(); - logger.info("Maintenance scheduler started"); - - } catch (SchedulerException e) { + } catch (Exception e) { logger.error("Failed to initialize maintenance scheduler", e); + throw new RuntimeException(e); } } diff --git a/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/config/Config.java b/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/config/Config.java index 83a953b..26a7d54 100644 --- a/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/config/Config.java +++ b/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/config/Config.java @@ -41,6 +41,8 @@ private Config() {} public static final String OPTION_ANONYMOUS_ACCESS = "ice.anonymous.access"; public static final String OPTION_ANONYMOUS_ACCESS_CONFIG = "ice.anonymous.access.config"; // format: param=value&... + public static final String OPTION_SNAPSHOT_EXPIRATION_DAYS = + "ice.maintenance.snapshot.expiration.days"; // TODO: return Config, not Map // https://py.iceberg.apache.org/configuration/#setting-configuration-values diff --git a/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/maintenance/MaintenanceJob.java b/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/maintenance/MaintenanceJob.java deleted file mode 100644 index e067fb3..0000000 --- a/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/maintenance/MaintenanceJob.java +++ /dev/null @@ -1,25 +0,0 @@ -package com.altinity.ice.rest.catalog.internal.maintenance; - -import org.quartz.Job; -import org.quartz.JobExecutionContext; -import org.quartz.JobExecutionException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class MaintenanceJob implements Job { - private static final Logger logger = LoggerFactory.getLogger(MaintenanceJob.class); - - @Override - public void execute(JobExecutionContext context) throws JobExecutionException { - logger.info("Maintenance job triggered by Quartz scheduler"); - - // Get the scheduler from the context - QuartzMaintenanceScheduler scheduler = - (QuartzMaintenanceScheduler) context.getMergedJobDataMap().get("scheduler"); - if (scheduler != null) { - scheduler.performMaintenance(); - } else { - logger.error("Maintenance scheduler not found in job context"); - } - } -} diff --git a/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/maintenance/MaintenanceScheduler.java b/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/maintenance/MaintenanceScheduler.java index bc4b1e8..6c7b949 100644 --- a/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/maintenance/MaintenanceScheduler.java +++ b/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/maintenance/MaintenanceScheduler.java @@ -1,67 +1,79 @@ package com.altinity.ice.rest.catalog.internal.maintenance; -import java.util.HashMap; +import com.altinity.ice.rest.catalog.internal.config.Config; +import com.github.shyiko.skedule.Schedule; +import java.time.LocalTime; +import java.time.ZonedDateTime; +import java.util.List; import java.util.Map; -import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.SupportsNamespaces; +import org.apache.iceberg.catalog.TableIdentifier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class MaintenanceScheduler { private static final Logger logger = LoggerFactory.getLogger(MaintenanceScheduler.class); + private static final int DEFAULT_EXPIRATION_DAYS = 30; - private final ScheduledExecutorService scheduler; + private final Catalog catalog; private final AtomicBoolean isMaintenanceMode = new AtomicBoolean(false); - private long maintenanceInterval = 24; // Default 24 hours - private TimeUnit maintenanceTimeUnit = TimeUnit.HOURS; + private final ScheduledExecutorService executor; + private final Map config; + private ScheduledFuture currentTask; + private Schedule schedule; - public MaintenanceScheduler() { - this.scheduler = Executors.newSingleThreadScheduledExecutor(); + public MaintenanceScheduler(Catalog catalog, Map config) { + this.catalog = catalog; + this.config = config; + this.executor = new ScheduledThreadPoolExecutor(1); + ((ScheduledThreadPoolExecutor) executor).setRemoveOnCancelPolicy(true); + // Default schedule: every day at midnight + this.schedule = Schedule.at(LocalTime.MIDNIGHT).everyDay(); } public void startScheduledMaintenance() { - logger.info( - "Starting scheduled maintenance with interval: {} {}", - maintenanceInterval, - maintenanceTimeUnit); - scheduler.scheduleAtFixedRate( - this::performMaintenance, maintenanceInterval, maintenanceInterval, maintenanceTimeUnit); + scheduleNextMaintenance(); } public void stopScheduledMaintenance() { - logger.info("Stopping scheduled maintenance"); - scheduler.shutdown(); - try { - if (!scheduler.awaitTermination(60, TimeUnit.SECONDS)) { - scheduler.shutdownNow(); - } - } catch (InterruptedException e) { - scheduler.shutdownNow(); - Thread.currentThread().interrupt(); + if (currentTask != null) { + currentTask.cancel(false); } + executor.shutdown(); } - public void setMaintenanceSchedule(long interval, TimeUnit timeUnit) { - this.maintenanceInterval = interval; - this.maintenanceTimeUnit = timeUnit; - logger.info("Maintenance schedule updated to: {} {}", interval, timeUnit); + private void scheduleNextMaintenance() { + if (currentTask != null) { + currentTask.cancel(false); + } - // Restart the scheduler with new settings - stopScheduledMaintenance(); - startScheduledMaintenance(); - } + ZonedDateTime now = ZonedDateTime.now(); + ZonedDateTime next = schedule.next(now); - public Map getMaintenanceSchedule() { - Map schedule = new HashMap<>(); - schedule.put("interval", maintenanceInterval); - schedule.put("timeUnit", maintenanceTimeUnit.name()); - return schedule; + long delay = next.toEpochSecond() - now.toEpochSecond(); + currentTask = + executor.schedule( + () -> { + performMaintenance(); + scheduleNextMaintenance(); // Schedule next run + }, + delay, + TimeUnit.SECONDS); + + logger.info("Next maintenance scheduled for: {}", next); } - public boolean isInMaintenanceMode() { - return isMaintenanceMode.get(); + public void setMaintenanceSchedule(String scheduleExpression) { + this.schedule = Schedule.parse(scheduleExpression); + scheduleNextMaintenance(); } public void setMaintenanceMode(boolean enabled) { @@ -69,7 +81,7 @@ public void setMaintenanceMode(boolean enabled) { logger.info("Maintenance mode {}", enabled ? "enabled" : "disabled"); } - private void performMaintenance() { + public void performMaintenance() { if (isMaintenanceMode.get()) { logger.info("Skipping maintenance task as system is already in maintenance mode"); return; @@ -79,8 +91,48 @@ private void performMaintenance() { logger.info("Starting scheduled maintenance task"); setMaintenanceMode(true); - // Perform maintenance tasks here - // For example: cleanup old files, optimize tables, etc. + if (catalog != null) { + logger.info("Performing maintenance on catalog: {}", catalog.name()); + List namespaces; + if (catalog instanceof SupportsNamespaces) { + SupportsNamespaces nsCatalog = (SupportsNamespaces) catalog; + namespaces = nsCatalog.listNamespaces(); + for (Namespace ns : namespaces) { + logger.debug("Namespace: " + ns); + } + } else { + logger.error("Catalog does not support namespace operations."); + return; + } + + for (Namespace namespace : namespaces) { + List tables = catalog.listTables(namespace); + for (TableIdentifier tableIdent : tables) { + int expirationDays = DEFAULT_EXPIRATION_DAYS; + String configuredDays = config.get(Config.OPTION_SNAPSHOT_EXPIRATION_DAYS); + if (configuredDays != null) { + try { + expirationDays = Integer.parseInt(configuredDays); + logger.debug("Using configured snapshot expiration days: {}", expirationDays); + } catch (NumberFormatException e) { + logger.warn( + "Invalid value for {}: {}. Using default of {} days", + Config.OPTION_SNAPSHOT_EXPIRATION_DAYS, + configuredDays, + DEFAULT_EXPIRATION_DAYS); + } + } + long olderThanMillis = + System.currentTimeMillis() - TimeUnit.DAYS.toMillis(expirationDays); + Table table = catalog.loadTable(tableIdent); + table.expireSnapshots().expireOlderThan(olderThanMillis).commit(); + table.rewriteManifests().rewriteIf(manifest -> true).commit(); + } + } + logger.info("Maintenance operations completed for catalog: {}", catalog.name()); + } else { + logger.warn("No catalog available for maintenance operations"); + } logger.info("Scheduled maintenance task completed successfully"); } catch (Exception e) { diff --git a/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/maintenance/QuartzMaintenanceScheduler.java b/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/maintenance/QuartzMaintenanceScheduler.java deleted file mode 100644 index d48ec7b..0000000 --- a/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/maintenance/QuartzMaintenanceScheduler.java +++ /dev/null @@ -1,220 +0,0 @@ -package com.altinity.ice.rest.catalog.internal.maintenance; - -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import org.apache.iceberg.Table; -import org.apache.iceberg.catalog.Catalog; -import org.apache.iceberg.catalog.Namespace; -import org.apache.iceberg.catalog.SupportsNamespaces; -import org.apache.iceberg.catalog.TableIdentifier; -import org.quartz.CronScheduleBuilder; -import org.quartz.JobBuilder; -import org.quartz.JobDetail; -import org.quartz.JobKey; -import org.quartz.Scheduler; -import org.quartz.SchedulerException; -import org.quartz.SchedulerFactory; -import org.quartz.Trigger; -import org.quartz.TriggerBuilder; -import org.quartz.impl.StdSchedulerFactory; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class QuartzMaintenanceScheduler { - private static final Logger logger = LoggerFactory.getLogger(QuartzMaintenanceScheduler.class); - private static final String MAINTENANCE_JOB_NAME = "MaintenanceJob"; - private static final String MAINTENANCE_GROUP_NAME = "MaintenanceGroup"; - private static final String MAINTENANCE_TRIGGER_NAME = "MaintenanceTrigger"; - - private final Scheduler scheduler; - private final AtomicBoolean isMaintenanceMode = new AtomicBoolean(false); - private String cronExpression = "0 0 0 * * ?"; // Default: Run at midnight every day - private final Catalog catalog; - - public QuartzMaintenanceScheduler(Catalog catalog) throws SchedulerException { - this.catalog = catalog; - SchedulerFactory schedulerFactory = new StdSchedulerFactory(); - this.scheduler = schedulerFactory.getScheduler(); - } - - public QuartzMaintenanceScheduler(Catalog catalog, Properties quartzProperties) - throws SchedulerException { - this.catalog = catalog; - SchedulerFactory schedulerFactory = new StdSchedulerFactory(quartzProperties); - this.scheduler = schedulerFactory.getScheduler(); - } - - public void startScheduledMaintenance() throws SchedulerException { - logger.info("Starting scheduled maintenance with cron expression: {}", cronExpression); - - // Create the job - JobDetail job = - JobBuilder.newJob(MaintenanceJob.class) - .withIdentity(MAINTENANCE_JOB_NAME, MAINTENANCE_GROUP_NAME) - .build(); - - // Add scheduler to job data map - job.getJobDataMap().put("scheduler", this); - - // Create the trigger - Trigger trigger = - TriggerBuilder.newTrigger() - .withIdentity(MAINTENANCE_TRIGGER_NAME, MAINTENANCE_GROUP_NAME) - .withSchedule(CronScheduleBuilder.cronSchedule(cronExpression)) - .build(); - - // Schedule the job - scheduler.scheduleJob(job, trigger); - - // Start the scheduler - scheduler.start(); - } - - public void stopScheduledMaintenance() throws SchedulerException { - logger.info("Stopping scheduled maintenance"); - scheduler.shutdown(true); - } - - public void setMaintenanceSchedule(String cronExpression) throws SchedulerException { - this.cronExpression = cronExpression; - logger.info("Maintenance schedule updated to cron expression: {}", cronExpression); - - // Update the trigger - JobKey jobKey = JobKey.jobKey(MAINTENANCE_JOB_NAME, MAINTENANCE_GROUP_NAME); - if (scheduler.checkExists(jobKey)) { - Trigger newTrigger = - TriggerBuilder.newTrigger() - .withIdentity(MAINTENANCE_TRIGGER_NAME, MAINTENANCE_GROUP_NAME) - .withSchedule(CronScheduleBuilder.cronSchedule(cronExpression)) - .build(); - - scheduler.rescheduleJob( - TriggerBuilder.newTrigger() - .withIdentity(MAINTENANCE_TRIGGER_NAME, MAINTENANCE_GROUP_NAME) - .build() - .getKey(), - newTrigger); - } else { - // If job doesn't exist, create it - JobDetail job = - JobBuilder.newJob(MaintenanceJob.class) - .withIdentity(MAINTENANCE_JOB_NAME, MAINTENANCE_GROUP_NAME) - .build(); - - // Add scheduler to job data map - job.getJobDataMap().put("scheduler", this); - - Trigger trigger = - TriggerBuilder.newTrigger() - .withIdentity(MAINTENANCE_TRIGGER_NAME, MAINTENANCE_GROUP_NAME) - .withSchedule(CronScheduleBuilder.cronSchedule(cronExpression)) - .build(); - - scheduler.scheduleJob(job, trigger); - } - } - - public void setMaintenanceSchedule(long interval, TimeUnit timeUnit) throws SchedulerException { - // Convert interval and timeUnit to cron expression - String cronExpression = convertToCronExpression(interval, timeUnit); - setMaintenanceSchedule(cronExpression); - } - - private String convertToCronExpression(long interval, TimeUnit timeUnit) { - // Default to daily at midnight - if (interval <= 0) { - return "0 0 0 * * ?"; - } - - switch (timeUnit) { - case MINUTES: - return "0 */" + interval + " * * * ?"; - case HOURS: - return "0 0 */" + interval + " * * ?"; - case DAYS: - return "0 0 0 */" + interval + " * ?"; - default: - return "0 0 0 * * ?"; // Default to daily at midnight - } - } - - public Map getMaintenanceSchedule() { - Map schedule = new HashMap<>(); - schedule.put("cronExpression", cronExpression); - return schedule; - } - - public boolean isInMaintenanceMode() { - return isMaintenanceMode.get(); - } - - public void setMaintenanceMode(boolean enabled) { - isMaintenanceMode.set(enabled); - logger.info("Maintenance mode {}", enabled ? "enabled" : "disabled"); - } - - public void performMaintenance() { - if (isMaintenanceMode.get()) { - logger.info("Skipping maintenance task as system is already in maintenance mode"); - return; - } - - try { - logger.info("Starting scheduled maintenance task"); - setMaintenanceMode(true); - - // Perform maintenance tasks on the catalog - if (catalog != null) { - logger.info("Performing maintenance on catalog: {}", catalog.name()); - // Example maintenance operations: - // - Clean up old metadata files - // - Optimize table layouts - // - Vacuum old snapshots - // - Compact small files - - // For now, just log that we're performing maintenance - // get the list of namespaces. - List namespaces; - if (catalog instanceof SupportsNamespaces) { - SupportsNamespaces nsCatalog = (SupportsNamespaces) catalog; - namespaces = nsCatalog.listNamespaces(); - for (Namespace ns : namespaces) { - logger.debug("Namespace: " + ns); - } - } else { - logger.error("Catalog does not support namespace operations."); - return; - } // Iterate through namespace - - for (Namespace namespace : namespaces) { - // Get the table - List tables = catalog.listTables(namespace); - for (TableIdentifier tableIdent : tables) { - long olderThanMillis = System.currentTimeMillis() - TimeUnit.DAYS.toMillis(30); - // Get the table - Table table = catalog.loadTable(tableIdent); - // Expire snapshots older than 30 days - table.expireSnapshots().expireOlderThan(olderThanMillis).commit(); - // Remove Orphan Files - - // Rewrite Manifests - table.rewriteManifests().rewriteIf(manifest -> true).commit(); - } - } - logger.info("Maintenance operations completed for catalog: {}", catalog.name()); - } else { - logger.warn("No catalog available for maintenance operations"); - } - - logger.info("Scheduled maintenance task completed successfully"); - } catch (Exception e) { - logger.error("Error during scheduled maintenance task", e); - } finally { - setMaintenanceMode(false); - } - } -} From 189ab193df90d239b86b211e510e89f23b513a65 Mon Sep 17 00:00:00 2001 From: Kanthi Subramanian Date: Thu, 24 Apr 2025 16:51:09 -0400 Subject: [PATCH 4/6] ice-catalog: review comments, added synchronization to prevent currentTask accessed from multiple threads. --- .../com/altinity/ice/rest/catalog/Main.java | 13 ++- .../maintenance/MaintenanceScheduler.java | 84 ++++++++++--------- 2 files changed, 55 insertions(+), 42 deletions(-) diff --git a/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/Main.java b/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/Main.java index a012ebb..c399d8f 100644 --- a/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/Main.java +++ b/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/Main.java @@ -70,8 +70,7 @@ public String[] getVersion() { @CommandLine.Option( names = "--maintenance-interval", description = - "Maintenance interval in human-friendly format (e.g. 'every day', 'every monday 09:00')", - defaultValue = "every day 00:00") + "Maintenance interval in human-friendly format (e.g. 'every day', 'every monday 09:00'). Leave empty to disable maintenance.") private String maintenanceInterval; private Main() {} @@ -309,10 +308,16 @@ public Integer call() throws Exception { } private void initializeMaintenanceScheduler(Catalog catalog, Map config) { + if (maintenanceInterval == null || maintenanceInterval.trim().isEmpty()) { + logger.info("Maintenance scheduler is disabled (no maintenance interval specified)"); + return; + } + try { - MaintenanceScheduler scheduler = new MaintenanceScheduler(catalog, config); - scheduler.setMaintenanceSchedule(maintenanceInterval); + MaintenanceScheduler scheduler = + new MaintenanceScheduler(catalog, config, maintenanceInterval); scheduler.startScheduledMaintenance(); + logger.info("Maintenance scheduler initialized with interval: {}", maintenanceInterval); } catch (Exception e) { logger.error("Failed to initialize maintenance scheduler", e); throw new RuntimeException(e); diff --git a/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/maintenance/MaintenanceScheduler.java b/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/maintenance/MaintenanceScheduler.java index 6c7b949..ecaaafa 100644 --- a/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/maintenance/MaintenanceScheduler.java +++ b/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/maintenance/MaintenanceScheduler.java @@ -2,7 +2,6 @@ import com.altinity.ice.rest.catalog.internal.config.Config; import com.github.shyiko.skedule.Schedule; -import java.time.LocalTime; import java.time.ZonedDateTime; import java.util.List; import java.util.Map; @@ -26,17 +25,21 @@ public class MaintenanceScheduler { private final Catalog catalog; private final AtomicBoolean isMaintenanceMode = new AtomicBoolean(false); private final ScheduledExecutorService executor; - private final Map config; + private final Schedule schedule; + private final Object taskLock = new Object(); + private ScheduledFuture currentTask; - private Schedule schedule; + private volatile int expirationDays; + private final String configuredDays; - public MaintenanceScheduler(Catalog catalog, Map config) { + public MaintenanceScheduler( + Catalog catalog, Map config, String maintenanceInterval) { this.catalog = catalog; - this.config = config; this.executor = new ScheduledThreadPoolExecutor(1); ((ScheduledThreadPoolExecutor) executor).setRemoveOnCancelPolicy(true); - // Default schedule: every day at midnight - this.schedule = Schedule.at(LocalTime.MIDNIGHT).everyDay(); + this.schedule = Schedule.parse(maintenanceInterval); + this.expirationDays = DEFAULT_EXPIRATION_DAYS; + this.configuredDays = config.get(Config.OPTION_SNAPSHOT_EXPIRATION_DAYS); } public void startScheduledMaintenance() { @@ -44,41 +47,35 @@ public void startScheduledMaintenance() { } public void stopScheduledMaintenance() { - if (currentTask != null) { - currentTask.cancel(false); + synchronized (taskLock) { + if (currentTask != null) { + currentTask.cancel(false); + } + executor.shutdown(); } - executor.shutdown(); } private void scheduleNextMaintenance() { - if (currentTask != null) { - currentTask.cancel(false); - } - - ZonedDateTime now = ZonedDateTime.now(); - ZonedDateTime next = schedule.next(now); + synchronized (taskLock) { + if (currentTask != null) { + currentTask.cancel(false); + } - long delay = next.toEpochSecond() - now.toEpochSecond(); - currentTask = - executor.schedule( - () -> { - performMaintenance(); - scheduleNextMaintenance(); // Schedule next run - }, - delay, - TimeUnit.SECONDS); + ZonedDateTime now = ZonedDateTime.now(); + ZonedDateTime next = schedule.next(now); - logger.info("Next maintenance scheduled for: {}", next); - } + long delay = next.toEpochSecond() - now.toEpochSecond(); + currentTask = + executor.schedule( + () -> { + performMaintenance(); + scheduleNextMaintenance(); // Schedule next run + }, + delay, + TimeUnit.SECONDS); - public void setMaintenanceSchedule(String scheduleExpression) { - this.schedule = Schedule.parse(scheduleExpression); - scheduleNextMaintenance(); - } - - public void setMaintenanceMode(boolean enabled) { - isMaintenanceMode.set(enabled); - logger.info("Maintenance mode {}", enabled ? "enabled" : "disabled"); + logger.info("Next maintenance scheduled for: {}", next); + } } public void performMaintenance() { @@ -108,8 +105,7 @@ public void performMaintenance() { for (Namespace namespace : namespaces) { List tables = catalog.listTables(namespace); for (TableIdentifier tableIdent : tables) { - int expirationDays = DEFAULT_EXPIRATION_DAYS; - String configuredDays = config.get(Config.OPTION_SNAPSHOT_EXPIRATION_DAYS); + if (configuredDays != null) { try { expirationDays = Integer.parseInt(configuredDays); @@ -125,8 +121,15 @@ public void performMaintenance() { long olderThanMillis = System.currentTimeMillis() - TimeUnit.DAYS.toMillis(expirationDays); Table table = catalog.loadTable(tableIdent); - table.expireSnapshots().expireOlderThan(olderThanMillis).commit(); + + // Check if table has any snapshots before performing maintenance + if (table.currentSnapshot() == null) { + logger.warn("Table {} has no snapshots, skipping maintenance", tableIdent); + continue; + } + table.rewriteManifests().rewriteIf(manifest -> true).commit(); + table.expireSnapshots().expireOlderThan(olderThanMillis).commit(); } } logger.info("Maintenance operations completed for catalog: {}", catalog.name()); @@ -141,4 +144,9 @@ public void performMaintenance() { setMaintenanceMode(false); } } + + private void setMaintenanceMode(boolean enabled) { + isMaintenanceMode.set(enabled); + logger.info("Maintenance mode {}", enabled ? "enabled" : "disabled"); + } } From 590b6365295101ec2e798f85ed93fe9cc5bb127a Mon Sep 17 00:00:00 2001 From: Kanthi Subramanian Date: Thu, 24 Apr 2025 16:58:30 -0400 Subject: [PATCH 5/6] ice-catalog: review comments, added synchronization to prevent currentTask accessed from multiple threads. --- .../rest/catalog/internal/maintenance/MaintenanceScheduler.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/maintenance/MaintenanceScheduler.java b/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/maintenance/MaintenanceScheduler.java index ecaaafa..f01a7a1 100644 --- a/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/maintenance/MaintenanceScheduler.java +++ b/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/maintenance/MaintenanceScheduler.java @@ -27,7 +27,7 @@ public class MaintenanceScheduler { private final ScheduledExecutorService executor; private final Schedule schedule; private final Object taskLock = new Object(); - + private ScheduledFuture currentTask; private volatile int expirationDays; private final String configuredDays; From 76eb25b963f9f5687865bbe533cb36059ca8de3a Mon Sep 17 00:00:00 2001 From: Kanthi Subramanian Date: Mon, 28 Apr 2025 14:01:29 -0400 Subject: [PATCH 6/6] ice: Moved logic of reading config value of snapshot expiration days to constructor. --- examples/scratch/.ice-rest-catalog.yaml | 1 + .../maintenance/MaintenanceScheduler.java | 26 ++++++------------- 2 files changed, 9 insertions(+), 18 deletions(-) diff --git a/examples/scratch/.ice-rest-catalog.yaml b/examples/scratch/.ice-rest-catalog.yaml index 2f076b9..f5c73af 100644 --- a/examples/scratch/.ice-rest-catalog.yaml +++ b/examples/scratch/.ice-rest-catalog.yaml @@ -5,3 +5,4 @@ s3.access-key-id: "miniouser" s3.secret-access-key: "miniopassword" ice.s3.region: minio ice.token: foo +ice.maintenance.snapshot.expiration.days: 20 diff --git a/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/maintenance/MaintenanceScheduler.java b/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/maintenance/MaintenanceScheduler.java index f01a7a1..3fc06b3 100644 --- a/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/maintenance/MaintenanceScheduler.java +++ b/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/maintenance/MaintenanceScheduler.java @@ -29,8 +29,7 @@ public class MaintenanceScheduler { private final Object taskLock = new Object(); private ScheduledFuture currentTask; - private volatile int expirationDays; - private final String configuredDays; + private final Integer snapshotExpirationDays; public MaintenanceScheduler( Catalog catalog, Map config, String maintenanceInterval) { @@ -38,8 +37,12 @@ public MaintenanceScheduler( this.executor = new ScheduledThreadPoolExecutor(1); ((ScheduledThreadPoolExecutor) executor).setRemoveOnCancelPolicy(true); this.schedule = Schedule.parse(maintenanceInterval); - this.expirationDays = DEFAULT_EXPIRATION_DAYS; - this.configuredDays = config.get(Config.OPTION_SNAPSHOT_EXPIRATION_DAYS); + if (config.containsKey(Config.OPTION_SNAPSHOT_EXPIRATION_DAYS)) { + this.snapshotExpirationDays = + Integer.parseInt(config.get(Config.OPTION_SNAPSHOT_EXPIRATION_DAYS)); + } else { + this.snapshotExpirationDays = DEFAULT_EXPIRATION_DAYS; + } } public void startScheduledMaintenance() { @@ -105,21 +108,8 @@ public void performMaintenance() { for (Namespace namespace : namespaces) { List tables = catalog.listTables(namespace); for (TableIdentifier tableIdent : tables) { - - if (configuredDays != null) { - try { - expirationDays = Integer.parseInt(configuredDays); - logger.debug("Using configured snapshot expiration days: {}", expirationDays); - } catch (NumberFormatException e) { - logger.warn( - "Invalid value for {}: {}. Using default of {} days", - Config.OPTION_SNAPSHOT_EXPIRATION_DAYS, - configuredDays, - DEFAULT_EXPIRATION_DAYS); - } - } long olderThanMillis = - System.currentTimeMillis() - TimeUnit.DAYS.toMillis(expirationDays); + System.currentTimeMillis() - TimeUnit.DAYS.toMillis(snapshotExpirationDays); Table table = catalog.loadTable(tableIdent); // Check if table has any snapshots before performing maintenance