From d3a0d057194f0cb432f5512849e7cbda48081549 Mon Sep 17 00:00:00 2001 From: Pravin Bhat Date: Fri, 25 Oct 2024 09:36:52 -0400 Subject: [PATCH] Major refactor of code to make it work in Spark Cluster mode (#323) * Major refactor to make CDM work in cluster mode * Feature fixes * Fixed guardrail issue * Updated docs for cluster-mode --- README.md | 47 +++++++++----- RELEASE.md | 9 ++- SIT/features/05_guardrail/cdm.fixData.assert | 8 --- .../05_guardrail/cdm.guardrailCheck.assert | 4 +- .../05_guardrail/cdm.migrateData.assert | 4 -- SIT/features/05_guardrail/cdm.txt | 3 - .../05_guardrail/cdm.validateData.assert | 8 --- SIT/features/05_guardrail/execute.sh | 9 --- SIT/features/05_guardrail/expected.cql | 2 +- SIT/features/05_guardrail/expected.out | 23 +++---- SIT/features/05_guardrail/fix.properties | 29 --------- SIT/features/05_guardrail/migrate.properties | 5 -- SIT/features/05_guardrail/setup.cql | 6 -- .../java/com/datastax/cdm/data/PKFactory.java | 4 -- .../com/datastax/cdm/feature/Guardrail.java | 59 +++--------------- .../datastax/cdm/job/AbstractJobSession.java | 62 ++++++++++++------- .../com/datastax/cdm/job/BaseJobSession.java | 10 +-- .../com/datastax/cdm/job/CopyJobSession.java | 24 ++----- .../cdm/job/CopyJobSessionFactory.java | 10 +-- .../com/datastax/cdm/job/DiffJobSession.java | 24 ++----- .../cdm/job/DiffJobSessionFactory.java | 10 +-- .../cdm/job/GuardrailCheckJobSession.java | 40 +++--------- .../job/GuardrailCheckJobSessionFactory.java | 10 +-- .../datastax/cdm/job/IJobSessionFactory.java | 5 +- .../java/com/datastax/cdm/job/JobCounter.java | 3 +- .../cdm/properties/PropertyHelper.java | 3 +- .../com/datastax/cdm/schema/CqlTable.java | 22 ++++--- .../scala/com/datastax/cdm/job/BaseJob.scala | 5 +- .../datastax/cdm/job/BasePartitionJob.scala | 3 +- .../datastax/cdm/job/ConnectionFetcher.scala | 5 +- .../scala/com/datastax/cdm/job/DiffData.scala | 21 +++++-- .../com/datastax/cdm/job/GuardrailCheck.scala | 18 +++++- .../scala/com/datastax/cdm/job/Migrate.scala | 18 +++++- .../datastax/cdm/feature/GuardrailTest.java | 49 +++------------ .../cdm/job/ConnectionFetcherTest.java | 6 +- 35 files changed, 215 insertions(+), 353 deletions(-) delete mode 100644 SIT/features/05_guardrail/cdm.fixData.assert delete mode 100644 SIT/features/05_guardrail/cdm.migrateData.assert delete mode 100644 SIT/features/05_guardrail/cdm.validateData.assert delete mode 100644 SIT/features/05_guardrail/fix.properties diff --git a/README.md b/README.md index b6b971c0..ae5732f1 100644 --- a/README.md +++ b/README.md @@ -7,7 +7,8 @@ Migrate and Validate Tables between Origin and Target Cassandra Clusters. -> :warning: Please note this job has been tested with spark version [3.5.3](https://archive.apache.org/dist/spark/spark-3.5.3/) +> [!IMPORTANT] +> Please note this job has been tested with spark version [3.5.3](https://archive.apache.org/dist/spark/spark-3.5.3/) ## Install as a Container - Get the latest image that includes all dependencies from [DockerHub](https://hub.docker.com/r/datastax/cassandra-data-migrator) @@ -17,21 +18,28 @@ Migrate and Validate Tables between Origin and Target Cassandra Clusters. - Download the latest jar file from the GitHub [packages area here](https://github.com/datastax/cassandra-data-migrator/packages/1832128) ### Prerequisite -- Install **Java11** (minimum) as Spark binaries are compiled with it. -- Install Spark version [`3.5.3`](https://archive.apache.org/dist/spark/spark-3.5.3/spark-3.5.3-bin-hadoop3-scala2.13.tgz) on a single VM (no cluster necessary) where you want to run this job. Spark can be installed by running the following: - +- **Java11** (minimum) as Spark binaries are compiled with it. +- **Spark `3.5.x` with Scala `2.13` and Hadoop `3.3`** + - Typically installed using [this binary](https://archive.apache.org/dist/spark/spark-3.5.3/spark-3.5.3-bin-hadoop3-scala2.13.tgz) on a single VM (no cluster necessary) where you want to run this job. This simple setup is recommended for most one-time migrations. + - However we recommend a Spark Cluster or a Spark Serverless platform like `Databricks` or `Google Dataproc` (that supports the above mentioned versions) for large (e.g. several terabytes) complex migrations OR when CDM is used as a long-term data-transfer utility and not a one-time job. + +Spark can be installed by running the following: - + ``` wget https://archive.apache.org/dist/spark/spark-3.5.3/spark-3.5.3-bin-hadoop3-scala2.13.tgz tar -xvzf spark-3.5.3-bin-hadoop3-scala2.13.tgz ``` -> :warning: If the above Spark and Scala version is not properly installed, you'll then see a similar exception like below when running the CDM jobs, +> [!CAUTION] +> If the above Spark and Scala version does not match, you may see an exception similar like below when running the CDM jobs, ``` Exception in thread "main" java.lang.NoSuchMethodError: scala.runtime.Statics.releaseFence()V ``` -# Steps for Data-Migration: +> [!NOTE] +> When deploying CDM on a Spark cluster, replace the params `--master "local[*]"` with `--master "spark://master-host:port"` and remove any params (e.g. `--driver-memory`, `--executor-memory`, etc.) related to a single VM run -> :warning: Note that Version 4 of the tool is not backward-compatible with .properties files created in previous versions, and that package names have changed. +# Steps for Data-Migration: 1. `cdm.properties` file needs to be configured as applicable for the environment. Parameter descriptions and defaults are described in the file. The file can have any name, it does not need to be `cdm.properties`. > * A simplified sample properties file configuration can be found here as [cdm.properties](./src/resources/cdm.properties) @@ -46,7 +54,7 @@ spark-submit --properties-file cdm.properties \ --class com.datastax.cdm.job.Migrate cassandra-data-migrator-4.x.x.jar &> logfile_name_$(date +%Y%m%d_%H_%M).txt ``` -Note: +**Note:** - Above command generates a log file `logfile_name_*.txt` to avoid log output on the console. - Update the memory options (driver & executor memory) based on your use-case - To track details of a run in the `target` keyspace, pass param `--conf spark.cdm.trackRun=true` @@ -98,8 +106,8 @@ spark-submit --properties-file cdm.properties \ spark.cdm.autocorrect.missing false|true spark.cdm.autocorrect.mismatch false|true ``` -Note: -- The validation job will never delete records from target i.e. it only adds or updates data on target +> [!IMPORTANT] +> The validation job will never delete records from target i.e. it only adds or updates data on target # Rerun (previously incomplete) Migration or Validation - You can rerun/resume a Migration or Validation job to complete a previous run that could have stopped (or completed with some errors) for any reasons. This mode will skip any token-ranges from the previous run that were migrated (or validated) successfully. This is done by passing the `spark.cdm.trackRun.previousRunId` param as shown below @@ -135,6 +143,7 @@ spark-submit --properties-file cdm.properties \ - Supports adding `constants` as new columns on `Target` - Supports expanding `Map` columns on `Origin` into multiple records on `Target` - Supports extracting value from a JSON column in `Origin` and map it to a specific field on `Target` +- Can be deployed on a Spark Cluster or a single VM - Fully containerized (Docker and K8s friendly) - SSL Support (including custom cipher algorithms) - Migrate from any Cassandra `Origin` ([Apache Cassandra®](https://cassandra.apache.org) / [DataStax Enterprise™](https://www.datastax.com/products/datastax-enterprise) / [DataStax Astra DB™](https://www.datastax.com/products/datastax-astra)) to any Cassandra `Target` ([Apache Cassandra®](https://cassandra.apache.org) / [DataStax Enterprise™](https://www.datastax.com/products/datastax-enterprise) / [DataStax Astra DB™](https://www.datastax.com/products/datastax-astra)) @@ -151,21 +160,25 @@ spark-submit --properties-file cdm.properties \ - If a table has only collection and/or UDT non-key columns, the `writetime` used on target will be time the job was run. If you want to avoid this, we recommend setting `spark.cdm.schema.ttlwritetime.calc.useCollections` param to `true` in such scenarios. - When CDM migration (or validation with autocorrect) is run multiple times on the same table (for whatever reasons), it could lead to duplicate entries in `list` type columns. Note this is [due to a Cassandra/DSE bug](https://issues.apache.org/jira/browse/CASSANDRA-11368) and not a CDM issue. This issue can be addressed by enabling and setting a positive value for `spark.cdm.transform.custom.writetime.incrementBy` param. This param was specifically added to address this issue. - When you rerun job to resume from a previous run, the run metrics (read, write, skipped, etc.) captured in table `cdm_run_info` will be only for the current run. If the previous run was killed for some reasons, its run metrics may not have been saved. If the previous run did complete (not killed) but with errors, then you will have all run metrics from previous run as well. +- The Spark Cluster based deployment currently has a bug. It reports '0' for all count metrics, while doing underlying tasks (Migration, Validation, etc.). We are working to address this in the upcoming releases. Also note that this issue is only with the Spark cluster deployment and not with the single VM run. -# Performance FAQ -- Below recommendations may only be needed while migrating large tables where the default performance is not good enough. +# Performance recommendations +Below recommendations may only be useful when migrating large tables where the default performance is not good enough - Performance bottleneck are usually the result of - Low resource availability on `Origin` OR `Target` cluster - Low resource availability on CDM VMs, [see recommendations here](https://docs.datastax.com/en/data-migration/deployment-infrastructure.html#_machines) - Bad schema design which could be caused by out of balance `Origin` cluster, large partitions (> 100 MB), large rows (> 10MB) and/or high column count. -- Incorrect configuration of below properties +- Incorrect configuration of below properties may negatively impact performance - `numParts`: Default is 5K, but ideal value is usually around table-size/10MB. - `batchSize`: Default is 5, but this should be set to 1 for tables where primary-key=partition-key OR where average row-size is > 20 KB. Similarly, this should be set to a value > 5, if row-size is small (< 1KB) and most partitions have several rows (100+). - - `fetchSizeInRows`: Default is 1K & this usually fine. However you can reduce this if your table has many large rows (over 100KB). - - `ratelimit`: Default is 20K. Once you set all the other properties appropriately, set this value to the highest possible value that your cluster (origin & target) is able to handle. -- Using schema manipulation features (like `constantColumns`, `explodeMap`, `extractJson`), transformation functions and/or where-filter-conditions (except partition min/max) may negatively impacts performance -- We typically recommend [this infrastructure](https://docs.datastax.com/en/data-migration/deployment-infrastructure.html#_machines) for CDM VMs and [this starter conf](https://github.com/datastax/cassandra-data-migrator/blob/main/src/resources/cdm.properties). You can then optimize the job further based on CDM params info provided above and the observed load and throughput on `Origin` and `Target` cluster. -- Note: For additional performance tuning, refer to details mentioned in [cdm-detailed.properties file here](https://github.com/datastax/cassandra-data-migrator/blob/main/src/resources/cdm-detailed.properties) + - `fetchSizeInRows`: Default is 1K and this usually works fine. However you can reduce this as needed if your table has many large rows (over 100KB). + - `ratelimit`: Default is `20000`, but this property should usually be updated (after updating other properties) to the highest possible value that your `origin` and `target` clusters can efficiently handle. +- Using schema manipulation features (like `constantColumns`, `explodeMap`, `extractJson`), transformation functions and/or where-filter-conditions (except partition min/max) may negatively impact performance +- We typically recommend [this infrastructure](https://docs.datastax.com/en/data-migration/deployment-infrastructure.html#_machines) for CDM VMs and [this starter conf](https://github.com/datastax/cassandra-data-migrator/blob/main/src/resources/cdm.properties). You can then optimize the job further based on CDM params info provided above and the observed load and throughput on `Origin` and `Target` clusters +- Use a Spark Cluster or a Spark Serverless platform like `Databricks` or `Google Dataproc` for large (e.g. several terabytes) complex migrations OR when CDM is used as a long-term data-transfer utility and not a one-time job. + +> [!NOTE] +> For additional performance tuning, refer to details mentioned in the [`cdm-detailed.properties` file here](./src/resources/cdm-detailed.properties) # Building Jar for local development 1. Clone this repo diff --git a/RELEASE.md b/RELEASE.md index 596efd99..fbac8d40 100644 --- a/RELEASE.md +++ b/RELEASE.md @@ -1,7 +1,10 @@ # Release Notes -## [4.6.1] - 2024-10-22 -- Make `trackRun` feature work on all versions of Cassandra/DSE by replacing the `IN` clause on `cdm_run_details` table. -- Updated `README` docs. +## [4.7.0] - 2024-10-25 +- CDM refractored to work when deployed on a Spark Cluster +- More performant for large migration efforts (multi-terabytes clusters with several billions of rows) using Spark Cluster (instead of individual VMs) +- No functional changes and fully backward compatible, just refactor to support Spark cluster deployment + +Note: The Spark Cluster based deployment in this release currently has a bug. It reports '0' for all count metrics, while doing underlying tasks (Migration, Validation, etc.). We are working to address this in the upcoming releases. Also note that this issue is only with the Spark cluster deployment and not with the single VM run (i.e. no impact to current users). ## [4.6.0] - 2024-10-18 - Allow using Collections and/or UDTs for `ttl` & `writetime` calculations. This is specifically helpful in scenarios where the only non-key columns are Collections and/or UDTs. diff --git a/SIT/features/05_guardrail/cdm.fixData.assert b/SIT/features/05_guardrail/cdm.fixData.assert deleted file mode 100644 index c5f87534..00000000 --- a/SIT/features/05_guardrail/cdm.fixData.assert +++ /dev/null @@ -1,8 +0,0 @@ -Read Record Count: 4 -Mismatch Record Count: 0 -Corrected Mismatch Record Count: 0 -Missing Record Count: 0 -Corrected Missing Record Count: 0 -Valid Record Count: 10 -Skipped Record Count: 6 -Error Record Count: 0 diff --git a/SIT/features/05_guardrail/cdm.guardrailCheck.assert b/SIT/features/05_guardrail/cdm.guardrailCheck.assert index 164358c9..9fcfdc2a 100644 --- a/SIT/features/05_guardrail/cdm.guardrailCheck.assert +++ b/SIT/features/05_guardrail/cdm.guardrailCheck.assert @@ -1,4 +1,4 @@ Read Record Count: 4 -Valid Record Count: 10 +Valid Record Count: 1 Skipped Record Count: 0 -Large Record Count: 6 +Large Record Count: 3 diff --git a/SIT/features/05_guardrail/cdm.migrateData.assert b/SIT/features/05_guardrail/cdm.migrateData.assert deleted file mode 100644 index c2ab2607..00000000 --- a/SIT/features/05_guardrail/cdm.migrateData.assert +++ /dev/null @@ -1,4 +0,0 @@ -Read Record Count: 4 -Skipped Record Count: 6 -Write Record Count: 10 -Error Record Count: 0 diff --git a/SIT/features/05_guardrail/cdm.txt b/SIT/features/05_guardrail/cdm.txt index 4d400d2d..ea90d72e 100644 --- a/SIT/features/05_guardrail/cdm.txt +++ b/SIT/features/05_guardrail/cdm.txt @@ -1,4 +1 @@ -migrateData com.datastax.cdm.job.Migrate migrate.properties guardrailCheck com.datastax.cdm.job.GuardrailCheck migrate.properties -validateData com.datastax.cdm.job.DiffData migrate.properties -fixData com.datastax.cdm.job.DiffData fix.properties diff --git a/SIT/features/05_guardrail/cdm.validateData.assert b/SIT/features/05_guardrail/cdm.validateData.assert deleted file mode 100644 index c5f87534..00000000 --- a/SIT/features/05_guardrail/cdm.validateData.assert +++ /dev/null @@ -1,8 +0,0 @@ -Read Record Count: 4 -Mismatch Record Count: 0 -Corrected Mismatch Record Count: 0 -Missing Record Count: 0 -Corrected Missing Record Count: 0 -Valid Record Count: 10 -Skipped Record Count: 6 -Error Record Count: 0 diff --git a/SIT/features/05_guardrail/execute.sh b/SIT/features/05_guardrail/execute.sh index 135416a1..49cdd3dc 100644 --- a/SIT/features/05_guardrail/execute.sh +++ b/SIT/features/05_guardrail/execute.sh @@ -19,12 +19,3 @@ cd "$workingDir" /local/cdm.sh -f cdm.txt -s guardrailCheck -d "$workingDir" > cdm.guardrailCheck.out 2>cdm.guardrailCheck.err /local/cdm-assert.sh -f cdm.guardrailCheck.out -a cdm.guardrailCheck.assert -d "$workingDir" - -/local/cdm.sh -f cdm.txt -s migrateData -d "$workingDir" > cdm.migrateData.out 2>cdm.migrateData.err -/local/cdm-assert.sh -f cdm.migrateData.out -a cdm.migrateData.assert -d "$workingDir" - -/local/cdm.sh -f cdm.txt -s validateData -d "$workingDir" > cdm.validateData.out 2>cdm.validateData.err -/local/cdm-assert.sh -f cdm.validateData.out -a cdm.validateData.assert -d "$workingDir" - -/local/cdm.sh -f cdm.txt -s fixData -d "$workingDir" > cdm.fixData.out 2>cdm.fixData.err -/local/cdm-assert.sh -f cdm.fixData.out -a cdm.fixData.assert -d "$workingDir" diff --git a/SIT/features/05_guardrail/expected.cql b/SIT/features/05_guardrail/expected.cql index 5420a2d8..1b99959c 100644 --- a/SIT/features/05_guardrail/expected.cql +++ b/SIT/features/05_guardrail/expected.cql @@ -12,4 +12,4 @@ limitations under the License. */ -SELECT * FROM target.feature_guardrail; +SELECT count(*) FROM origin.feature_guardrail; diff --git a/SIT/features/05_guardrail/expected.out b/SIT/features/05_guardrail/expected.out index 1bcd8f50..18485ae5 100644 --- a/SIT/features/05_guardrail/expected.out +++ b/SIT/features/05_guardrail/expected.out @@ -1,15 +1,10 @@ - key | fruit | fruit_taste | value --------------+---------+-------------+-------- - badMapValue | bananas | squishy | valueA - badMapValue | grapes | sour | valueA - badMapValue | oranges | sweet | valueA - clean | apples | delicious | valueA - clean | bananas | squishy | valueA - clean | grapes | sour | valueA - clean | oranges | sweet | valueA - badMapKey | bananas | squishy | valueA - badMapKey | grapes | sour | valueA - badMapKey | oranges | sweet | valueA - -(10 rows) + count +------- + 4 + +(1 rows) + +Warnings : +Aggregation query used without partition key + diff --git a/SIT/features/05_guardrail/fix.properties b/SIT/features/05_guardrail/fix.properties deleted file mode 100644 index 0046ee48..00000000 --- a/SIT/features/05_guardrail/fix.properties +++ /dev/null @@ -1,29 +0,0 @@ -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -spark.cdm.connect.origin.host cdm-sit-cass -spark.cdm.connect.target.host cdm-sit-cass - -spark.cdm.schema.origin.keyspaceTable origin.feature_guardrail -spark.cdm.schema.target.keyspaceTable target.feature_guardrail -spark.cdm.perfops.numParts 1 - -spark.cdm.feature.explodeMap.origin.name fruits -spark.cdm.feature.explodeMap.target.name.key fruit -spark.cdm.feature.explodeMap.target.name.value fruit_taste - -spark.cdm.feature.guardrail.colSizeInKB 1 - -spark.cdm.autocorrect.missing true -spark.cdm.autocorrect.mismatch true diff --git a/SIT/features/05_guardrail/migrate.properties b/SIT/features/05_guardrail/migrate.properties index 03f801c7..ddc211a8 100644 --- a/SIT/features/05_guardrail/migrate.properties +++ b/SIT/features/05_guardrail/migrate.properties @@ -16,11 +16,6 @@ spark.cdm.connect.origin.host cdm-sit-cass spark.cdm.connect.target.host cdm-sit-cass spark.cdm.schema.origin.keyspaceTable origin.feature_guardrail -spark.cdm.schema.target.keyspaceTable target.feature_guardrail spark.cdm.perfops.numParts 1 -spark.cdm.feature.explodeMap.origin.name fruits -spark.cdm.feature.explodeMap.target.name.key fruit -spark.cdm.feature.explodeMap.target.name.value fruit_taste - spark.cdm.feature.guardrail.colSizeInKB 1 diff --git a/SIT/features/05_guardrail/setup.cql b/SIT/features/05_guardrail/setup.cql index 9179bdc0..09736761 100644 --- a/SIT/features/05_guardrail/setup.cql +++ b/SIT/features/05_guardrail/setup.cql @@ -18,12 +18,6 @@ INSERT INTO origin.feature_guardrail(key,value,fruits) VALUES ('clean','valueA', INSERT INTO origin.feature_guardrail(key,value,fruits) VALUES ('badValue','Lorem ipsum dolor sit amet, consectetur adipiscing elit. Etiam sed commodo enim, eu ullamcorper nunc. Curabitur et risus id ligula commodo convallis. In hac habitasse platea dictumst. Phasellus blandit, felis id elementum facilisis, felis est dictum lectus, a rhoncus massa odio vel dolor. Donec interdum sodales erat, quis facilisis est porttitor sagittis. Aliquam non neque cursus, sodales quam vitae, malesuada risus. Phasellus nec porttitor lacus. Aliquam erat volutpat. Mauris velit massa, luctus ut nunc quis, mollis malesuada purus. Vestibulum non feugiat magna. Nullam mattis vestibulum velit in iaculis. Phasellus aliquet sit amet urna nec volutpat. Phasellus mollis metus ac enim lacinia vehicula. Vestibulum ante ipsum primis in faucibus orci luctus et ultrices posuere cubilia curae; Mauris eu sapien neque. Nulla eu dolor tellus. Quisque id augue ex. Vivamus nec hendrerit mi, id malesuada orci. Aenean in lectus porta, placerat sem nec, tristique massa. Morbi tristique pulvinar massa eget fermentum. Donec elementum quam a augue vulputate convallis non sit amet velit. Morbi erat felis, aliquam quis justo id, interdum pretium felis. Integer at tristique eros. Nam sed diam posuere, ornare lorem ac, venenatis felis. Donec congue, dolor sed finibus egestas, erat lectus ultrices sem, nec tincidunt eros leo et turpis. Donec placerat dolor ipsum, eu porttitor ipsum luctus eu. Sed ultricies sed arcu nec dictum. Nullam sodales vestibulum nibh, ut dapibus sem placerat vel. Nunc cras amet.', {'apples': 'delicious', 'oranges': 'sweet', 'bananas': 'squishy', 'grapes': 'sour'}); - INSERT INTO origin.feature_guardrail(key,value,fruits) VALUES ('badMapKey','valueA', {'Lorem ipsum dolor sit amet, consectetur adipiscing elit. Etiam sed commodo enim, eu ullamcorper nunc. Curabitur et risus id ligula commodo convallis. In hac habitasse platea dictumst. Phasellus blandit, felis id elementum facilisis, felis est dictum lectus, a rhoncus massa odio vel dolor. Donec interdum sodales erat, quis facilisis est porttitor sagittis. Aliquam non neque cursus, sodales quam vitae, malesuada risus. Phasellus nec porttitor lacus. Aliquam erat volutpat. Mauris velit massa, luctus ut nunc quis, mollis malesuada purus. Vestibulum non feugiat magna. Nullam mattis vestibulum velit in iaculis. Phasellus aliquet sit amet urna nec volutpat. Phasellus mollis metus ac enim lacinia vehicula. Vestibulum ante ipsum primis in faucibus orci luctus et ultrices posuere cubilia curae; Mauris eu sapien neque. Nulla eu dolor tellus. Quisque id augue ex. Vivamus nec hendrerit mi, id malesuada orci. Aenean in lectus porta, placerat sem nec, tristique massa. Morbi tristique pulvinar massa eget fermentum. Donec elementum quam a augue vulputate convallis non sit amet velit. Morbi erat felis, aliquam quis justo id, interdum pretium felis. Integer at tristique eros. Nam sed diam posuere, ornare lorem ac, venenatis felis. Donec congue, dolor sed finibus egestas, erat lectus ultrices sem, nec tincidunt eros leo et turpis. Donec placerat dolor ipsum, eu porttitor ipsum luctus eu. Sed ultricies sed arcu nec dictum. Nullam sodales vestibulum nibh, ut dapibus sem placerat vel. Nunc cras amet.': 'delicious', 'oranges': 'sweet', 'bananas': 'squishy', 'grapes': 'sour'}); - INSERT INTO origin.feature_guardrail(key,value,fruits) VALUES ('badMapValue','valueA', {'apples': 'Lorem ipsum dolor sit amet, consectetur adipiscing elit. Etiam sed commodo enim, eu ullamcorper nunc. Curabitur et risus id ligula commodo convallis. In hac habitasse platea dictumst. Phasellus blandit, felis id elementum facilisis, felis est dictum lectus, a rhoncus massa odio vel dolor. Donec interdum sodales erat, quis facilisis est porttitor sagittis. Aliquam non neque cursus, sodales quam vitae, malesuada risus. Phasellus nec porttitor lacus. Aliquam erat volutpat. Mauris velit massa, luctus ut nunc quis, mollis malesuada purus. Vestibulum non feugiat magna. Nullam mattis vestibulum velit in iaculis. Phasellus aliquet sit amet urna nec volutpat. Phasellus mollis metus ac enim lacinia vehicula. Vestibulum ante ipsum primis in faucibus orci luctus et ultrices posuere cubilia curae; Mauris eu sapien neque. Nulla eu dolor tellus. Quisque id augue ex. Vivamus nec hendrerit mi, id malesuada orci. Aenean in lectus porta, placerat sem nec, tristique massa. Morbi tristique pulvinar massa eget fermentum. Donec elementum quam a augue vulputate convallis non sit amet velit. Morbi erat felis, aliquam quis justo id, interdum pretium felis. Integer at tristique eros. Nam sed diam posuere, ornare lorem ac, venenatis felis. Donec congue, dolor sed finibus egestas, erat lectus ultrices sem, nec tincidunt eros leo et turpis. Donec placerat dolor ipsum, eu porttitor ipsum luctus eu. Sed ultricies sed arcu nec dictum. Nullam sodales vestibulum nibh, ut dapibus sem placerat vel. Nunc cras amet.', 'oranges': 'sweet', 'bananas': 'squishy', 'grapes': 'sour'}); - - -DROP TABLE IF EXISTS target.feature_guardrail; -CREATE TABLE target.feature_guardrail(key text, fruit text, value text, fruit_taste text, PRIMARY KEY ((key), fruit)); diff --git a/src/main/java/com/datastax/cdm/data/PKFactory.java b/src/main/java/com/datastax/cdm/data/PKFactory.java index ae931754..413315cc 100644 --- a/src/main/java/com/datastax/cdm/data/PKFactory.java +++ b/src/main/java/com/datastax/cdm/data/PKFactory.java @@ -118,10 +118,6 @@ public EnhancedPK getTargetPK(Row originRow) { } } - public EnhancedPK toEnhancedPK(List pkValues, List pkClasses) { - return new EnhancedPK(this, pkValues, pkClasses, null, null, null); - } - public String getWhereClause(Side side) { StringBuilder sb; List pkNames; diff --git a/src/main/java/com/datastax/cdm/feature/Guardrail.java b/src/main/java/com/datastax/cdm/feature/Guardrail.java index 75923f42..1bfbb01e 100644 --- a/src/main/java/com/datastax/cdm/feature/Guardrail.java +++ b/src/main/java/com/datastax/cdm/feature/Guardrail.java @@ -22,7 +22,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.datastax.cdm.data.Record; import com.datastax.cdm.properties.IPropertyHelper; import com.datastax.cdm.properties.KnownProperties; import com.datastax.cdm.schema.CqlTable; @@ -38,14 +37,7 @@ public class Guardrail extends AbstractFeature { private DecimalFormat decimalFormat = new DecimalFormat("0.###"); private Double colSizeInKB; - private CqlTable originTable; - private CqlTable targetTable; - - private ExplodeMap explodeMap = null; - private int explodeMapIndex = -1; - private int explodeMapKeyIndex = -1; - private int explodeMapValueIndex = -1; @Override public boolean loadProperties(IPropertyHelper propertyHelper) { @@ -79,13 +71,7 @@ public boolean initializeAndValidate(CqlTable originTable, CqlTable targetTable) logger.error("originTable is null, or is not an origin table"); return false; } - if (null == targetTable || targetTable.isOrigin()) { - logger.error("targetTable is null, or is an origin table"); - return false; - } - this.originTable = originTable; - this.targetTable = targetTable; isValid = true; if (!validateProperties()) { @@ -100,45 +86,26 @@ public boolean initializeAndValidate(CqlTable originTable, CqlTable targetTable) } private Map check(Map currentChecks, int targetIndex, Object targetValue) { - int colSize = targetTable.byteCount(targetIndex, targetValue); + int colSize = originTable.byteCount(targetIndex, targetValue); if (logTrace) logger.trace("Column {} at targetIndex {} has size {} bytes", - targetTable.getColumnNames(false).get(targetIndex), targetIndex, colSize); + originTable.getColumnNames(false).get(targetIndex), targetIndex, colSize); if (colSize > colSizeInKB * BASE_FACTOR) { if (null == currentChecks) currentChecks = new HashMap(); - currentChecks.put(targetTable.getColumnNames(false).get(targetIndex), colSize); + currentChecks.put(originTable.getColumnNames(false).get(targetIndex), colSize); } return currentChecks; } - public String guardrailChecks(Record record) { + public String guardrailChecks(Row row) { if (!isEnabled) return null; - if (null == record) - return CLEAN_CHECK; - if (null == record.getOriginRow()) - return CLEAN_CHECK; - Map largeColumns = null; - - // As the order of feature loading is not guaranteed, we wait until the first record to figure out the - // explodeMap - if (null == explodeMap) - calcExplodeMap(); - Row row = record.getOriginRow(); + Map largeColumns = null; for (int i = 0; i < originTable.getColumnNames(false).size(); i++) { - if (i == explodeMapIndex) { - // Exploded columns are already converted to target type - largeColumns = check(largeColumns, explodeMapKeyIndex, record.getPk().getExplodeMapKey()); - largeColumns = check(largeColumns, explodeMapValueIndex, record.getPk().getExplodeMapValue()); - } else { - int targetIndex = originTable.getCorrespondingIndex(i); - if (targetIndex < 0) - continue; // TTL and WRITETIME columns for example - Object targetObject = originTable.getAndConvertData(i, row); - largeColumns = check(largeColumns, targetIndex, targetObject); - } + Object targetObject = originTable.getAndConvertData(i, row); + largeColumns = check(largeColumns, i, targetObject); } if (null == largeColumns || largeColumns.isEmpty()) @@ -157,16 +124,4 @@ public String guardrailChecks(Record record) { return sb.toString(); } - private void calcExplodeMap() { - this.explodeMap = (ExplodeMap) originTable.getFeature(Featureset.EXPLODE_MAP); - if (null != explodeMap && explodeMap.isEnabled()) { - explodeMapIndex = explodeMap.getOriginColumnIndex(); - explodeMapKeyIndex = explodeMap.getKeyColumnIndex(); - explodeMapValueIndex = explodeMap.getValueColumnIndex(); - if (logDebug) - logger.debug( - "ExplodeMap is enabled. explodeMapIndex={}, explodeMapKeyIndex={}, explodeMapValueIndex={}", - explodeMapIndex, explodeMapKeyIndex, explodeMapValueIndex); - } - } } diff --git a/src/main/java/com/datastax/cdm/job/AbstractJobSession.java b/src/main/java/com/datastax/cdm/job/AbstractJobSession.java index 991ae14a..69853ca6 100644 --- a/src/main/java/com/datastax/cdm/job/AbstractJobSession.java +++ b/src/main/java/com/datastax/cdm/job/AbstractJobSession.java @@ -15,9 +15,9 @@ */ package com.datastax.cdm.job; +import java.math.BigInteger; import java.util.Collection; -import org.apache.spark.SparkConf; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -29,6 +29,8 @@ import com.datastax.cdm.feature.Guardrail; import com.datastax.cdm.feature.TrackRun; import com.datastax.cdm.properties.KnownProperties; +import com.datastax.cdm.properties.PropertyHelper; +import com.datastax.cdm.schema.CqlTable; import com.datastax.oss.driver.api.core.CqlSession; import com.datastax.oss.driver.shaded.guava.common.util.concurrent.RateLimiter; @@ -38,19 +40,18 @@ public abstract class AbstractJobSession extends BaseJobSession { protected EnhancedSession originSession; protected EnhancedSession targetSession; protected Guardrail guardrailFeature; - protected boolean guardrailEnabled; protected JobCounter jobCounter; protected Long printStatsAfter; protected TrackRun trackRunFeature; protected long runId; - protected AbstractJobSession(CqlSession originSession, CqlSession targetSession, SparkConf sc) { - this(originSession, targetSession, sc, false); + protected AbstractJobSession(CqlSession originSession, CqlSession targetSession, PropertyHelper propHelper) { + this(originSession, targetSession, propHelper, false); } - protected AbstractJobSession(CqlSession originSession, CqlSession targetSession, SparkConf sc, + protected AbstractJobSession(CqlSession originSession, CqlSession targetSession, PropertyHelper propHelper, boolean isJobMigrateRowsFromFile) { - super(sc); + super(propHelper); if (originSession == null) { return; @@ -73,36 +74,49 @@ protected AbstractJobSession(CqlSession originSession, CqlSession targetSession, logger.info("PARAM -- Origin Rate Limit: {}", rateLimiterOrigin.getRate()); logger.info("PARAM -- Target Rate Limit: {}", rateLimiterTarget.getRate()); + CqlTable cqlTableOrigin, cqlTableTarget = null; this.originSession = new EnhancedSession(propertyHelper, originSession, true); - this.targetSession = new EnhancedSession(propertyHelper, targetSession, false); - this.originSession.getCqlTable().setOtherCqlTable(this.targetSession.getCqlTable()); - this.targetSession.getCqlTable().setOtherCqlTable(this.originSession.getCqlTable()); - this.originSession.getCqlTable().setFeatureMap(featureMap); - this.targetSession.getCqlTable().setFeatureMap(featureMap); + cqlTableOrigin = this.originSession.getCqlTable(); + cqlTableOrigin.setFeatureMap(featureMap); boolean allFeaturesValid = true; - for (Feature f : featureMap.values()) { - if (!f.initializeAndValidate(this.originSession.getCqlTable(), this.targetSession.getCqlTable())) { - allFeaturesValid = false; - logger.error("Feature {} is not valid. Please check the configuration.", f.getClass().getName()); + if (targetSession != null) { + this.targetSession = new EnhancedSession(propertyHelper, targetSession, false); + cqlTableTarget = this.targetSession.getCqlTable(); + cqlTableOrigin.setOtherCqlTable(cqlTableTarget); + cqlTableTarget.setOtherCqlTable(cqlTableOrigin); + cqlTableTarget.setFeatureMap(featureMap); + for (Feature f : featureMap.values()) { + if (!f.initializeAndValidate(cqlTableOrigin, cqlTableTarget)) { + allFeaturesValid = false; + logger.error("Feature {} is not valid. Please check the configuration.", f.getClass().getName()); + } } + + PKFactory pkFactory = new PKFactory(propertyHelper, cqlTableOrigin, cqlTableTarget); + this.originSession.setPKFactory(pkFactory); + this.targetSession.setPKFactory(pkFactory); } + if (!allFeaturesValid) { throw new RuntimeException("One or more features are not valid. Please check the configuration."); } - PKFactory pkFactory = new PKFactory(propertyHelper, this.originSession.getCqlTable(), - this.targetSession.getCqlTable()); - this.originSession.setPKFactory(pkFactory); - this.targetSession.setPKFactory(pkFactory); + this.guardrailFeature = (Guardrail) cqlTableOrigin.getFeature(Featureset.GUARDRAIL_CHECK); + if (!guardrailFeature.initializeAndValidate(cqlTableOrigin, null)) { + allFeaturesValid = false; + logger.error("Feature {} is not valid. Please check the configuration.", + guardrailFeature.getClass().getName()); + } + } - // Guardrail is referenced by many jobs, and is evaluated against the target - // table - this.guardrailFeature = (Guardrail) this.targetSession.getCqlTable().getFeature(Featureset.GUARDRAIL_CHECK); - this.guardrailEnabled = this.guardrailFeature.isEnabled(); + public void processSlice(SplitPartitions.Partition slice, TrackRun trackRunFeature, long runId) { + this.trackRunFeature = trackRunFeature; + this.runId = runId; + this.processSlice(slice.getMin(), slice.getMax()); } - public abstract void processSlice(T slice); + protected abstract void processSlice(BigInteger min, BigInteger max); public synchronized void initCdmRun(long runId, long prevRunId, Collection parts, TrackRun trackRunFeature, TrackRun.RUN_TYPE runType) { diff --git a/src/main/java/com/datastax/cdm/job/BaseJobSession.java b/src/main/java/com/datastax/cdm/job/BaseJobSession.java index 6a400d3a..d2b858fa 100644 --- a/src/main/java/com/datastax/cdm/job/BaseJobSession.java +++ b/src/main/java/com/datastax/cdm/job/BaseJobSession.java @@ -20,9 +20,6 @@ import java.util.Map; import org.apache.logging.log4j.ThreadContext; -import org.apache.spark.SparkConf; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import com.datastax.cdm.feature.Feature; import com.datastax.cdm.feature.FeatureFactory; @@ -34,14 +31,13 @@ public abstract class BaseJobSession { public static final String THREAD_CONTEXT_LABEL = "ThreadLabel"; protected static final String NEW_LINE = System.lineSeparator(); - private final Logger logger = LoggerFactory.getLogger(this.getClass().getName()); - protected PropertyHelper propertyHelper = PropertyHelper.getInstance(); + protected PropertyHelper propertyHelper; protected Map featureMap; protected RateLimiter rateLimiterOrigin; protected RateLimiter rateLimiterTarget; - protected BaseJobSession(SparkConf sc) { - propertyHelper.initializeSparkConf(sc); + protected BaseJobSession(PropertyHelper propHelper) { + propertyHelper = propHelper; this.featureMap = calcFeatureMap(propertyHelper); ThreadContext.put(THREAD_CONTEXT_LABEL, getThreadLabel()); } diff --git a/src/main/java/com/datastax/cdm/job/CopyJobSession.java b/src/main/java/com/datastax/cdm/job/CopyJobSession.java index 506b71e5..514f14a8 100644 --- a/src/main/java/com/datastax/cdm/job/CopyJobSession.java +++ b/src/main/java/com/datastax/cdm/job/CopyJobSession.java @@ -21,7 +21,6 @@ import java.util.concurrent.CompletionStage; import org.apache.logging.log4j.ThreadContext; -import org.apache.spark.SparkConf; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -30,8 +29,8 @@ import com.datastax.cdm.cql.statement.TargetUpsertStatement; import com.datastax.cdm.data.PKFactory; import com.datastax.cdm.data.Record; -import com.datastax.cdm.feature.Guardrail; import com.datastax.cdm.feature.TrackRun; +import com.datastax.cdm.properties.PropertyHelper; import com.datastax.oss.driver.api.core.CqlSession; import com.datastax.oss.driver.api.core.cql.AsyncResultSet; import com.datastax.oss.driver.api.core.cql.BatchStatement; @@ -50,8 +49,8 @@ public class CopyJobSession extends AbstractJobSession { +public class CopyJobSessionFactory implements IJobSessionFactory, Serializable { + private static final long serialVersionUID = 5255029377029801421L; private static CopyJobSession jobSession = null; public AbstractJobSession getInstance(CqlSession originSession, CqlSession targetSession, - SparkConf sc) { + PropertyHelper propHelper) { if (jobSession == null) { synchronized (CopyJobSession.class) { if (jobSession == null) { - jobSession = new CopyJobSession(originSession, targetSession, sc); + jobSession = new CopyJobSession(originSession, targetSession, propHelper); } } } diff --git a/src/main/java/com/datastax/cdm/job/DiffJobSession.java b/src/main/java/com/datastax/cdm/job/DiffJobSession.java index 21d44605..440d5517 100644 --- a/src/main/java/com/datastax/cdm/job/DiffJobSession.java +++ b/src/main/java/com/datastax/cdm/job/DiffJobSession.java @@ -26,7 +26,6 @@ import java.util.stream.StreamSupport; import org.apache.logging.log4j.ThreadContext; -import org.apache.spark.SparkConf; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,9 +40,9 @@ import com.datastax.cdm.feature.ExplodeMap; import com.datastax.cdm.feature.ExtractJson; import com.datastax.cdm.feature.Featureset; -import com.datastax.cdm.feature.Guardrail; import com.datastax.cdm.feature.TrackRun; import com.datastax.cdm.properties.KnownProperties; +import com.datastax.cdm.properties.PropertyHelper; import com.datastax.oss.driver.api.core.CqlSession; import com.datastax.oss.driver.api.core.cql.AsyncResultSet; import com.datastax.oss.driver.api.core.cql.ResultSet; @@ -67,8 +66,8 @@ public class DiffJobSession extends CopyJobSession { private ExtractJson extractJsonFeature; private boolean overwriteTarget; - public DiffJobSession(CqlSession originSession, CqlSession targetSession, SparkConf sc) { - super(originSession, targetSession, sc); + public DiffJobSession(CqlSession originSession, CqlSession targetSession, PropertyHelper propHelper) { + super(originSession, targetSession, propHelper); this.jobCounter.setRegisteredTypes(JobCounter.CounterType.READ, JobCounter.CounterType.VALID, JobCounter.CounterType.MISMATCH, JobCounter.CounterType.CORRECTED_MISMATCH, JobCounter.CounterType.MISSING, JobCounter.CounterType.CORRECTED_MISSING, @@ -118,12 +117,7 @@ public DiffJobSession(CqlSession originSession, CqlSession targetSession, SparkC logger.info("CQL -- target upsert: {}", this.targetSession.getTargetUpsertStatement().getCQL()); } - @Override - public void processSlice(SplitPartitions.Partition slice) { - this.getDataAndDiff(slice.getMin(), slice.getMax()); - } - - private void getDataAndDiff(BigInteger min, BigInteger max) { + protected void processSlice(BigInteger min, BigInteger max) { ThreadContext.put(THREAD_CONTEXT_LABEL, getThreadLabel(min, max)); logger.info("ThreadID: {} Processing min: {} max: {}", Thread.currentThread().getId(), min, max); if (null != trackRunFeature) @@ -151,16 +145,6 @@ private void getDataAndDiff(BigInteger min, BigInteger max) { jobCounter.threadIncrement(JobCounter.CounterType.SKIPPED); } else { for (Record r : pkFactory.toValidRecordList(record)) { - - if (guardrailEnabled) { - String guardrailCheck = guardrailFeature.guardrailChecks(r); - if (guardrailCheck != null && guardrailCheck != Guardrail.CLEAN_CHECK) { - logger.error("Guardrails failed for PrimaryKey {}; {}", r.getPk(), guardrailCheck); - jobCounter.threadIncrement(JobCounter.CounterType.SKIPPED); - continue; - } - } - rateLimiterTarget.acquire(1); CompletionStage targetResult = targetSelectByPKStatement .getAsyncResult(r.getPk()); diff --git a/src/main/java/com/datastax/cdm/job/DiffJobSessionFactory.java b/src/main/java/com/datastax/cdm/job/DiffJobSessionFactory.java index 84434a05..3b09b7c1 100644 --- a/src/main/java/com/datastax/cdm/job/DiffJobSessionFactory.java +++ b/src/main/java/com/datastax/cdm/job/DiffJobSessionFactory.java @@ -15,19 +15,21 @@ */ package com.datastax.cdm.job; -import org.apache.spark.SparkConf; +import java.io.Serializable; +import com.datastax.cdm.properties.PropertyHelper; import com.datastax.oss.driver.api.core.CqlSession; -public class DiffJobSessionFactory implements IJobSessionFactory { +public class DiffJobSessionFactory implements IJobSessionFactory, Serializable { + private static final long serialVersionUID = -3543616512495020278L; private static DiffJobSession jobSession = null; public AbstractJobSession getInstance(CqlSession originSession, CqlSession targetSession, - SparkConf sc) { + PropertyHelper propHelper) { if (jobSession == null) { synchronized (DiffJobSession.class) { if (jobSession == null) { - jobSession = new DiffJobSession(originSession, targetSession, sc); + jobSession = new DiffJobSession(originSession, targetSession, propHelper); } } } diff --git a/src/main/java/com/datastax/cdm/job/GuardrailCheckJobSession.java b/src/main/java/com/datastax/cdm/job/GuardrailCheckJobSession.java index 46ea8856..bdcb74f3 100644 --- a/src/main/java/com/datastax/cdm/job/GuardrailCheckJobSession.java +++ b/src/main/java/com/datastax/cdm/job/GuardrailCheckJobSession.java @@ -18,13 +18,11 @@ import java.math.BigInteger; import org.apache.logging.log4j.ThreadContext; -import org.apache.spark.SparkConf; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.datastax.cdm.cql.statement.OriginSelectByPartitionRangeStatement; -import com.datastax.cdm.data.PKFactory; -import com.datastax.cdm.data.Record; +import com.datastax.cdm.properties.PropertyHelper; import com.datastax.oss.driver.api.core.CqlSession; import com.datastax.oss.driver.api.core.cql.ResultSet; import com.datastax.oss.driver.api.core.cql.Row; @@ -33,15 +31,10 @@ public class GuardrailCheckJobSession extends AbstractJobSession { +public class GuardrailCheckJobSessionFactory implements IJobSessionFactory, Serializable { + private static final long serialVersionUID = -4673384128807660843L; private static GuardrailCheckJobSession jobSession = null; public AbstractJobSession getInstance(CqlSession originSession, CqlSession targetSession, - SparkConf sc) { + PropertyHelper propHelper) { if (jobSession == null) { synchronized (GuardrailCheckJobSession.class) { if (jobSession == null) { - jobSession = new GuardrailCheckJobSession(originSession, targetSession, sc); + jobSession = new GuardrailCheckJobSession(originSession, targetSession, propHelper); } } } diff --git a/src/main/java/com/datastax/cdm/job/IJobSessionFactory.java b/src/main/java/com/datastax/cdm/job/IJobSessionFactory.java index 907eec3a..2e643502 100644 --- a/src/main/java/com/datastax/cdm/job/IJobSessionFactory.java +++ b/src/main/java/com/datastax/cdm/job/IJobSessionFactory.java @@ -15,10 +15,9 @@ */ package com.datastax.cdm.job; -import org.apache.spark.SparkConf; - +import com.datastax.cdm.properties.PropertyHelper; import com.datastax.oss.driver.api.core.CqlSession; public interface IJobSessionFactory { - AbstractJobSession getInstance(CqlSession originSession, CqlSession targetSession, SparkConf sc); + AbstractJobSession getInstance(CqlSession originSession, CqlSession targetSession, PropertyHelper propHelper); } diff --git a/src/main/java/com/datastax/cdm/job/JobCounter.java b/src/main/java/com/datastax/cdm/job/JobCounter.java index 3610fd16..71811eb6 100644 --- a/src/main/java/com/datastax/cdm/job/JobCounter.java +++ b/src/main/java/com/datastax/cdm/job/JobCounter.java @@ -67,7 +67,6 @@ public long getGlobalCounter() { private final HashMap counterMap = new HashMap<>(); // Variables to hold lock objects and registered types - private final Object globalLock = new Object(); private final boolean printPerThread; private final long printStatsAfter; private final CounterUnit printCounter = new CounterUnit(); @@ -128,7 +127,7 @@ public void threadIncrement(CounterType counterType) { // Method to increment global counters based on thread-specific counters public void globalIncrement() { - synchronized (globalLock) { + synchronized (this) { for (CounterType type : counterMap.keySet()) { getCounterUnit(type).addThreadToGlobalCounter(); } diff --git a/src/main/java/com/datastax/cdm/properties/PropertyHelper.java b/src/main/java/com/datastax/cdm/properties/PropertyHelper.java index 863ab0ad..cd8cf534 100644 --- a/src/main/java/com/datastax/cdm/properties/PropertyHelper.java +++ b/src/main/java/com/datastax/cdm/properties/PropertyHelper.java @@ -15,6 +15,7 @@ */ package com.datastax.cdm.properties; +import java.io.Serializable; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -27,7 +28,7 @@ import scala.Tuple2; -public final class PropertyHelper extends KnownProperties implements IPropertyHelper { +public final class PropertyHelper extends KnownProperties implements IPropertyHelper, Serializable { private static PropertyHelper instance = null; public Logger logger = LoggerFactory.getLogger(this.getClass().getName()); diff --git a/src/main/java/com/datastax/cdm/schema/CqlTable.java b/src/main/java/com/datastax/cdm/schema/CqlTable.java index 5c77bd33..79a1c303 100644 --- a/src/main/java/com/datastax/cdm/schema/CqlTable.java +++ b/src/main/java/com/datastax/cdm/schema/CqlTable.java @@ -347,16 +347,20 @@ public Object getAndConvertData(int index, Row row) { return removeNullValuesFromMap(thisObject); } - CqlConversion cqlConversion = this.cqlConversions.get(index); - if (null == cqlConversion) { - if (logTrace) - logger.trace("{} Index:{} not converting:{}", isOrigin ? "origin" : "target", index, thisObject); - return thisObject; + if (null != this.cqlConversions) { + CqlConversion cqlConversion = this.cqlConversions.get(index); + if (null == cqlConversion) { + if (logTrace) + logger.trace("{} Index:{} not converting:{}", isOrigin ? "origin" : "target", index, thisObject); + return thisObject; + } else { + if (logTrace) + logger.trace("{} Index:{} converting:{} via CqlConversion:{}", isOrigin ? "origin" : "target", + index, thisObject, cqlConversion); + return cqlConversion.convert(thisObject); + } } else { - if (logTrace) - logger.trace("{} Index:{} converting:{} via CqlConversion:{}", isOrigin ? "origin" : "target", index, - thisObject, cqlConversion); - return cqlConversion.convert(thisObject); + return thisObject; } } diff --git a/src/main/scala/com/datastax/cdm/job/BaseJob.scala b/src/main/scala/com/datastax/cdm/job/BaseJob.scala index 3e04a69e..6354fb75 100644 --- a/src/main/scala/com/datastax/cdm/job/BaseJob.scala +++ b/src/main/scala/com/datastax/cdm/job/BaseJob.scala @@ -43,6 +43,7 @@ abstract class BaseJob[T: ClassTag] extends App { var propertyHelper: PropertyHelper = _ var consistencyLevel: String = _ + var connectionFetcher: ConnectionFetcher = _ var minPartition: BigInteger = _ var maxPartition: BigInteger = _ var coveragePercent: Int = _ @@ -76,7 +77,7 @@ abstract class BaseJob[T: ClassTag] extends App { runId = System.nanoTime(); } consistencyLevel = propertyHelper.getString(KnownProperties.READ_CL) - val connectionFetcher = new ConnectionFetcher(sContext, propertyHelper) + connectionFetcher = new ConnectionFetcher(sc, propertyHelper) originConnection = connectionFetcher.getConnection(Side.ORIGIN, consistencyLevel, runId) targetConnection = connectionFetcher.getConnection(Side.TARGET, consistencyLevel, runId) @@ -109,7 +110,7 @@ abstract class BaseJob[T: ClassTag] extends App { def getParts(pieces: Int): util.Collection[T] def printSummary(): Unit = { if (parts.size() > 0) { - jobFactory.getInstance(null, null, sc).printCounts(true); + jobFactory.getInstance(null, null, propertyHelper).printCounts(true); } } diff --git a/src/main/scala/com/datastax/cdm/job/BasePartitionJob.scala b/src/main/scala/com/datastax/cdm/job/BasePartitionJob.scala index 592157ee..2f380866 100644 --- a/src/main/scala/com/datastax/cdm/job/BasePartitionJob.scala +++ b/src/main/scala/com/datastax/cdm/job/BasePartitionJob.scala @@ -22,13 +22,14 @@ import com.datastax.cdm.properties.KnownProperties abstract class BasePartitionJob extends BaseJob[SplitPartitions.Partition] { var trackRunFeature: TrackRun = _ + var keyspaceTableValue: String = _ override def getParts(pieces: Int): util.Collection[SplitPartitions.Partition] = { var keyspaceTable: Option[String] = Option(propertyHelper.getString(KnownProperties.TARGET_KEYSPACE_TABLE)) .filter(_.nonEmpty) .orElse(Option(propertyHelper.getString(KnownProperties.ORIGIN_KEYSPACE_TABLE))) - var keyspaceTableValue: String = keyspaceTable.getOrElse { + keyspaceTableValue = keyspaceTable.getOrElse { throw new RuntimeException("Both " + KnownProperties.TARGET_KEYSPACE_TABLE + " and " + KnownProperties.ORIGIN_KEYSPACE_TABLE + " properties are missing.") } diff --git a/src/main/scala/com/datastax/cdm/job/ConnectionFetcher.scala b/src/main/scala/com/datastax/cdm/job/ConnectionFetcher.scala index 0186ceaf..2b723a9e 100644 --- a/src/main/scala/com/datastax/cdm/job/ConnectionFetcher.scala +++ b/src/main/scala/com/datastax/cdm/job/ConnectionFetcher.scala @@ -17,13 +17,13 @@ package com.datastax.cdm.job import com.datastax.cdm.properties.{KnownProperties, IPropertyHelper} import com.datastax.spark.connector.cql.CassandraConnector -import org.apache.spark.{SparkConf, SparkContext} +import org.apache.spark.SparkConf import org.slf4j.{Logger, LoggerFactory} import com.datastax.cdm.data.DataUtility.generateSCB import com.datastax.cdm.data.PKFactory.Side // TODO: CDM-31 - add localDC configuration support -class ConnectionFetcher(sparkContext: SparkContext, propertyHelper: IPropertyHelper) { +class ConnectionFetcher(config: SparkConf, propertyHelper: IPropertyHelper) extends Serializable { val logger: Logger = LoggerFactory.getLogger(this.getClass.getName) def getConnectionDetails(side: Side): ConnectionDetails = { @@ -65,7 +65,6 @@ class ConnectionFetcher(sparkContext: SparkContext, propertyHelper: IPropertyHel def getConnection(side: Side, consistencyLevel: String, runId: Long): CassandraConnector = { val connectionDetails = getConnectionDetails(side) - val config: SparkConf = sparkContext.getConf logger.info("PARAM -- SSL Enabled: "+connectionDetails.sslEnabled); diff --git a/src/main/scala/com/datastax/cdm/job/DiffData.scala b/src/main/scala/com/datastax/cdm/job/DiffData.scala index caaa3cfa..9c4f84eb 100644 --- a/src/main/scala/com/datastax/cdm/job/DiffData.scala +++ b/src/main/scala/com/datastax/cdm/job/DiffData.scala @@ -16,24 +16,37 @@ package com.datastax.cdm.job import com.datastax.cdm.feature.TrackRun +import com.datastax.cdm.data.PKFactory.Side +import com.datastax.cdm.properties.{KnownProperties, PropertyHelper} object DiffData extends BasePartitionJob { setup("Data Validation Job", new DiffJobSessionFactory()) execute() finish() - + protected def execute(): Unit = { if (!parts.isEmpty()) { originConnection.withSessionDo(originSession => targetConnection.withSessionDo(targetSession => - jobFactory.getInstance(originSession, targetSession, sc).initCdmRun(runId, prevRunId, parts, trackRunFeature, TrackRun.RUN_TYPE.DIFF_DATA))); + jobFactory.getInstance(originSession, targetSession, propertyHelper).initCdmRun(runId, prevRunId, parts, trackRunFeature, TrackRun.RUN_TYPE.DIFF_DATA))); + val bcConnectionFetcher = sContext.broadcast(connectionFetcher) + val bcPropHelper = sContext.broadcast(propertyHelper) + val bcJobFactory = sContext.broadcast(jobFactory) + val bcKeyspaceTableValue = sContext.broadcast(keyspaceTableValue) + val bcRunId = sContext.broadcast(runId) slices.foreach(slice => { + if (null == originConnection) { + originConnection = bcConnectionFetcher.value.getConnection(Side.ORIGIN, bcPropHelper.value.getString(KnownProperties.READ_CL), bcRunId.value) + targetConnection = bcConnectionFetcher.value.getConnection(Side.TARGET, bcPropHelper.value.getString(KnownProperties.READ_CL), bcRunId.value) + trackRunFeature = targetConnection.withSessionDo(targetSession => new TrackRun(targetSession, bcKeyspaceTableValue.value)) + } originConnection.withSessionDo(originSession => targetConnection.withSessionDo(targetSession => - jobFactory.getInstance(originSession, targetSession, sc) - .processSlice(slice))) + bcJobFactory.value.getInstance(originSession, targetSession, bcPropHelper.value) + .processSlice(slice, trackRunFeature, bcRunId.value))) }) } } + } diff --git a/src/main/scala/com/datastax/cdm/job/GuardrailCheck.scala b/src/main/scala/com/datastax/cdm/job/GuardrailCheck.scala index 6d439cd2..20eccf2a 100644 --- a/src/main/scala/com/datastax/cdm/job/GuardrailCheck.scala +++ b/src/main/scala/com/datastax/cdm/job/GuardrailCheck.scala @@ -15,6 +15,9 @@ */ package com.datastax.cdm.job +import com.datastax.cdm.data.PKFactory.Side +import com.datastax.cdm.properties.{KnownProperties, PropertyHelper} + object GuardrailCheck extends BasePartitionJob { setup("Guardrail Check Job", new GuardrailCheckJobSessionFactory()) execute() @@ -22,12 +25,21 @@ object GuardrailCheck extends BasePartitionJob { protected def execute(): Unit = { if (!parts.isEmpty()) { + originConnection.withSessionDo(originSession => + jobFactory.getInstance(originSession, null, propertyHelper)); + val bcConnectionFetcher = sContext.broadcast(connectionFetcher) + val bcPropHelper = sContext.broadcast(propertyHelper) + val bcJobFactory = sContext.broadcast(jobFactory) + slices.foreach(slice => { + if (null == originConnection) { + originConnection = bcConnectionFetcher.value.getConnection(Side.ORIGIN, bcPropHelper.value.getString(KnownProperties.READ_CL), 0) + } originConnection.withSessionDo(originSession => - targetConnection.withSessionDo(targetSession => - jobFactory.getInstance(originSession, targetSession, sc) - .processSlice(slice))) + bcJobFactory.value.getInstance(originSession, null, bcPropHelper.value) + .processSlice(slice, null, 0)) }) } } + } diff --git a/src/main/scala/com/datastax/cdm/job/Migrate.scala b/src/main/scala/com/datastax/cdm/job/Migrate.scala index e88709e2..b0002548 100644 --- a/src/main/scala/com/datastax/cdm/job/Migrate.scala +++ b/src/main/scala/com/datastax/cdm/job/Migrate.scala @@ -16,6 +16,8 @@ package com.datastax.cdm.job import com.datastax.cdm.feature.TrackRun +import com.datastax.cdm.data.PKFactory.Side +import com.datastax.cdm.properties.{KnownProperties, PropertyHelper} object Migrate extends BasePartitionJob { setup("Migrate Job", new CopyJobSessionFactory()) @@ -26,13 +28,23 @@ object Migrate extends BasePartitionJob { if (!parts.isEmpty()) { originConnection.withSessionDo(originSession => targetConnection.withSessionDo(targetSession => - jobFactory.getInstance(originSession, targetSession, sc).initCdmRun(runId, prevRunId, parts, trackRunFeature, TrackRun.RUN_TYPE.MIGRATE))); + jobFactory.getInstance(originSession, targetSession, propertyHelper).initCdmRun(runId, prevRunId, parts, trackRunFeature, TrackRun.RUN_TYPE.MIGRATE))); + val bcConnectionFetcher = sContext.broadcast(connectionFetcher) + val bcPropHelper = sContext.broadcast(propertyHelper) + val bcJobFactory = sContext.broadcast(jobFactory) + val bcKeyspaceTableValue = sContext.broadcast(keyspaceTableValue) + val bcRunId = sContext.broadcast(runId) slices.foreach(slice => { + if (null == originConnection) { + originConnection = bcConnectionFetcher.value.getConnection(Side.ORIGIN, bcPropHelper.value.getString(KnownProperties.READ_CL), bcRunId.value) + targetConnection = bcConnectionFetcher.value.getConnection(Side.TARGET, bcPropHelper.value.getString(KnownProperties.READ_CL), bcRunId.value) + trackRunFeature = targetConnection.withSessionDo(targetSession => new TrackRun(targetSession, bcKeyspaceTableValue.value)) + } originConnection.withSessionDo(originSession => targetConnection.withSessionDo(targetSession => - jobFactory.getInstance(originSession, targetSession, sc) - .processSlice(slice))) + bcJobFactory.value.getInstance(originSession, targetSession, bcPropHelper.value) + .processSlice(slice, trackRunFeature, bcRunId.value))) }) } } diff --git a/src/test/java/com/datastax/cdm/feature/GuardrailTest.java b/src/test/java/com/datastax/cdm/feature/GuardrailTest.java index e692db4a..c99aeb44 100644 --- a/src/test/java/com/datastax/cdm/feature/GuardrailTest.java +++ b/src/test/java/com/datastax/cdm/feature/GuardrailTest.java @@ -43,7 +43,7 @@ public void smoke_cleanCheck() { guardrail.loadProperties(propertyHelper); guardrail.initializeAndValidate(originTable, targetTable); - String guardrailChecksResult = guardrail.guardrailChecks(record); + String guardrailChecksResult = guardrail.guardrailChecks(originRow); assertEquals(Guardrail.CLEAN_CHECK, guardrailChecksResult, "guardrailChecks"); } @@ -52,7 +52,7 @@ public void smoke_nullWhenDisabled() { guardrail.loadProperties(propertyHelper); guardrail.initializeAndValidate(originTable, targetTable); - String guardrailChecksResult = guardrail.guardrailChecks(record); + String guardrailChecksResult = guardrail.guardrailChecks(originRow); assertNull(guardrailChecksResult, "guardrailChecks"); } @@ -62,9 +62,9 @@ public void smoke_exceedCheck() { guardrail.loadProperties(propertyHelper); guardrail.initializeAndValidate(originTable, targetTable); - when(targetTable.byteCount(anyInt(),any())).thenReturn(Guardrail.BASE_FACTOR+1); + when(originTable.byteCount(anyInt(),any())).thenReturn(Guardrail.BASE_FACTOR+1); - String guardrailChecksResult = guardrail.guardrailChecks(record); + String guardrailChecksResult = guardrail.guardrailChecks(originRow); assertTrue(guardrailChecksResult.startsWith("Large columns"), "guardrailChecks"); } @@ -76,42 +76,10 @@ public void smoke_explodeMap() { guardrail.loadProperties(propertyHelper); guardrail.initializeAndValidate(originTable, targetTable); - String guardrailChecksResult = guardrail.guardrailChecks(record); + String guardrailChecksResult = guardrail.guardrailChecks(originRow); assertEquals(Guardrail.CLEAN_CHECK, guardrailChecksResult, "guardrailChecks"); } - @Test - public void explodeMap_KeyExceeds() { - defaultClassVariables(); - commonSetupWithoutDefaultClassVariables(true, false, false); - when(propertyHelper.getNumber(KnownProperties.GUARDRAIL_COLSIZE_KB)).thenReturn(1); - - guardrail.loadProperties(propertyHelper); - guardrail.initializeAndValidate(originTable, targetTable); - - when(targetTable.byteCount(eq(explodeMapFeature.getKeyColumnIndex()), any())) - .thenReturn(Guardrail.BASE_FACTOR + 1); - - String guardrailChecksResult = guardrail.guardrailChecks(record); - assertTrue(guardrailChecksResult.startsWith("Large columns"), "guardrailChecks"); - } - - @Test - public void explodeMap_ValueExceeds() { - defaultClassVariables(); - commonSetupWithoutDefaultClassVariables(true, false, false); - when(propertyHelper.getNumber(KnownProperties.GUARDRAIL_COLSIZE_KB)).thenReturn(1); - - guardrail.loadProperties(propertyHelper); - guardrail.initializeAndValidate(originTable, targetTable); - - when(targetTable.byteCount(eq(explodeMapFeature.getValueColumnIndex()), any())) - .thenReturn(Guardrail.BASE_FACTOR + 1); - - String guardrailChecksResult = guardrail.guardrailChecks(record); - assertTrue(guardrailChecksResult.startsWith("Large columns"), "guardrailChecks"); - } - @Test public void loadProperties_configured() { when(propertyHelper.getNumber(KnownProperties.GUARDRAIL_COLSIZE_KB)).thenReturn(colSizeInKB); @@ -166,8 +134,7 @@ public void initializeAndValidate_invalidOrigin() { @Test public void initializeAndValidate_invalidTarget() { - when(originTable.isOrigin()).thenReturn(true); - when(targetTable.isOrigin()).thenReturn(true); + when(originTable.isOrigin()).thenReturn(false); guardrail.loadProperties(propertyHelper); boolean initializeAndValidateResult = guardrail.initializeAndValidate(originTable, targetTable); @@ -190,7 +157,7 @@ public void checkWhenDisabled() { guardrail.loadProperties(propertyHelper); guardrail.initializeAndValidate(originTable, targetTable); - String guardrailChecksResult = guardrail.guardrailChecks(record); + String guardrailChecksResult = guardrail.guardrailChecks(originRow); assertNull(guardrailChecksResult, "guardrailChecks"); } @@ -212,7 +179,7 @@ public void checkWithNullOriginRow() { guardrail.loadProperties(propertyHelper); guardrail.initializeAndValidate(originTable, targetTable); - String guardrailChecksResult = guardrail.guardrailChecks(record); + String guardrailChecksResult = guardrail.guardrailChecks(originRow); assertEquals(Guardrail.CLEAN_CHECK, guardrailChecksResult, "guardrailChecks"); } diff --git a/src/test/java/com/datastax/cdm/job/ConnectionFetcherTest.java b/src/test/java/com/datastax/cdm/job/ConnectionFetcherTest.java index 123e3191..ffc224ec 100644 --- a/src/test/java/com/datastax/cdm/job/ConnectionFetcherTest.java +++ b/src/test/java/com/datastax/cdm/job/ConnectionFetcherTest.java @@ -18,7 +18,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.mockito.Mockito.when; -import org.apache.spark.SparkContext; +import org.apache.spark.SparkConf; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.mockito.Mock; @@ -35,7 +35,7 @@ public class ConnectionFetcherTest extends CommonMocks { IPropertyHelper propertyHelper; @Mock - private SparkContext context; + private SparkConf conf; private ConnectionFetcher cf; @@ -45,7 +45,7 @@ public void setup() { commonSetupWithoutDefaultClassVariables(); MockitoAnnotations.openMocks(this); - cf = new ConnectionFetcher(context, propertyHelper); + cf = new ConnectionFetcher(conf, propertyHelper); } @Test