Skip to content

Added quartz scheduler to schedule maintenance job. #17

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Apr 29, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions examples/scratch/.ice-rest-catalog.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ s3.access-key-id: "miniouser"
s3.secret-access-key: "miniopassword"
ice.s3.region: minio
ice.token: foo
ice.maintenance.snapshot.expiration.days: 20
6 changes: 6 additions & 0 deletions ice-rest-catalog/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,12 @@
<version>7.9.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.github.shyiko.skedule</groupId>
<artifactId>skedule</artifactId>
<version>0.4.0</version>
<classifier>kalvanized</classifier>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.altinity.ice.rest.catalog.internal.config.Config;
import com.altinity.ice.rest.catalog.internal.jetty.PlainErrorHandler;
import com.altinity.ice.rest.catalog.internal.jetty.ServerConfig;
import com.altinity.ice.rest.catalog.internal.maintenance.MaintenanceScheduler;
import com.altinity.ice.rest.catalog.internal.rest.RESTCatalogAdapter;
import com.altinity.ice.rest.catalog.internal.rest.RESTCatalogAuthorizationHandler;
import com.altinity.ice.rest.catalog.internal.rest.RESTCatalogHandler;
Expand Down Expand Up @@ -66,6 +67,12 @@ public String[] getVersion() {
description = "/path/to/config.yaml ($CWD/.ice-rest-catalog.yaml by default)")
String configFile;

@CommandLine.Option(
names = "--maintenance-interval",
description =
"Maintenance interval in human-friendly format (e.g. 'every day', 'every monday 09:00'). Leave empty to disable maintenance.")
private String maintenanceInterval;

private Main() {}

private static Server createServer(int port, Catalog catalog, Map<String, String> config) {
Expand Down Expand Up @@ -276,6 +283,9 @@ public Integer call() throws Exception {

Catalog catalog = CatalogUtil.buildIcebergCatalog("rest_backend", config, null);

// Initialize and start the maintenance scheduler
initializeMaintenanceScheduler(catalog, config);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it would be nice to have maintenance optional (e.g. disable when maintenanceInterval is empty)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logic added inside the function

  private void initializeMaintenanceScheduler(Catalog catalog, Map<String, String> config) {
    if (maintenanceInterval == null || maintenanceInterval.trim().isEmpty()) {
      logger.info("Maintenance scheduler is disabled (no maintenance interval specified)");
      return;
    ```


// TODO: replace with uds (jetty-unixdomain-server is all that is needed here but in ice you'll
// need to implement custom org.apache.iceberg.rest.RESTClient)
String adminPort = config.get(Config.OPTION_ADMIN_PORT);
Expand All @@ -297,6 +307,23 @@ public Integer call() throws Exception {
return 0;
}

private void initializeMaintenanceScheduler(Catalog catalog, Map<String, String> config) {
if (maintenanceInterval == null || maintenanceInterval.trim().isEmpty()) {
logger.info("Maintenance scheduler is disabled (no maintenance interval specified)");
return;
}

try {
MaintenanceScheduler scheduler =
new MaintenanceScheduler(catalog, config, maintenanceInterval);
scheduler.startScheduledMaintenance();
logger.info("Maintenance scheduler initialized with interval: {}", maintenanceInterval);
} catch (Exception e) {
logger.error("Failed to initialize maintenance scheduler", e);
throw new RuntimeException(e);
}
}

public static void main(String[] args) throws Exception {
int exitCode = new CommandLine(new Main()).execute(args);
System.exit(exitCode);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ private Config() {}
public static final String OPTION_ANONYMOUS_ACCESS = "ice.anonymous.access";
public static final String OPTION_ANONYMOUS_ACCESS_CONFIG =
"ice.anonymous.access.config"; // format: param=value&...
public static final String OPTION_SNAPSHOT_EXPIRATION_DAYS =
"ice.maintenance.snapshot.expiration.days";

// TODO: return Config, not Map
// https://py.iceberg.apache.org/configuration/#setting-configuration-values
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
package com.altinity.ice.rest.catalog.internal.maintenance;

import com.altinity.ice.rest.catalog.internal.config.Config;
import com.github.shyiko.skedule.Schedule;
import java.time.ZonedDateTime;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.SupportsNamespaces;
import org.apache.iceberg.catalog.TableIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MaintenanceScheduler {
private static final Logger logger = LoggerFactory.getLogger(MaintenanceScheduler.class);
private static final int DEFAULT_EXPIRATION_DAYS = 30;

private final Catalog catalog;
private final AtomicBoolean isMaintenanceMode = new AtomicBoolean(false);
private final ScheduledExecutorService executor;
private final Schedule schedule;
private final Object taskLock = new Object();

private ScheduledFuture<?> currentTask;
private final Integer snapshotExpirationDays;

public MaintenanceScheduler(
Catalog catalog, Map<String, String> config, String maintenanceInterval) {
this.catalog = catalog;
this.executor = new ScheduledThreadPoolExecutor(1);
((ScheduledThreadPoolExecutor) executor).setRemoveOnCancelPolicy(true);
this.schedule = Schedule.parse(maintenanceInterval);
if (config.containsKey(Config.OPTION_SNAPSHOT_EXPIRATION_DAYS)) {
this.snapshotExpirationDays =
Integer.parseInt(config.get(Config.OPTION_SNAPSHOT_EXPIRATION_DAYS));
} else {
this.snapshotExpirationDays = DEFAULT_EXPIRATION_DAYS;
}
}

public void startScheduledMaintenance() {
scheduleNextMaintenance();
}

public void stopScheduledMaintenance() {
synchronized (taskLock) {
if (currentTask != null) {
currentTask.cancel(false);
}
executor.shutdown();
}
}

private void scheduleNextMaintenance() {
synchronized (taskLock) {
if (currentTask != null) {
currentTask.cancel(false);
}

ZonedDateTime now = ZonedDateTime.now();
ZonedDateTime next = schedule.next(now);

long delay = next.toEpochSecond() - now.toEpochSecond();
currentTask =
executor.schedule(
() -> {
performMaintenance();
scheduleNextMaintenance(); // Schedule next run
},
delay,
TimeUnit.SECONDS);

logger.info("Next maintenance scheduled for: {}", next);
}
}

public void performMaintenance() {
if (isMaintenanceMode.get()) {
logger.info("Skipping maintenance task as system is already in maintenance mode");
return;
}

try {
logger.info("Starting scheduled maintenance task");
setMaintenanceMode(true);

if (catalog != null) {
logger.info("Performing maintenance on catalog: {}", catalog.name());
List<Namespace> namespaces;
if (catalog instanceof SupportsNamespaces) {
SupportsNamespaces nsCatalog = (SupportsNamespaces) catalog;
namespaces = nsCatalog.listNamespaces();
for (Namespace ns : namespaces) {
logger.debug("Namespace: " + ns);
}
} else {
logger.error("Catalog does not support namespace operations.");
return;
}

for (Namespace namespace : namespaces) {
List<TableIdentifier> tables = catalog.listTables(namespace);
for (TableIdentifier tableIdent : tables) {
long olderThanMillis =
System.currentTimeMillis() - TimeUnit.DAYS.toMillis(snapshotExpirationDays);
Table table = catalog.loadTable(tableIdent);

// Check if table has any snapshots before performing maintenance
if (table.currentSnapshot() == null) {
logger.warn("Table {} has no snapshots, skipping maintenance", tableIdent);
continue;
}

table.rewriteManifests().rewriteIf(manifest -> true).commit();
table.expireSnapshots().expireOlderThan(olderThanMillis).commit();
}
}
logger.info("Maintenance operations completed for catalog: {}", catalog.name());
} else {
logger.warn("No catalog available for maintenance operations");
}

logger.info("Scheduled maintenance task completed successfully");
} catch (Exception e) {
logger.error("Error during scheduled maintenance task", e);
} finally {
setMaintenanceMode(false);
}
}

private void setMaintenanceMode(boolean enabled) {
isMaintenanceMode.set(enabled);
logger.info("Maintenance mode {}", enabled ? "enabled" : "disabled");
}
}