From 804500375b3bdfeca485d752d856d7f90e9a3f1d Mon Sep 17 00:00:00 2001
From: Mikhail2048 <mikhailpolivakha@gmail.com>
Date: Mon, 1 Jan 2024 13:38:28 +0300
Subject: [PATCH 1/3] DATACASS-594 Polishing

---
 .../cassandra/core/cql/CassandraAccessor.java | 12 +--
 .../data/cassandra/core/cql/CqlTemplate.java  |  2 +-
 .../core/cql/util/AbstractCollectionTerm.java | 62 +++++++++++++
 .../cassandra/core/cql/util/ListTerm.java     | 17 ++++
 .../data/cassandra/core/cql/util/SetTerm.java | 17 ++++
 .../core/cql/util/StatementBuilder.java       | 92 ++-----------------
 .../cassandra/core/cql/util/TermFactory.java  |  3 +-
 7 files changed, 114 insertions(+), 91 deletions(-)
 create mode 100644 spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/cql/util/AbstractCollectionTerm.java
 create mode 100644 spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/cql/util/ListTerm.java
 create mode 100644 spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/cql/util/SetTerm.java

diff --git a/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/cql/CassandraAccessor.java b/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/cql/CassandraAccessor.java
index 2ec24ea2a..1d3b0757a 100644
--- a/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/cql/CassandraAccessor.java
+++ b/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/cql/CassandraAccessor.java
@@ -85,7 +85,7 @@ public class CassandraAccessor implements InitializingBean {
 	 */
 	private @Nullable ConsistencyLevel serialConsistencyLevel;
 
-	private @Nullable SessionFactory sessionFactory;
+	private SessionFactory sessionFactory;
 
 	/**
 	 * Ensures the Cassandra {@link CqlSession} and exception translator has been propertly set.
@@ -314,7 +314,6 @@ public void setSessionFactory(SessionFactory sessionFactory) {
 	 * @since 2.0
 	 * @see SessionFactory
 	 */
-	@Nullable
 	public SessionFactory getSessionFactory() {
 		return this.sessionFactory;
 	}
@@ -361,11 +360,10 @@ protected Statement<?> applyStatementSettings(Statement<?> statement) {
 		}
 
 		if (keyspace != null) {
-			if (statementToUse instanceof BatchStatement) {
-				statementToUse = ((BatchStatement) statementToUse).setKeyspace(keyspace);
-			}
-			if (statementToUse instanceof SimpleStatement) {
-				statementToUse = ((SimpleStatement) statementToUse).setKeyspace(keyspace);
+			if (statementToUse instanceof BatchStatement bs) {
+				statementToUse = bs.setKeyspace(keyspace);
+			} else if (statementToUse instanceof SimpleStatement ss) {
+				statementToUse = ss.setKeyspace(keyspace);
 			}
 		}
 
diff --git a/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/cql/CqlTemplate.java b/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/cql/CqlTemplate.java
index 7bd5bad1d..364c8149b 100644
--- a/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/cql/CqlTemplate.java
+++ b/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/cql/CqlTemplate.java
@@ -620,7 +620,7 @@ private CqlSession getCurrentSession() {
 
 		SessionFactory sessionFactory = getSessionFactory();
 
-		Assert.state(sessionFactory != null, "SessionFactory is null");
+		Assert.notNull(sessionFactory, "SessionFactory is null");
 
 		return sessionFactory.getSession();
 	}
diff --git a/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/cql/util/AbstractCollectionTerm.java b/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/cql/util/AbstractCollectionTerm.java
new file mode 100644
index 000000000..4df5f009c
--- /dev/null
+++ b/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/cql/util/AbstractCollectionTerm.java
@@ -0,0 +1,62 @@
+package org.springframework.data.cassandra.core.cql.util;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Optional;
+
+import org.jetbrains.annotations.NotNull;
+import org.springframework.lang.NonNull;
+
+import com.datastax.oss.driver.api.querybuilder.term.Term;
+import com.datastax.oss.driver.internal.querybuilder.CqlHelper;
+
+/**
+ * Represents an abstract collection like {@link Term} such as Set, List, Tuple in CQL
+ *
+ * @author Mikhail Polivakha
+ */
+public abstract class AbstractCollectionTerm implements Term {
+
+    @NonNull
+    private final Collection<? extends Term> components;
+
+    /**
+     * @return EnclosingLiterals that are used to render the collection of terms
+     */
+    public abstract EnclosingLiterals enclosingLiterals();
+
+    public AbstractCollectionTerm(Collection<? extends Term> components) {
+        this.components = Optional.ofNullable(components).orElse(Collections.emptySet());
+    }
+
+    @Override
+    public boolean isIdempotent() {
+        return components.stream().allMatch(Term::isIdempotent);
+    }
+
+    @Override
+    public void appendTo(@NotNull StringBuilder builder) {
+        EnclosingLiterals literals = this.enclosingLiterals();
+
+        if (components.isEmpty()) {
+            builder.append(literals.prefix).append(literals.postfix);
+        } else {
+            CqlHelper.append(components, builder, literals.prefix, ",", literals.postfix);
+        }
+    }
+
+    protected static class EnclosingLiterals {
+
+        private final String prefix;
+        private final String postfix;
+
+        protected EnclosingLiterals(String prefix, String postfix) {
+            this.prefix = prefix;
+            this.postfix = postfix;
+        }
+
+        protected static EnclosingLiterals of(String prefix, String postfix) {
+            return new EnclosingLiterals(prefix, postfix);
+        }
+    }
+}
diff --git a/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/cql/util/ListTerm.java b/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/cql/util/ListTerm.java
new file mode 100644
index 000000000..67eb05909
--- /dev/null
+++ b/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/cql/util/ListTerm.java
@@ -0,0 +1,17 @@
+package org.springframework.data.cassandra.core.cql.util;
+
+import java.util.Collection;
+
+import com.datastax.oss.driver.api.querybuilder.term.Term;
+
+public class ListTerm extends AbstractCollectionTerm {
+
+    public ListTerm(Collection<? extends Term> components) {
+        super(components);
+    }
+
+    @Override
+    public EnclosingLiterals enclosingLiterals() {
+        return EnclosingLiterals.of("[", "]");
+    }
+}
diff --git a/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/cql/util/SetTerm.java b/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/cql/util/SetTerm.java
new file mode 100644
index 000000000..0763304e1
--- /dev/null
+++ b/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/cql/util/SetTerm.java
@@ -0,0 +1,17 @@
+package org.springframework.data.cassandra.core.cql.util;
+
+import java.util.Collection;
+
+import com.datastax.oss.driver.api.querybuilder.term.Term;
+
+public class SetTerm extends AbstractCollectionTerm {
+
+    public SetTerm(Collection<? extends Term> components) {
+        super(components);
+    }
+
+    @Override
+    public EnclosingLiterals enclosingLiterals() {
+        return EnclosingLiterals.of("{", "}");
+    }
+}
diff --git a/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/cql/util/StatementBuilder.java b/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/cql/util/StatementBuilder.java
index dee29937f..a060d7e71 100644
--- a/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/cql/util/StatementBuilder.java
+++ b/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/cql/util/StatementBuilder.java
@@ -25,6 +25,7 @@
 import java.util.function.Function;
 import java.util.function.Supplier;
 import java.util.function.UnaryOperator;
+import java.util.stream.Collectors;
 
 import org.springframework.lang.NonNull;
 import org.springframework.lang.Nullable;
@@ -42,7 +43,7 @@
 /**
  * Functional builder for Cassandra {@link BuildableQuery statements}. Statements are built by applying
  * {@link UnaryOperator builder functions} that get applied when {@link #build() building} the actual
- * {@link SimpleStatement statement}. The {@code StatementBuilder} provides a mutable container for statement creation
+ * {@link SimpleStatement statement}. The {@link StatementBuilder} provides a mutable container for statement creation
  * allowing a functional declaration of actions that are necessary to build a statement. This class helps building CQL
  * statements as a {@link BuildableQuery} classes are typically immutable and require return value tracking across
  * methods that want to apply modifications to a statement.
@@ -289,13 +290,13 @@ public <T> T ifBoundOrInline(Function<Bindings, T> bindingFunction, Supplier<T>
 
 	private SimpleStatement build(SimpleStatementBuilder builder) {
 
-		SimpleStatement statmentToUse = onBuild(builder).build();
+		SimpleStatement statementToUse = onBuild(builder).build();
 
 		for (UnaryOperator<SimpleStatement> operator : onBuilt) {
-			statmentToUse = operator.apply(statmentToUse);
+			statementToUse = operator.apply(statementToUse);
 		}
 
-		return statmentToUse;
+		return statementToUse;
 	}
 
 	private SimpleStatementBuilder onBuild(SimpleStatementBuilder statementBuilder) {
@@ -308,26 +309,13 @@ private SimpleStatementBuilder onBuild(SimpleStatementBuilder statementBuilder)
 	@SuppressWarnings("unchecked")
 	private static Term toLiteralTerms(@Nullable Object value, CodecRegistry codecRegistry) {
 
-		if (value instanceof List) {
+		if (value instanceof Collection<?> c) {
 
-			List<Term> terms = new ArrayList<>();
+			List<Term> mappedTerms = c.stream()
+			.map(o -> toLiteralTerms(o, codecRegistry))
+			.toList();
 
-			for (Object o : (List<Object>) value) {
-				terms.add(toLiteralTerms(o, codecRegistry));
-			}
-
-			return new ListTerm(terms);
-		}
-
-		if (value instanceof Set) {
-
-			List<Term> terms = new ArrayList<>();
-
-			for (Object o : (Set<Object>) value) {
-				terms.add(toLiteralTerms(o, codecRegistry));
-			}
-
-			return new SetTerm(terms);
+			return c instanceof Set ? new SetTerm(mappedTerms) : new ListTerm(mappedTerms);
 		}
 
 		if (value instanceof Map) {
@@ -387,66 +375,6 @@ public enum ParameterHandling {
 		BY_NAME
 	}
 
-	static class ListTerm implements Term {
-
-		private final Collection<? extends Term> components;
-
-		public ListTerm(@NonNull Collection<? extends Term> components) {
-			this.components = components;
-		}
-
-		@Override
-		public void appendTo(@NonNull StringBuilder builder) {
-
-			if (components.isEmpty()) {
-				builder.append("[]");
-				return;
-			}
-
-			CqlHelper.append(components, builder, "[", ",", "]");
-		}
-
-		@Override
-		public boolean isIdempotent() {
-			for (Term component : components) {
-				if (!component.isIdempotent()) {
-					return false;
-				}
-			}
-			return true;
-		}
-	}
-
-	static class SetTerm implements Term {
-
-		private final Collection<? extends Term> components;
-
-		public SetTerm(@NonNull Collection<? extends Term> components) {
-			this.components = components;
-		}
-
-		@Override
-		public void appendTo(@NonNull StringBuilder builder) {
-
-			if (components.isEmpty()) {
-				builder.append("{}");
-				return;
-			}
-
-			CqlHelper.append(components, builder, "{", ",", "}");
-		}
-
-		@Override
-		public boolean isIdempotent() {
-			for (Term component : components) {
-				if (!component.isIdempotent()) {
-					return false;
-				}
-			}
-			return true;
-		}
-	}
-
 	static class MapTerm implements Term {
 
 		private final Map<? extends Term, ? extends Term> components;
diff --git a/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/cql/util/TermFactory.java b/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/cql/util/TermFactory.java
index 84e895444..29f78d600 100644
--- a/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/cql/util/TermFactory.java
+++ b/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/cql/util/TermFactory.java
@@ -20,11 +20,12 @@
 
 import org.springframework.lang.Nullable;
 
+import com.datastax.oss.driver.api.querybuilder.BindMarker;
 import com.datastax.oss.driver.api.querybuilder.term.Term;
 
 /**
  * Factory for {@link Term} objects encapsulating a binding {@code value}. Classes implementing this factory interface
- * may return inline terms to render values as part of the query string, or bind markers to supply parameters on
+ * may return inline {@link Term terms} to render values as part of the query string, or {@link BindMarker bind markers} to supply parameters on
  * statement creation/parameter binding.
  * <p>
  * A {@link TermFactory} is typically used with {@link StatementBuilder}.

From 7d1d9e46762d717e4504e4c3d19ae59c616724a0 Mon Sep 17 00:00:00 2001
From: Mikhail2048 <mikhailpolivakha@gmail.com>
Date: Thu, 4 Jan 2024 12:59:50 +0300
Subject: [PATCH 2/3] DATACASS-594 added schema validation capabilities

---
 spring-data-cassandra/pom.xml                 |   6 +
 ...ssandraKeyspaceDoesNotExistsException.java |  15 ++
 ...tiveKeyspaceSetForCqlSessionException.java |  18 ++
 .../CassandraSchemaValidationException.java   |  16 ++
 .../CassandraSchemaValidationProfile.java     |  47 +++++
 .../config/CassandraSchemaValidator.java      | 181 ++++++++++++++++++
 .../cassandra/core/cql/CassandraAccessor.java |   1 +
 .../core/cql/util/StatementBuilder.java       |   3 +-
 ...PrimaryKeyClassEntityMetadataVerifier.java |   2 -
 .../CassandraSchemaValidationProfileTest.java |  30 +++
 .../config/CassandraSchemaValidatorTest.java  | 151 +++++++++++++++
 .../support/IntegrationTestConfig.java        |   3 +-
 .../data/cassandra/support/CqlDataSet.java    |  10 +
 .../test/util/CassandraDelegate.java          |   7 +-
 .../test/util/CassandraExtension.java         |   9 +
 .../cql/session/init/schema-validation.cql    |   4 +
 16 files changed, 495 insertions(+), 8 deletions(-)
 create mode 100644 spring-data-cassandra/src/main/java/org/springframework/data/cassandra/CassandraKeyspaceDoesNotExistsException.java
 create mode 100644 spring-data-cassandra/src/main/java/org/springframework/data/cassandra/CassandraNoActiveKeyspaceSetForCqlSessionException.java
 create mode 100644 spring-data-cassandra/src/main/java/org/springframework/data/cassandra/CassandraSchemaValidationException.java
 create mode 100644 spring-data-cassandra/src/main/java/org/springframework/data/cassandra/config/CassandraSchemaValidationProfile.java
 create mode 100644 spring-data-cassandra/src/main/java/org/springframework/data/cassandra/config/CassandraSchemaValidator.java
 create mode 100644 spring-data-cassandra/src/test/java/org/springframework/data/cassandra/config/CassandraSchemaValidationProfileTest.java
 create mode 100644 spring-data-cassandra/src/test/java/org/springframework/data/cassandra/config/CassandraSchemaValidatorTest.java
 create mode 100644 spring-data-cassandra/src/test/resources/org/springframework/data/cassandra/core/cql/session/init/schema-validation.cql

diff --git a/spring-data-cassandra/pom.xml b/spring-data-cassandra/pom.xml
index f0f45a269..0add627f1 100644
--- a/spring-data-cassandra/pom.xml
+++ b/spring-data-cassandra/pom.xml
@@ -233,6 +233,12 @@
 			<scope>test</scope>
 		</dependency>
 
+		<dependency>
+			<groupId>org.springframework.boot</groupId>
+			<artifactId>spring-boot-test</artifactId>
+			<version>3.1.6</version>
+		</dependency>
+
 	</dependencies>
 
 </project>
diff --git a/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/CassandraKeyspaceDoesNotExistsException.java b/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/CassandraKeyspaceDoesNotExistsException.java
new file mode 100644
index 000000000..76a33cd88
--- /dev/null
+++ b/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/CassandraKeyspaceDoesNotExistsException.java
@@ -0,0 +1,15 @@
+package org.springframework.data.cassandra;
+
+import org.springframework.dao.NonTransientDataAccessException;
+
+/**
+ * The exception to be thrown when keyspace that expected to be present is missing in the cluster
+ *
+ * @author Mikhail Polivakha
+ */
+public class CassandraKeyspaceDoesNotExistsException extends NonTransientDataAccessException {
+
+    public CassandraKeyspaceDoesNotExistsException(String keyspace) {
+        super("Keyspace %s does not exists in the cluster".formatted(keyspace));
+    }
+}
diff --git a/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/CassandraNoActiveKeyspaceSetForCqlSessionException.java b/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/CassandraNoActiveKeyspaceSetForCqlSessionException.java
new file mode 100644
index 000000000..2b4b24639
--- /dev/null
+++ b/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/CassandraNoActiveKeyspaceSetForCqlSessionException.java
@@ -0,0 +1,18 @@
+package org.springframework.data.cassandra;
+
+import org.springframework.dao.NonTransientDataAccessException;
+
+import com.datastax.oss.driver.api.core.CqlSession;
+
+/**
+ * Exception that is thrown in case {@link CqlSession} has no active keyspace set. This should not
+ * typically happen. This exception means some misconfiguration within framework.
+ *
+ * @author Mikhail Polivakha
+ */
+public class CassandraNoActiveKeyspaceSetForCqlSessionException extends NonTransientDataAccessException {
+
+    public CassandraNoActiveKeyspaceSetForCqlSessionException() {
+        super("There is no active keyspace set for CqlSession");
+    }
+}
diff --git a/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/CassandraSchemaValidationException.java b/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/CassandraSchemaValidationException.java
new file mode 100644
index 000000000..6a4974c3f
--- /dev/null
+++ b/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/CassandraSchemaValidationException.java
@@ -0,0 +1,16 @@
+package org.springframework.data.cassandra;
+
+import org.springframework.dao.NonTransientDataAccessException;
+
+/**
+ * The exception that is thrown in case cassandra schema in the particular keyspace does not match
+ * the configuration of the entities inside application.
+ *
+ * @author Mikhail Polivakha
+ */
+public class CassandraSchemaValidationException extends NonTransientDataAccessException {
+
+    public CassandraSchemaValidationException(String message) {
+        super(message);
+    }
+}
diff --git a/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/config/CassandraSchemaValidationProfile.java b/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/config/CassandraSchemaValidationProfile.java
new file mode 100644
index 000000000..e3f228b33
--- /dev/null
+++ b/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/config/CassandraSchemaValidationProfile.java
@@ -0,0 +1,47 @@
+package org.springframework.data.cassandra.config;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import org.springframework.util.Assert;
+import org.springframework.util.CollectionUtils;
+
+/**
+ * Class that encapsulates all the problems encountered during cassandra schema validation
+ *
+ * @author Mikhail Polivakha
+ */
+public class CassandraSchemaValidationProfile {
+
+    private final List<ValidationError> validationErrors;
+
+    public CassandraSchemaValidationProfile(List<ValidationError> validationErrors) {
+        this.validationErrors = validationErrors;
+    }
+
+    public static CassandraSchemaValidationProfile empty() {
+        return new CassandraSchemaValidationProfile(new LinkedList<>());
+    }
+
+    public void addValidationErrors(List<String> message) {
+        if (!CollectionUtils.isEmpty(message)) {
+            this.validationErrors.addAll(message.stream().map(ValidationError::new).collect(Collectors.toSet()));
+        }
+    }
+
+    public record ValidationError(String errorMessage) { }
+
+    public boolean validationFailed() {
+        return !validationErrors.isEmpty();
+    }
+
+    public String renderExceptionMessage() {
+
+        Assert.state(validationFailed(), "Schema validation was successful but error message rendering requested");
+
+        StringBuilder constructedMessage = new StringBuilder("The following errors were encountered during cassandra schema validation:\n");
+        validationErrors.forEach(validationError -> constructedMessage.append("\t- %s\n".formatted(validationError.errorMessage())));
+        return constructedMessage.toString();
+    }
+}
diff --git a/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/config/CassandraSchemaValidator.java b/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/config/CassandraSchemaValidator.java
new file mode 100644
index 000000000..7283f3aa6
--- /dev/null
+++ b/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/config/CassandraSchemaValidator.java
@@ -0,0 +1,181 @@
+package org.springframework.data.cassandra.config;
+
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.springframework.beans.factory.SmartInitializingSingleton;
+import org.springframework.data.cassandra.CassandraKeyspaceDoesNotExistsException;
+import org.springframework.data.cassandra.CassandraNoActiveKeyspaceSetForCqlSessionException;
+import org.springframework.data.cassandra.CassandraSchemaValidationException;
+import org.springframework.data.cassandra.core.mapping.BasicCassandraPersistentEntity;
+import org.springframework.data.cassandra.core.mapping.CassandraMappingContext;
+import org.springframework.data.cassandra.core.mapping.CassandraPersistentEntity;
+import org.springframework.data.cassandra.core.mapping.CassandraPersistentProperty;
+import org.springframework.data.cassandra.core.mapping.CassandraSimpleTypeHolder;
+import org.springframework.data.mapping.PropertyHandler;
+import org.springframework.util.Assert;
+
+import com.datastax.oss.driver.api.core.CqlIdentifier;
+import com.datastax.oss.driver.api.core.CqlSession;
+import com.datastax.oss.driver.api.core.metadata.schema.ColumnMetadata;
+import com.datastax.oss.driver.api.core.metadata.schema.KeyspaceMetadata;
+import com.datastax.oss.driver.api.core.metadata.schema.TableMetadata;
+import com.datastax.oss.driver.api.core.type.DataType;
+
+/**
+ * Class that is responsible to validate cassandra schema inside {@link CqlSession} keyspace.
+ *
+ * @author Mikhail Polivakha
+ */
+public class CassandraSchemaValidator implements SmartInitializingSingleton {
+
+    private static final Log logger = LogFactory.getLog(CassandraSchemaValidator.class);
+
+    private final CqlSessionFactoryBean cqlSessionFactoryBean;
+
+    private final CassandraMappingContext cassandraMappingContext;
+
+    private final boolean strictValidation;
+
+    public CassandraSchemaValidator(
+      CqlSessionFactoryBean cqlSessionFactoryBean,
+      CassandraMappingContext cassandraMappingContext,
+      boolean strictValidation
+    ) {
+        this.strictValidation = strictValidation;
+        this.cqlSessionFactoryBean = cqlSessionFactoryBean;
+        this.cassandraMappingContext = cassandraMappingContext;
+    }
+
+    /**
+     * Here, we only consider {@link CqlSession#getKeyspace() current session keyspace},
+     * because for now there is no way to customize keyspace for {@link CassandraPersistentEntity}.
+     * <p>
+     * See <a href="https://github.com/spring-projects/spring-data-cassandra/issues/921">related issue</a>
+     */
+    @Override
+    public void afterSingletonsInstantiated() {
+        CqlSession session = cqlSessionFactoryBean.getSession();
+
+        CqlIdentifier activeKeyspace = session
+          .getKeyspace()
+          .orElseThrow(CassandraNoActiveKeyspaceSetForCqlSessionException::new);
+
+        KeyspaceMetadata keyspaceMetadata = session
+          .getMetadata()
+          .getKeyspace(activeKeyspace)
+          .orElseThrow(() -> new CassandraKeyspaceDoesNotExistsException(activeKeyspace.asInternal()));
+
+        Collection<BasicCassandraPersistentEntity<?>> persistentEntities = cassandraMappingContext.getPersistentEntities();
+
+        CassandraSchemaValidationProfile validationProfile = CassandraSchemaValidationProfile.empty();
+
+        for (BasicCassandraPersistentEntity<?> persistentEntity : persistentEntities) {
+            validationProfile.addValidationErrors(validatePersistentEntity(keyspaceMetadata, persistentEntity));
+        }
+
+        evaluateValidationResult(validationProfile);
+    }
+
+    private void evaluateValidationResult(CassandraSchemaValidationProfile validationProfile) {
+        if (validationProfile.validationFailed()) {
+            if (strictValidation) {
+                throw new CassandraSchemaValidationException(validationProfile.renderExceptionMessage());
+            } else {
+                if (logger.isErrorEnabled()) {
+                    logger.error(validationProfile.renderExceptionMessage());
+                }
+            }
+        } else {
+            if (logger.isDebugEnabled()) {
+                logger.debug("Cassandra schema validation completed successfully");
+            }
+        }
+    }
+
+    private List<String> validatePersistentEntity(
+      KeyspaceMetadata keyspaceMetadata,
+      BasicCassandraPersistentEntity<?> entity
+    ) {
+
+        if (entity.isTupleType() || entity.isUserDefinedType()) {
+            return List.of();
+        }
+
+        if (logger.isDebugEnabled()) {
+            logger.debug("Validating persistent entity '%s'".formatted(keyspaceMetadata.getName()));
+        }
+
+        Optional<TableMetadata> table = keyspaceMetadata.getTable(entity.getTableName());
+
+        if (table.isPresent()) {
+            return this.validateProperties(table.get(), entity);
+        } else {
+            return List.of(
+              "Unable to locate target table for persistent entity '%s'. Expected table name is '%s', but no such table in keyspace '%s'".formatted(
+                entity.getName(),
+                entity.getTableName(),
+                keyspaceMetadata.getName()
+              )
+            );
+        }
+    }
+
+    private List<String> validateProperties(TableMetadata tableMetadata, BasicCassandraPersistentEntity<?> entity) {
+
+        List<String> validationErrors = new LinkedList<>();
+
+        entity.doWithProperties((PropertyHandler<CassandraPersistentProperty>) persistentProperty -> {
+            CqlIdentifier expectedColumnName = persistentProperty.getColumnName();
+
+            Assert.notNull(expectedColumnName, "Column cannot not be null at this point");
+
+            Optional<ColumnMetadata> column = tableMetadata.getColumn(expectedColumnName);
+
+            if (column.isPresent()) {
+                ColumnMetadata columnMetadata = column.get();
+                DataType dataTypeExpected = CassandraSimpleTypeHolder.getDataTypeFor(persistentProperty.getRawType());
+
+                if (dataTypeExpected == null) {
+                    validationErrors.add(
+                      "Unable to deduce cassandra data type for property '%s' inside the persistent entity '%s'".formatted(
+                        persistentProperty.getName(),
+                        entity.getName()
+                      )
+                    );
+                } else {
+                    if (!Objects.equals(dataTypeExpected.getProtocolCode(), columnMetadata.getType().getProtocolCode())) {
+                        validationErrors.add(
+                          "Expected '%s' data type for '%s' property in the '%s' persistent entity, but actual data type is '%s'".formatted(
+                            dataTypeExpected,
+                            persistentProperty.getName(),
+                            entity.getName(),
+                            columnMetadata.getType()
+                          )
+                        );
+                    }
+                }
+            } else {
+                validationErrors.add(
+                  "Unable to locate target column for persistent property '%s' in persistent entity '%s'. Expected to see column with name '%s', but there is no such column in table '%s'".formatted(
+                    persistentProperty.getName(),
+                    entity.getName(),
+                    expectedColumnName,
+                    entity.getTableName()
+                  )
+                );
+            }
+        });
+
+        return validationErrors;
+    }
+
+    public boolean isStrictValidation() {
+        return strictValidation;
+    }
+}
diff --git a/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/cql/CassandraAccessor.java b/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/cql/CassandraAccessor.java
index 1d3b0757a..ffb70f716 100644
--- a/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/cql/CassandraAccessor.java
+++ b/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/cql/CassandraAccessor.java
@@ -45,6 +45,7 @@
  * @author Mark Paluch
  * @author John Blum
  * @author Tomasz Lelek
+ * @author Mikhail Polivakha
  * @see org.springframework.beans.factory.InitializingBean
  * @see com.datastax.oss.driver.api.core.CqlSession
  */
diff --git a/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/cql/util/StatementBuilder.java b/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/cql/util/StatementBuilder.java
index a060d7e71..061887663 100644
--- a/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/cql/util/StatementBuilder.java
+++ b/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/cql/util/StatementBuilder.java
@@ -25,7 +25,6 @@
 import java.util.function.Function;
 import java.util.function.Supplier;
 import java.util.function.UnaryOperator;
-import java.util.stream.Collectors;
 
 import org.springframework.lang.NonNull;
 import org.springframework.lang.Nullable;
@@ -38,7 +37,6 @@
 import com.datastax.oss.driver.api.querybuilder.BuildableQuery;
 import com.datastax.oss.driver.api.querybuilder.QueryBuilder;
 import com.datastax.oss.driver.api.querybuilder.term.Term;
-import com.datastax.oss.driver.internal.querybuilder.CqlHelper;
 
 /**
  * Functional builder for Cassandra {@link BuildableQuery statements}. Statements are built by applying
@@ -67,6 +65,7 @@
  * All methods returning {@link StatementBuilder} point to the same instance. This class is intended for internal use.
  *
  * @author Mark Paluch
+ * @author Mikhail Polivakha
  * @param <S> Statement type
  * @since 3.0
  */
diff --git a/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/mapping/PrimaryKeyClassEntityMetadataVerifier.java b/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/mapping/PrimaryKeyClassEntityMetadataVerifier.java
index 5400b4987..0c2a16d77 100644
--- a/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/mapping/PrimaryKeyClassEntityMetadataVerifier.java
+++ b/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/mapping/PrimaryKeyClassEntityMetadataVerifier.java
@@ -45,8 +45,6 @@ public void verify(CassandraPersistentEntity<?> entity) throws MappingException
 		List<CassandraPersistentProperty> partitionKeyColumns = new ArrayList<>();
 		List<CassandraPersistentProperty> primaryKeyColumns = new ArrayList<>();
 
-		Class<?> entityType = entity.getType();
-
 		// @Indexed not allowed on type level
 		if (entity.isAnnotationPresent(Indexed.class)) {
 			exceptions.add(new MappingException("@Indexed cannot be used on primary key classes"));
diff --git a/spring-data-cassandra/src/test/java/org/springframework/data/cassandra/config/CassandraSchemaValidationProfileTest.java b/spring-data-cassandra/src/test/java/org/springframework/data/cassandra/config/CassandraSchemaValidationProfileTest.java
new file mode 100644
index 000000000..1e67a2ba0
--- /dev/null
+++ b/spring-data-cassandra/src/test/java/org/springframework/data/cassandra/config/CassandraSchemaValidationProfileTest.java
@@ -0,0 +1,30 @@
+package org.springframework.data.cassandra.config;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.util.List;
+
+import org.junit.jupiter.api.Test;
+
+/**
+ * @author Mikhail Polivakha
+ */
+class CassandraSchemaValidationProfileUnitTest {
+
+    @Test
+    void testRenderingValidationErrorMessageOnSuccessfulValidation() {
+        CassandraSchemaValidationProfile empty = CassandraSchemaValidationProfile.empty();
+
+        assertThatThrownBy(empty::renderExceptionMessage).isInstanceOf(IllegalStateException.class);
+    }
+
+    @Test
+    void testRenderingValidationErrorMessageOnFailedValidation() {
+        CassandraSchemaValidationProfile empty = CassandraSchemaValidationProfile.empty();
+
+        empty.addValidationErrors(List.of("Something went wrong"));
+
+        assertThat(empty.renderExceptionMessage()).contains("- Something went wrong");
+    }
+}
\ No newline at end of file
diff --git a/spring-data-cassandra/src/test/java/org/springframework/data/cassandra/config/CassandraSchemaValidatorTest.java b/spring-data-cassandra/src/test/java/org/springframework/data/cassandra/config/CassandraSchemaValidatorTest.java
new file mode 100644
index 000000000..126872962
--- /dev/null
+++ b/spring-data-cassandra/src/test/java/org/springframework/data/cassandra/config/CassandraSchemaValidatorTest.java
@@ -0,0 +1,151 @@
+package org.springframework.data.cassandra.config;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.UUID;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.AnnotationConfigApplicationContext;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.data.annotation.Id;
+import org.springframework.data.cassandra.CassandraSchemaValidationException;
+import org.springframework.data.cassandra.core.mapping.CassandraMappingContext;
+import org.springframework.data.cassandra.core.mapping.Column;
+import org.springframework.data.cassandra.core.mapping.Table;
+import org.springframework.data.cassandra.repository.support.IntegrationTestConfig;
+import org.springframework.data.cassandra.support.CqlDataSet;
+import org.springframework.data.cassandra.test.util.CassandraExtension;
+
+/**
+ * @author Mikhail Polivakha
+ */
+class CassandraSchemaValidatorTest {
+
+    @RegisterExtension
+    static CassandraExtension cassandraExtension = new CassandraExtension(
+      CqlDataSet.fromClassPath("org/springframework/data/cassandra/core/cql/session/init/schema-validation.cql")
+    );
+
+    @Configuration
+    static class Config extends IntegrationTestConfig {
+
+        @Bean("cassandraSchemaValidator")
+        CassandraSchemaValidator cassandraSchemaValidator(
+          CqlSessionFactoryBean cqlSessionFactoryBean,
+          CassandraMappingContext cassandraMappingContext,
+          @Value("${validation.mode.strict:true}") Boolean validationModeStrict
+        ) {
+            return new CassandraSchemaValidator(
+              cqlSessionFactoryBean,
+              cassandraMappingContext,
+              validationModeStrict
+            );
+        }
+
+        @Override
+        public SchemaAction getSchemaAction() {
+            return SchemaAction.NONE;
+        }
+
+        @Override
+        protected String getKeyspaceName() {
+            return "validation_keyspace";
+        }
+
+        @Bean
+        CassandraMappingContext cassandraMappingContext() {
+            return new CassandraMappingContext();
+        }
+    }
+
+    @Configuration
+    static class WithAutoCreationConfig extends Config {
+
+        @Override
+        public SchemaAction getSchemaAction() {
+            return SchemaAction.CREATE;
+        }
+    }
+
+    @Table(value = "should_pass")
+    static class ShouldPass {
+
+        @Id
+        private UUID id;
+
+        @Column(value = "name")
+        private String name;
+
+        @Column(value = "some_type")
+        private String type;
+
+        private Integer status;
+
+        private Integer precision;
+    }
+
+    @Table(value = "should_fail")
+    static class ShouldFail {
+
+        @Id
+        private UUID id;
+
+        @Column(value = "name")
+        private String name;
+
+        @Column(value = "some_type")
+        private String type;
+
+        private Integer status;
+
+        private Integer precision;
+
+        private Integer noSuchColumn;
+    }
+
+    @Table(value = "no_such_table")
+    static class NoSuchTable {
+
+        @Id
+        private UUID id;
+    }
+
+    @Test
+    void testValidationFailedWithNoSchemaAction() {
+        try {
+            new AnnotationConfigApplicationContext(Config.class);
+            Assertions.fail(); // Context should not boot
+        } catch (CassandraSchemaValidationException exception) {
+            String message = exception.getMessage();
+            assertThat(message).contains("Expected table name is 'no_such_table', but no such table in keyspace");
+            assertThat(message).contains("Expected 'TEXT' data type for 'name' property");
+            assertThat(message).contains("Unable to locate target column for persistent property 'noSuchColumn'");
+        }
+    }
+
+    @Test
+    void testValidationPassedWithSchemaAutoCreation() {
+        try {
+            System.setProperty("validation.mode.strict", "false");
+            new AnnotationConfigApplicationContext(Config.class);
+            System.clearProperty("validation.mode.strict");
+        } catch (CassandraSchemaValidationException exception) {
+            Assertions.fail(); // Context should load successfully, no exception should be thrown
+        }
+    }
+
+    @Test
+    void testWhenSchemaIsAutoCreatedThenValidationShouldPass() {
+        try {
+            var applicationContext = new AnnotationConfigApplicationContext(WithAutoCreationConfig.class);
+            CassandraSchemaValidator cassandraSchemaValidator = applicationContext.getBean("cassandraSchemaValidator", CassandraSchemaValidator.class);
+            Assertions.assertTrue(cassandraSchemaValidator.isStrictValidation());
+        } catch (CassandraSchemaValidationException exception) {
+            Assertions.fail(); // Context should load successfully, no exception should be thrown
+        }
+    }
+}
\ No newline at end of file
diff --git a/spring-data-cassandra/src/test/java/org/springframework/data/cassandra/repository/support/IntegrationTestConfig.java b/spring-data-cassandra/src/test/java/org/springframework/data/cassandra/repository/support/IntegrationTestConfig.java
index 39f630e5b..a89aa3b20 100644
--- a/spring-data-cassandra/src/test/java/org/springframework/data/cassandra/repository/support/IntegrationTestConfig.java
+++ b/spring-data-cassandra/src/test/java/org/springframework/data/cassandra/repository/support/IntegrationTestConfig.java
@@ -37,6 +37,7 @@
  * @author David Webb
  * @author Matthew T. Adams
  * @author Mark Paluch
+ * @author Mikhail Polivakha
  */
 @Configuration
 public class IntegrationTestConfig extends AbstractReactiveCassandraConfiguration {
@@ -88,7 +89,7 @@ protected String getKeyspaceName() {
 
 	@Override
 	protected List<CreateKeyspaceSpecification> getKeyspaceCreations() {
-		return Collections.singletonList(createKeyspace(getKeyspaceName()).withSimpleReplication());
+		return Collections.singletonList(createKeyspace(getKeyspaceName()).ifNotExists().withSimpleReplication());
 	}
 
 	@Override
diff --git a/spring-data-cassandra/src/test/java/org/springframework/data/cassandra/support/CqlDataSet.java b/spring-data-cassandra/src/test/java/org/springframework/data/cassandra/support/CqlDataSet.java
index 59b293905..5e391dcc9 100644
--- a/spring-data-cassandra/src/test/java/org/springframework/data/cassandra/support/CqlDataSet.java
+++ b/spring-data-cassandra/src/test/java/org/springframework/data/cassandra/support/CqlDataSet.java
@@ -29,6 +29,7 @@
  * particular keyspace.
  *
  * @author Mark Paluch
+ * @author Mikhail Polivakha
  */
 public class CqlDataSet {
 
@@ -92,4 +93,13 @@ public static CqlDataSet fromClassPath(String resource) {
 		URL url = Resources.getResource(resource);
 		return new CqlDataSet(url, null);
 	}
+
+	/**
+	 * Create a {@link CqlDataSet} from a class-path resource.
+	 */
+	public static CqlDataSet fromClassPath(String resource, String keyspaceName) {
+
+		URL url = Resources.getResource(resource);
+		return new CqlDataSet(url, keyspaceName);
+	}
 }
diff --git a/spring-data-cassandra/src/test/java/org/springframework/data/cassandra/test/util/CassandraDelegate.java b/spring-data-cassandra/src/test/java/org/springframework/data/cassandra/test/util/CassandraDelegate.java
index 28d874a40..fa0570b58 100644
--- a/spring-data-cassandra/src/test/java/org/springframework/data/cassandra/test/util/CassandraDelegate.java
+++ b/spring-data-cassandra/src/test/java/org/springframework/data/cassandra/test/util/CassandraDelegate.java
@@ -49,6 +49,7 @@
  * @author Mark Paluch
  * @author John Blum
  * @author Tomasz Lelek
+ * @author Mikhail Polivakha
  * @see org.springframework.data.cassandra.support.CassandraConnectionProperties
  * @see com.datastax.oss.driver.api.core.CqlSessionBuilder
  * @see com.datastax.oss.driver.api.core.CqlSession
@@ -155,7 +156,7 @@ public CassandraDelegate before(CqlDataSet cqlDataSet) {
 	 * @param cqlDataSet must not be {@literal null}
 	 * @return the rule
 	 */
-	private CassandraDelegate before(InvocationMode invocationMode, CqlDataSet cqlDataSet) {
+	public CassandraDelegate before(InvocationMode invocationMode, CqlDataSet cqlDataSet) {
 
 		Assert.notNull(cqlDataSet, "CqlDataSet must not be null");
 
@@ -346,7 +347,7 @@ private Version getCassandraVersion(CqlSessionBuilder builder) {
 	private String resolveHost() {
 
 		if (isTestcontainers()) {
-			return container.getContainerIpAddress();
+			return container.getHost();
 		}
 
 		return isEmbedded() ? EmbeddedCassandraServerHelper.getHost() : this.properties.getCassandraHost();
@@ -425,7 +426,7 @@ public void execute(CqlDataSet cqlDataSet) {
 
 	private void load(CqlSession session, CqlDataSet cqlDataSet) {
 
-		Optional.of(cqlDataSet.getKeyspaceName()).filter(StringUtils::hasText)
+		Optional.ofNullable(cqlDataSet.getKeyspaceName()).filter(StringUtils::hasText)
 				.filter(
 						keyspaceName -> !keyspaceName.equals(session.getKeyspace().map(CqlIdentifier::toString).orElse("system")))
 				.ifPresent(keyspaceName -> session.execute(String.format(TestKeyspaceDelegate.USE_KEYSPACE_CQL, keyspaceName)));
diff --git a/spring-data-cassandra/src/test/java/org/springframework/data/cassandra/test/util/CassandraExtension.java b/spring-data-cassandra/src/test/java/org/springframework/data/cassandra/test/util/CassandraExtension.java
index a26e6ee0e..fdd9fe079 100644
--- a/spring-data-cassandra/src/test/java/org/springframework/data/cassandra/test/util/CassandraExtension.java
+++ b/spring-data-cassandra/src/test/java/org/springframework/data/cassandra/test/util/CassandraExtension.java
@@ -27,6 +27,7 @@
 import org.junit.platform.commons.util.AnnotationUtils;
 import org.junit.platform.commons.util.ReflectionUtils;
 import org.junit.platform.commons.util.StringUtils;
+import org.springframework.data.cassandra.support.CqlDataSet;
 import org.springframework.data.cassandra.support.RandomKeyspaceName;
 import org.springframework.util.Assert;
 
@@ -52,6 +53,7 @@
  * </pre>
  *
  * @author Mark Paluch
+ * @author Mikhail Polivakha
  */
 public class CassandraExtension implements BeforeAllCallback, AfterAllCallback, TestInstancePostProcessor {
 
@@ -61,6 +63,13 @@ public class CassandraExtension implements BeforeAllCallback, AfterAllCallback,
 
 	private final CassandraDelegate delegate = new CassandraDelegate("embedded-cassandra.yaml");
 
+	public CassandraExtension() {
+	}
+
+	public CassandraExtension(CqlDataSet cqlDataSet) {
+		this.delegate.before(CassandraDelegate.InvocationMode.ONCE, cqlDataSet);
+	}
+
 	/**
 	 * Retrieve the keyspace {@link CqlSession} that is associated with the current {@link Thread}.
 	 *
diff --git a/spring-data-cassandra/src/test/resources/org/springframework/data/cassandra/core/cql/session/init/schema-validation.cql b/spring-data-cassandra/src/test/resources/org/springframework/data/cassandra/core/cql/session/init/schema-validation.cql
new file mode 100644
index 000000000..d2b9db53e
--- /dev/null
+++ b/spring-data-cassandra/src/test/resources/org/springframework/data/cassandra/core/cql/session/init/schema-validation.cql
@@ -0,0 +1,4 @@
+CREATE KEYSPACE IF NOT EXISTS validation_keyspace WITH durable_writes = false AND replication = {'class': 'SimpleStrategy', 'replication_factor' : 1};
+USE validation_keyspace;
+CREATE TABLE should_pass(id UUID PRIMARY KEY, name TEXT, some_type TEXT, status INT, precision INT);
+CREATE TABLE should_fail(id UUID PRIMARY KEY, name INT, type TEXT, status TEXT, precision INT);

From 3da8a0be02cdc374f77685ba3479b17c4d8df39b Mon Sep 17 00:00:00 2001
From: Mikhail2048 <mikhailpolivakha@gmail.com>
Date: Tue, 9 Jan 2024 20:36:42 +0300
Subject: [PATCH 3/3] DATACASS-594 code review

---
 .../convert}/CassandraSchemaValidator.java    | 34 +++++++++++++------
 .../config/CassandraSchemaValidatorTest.java  |  8 +++--
 2 files changed, 28 insertions(+), 14 deletions(-)
 rename spring-data-cassandra/src/main/java/org/springframework/data/cassandra/{config => core/convert}/CassandraSchemaValidator.java (87%)

diff --git a/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/config/CassandraSchemaValidator.java b/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/convert/CassandraSchemaValidator.java
similarity index 87%
rename from spring-data-cassandra/src/main/java/org/springframework/data/cassandra/config/CassandraSchemaValidator.java
rename to spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/convert/CassandraSchemaValidator.java
index 7283f3aa6..95c2359ed 100644
--- a/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/config/CassandraSchemaValidator.java
+++ b/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/convert/CassandraSchemaValidator.java
@@ -1,4 +1,4 @@
-package org.springframework.data.cassandra.config;
+package org.springframework.data.cassandra.core.convert;
 
 import java.util.Collection;
 import java.util.LinkedList;
@@ -12,6 +12,7 @@
 import org.springframework.data.cassandra.CassandraKeyspaceDoesNotExistsException;
 import org.springframework.data.cassandra.CassandraNoActiveKeyspaceSetForCqlSessionException;
 import org.springframework.data.cassandra.CassandraSchemaValidationException;
+import org.springframework.data.cassandra.config.CassandraSchemaValidationProfile;
 import org.springframework.data.cassandra.core.mapping.BasicCassandraPersistentEntity;
 import org.springframework.data.cassandra.core.mapping.CassandraMappingContext;
 import org.springframework.data.cassandra.core.mapping.CassandraPersistentEntity;
@@ -36,20 +37,28 @@ public class CassandraSchemaValidator implements SmartInitializingSingleton {
 
     private static final Log logger = LogFactory.getLog(CassandraSchemaValidator.class);
 
-    private final CqlSessionFactoryBean cqlSessionFactoryBean;
+    private final CqlSession cqlSession;
 
     private final CassandraMappingContext cassandraMappingContext;
 
+    private final ColumnTypeResolver columnTypeResolver;
+
     private final boolean strictValidation;
 
     public CassandraSchemaValidator(
-      CqlSessionFactoryBean cqlSessionFactoryBean,
-      CassandraMappingContext cassandraMappingContext,
+      CqlSession cqlSession,
+      CassandraConverter cassandraConverter,
       boolean strictValidation
     ) {
         this.strictValidation = strictValidation;
-        this.cqlSessionFactoryBean = cqlSessionFactoryBean;
-        this.cassandraMappingContext = cassandraMappingContext;
+        this.cqlSession = cqlSession;
+        this.cassandraMappingContext = cassandraConverter.getMappingContext();
+        this.columnTypeResolver = new DefaultColumnTypeResolver(
+          cassandraMappingContext,
+          SchemaFactory.ShallowUserTypeResolver.INSTANCE,
+          cassandraConverter::getCodecRegistry,
+          cassandraConverter::getCustomConversions
+        );
     }
 
     /**
@@ -60,13 +69,11 @@ public CassandraSchemaValidator(
      */
     @Override
     public void afterSingletonsInstantiated() {
-        CqlSession session = cqlSessionFactoryBean.getSession();
-
-        CqlIdentifier activeKeyspace = session
+        CqlIdentifier activeKeyspace = cqlSession
           .getKeyspace()
           .orElseThrow(CassandraNoActiveKeyspaceSetForCqlSessionException::new);
 
-        KeyspaceMetadata keyspaceMetadata = session
+        KeyspaceMetadata keyspaceMetadata = cqlSession
           .getMetadata()
           .getKeyspace(activeKeyspace)
           .orElseThrow(() -> new CassandraKeyspaceDoesNotExistsException(activeKeyspace.asInternal()));
@@ -131,6 +138,11 @@ private List<String> validateProperties(TableMetadata tableMetadata, BasicCassan
         List<String> validationErrors = new LinkedList<>();
 
         entity.doWithProperties((PropertyHandler<CassandraPersistentProperty>) persistentProperty -> {
+
+            if (persistentProperty.isTransient()) {
+                return;
+            }
+
             CqlIdentifier expectedColumnName = persistentProperty.getColumnName();
 
             Assert.notNull(expectedColumnName, "Column cannot not be null at this point");
@@ -139,7 +151,7 @@ private List<String> validateProperties(TableMetadata tableMetadata, BasicCassan
 
             if (column.isPresent()) {
                 ColumnMetadata columnMetadata = column.get();
-                DataType dataTypeExpected = CassandraSimpleTypeHolder.getDataTypeFor(persistentProperty.getRawType());
+                DataType dataTypeExpected = columnTypeResolver.resolve(persistentProperty).getDataType();
 
                 if (dataTypeExpected == null) {
                     validationErrors.add(
diff --git a/spring-data-cassandra/src/test/java/org/springframework/data/cassandra/config/CassandraSchemaValidatorTest.java b/spring-data-cassandra/src/test/java/org/springframework/data/cassandra/config/CassandraSchemaValidatorTest.java
index 126872962..4fbea44bd 100644
--- a/spring-data-cassandra/src/test/java/org/springframework/data/cassandra/config/CassandraSchemaValidatorTest.java
+++ b/spring-data-cassandra/src/test/java/org/springframework/data/cassandra/config/CassandraSchemaValidatorTest.java
@@ -13,6 +13,8 @@
 import org.springframework.context.annotation.Configuration;
 import org.springframework.data.annotation.Id;
 import org.springframework.data.cassandra.CassandraSchemaValidationException;
+import org.springframework.data.cassandra.core.convert.CassandraConverter;
+import org.springframework.data.cassandra.core.convert.CassandraSchemaValidator;
 import org.springframework.data.cassandra.core.mapping.CassandraMappingContext;
 import org.springframework.data.cassandra.core.mapping.Column;
 import org.springframework.data.cassandra.core.mapping.Table;
@@ -36,12 +38,12 @@ static class Config extends IntegrationTestConfig {
         @Bean("cassandraSchemaValidator")
         CassandraSchemaValidator cassandraSchemaValidator(
           CqlSessionFactoryBean cqlSessionFactoryBean,
-          CassandraMappingContext cassandraMappingContext,
+          CassandraConverter cassandraConverter,
           @Value("${validation.mode.strict:true}") Boolean validationModeStrict
         ) {
             return new CassandraSchemaValidator(
-              cqlSessionFactoryBean,
-              cassandraMappingContext,
+              cqlSessionFactoryBean.getSession(),
+              cassandraConverter,
               validationModeStrict
             );
         }