Skip to content

Commit

Permalink
Major refactor of code to make it work in Spark Cluster mode (#323)
Browse files Browse the repository at this point in the history
* Major refactor to make CDM work in cluster mode
* Feature fixes
* Fixed guardrail issue
* Updated docs for cluster-mode
  • Loading branch information
pravinbhat authored Oct 25, 2024
1 parent d85c43f commit d3a0d05
Show file tree
Hide file tree
Showing 35 changed files with 215 additions and 353 deletions.
47 changes: 30 additions & 17 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@

Migrate and Validate Tables between Origin and Target Cassandra Clusters.

> :warning: Please note this job has been tested with spark version [3.5.3](https://archive.apache.org/dist/spark/spark-3.5.3/)
> [!IMPORTANT]
> Please note this job has been tested with spark version [3.5.3](https://archive.apache.org/dist/spark/spark-3.5.3/)
## Install as a Container
- Get the latest image that includes all dependencies from [DockerHub](https://hub.docker.com/r/datastax/cassandra-data-migrator)
Expand All @@ -17,21 +18,28 @@ Migrate and Validate Tables between Origin and Target Cassandra Clusters.
- Download the latest jar file from the GitHub [packages area here](https://github.com/datastax/cassandra-data-migrator/packages/1832128)

### Prerequisite
- Install **Java11** (minimum) as Spark binaries are compiled with it.
- Install Spark version [`3.5.3`](https://archive.apache.org/dist/spark/spark-3.5.3/spark-3.5.3-bin-hadoop3-scala2.13.tgz) on a single VM (no cluster necessary) where you want to run this job. Spark can be installed by running the following: -
- **Java11** (minimum) as Spark binaries are compiled with it.
- **Spark `3.5.x` with Scala `2.13` and Hadoop `3.3`**
- Typically installed using [this binary](https://archive.apache.org/dist/spark/spark-3.5.3/spark-3.5.3-bin-hadoop3-scala2.13.tgz) on a single VM (no cluster necessary) where you want to run this job. This simple setup is recommended for most one-time migrations.
- However we recommend a Spark Cluster or a Spark Serverless platform like `Databricks` or `Google Dataproc` (that supports the above mentioned versions) for large (e.g. several terabytes) complex migrations OR when CDM is used as a long-term data-transfer utility and not a one-time job.

Spark can be installed by running the following: -

```
wget https://archive.apache.org/dist/spark/spark-3.5.3/spark-3.5.3-bin-hadoop3-scala2.13.tgz
tar -xvzf spark-3.5.3-bin-hadoop3-scala2.13.tgz
```

> :warning: If the above Spark and Scala version is not properly installed, you'll then see a similar exception like below when running the CDM jobs,
> [!CAUTION]
> If the above Spark and Scala version does not match, you may see an exception similar like below when running the CDM jobs,
```
Exception in thread "main" java.lang.NoSuchMethodError: scala.runtime.Statics.releaseFence()V
```

# Steps for Data-Migration:
> [!NOTE]
> When deploying CDM on a Spark cluster, replace the params `--master "local[*]"` with `--master "spark://master-host:port"` and remove any params (e.g. `--driver-memory`, `--executor-memory`, etc.) related to a single VM run
> :warning: Note that Version 4 of the tool is not backward-compatible with .properties files created in previous versions, and that package names have changed.
# Steps for Data-Migration:

1. `cdm.properties` file needs to be configured as applicable for the environment. Parameter descriptions and defaults are described in the file. The file can have any name, it does not need to be `cdm.properties`.
> * A simplified sample properties file configuration can be found here as [cdm.properties](./src/resources/cdm.properties)
Expand All @@ -46,7 +54,7 @@ spark-submit --properties-file cdm.properties \
--class com.datastax.cdm.job.Migrate cassandra-data-migrator-4.x.x.jar &> logfile_name_$(date +%Y%m%d_%H_%M).txt
```

Note:
**Note:**
- Above command generates a log file `logfile_name_*.txt` to avoid log output on the console.
- Update the memory options (driver & executor memory) based on your use-case
- To track details of a run in the `target` keyspace, pass param `--conf spark.cdm.trackRun=true`
Expand Down Expand Up @@ -98,8 +106,8 @@ spark-submit --properties-file cdm.properties \
spark.cdm.autocorrect.missing false|true
spark.cdm.autocorrect.mismatch false|true
```
Note:
- The validation job will never delete records from target i.e. it only adds or updates data on target
> [!IMPORTANT]
> The validation job will never delete records from target i.e. it only adds or updates data on target
# Rerun (previously incomplete) Migration or Validation
- You can rerun/resume a Migration or Validation job to complete a previous run that could have stopped (or completed with some errors) for any reasons. This mode will skip any token-ranges from the previous run that were migrated (or validated) successfully. This is done by passing the `spark.cdm.trackRun.previousRunId` param as shown below
Expand Down Expand Up @@ -135,6 +143,7 @@ spark-submit --properties-file cdm.properties \
- Supports adding `constants` as new columns on `Target`
- Supports expanding `Map` columns on `Origin` into multiple records on `Target`
- Supports extracting value from a JSON column in `Origin` and map it to a specific field on `Target`
- Can be deployed on a Spark Cluster or a single VM
- Fully containerized (Docker and K8s friendly)
- SSL Support (including custom cipher algorithms)
- Migrate from any Cassandra `Origin` ([Apache Cassandra®](https://cassandra.apache.org) / [DataStax Enterprise™](https://www.datastax.com/products/datastax-enterprise) / [DataStax Astra DB™](https://www.datastax.com/products/datastax-astra)) to any Cassandra `Target` ([Apache Cassandra®](https://cassandra.apache.org) / [DataStax Enterprise™](https://www.datastax.com/products/datastax-enterprise) / [DataStax Astra DB™](https://www.datastax.com/products/datastax-astra))
Expand All @@ -151,21 +160,25 @@ spark-submit --properties-file cdm.properties \
- If a table has only collection and/or UDT non-key columns, the `writetime` used on target will be time the job was run. If you want to avoid this, we recommend setting `spark.cdm.schema.ttlwritetime.calc.useCollections` param to `true` in such scenarios.
- When CDM migration (or validation with autocorrect) is run multiple times on the same table (for whatever reasons), it could lead to duplicate entries in `list` type columns. Note this is [due to a Cassandra/DSE bug](https://issues.apache.org/jira/browse/CASSANDRA-11368) and not a CDM issue. This issue can be addressed by enabling and setting a positive value for `spark.cdm.transform.custom.writetime.incrementBy` param. This param was specifically added to address this issue.
- When you rerun job to resume from a previous run, the run metrics (read, write, skipped, etc.) captured in table `cdm_run_info` will be only for the current run. If the previous run was killed for some reasons, its run metrics may not have been saved. If the previous run did complete (not killed) but with errors, then you will have all run metrics from previous run as well.
- The Spark Cluster based deployment currently has a bug. It reports '0' for all count metrics, while doing underlying tasks (Migration, Validation, etc.). We are working to address this in the upcoming releases. Also note that this issue is only with the Spark cluster deployment and not with the single VM run.

# Performance FAQ
- Below recommendations may only be needed while migrating large tables where the default performance is not good enough.
# Performance recommendations
Below recommendations may only be useful when migrating large tables where the default performance is not good enough
- Performance bottleneck are usually the result of
- Low resource availability on `Origin` OR `Target` cluster
- Low resource availability on CDM VMs, [see recommendations here](https://docs.datastax.com/en/data-migration/deployment-infrastructure.html#_machines)
- Bad schema design which could be caused by out of balance `Origin` cluster, large partitions (> 100 MB), large rows (> 10MB) and/or high column count.
- Incorrect configuration of below properties
- Incorrect configuration of below properties may negatively impact performance
- `numParts`: Default is 5K, but ideal value is usually around table-size/10MB.
- `batchSize`: Default is 5, but this should be set to 1 for tables where primary-key=partition-key OR where average row-size is > 20 KB. Similarly, this should be set to a value > 5, if row-size is small (< 1KB) and most partitions have several rows (100+).
- `fetchSizeInRows`: Default is 1K & this usually fine. However you can reduce this if your table has many large rows (over 100KB).
- `ratelimit`: Default is 20K. Once you set all the other properties appropriately, set this value to the highest possible value that your cluster (origin & target) is able to handle.
- Using schema manipulation features (like `constantColumns`, `explodeMap`, `extractJson`), transformation functions and/or where-filter-conditions (except partition min/max) may negatively impacts performance
- We typically recommend [this infrastructure](https://docs.datastax.com/en/data-migration/deployment-infrastructure.html#_machines) for CDM VMs and [this starter conf](https://github.com/datastax/cassandra-data-migrator/blob/main/src/resources/cdm.properties). You can then optimize the job further based on CDM params info provided above and the observed load and throughput on `Origin` and `Target` cluster.
- Note: For additional performance tuning, refer to details mentioned in [cdm-detailed.properties file here](https://github.com/datastax/cassandra-data-migrator/blob/main/src/resources/cdm-detailed.properties)
- `fetchSizeInRows`: Default is 1K and this usually works fine. However you can reduce this as needed if your table has many large rows (over 100KB).
- `ratelimit`: Default is `20000`, but this property should usually be updated (after updating other properties) to the highest possible value that your `origin` and `target` clusters can efficiently handle.
- Using schema manipulation features (like `constantColumns`, `explodeMap`, `extractJson`), transformation functions and/or where-filter-conditions (except partition min/max) may negatively impact performance
- We typically recommend [this infrastructure](https://docs.datastax.com/en/data-migration/deployment-infrastructure.html#_machines) for CDM VMs and [this starter conf](https://github.com/datastax/cassandra-data-migrator/blob/main/src/resources/cdm.properties). You can then optimize the job further based on CDM params info provided above and the observed load and throughput on `Origin` and `Target` clusters
- Use a Spark Cluster or a Spark Serverless platform like `Databricks` or `Google Dataproc` for large (e.g. several terabytes) complex migrations OR when CDM is used as a long-term data-transfer utility and not a one-time job.

> [!NOTE]
> For additional performance tuning, refer to details mentioned in the [`cdm-detailed.properties` file here](./src/resources/cdm-detailed.properties)
# Building Jar for local development
1. Clone this repo
Expand Down
9 changes: 6 additions & 3 deletions RELEASE.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
# Release Notes
## [4.6.1] - 2024-10-22
- Make `trackRun` feature work on all versions of Cassandra/DSE by replacing the `IN` clause on `cdm_run_details` table.
- Updated `README` docs.
## [4.7.0] - 2024-10-25
- CDM refractored to work when deployed on a Spark Cluster
- More performant for large migration efforts (multi-terabytes clusters with several billions of rows) using Spark Cluster (instead of individual VMs)
- No functional changes and fully backward compatible, just refactor to support Spark cluster deployment

Note: The Spark Cluster based deployment in this release currently has a bug. It reports '0' for all count metrics, while doing underlying tasks (Migration, Validation, etc.). We are working to address this in the upcoming releases. Also note that this issue is only with the Spark cluster deployment and not with the single VM run (i.e. no impact to current users).

## [4.6.0] - 2024-10-18
- Allow using Collections and/or UDTs for `ttl` & `writetime` calculations. This is specifically helpful in scenarios where the only non-key columns are Collections and/or UDTs.
Expand Down
8 changes: 0 additions & 8 deletions SIT/features/05_guardrail/cdm.fixData.assert

This file was deleted.

4 changes: 2 additions & 2 deletions SIT/features/05_guardrail/cdm.guardrailCheck.assert
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
Read Record Count: 4
Valid Record Count: 10
Valid Record Count: 1
Skipped Record Count: 0
Large Record Count: 6
Large Record Count: 3
4 changes: 0 additions & 4 deletions SIT/features/05_guardrail/cdm.migrateData.assert

This file was deleted.

3 changes: 0 additions & 3 deletions SIT/features/05_guardrail/cdm.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1 @@
migrateData com.datastax.cdm.job.Migrate migrate.properties
guardrailCheck com.datastax.cdm.job.GuardrailCheck migrate.properties
validateData com.datastax.cdm.job.DiffData migrate.properties
fixData com.datastax.cdm.job.DiffData fix.properties
8 changes: 0 additions & 8 deletions SIT/features/05_guardrail/cdm.validateData.assert

This file was deleted.

9 changes: 0 additions & 9 deletions SIT/features/05_guardrail/execute.sh
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,3 @@ cd "$workingDir"

/local/cdm.sh -f cdm.txt -s guardrailCheck -d "$workingDir" > cdm.guardrailCheck.out 2>cdm.guardrailCheck.err
/local/cdm-assert.sh -f cdm.guardrailCheck.out -a cdm.guardrailCheck.assert -d "$workingDir"

/local/cdm.sh -f cdm.txt -s migrateData -d "$workingDir" > cdm.migrateData.out 2>cdm.migrateData.err
/local/cdm-assert.sh -f cdm.migrateData.out -a cdm.migrateData.assert -d "$workingDir"

/local/cdm.sh -f cdm.txt -s validateData -d "$workingDir" > cdm.validateData.out 2>cdm.validateData.err
/local/cdm-assert.sh -f cdm.validateData.out -a cdm.validateData.assert -d "$workingDir"

/local/cdm.sh -f cdm.txt -s fixData -d "$workingDir" > cdm.fixData.out 2>cdm.fixData.err
/local/cdm-assert.sh -f cdm.fixData.out -a cdm.fixData.assert -d "$workingDir"
2 changes: 1 addition & 1 deletion SIT/features/05_guardrail/expected.cql
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,4 @@
limitations under the License.
*/

SELECT * FROM target.feature_guardrail;
SELECT count(*) FROM origin.feature_guardrail;
23 changes: 9 additions & 14 deletions SIT/features/05_guardrail/expected.out
Original file line number Diff line number Diff line change
@@ -1,15 +1,10 @@

key | fruit | fruit_taste | value
-------------+---------+-------------+--------
badMapValue | bananas | squishy | valueA
badMapValue | grapes | sour | valueA
badMapValue | oranges | sweet | valueA
clean | apples | delicious | valueA
clean | bananas | squishy | valueA
clean | grapes | sour | valueA
clean | oranges | sweet | valueA
badMapKey | bananas | squishy | valueA
badMapKey | grapes | sour | valueA
badMapKey | oranges | sweet | valueA

(10 rows)
count
-------
4

(1 rows)

Warnings :
Aggregation query used without partition key

29 changes: 0 additions & 29 deletions SIT/features/05_guardrail/fix.properties

This file was deleted.

5 changes: 0 additions & 5 deletions SIT/features/05_guardrail/migrate.properties
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,6 @@ spark.cdm.connect.origin.host cdm-sit-cass
spark.cdm.connect.target.host cdm-sit-cass

spark.cdm.schema.origin.keyspaceTable origin.feature_guardrail
spark.cdm.schema.target.keyspaceTable target.feature_guardrail
spark.cdm.perfops.numParts 1

spark.cdm.feature.explodeMap.origin.name fruits
spark.cdm.feature.explodeMap.target.name.key fruit
spark.cdm.feature.explodeMap.target.name.value fruit_taste

spark.cdm.feature.guardrail.colSizeInKB 1
Loading

0 comments on commit d3a0d05

Please sign in to comment.