@@ -38,7 +38,6 @@ import com.builtamont.cassandra.migration.internal.util.logging.LogFactory
3838import com.datastax.driver.core.Cluster
3939import com.datastax.driver.core.Metadata
4040import com.datastax.driver.core.Session
41- import sun.reflect.generics.reflectiveObjects.NotImplementedException
4241
4342/* *
4443 * This is the centre point of Cassandra migration, and for most users, the only class they will ever have to deal with.
@@ -87,24 +86,18 @@ class CassandraMigration : CassandraMigrationConfiguration {
8786 * @return The number of successfully applied migrations.
8887 */
8988 fun migrate (): Int {
90- return execute(object : Action <Int > {
91- override fun execute (session : Session ): Int {
92- Initialize ().run (session, keyspace, MigrationVersion .CURRENT .table)
93-
94- val migrationResolver = createMigrationResolver()
95- val schemaVersionDAO = SchemaVersionDAO (session, keyspace, MigrationVersion .CURRENT .table)
96- val migrate = Migrate (
97- migrationResolver,
98- configs.target,
99- schemaVersionDAO,
100- session,
101- keyspace.cluster.username,
102- configs.isAllowOutOfOrder
103- )
89+ return execute(migrateAction())
90+ }
10491
105- return migrate.run ()
106- }
107- })
92+ /* *
93+ * Starts the database migration. All pending migrations will be applied in order.
94+ * Calling migrate on an up-to-date database has no effect.
95+ *
96+ * @param session The Cassandra connection session.
97+ * @return The number of successfully applied migrations.
98+ */
99+ fun migrate (session : Session ): Int {
100+ return execute(migrateAction(), session)
108101 }
109102
110103 /* *
@@ -114,16 +107,18 @@ class CassandraMigration : CassandraMigrationConfiguration {
114107 * @return All migrations sorted by version, oldest first.
115108 */
116109 fun info (): MigrationInfoService {
117- return execute(object : Action <MigrationInfoService > {
118- override fun execute (session : Session ): MigrationInfoService {
119- val migrationResolver = createMigrationResolver()
120- val schemaVersionDAO = SchemaVersionDAO (session, keyspace, MigrationVersion .CURRENT .table)
121- val migrationInfoService = MigrationInfoServiceImpl (migrationResolver, schemaVersionDAO, configs.target, false , true )
122- migrationInfoService.refresh()
110+ return execute(infoAction())
111+ }
123112
124- return migrationInfoService
125- }
126- })
113+ /* *
114+ * Retrieves the complete information about all the migrations including applied, pending and current migrations with
115+ * details and status.
116+ *
117+ * @param session The Cassandra connection session.
118+ * @return All migrations sorted by version, oldest first.
119+ */
120+ fun info (session : Session ): MigrationInfoService {
121+ return execute(infoAction(), session)
127122 }
128123
129124 /* *
@@ -136,32 +131,46 @@ class CassandraMigration : CassandraMigrationConfiguration {
136131 * * versions have been resolved that haven't been applied yet
137132 */
138133 fun validate () {
139- val validationError = execute(object : Action <String ?> {
140- override fun execute (session : Session ): String? {
141- val migrationResolver = createMigrationResolver()
142- val schemaVersionDAO = SchemaVersionDAO (session, keyspace, MigrationVersion .CURRENT .table)
143- val validate = Validate (migrationResolver, configs.target, schemaVersionDAO, true , false )
144- return validate.run ()
145- }
146- })
134+ val validationError = execute(validateAction())
135+
136+ if (validationError != null ) {
137+ throw CassandraMigrationException (" Validation failed. $validationError " )
138+ }
139+ }
140+
141+ /* *
142+ * Validate applied migrations against resolved ones (on the filesystem or classpath)
143+ * to detect accidental changes that may prevent the schema(s) from being recreated exactly.
144+ *
145+ * Validation fails if:
146+ * * differences in migration names, types or checksums are found
147+ * * versions have been applied that aren't resolved locally anymore
148+ * * versions have been resolved that haven't been applied yet
149+ *
150+ * @param session The Cassandra connection session.
151+ */
152+ fun validate (session : Session ) {
153+ val validationError = execute(validateAction(), session)
147154
148155 if (validationError != null ) {
149- throw CassandraMigrationException (" Validation failed. " + validationError)
156+ throw CassandraMigrationException (" Validation failed. $ validationError" )
150157 }
151158 }
152159
153160 /* *
154161 * Baselines an existing database, excluding all migrations up to and including baselineVersion.
155162 */
156163 fun baseline () {
157- execute(object : Action <Unit > {
158- override fun execute (session : Session ): Unit {
159- val migrationResolver = createMigrationResolver()
160- val schemaVersionDAO = SchemaVersionDAO (session, keyspace, MigrationVersion .CURRENT .table)
161- val baseline = Baseline (migrationResolver, baselineVersion, schemaVersionDAO, baselineDescription, keyspace.cluster.username)
162- baseline.run ()
163- }
164- })
164+ execute(baselineAction())
165+ }
166+
167+ /* *
168+ * Baselines an existing database, excluding all migrations up to and including baselineVersion.
169+ *
170+ * @param session The Cassandra connection session.
171+ */
172+ fun baseline (session : Session ) {
173+ execute(baselineAction(), session)
165174 }
166175
167176 /* *
@@ -179,40 +188,37 @@ class CassandraMigration : CassandraMigrationConfiguration {
179188 var cluster: Cluster ? = null
180189 var session: Session ? = null
181190 try {
182- if ( null == keyspace)
183- throw IllegalArgumentException ( " Unable to establish Cassandra session. Keyspace is not configured. " )
184-
185- if (null == keyspace.cluster )
186- throw IllegalArgumentException (" Unable to establish Cassandra session. Cluster is not configured ." )
191+ // Guard clauses: Cluster and Keyspace must be defined
192+ val errorMsg = " Unable to establish Cassandra session"
193+ if (keyspace == null ) throw IllegalArgumentException ( " $errorMsg . Keyspace is not configured. " )
194+ if (keyspace.cluster == null ) throw IllegalArgumentException ( " $errorMsg . Cluster is not configured. " )
195+ if (keyspace.name.isNullOrEmpty()) throw IllegalArgumentException (" $errorMsg . Keyspace is not specified ." )
187196
197+ // Build the Cluster
188198 val builder = Cluster .Builder ()
189199 builder.addContactPoints(* keyspace.cluster.contactpoints).withPort(keyspace.cluster.port)
190- if (null != keyspace.cluster.username && ! keyspace.cluster.username.trim { it <= ' ' }.isEmpty ()) {
191- if (null != keyspace.cluster.password && ! keyspace.cluster.password.trim { it <= ' ' }.isEmpty ()) {
200+ if (! keyspace.cluster.username.isNullOrBlank ()) {
201+ if (! keyspace.cluster.password.isNullOrBlank ()) {
192202 builder.withCredentials(keyspace.cluster.username, keyspace.cluster.password)
193203 } else {
194204 throw IllegalArgumentException (" Password must be provided with username." )
195205 }
196206 }
197207 cluster = builder.build()
198208
199- val metadata = cluster!! .metadata
200- LOG .info(getConnectionInfo(metadata))
209+ LOG .info(getConnectionInfo(cluster.metadata))
201210
211+ // Create a new Session
202212 session = cluster.newSession()
203- if ( null == keyspace.name || keyspace.name.trim { it <= ' ' }.length == 0 )
204- throw IllegalArgumentException ( " Keyspace not specified. " )
205-
206- val keyspaces = metadata.keyspaces
207- var keyspaceExists = false
208- for (keyspaceMetadata in keyspaces) {
209- if (keyspaceMetadata.name.equals(keyspace.name, ignoreCase = true ))
210- keyspaceExists = true
213+
214+ // Connect to the specific Keyspace context (if already defined )
215+ val keyspaces = cluster.metadata.keyspaces.map { it.name }
216+ val keyspaceExists = keyspaces.first { it.equals(keyspace.name, ignoreCase = true ) }.isNotEmpty()
217+ if ( keyspaceExists) {
218+ session = cluster.connect(keyspace.name)
219+ } else {
220+ throw CassandraMigrationException ( " Keyspace: ${keyspace.name} does not exist. " )
211221 }
212- if (keyspaceExists)
213- session!! .execute(" USE " + keyspace.name)
214- else
215- throw CassandraMigrationException (" Keyspace: " + keyspace.name + " does not exist." )
216222
217223 result = action.execute(session)
218224 } finally {
@@ -229,11 +235,22 @@ class CassandraMigration : CassandraMigrationConfiguration {
229235 } catch (e: Exception ) {
230236 LOG .warn(" Error closing Cassandra cluster" )
231237 }
232-
233238 }
234239 return result
235240 }
236241
242+ /* *
243+ * Executes this command with an existing session, with proper resource handling and cleanup.
244+ *
245+ * @param action The action to execute.
246+ * @param session The Cassandra connection session.
247+ * @param T The action result type.
248+ * @return The action result.
249+ */
250+ internal fun <T > execute (action : Action <T >, session : Session ): T {
251+ return action.execute(session)
252+ }
253+
237254 /* *
238255 * Get Cassandra connection information.
239256 *
@@ -263,6 +280,102 @@ class CassandraMigration : CassandraMigrationConfiguration {
263280 return CompositeMigrationResolver (classLoader, ScriptsLocations (* configs.scriptsLocations), configs.encoding)
264281 }
265282
283+ /* *
284+ * Creates the SchemaVersionDAO.
285+ *
286+ * @return A configured SchemaVersionDAO instance.
287+ */
288+ private fun createSchemaVersionDAO (session : Session ): SchemaVersionDAO {
289+ return SchemaVersionDAO (session, keyspace, MigrationVersion .CURRENT .table)
290+ }
291+
292+ /* *
293+ * @return The database migration action.
294+ */
295+ private fun migrateAction (): Action <Int > {
296+ return object : Action <Int > {
297+ override fun execute (session : Session ): Int {
298+ Initialize ().run (session, keyspace, MigrationVersion .CURRENT .table)
299+
300+ val migrationResolver = createMigrationResolver()
301+ val schemaVersionDAO = createSchemaVersionDAO(session)
302+ val migrate = Migrate (
303+ migrationResolver,
304+ configs.target,
305+ schemaVersionDAO,
306+ session,
307+ keyspace.cluster.username,
308+ configs.isAllowOutOfOrder
309+ )
310+
311+ return migrate.run ()
312+ }
313+ }
314+ }
315+
316+ /* *
317+ * @return The migration info service action.
318+ */
319+ private fun infoAction (): Action <MigrationInfoService > {
320+ return object : Action <MigrationInfoService > {
321+ override fun execute (session : Session ): MigrationInfoService {
322+ val migrationResolver = createMigrationResolver()
323+ val schemaVersionDAO = createSchemaVersionDAO(session)
324+ val migrationInfoService = MigrationInfoServiceImpl (
325+ migrationResolver,
326+ schemaVersionDAO,
327+ configs.target,
328+ outOfOrder = false ,
329+ pendingOrFuture = true
330+ )
331+ migrationInfoService.refresh()
332+
333+ return migrationInfoService
334+ }
335+ }
336+ }
337+
338+ /* *
339+ * @return The migration validation action.
340+ */
341+ private fun validateAction (): Action <String ?> {
342+ return object : Action <String ?> {
343+ override fun execute (session : Session ): String? {
344+ val migrationResolver = createMigrationResolver()
345+ val schemaVersionDAO = createSchemaVersionDAO(session)
346+ val validate = Validate (
347+ migrationResolver,
348+ configs.target,
349+ schemaVersionDAO,
350+ outOfOrder = true ,
351+ pendingOrFuture = false
352+ )
353+
354+ return validate.run ()
355+ }
356+ }
357+ }
358+
359+ /* *
360+ * @return The migration baselining action.
361+ */
362+ private fun baselineAction (): Action <Unit > {
363+ return object : Action <Unit > {
364+ override fun execute (session : Session ): Unit {
365+ val migrationResolver = createMigrationResolver()
366+ val schemaVersionDAO = createSchemaVersionDAO(session)
367+ val baseline = Baseline (
368+ migrationResolver,
369+ baselineVersion,
370+ schemaVersionDAO,
371+ baselineDescription,
372+ keyspace.cluster.username
373+ )
374+ baseline.run ()
375+ }
376+ }
377+ }
378+
266379 /* *
267380 * A Cassandra migration action that can be executed.
268381 *
0 commit comments