Skip to content

Commit

Permalink
Drop primary key constraint
Browse files Browse the repository at this point in the history
  • Loading branch information
ryannedolan committed Jul 9, 2024
1 parent c3fa16a commit c824193
Show file tree
Hide file tree
Showing 7 changed files with 14 additions and 34 deletions.
7 changes: 6 additions & 1 deletion .github/workflows/integration-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,16 @@ jobs:
kubectl describe kafkas -n kafka
kubectl describe flinkdeployments
kubectl describe subscriptions
- name: Capture Flink Job Logs
if: always()
run: kubectl logs $(kubectl get pods -l component=jobmanager -o name) $(kubectl get pods -l component=taskmanager -o name) --since=0s
- name: Capture Hoptimator Operator Logs
if: always()
run: kubectl logs $(kubectl get pods -l app=hoptimator-operator -o name)
- name: Capture Flink Operator Logs
if: always()
run: kubectl logs $(kubectl get pods -l app.kubernetes.io/name=flink-kubernetes-operator -o name)
- name: Capture Flink Job Logs
if: always()
run: kubectl logs $(kubectl get pods -l app.kubernetes.io/name=flink-kubernetes-operator -o name)


2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ deploy-dev-environment:
kubectl create -f https://github.com/jetstack/cert-manager/releases/download/v1.8.2/cert-manager.yaml || echo "skipping"
kubectl create namespace kafka || echo "skipping"
kubectl create namespace mysql || echo "skipping"
helm repo add flink-operator-repo https://downloads.apache.org/flink/flink-kubernetes-operator-1.6.1/
helm repo add flink-operator-repo https://downloads.apache.org/flink/flink-kubernetes-operator-1.9.0/
helm upgrade --install --atomic --set webhook.create=false flink-kubernetes-operator flink-operator-repo/flink-kubernetes-operator
kubectl apply -f "https://strimzi.io/install/latest?namespace=kafka" -n kafka
kubectl wait --for=condition=Established=True crds/kafkas.kafka.strimzi.io
Expand Down
4 changes: 2 additions & 2 deletions deploy/samples/subscriptions.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@
apiVersion: hoptimator.linkedin.com/v1alpha1
kind: Subscription
metadata:
name: products
name: names
spec:
sql: SELECT "quantity", "product_id" AS KEY FROM INVENTORY."products_on_hand"
sql: SELECT NAME, NAME AS KEY FROM DATAGEN.PERSON
database: RAWKAFKA


10 changes: 2 additions & 8 deletions etc/integration-tests.sql
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,6 @@ SELECT * FROM DATAGEN.COMPANY;
!insert into RAWKAFKA."test-sink" SELECT AGE AS PAYLOAD, NAME AS KEY FROM DATAGEN.PERSON
SELECT * FROM RAWKAFKA."test-sink" LIMIT 5;

-- MySQL CDC tables
SELECT * FROM INVENTORY."products_on_hand" LIMIT 1;

-- Test check command
!check not empty SELECT * FROM INVENTORY."products_on_hand";

-- MySQL CDC -> Kafka (via sample subscription "products")
SELECT * FROM RAWKAFKA."products" LIMIT 1;
-- read from sample subscription "names"
SELECT * FROM RAWKAFKA."names" LIMIT 1;

Original file line number Diff line number Diff line change
Expand Up @@ -168,9 +168,6 @@ public SqlNode visit(SqlCall call) {

/**
* Implements a CREATE TABLE...WITH... DDL statement.
*
* N.B. the following magic:
* - field 'KEY' is treated as a PRIMARY KEY
*/
class ConnectorImplementor implements ScriptImplementor {
private final String database;
Expand All @@ -192,11 +189,8 @@ public void implement(SqlWriter w) {
(new CompoundIdentifierImplementor(database, name)).implement(w);
SqlWriter.Frame frame1 = w.startList("(", ")");
(new RowTypeSpecImplementor(rowType)).implement(w);
if (rowType.getField("KEY", true, false) != null) {
w.sep(",");
w.literal("PRIMARY KEY (KEY) NOT ENFORCED");
}
w.endList(frame1);
// TODO support PRIMARY KEY for Tables that support it
// TODO support PARTITIONED BY for Tables that support it
w.keyword("WITH");
SqlWriter.Frame frame2 = w.startList("(", ")");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,9 @@ public Schema create(SchemaPlus parentSchema, String name, Map<String, Object> o
.with("KEY", DataType.VARCHAR);
ConfigProvider connectorConfigProvider = ConfigProvider.from(clientConfig)
.withPrefix("properties.")
.with("connector", "upsert-kafka")
.with("connector", "kafka")
.with("key.format", "csv")
.with("key.fields", "KEY")
.with("value.format", "csv")
.with("value.fields-include", "EXCEPT_KEY")
.with("topic", x -> x);
Expand Down
14 changes: 0 additions & 14 deletions test-model.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,3 @@ schemas:
bootstrap.servers: one-kafka-bootstrap.kafka.svc:9092
group.id: hoptimator-test
auto.offset.reset: earliest

- name: INVENTORY
type: custom
factory: com.linkedin.hoptimator.catalog.mysql.MySqlCdcSchemaFactory
operand:
username: root
password: debezium
hostname: mysql.mysql.svc.cluster.local
port: 3306
database: inventory
urlSuffix: "?useUnicode=true&characterEncoding=UTF-8&useSSL=false"
connectorConfig:
scan.incremental.snapshot.enabled: false

0 comments on commit c824193

Please sign in to comment.