Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cannot list splits for table 'table_name' reading topic 'topic_name' #24783

Open
ayronmax opened this issue Jan 23, 2025 · 3 comments
Open

Cannot list splits for table 'table_name' reading topic 'topic_name' #24783

ayronmax opened this issue Jan 23, 2025 · 3 comments

Comments

@ayronmax
Copy link

Description

I am using a Trino Cluster running in Docker to connect Apache Kafka to Oracle Cloud Infrastructure. While attempting to query a Kafka topic (customer), the query fails with the following error:

trino-coordinator  | 2025-01-23T00:23:49.317Z	DEBUG	dispatcher-query-5	io.trino.execution.QueryStateMachine	Query 20250123_002229_00000_7d8d6 is FAILED
trino-coordinator  | 2025-01-23T00:23:49.317Z	DEBUG	Query-20250123_002229_00000_7d8d6-187	io.trino.execution.QueryStateMachine	Query 20250123_002229_00000_7d8d6 failed
trino-coordinator  | io.trino.spi.TrinoException: Cannot list splits for table 'customer' reading topic 'customer'
trino-coordinator  | 	at io.trino.plugin.kafka.KafkaSplitManager.getSplits(KafkaSplitManager.java:109)
trino-coordinator  | 	at io.trino.plugin.base.classloader.ClassLoaderSafeConnectorSplitManager.getSplits(ClassLoaderSafeConnectorSplitManager.java:51)
trino-coordinator  | 	at io.trino.split.SplitManager.getSplits(SplitManager.java:89)
trino-coordinator  | 	at io.trino.sql.planner.SplitSourceFactory$Visitor.createSplitSource(SplitSourceFactory.java:183)
trino-coordinator  | 	at io.trino.sql.planner.SplitSourceFactory$Visitor.visitTableScan(SplitSourceFactory.java:156)
trino-coordinator  | 	at io.trino.sql.planner.SplitSourceFactory$Visitor.visitTableScan(SplitSourceFactory.java:130)
trino-coordinator  | 	at io.trino.sql.planner.plan.TableScanNode.accept(TableScanNode.java:218)
trino-coordinator  | 	at io.trino.sql.planner.SplitSourceFactory$Visitor.visitLimit(SplitSourceFactory.java:378)
trino-coordinator  | 	at io.trino.sql.planner.SplitSourceFactory$Visitor.visitLimit(SplitSourceFactory.java:130)
trino-coordinator  | 	at io.trino.sql.planner.plan.LimitNode.accept(LimitNode.java:123)
trino-coordinator  | 	at io.trino.sql.planner.SplitSourceFactory.createSplitSources(SplitSourceFactory.java:110)
trino-coordinator  | 	at io.trino.execution.scheduler.PipelinedQueryScheduler$DistributedStagesScheduler.createStageScheduler(PipelinedQueryScheduler.java:1054)
trino-coordinator  | 	at io.trino.execution.scheduler.PipelinedQueryScheduler$DistributedStagesScheduler.create(PipelinedQueryScheduler.java:930)
trino-coordinator  | 	at io.trino.execution.scheduler.PipelinedQueryScheduler.createDistributedStagesScheduler(PipelinedQueryScheduler.java:320)
trino-coordinator  | 	at io.trino.execution.scheduler.PipelinedQueryScheduler.start(PipelinedQueryScheduler.java:303)
trino-coordinator  | 	at io.trino.execution.SqlQueryExecution.start(SqlQueryExecution.java:446)
trino-coordinator  | 	at io.trino.execution.SqlQueryManager.createQuery(SqlQueryManager.java:272)
trino-coordinator  | 	at io.trino.dispatcher.LocalDispatchQuery.startExecution(LocalDispatchQuery.java:150)
trino-coordinator  | 	at io.trino.dispatcher.LocalDispatchQuery.lambda$waitForMinimumWorkers$2(LocalDispatchQuery.java:134)
trino-coordinator  | 	at io.airlift.concurrent.MoreFutures.lambda$addSuccessCallback$12(MoreFutures.java:570)
trino-coordinator  | 	at io.airlift.concurrent.MoreFutures$3.onSuccess(MoreFutures.java:545)
trino-coordinator  | 	at com.google.common.util.concurrent.Futures$CallbackListener.run(Futures.java:1139)
trino-coordinator  | 	at io.trino.$gen.Trino_468____20250123_002212_2.run(Unknown Source)
trino-coordinator  | 	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
trino-coordinator  | 	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
trino-coordinator  | 	at java.base/java.lang.Thread.run(Thread.java:1575)
trino-coordinator  | Caused by: org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata
trino-coordinator  | 
trino-coordinator  | 
trino-coordinator  | 2025-01-23T00:23:49.320Z	INFO	dispatcher-query-5	io.trino.event.QueryMonitor	TIMELINE: Query 20250123_002229_00000_7d8d6 :: FAILED (KAFKA_SPLIT_ERROR) :: elapsed 79439ms :: planning 79439ms :: waiting 0ms :: scheduling 0ms :: running 0ms :: finishing 0ms :: begin 2025-01-23T00:22:29.875Z :: end 2025-01-23T00:23:49.314Z`

Error Details

Query ID: 20250123_002229_00000_7d8d6
Error Type: KAFKA_SPLIT_ERROR
Elapsed Time: 79,439 ms

Configuration Details

Kafka Connector Configuration

connector.name=kafka
kafka.nodes=cell-1.streaming.us-ashburn-1.oci.oraclecloud.com:9092
kafka.table-names=customer
kafka.hide-internal-columns=false
kafka.config.resources=${ENV:CONTAINER_TRINO_CONFIG_PATH}/kafka/config/kafka-configuration.properties
kafka.table-description-supplier=FILE
kafka.table-description-dir=${ENV:CONTAINER_TRINO_CONFIG_PATH}/kafka/table_description

Kafka Resources Configuration (kafka-configuration.properties)

kafka.security.protocol=SASL_SSL
kafka.sasl.mechanism=PLAIN
kafka.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="username" password="password";

Kafka Table Description

{
   "tableName": "customer",
   "schemaName": "default",
   "topicName": "customer",
   "key": {
       "dataFormat": "raw",
       "fields": [
           {
               "name": "kafka_key",
               "type": "VARCHAR",
               "hidden": "false"
           }
       ]
   },
   "message": {
       "dataFormat": "json",
       "fields": [
           { "name": "row_number", "mapping": "rowNumber", "type": "BIGINT" },
           { "name": "customer_key", "mapping": "customerKey", "type": "BIGINT" },
           { "name": "name", "mapping": "name", "type": "VARCHAR" },
           { "name": "address", "mapping": "address", "type": "VARCHAR" },
           { "name": "nation_key", "mapping": "nationKey", "type": "BIGINT" },
           { "name": "phone", "mapping": "phone", "type": "VARCHAR" },
           { "name": "account_balance", "mapping": "accountBalance", "type": "DOUBLE" },
           { "name": "market_segment", "mapping": "marketSegment", "type": "VARCHAR" },
           { "name": "comment", "mapping": "comment", "type": "VARCHAR" }
       ]
   }
}

Kafka Topic Data

Topic: customer

Key: 12345 (string)

Value: (JSON)

{
  "rowNumber" : 1,
  "customerKey" : 1001,
  "name" : "John Doe",
  "address" : "123 Elm Street, Springfield",
  "nationKey" : 42,
  "phone" : "+1-800-555-1234",
  "accountBalance" : 12345.67,
  "marketSegment" : "Retail",
  "comment" : "Frequent high-value customer."
}

Steps to Reproduce

  1. Start the Trino Cluster in Docker with the provided configurations.
  2. Attempt to query the customer Kafka topic using Trino.
  3. Observe the error indicating a failure to list splits.

Expected Behavior
Trino should successfully list splits and execute the query against the customer topic.

Actual Behavior
The query fails with a TimeoutException while fetching topic metadata.

Environment Details

  • Trino Version: 468
  • Kafka Version: 3.9
  • OCI Streaming Endpoint: cell-1.streaming.us-ashburn-1.oci.oraclecloud.com:9092

Additional Notes

I have verified the following:

  1. Kafka is reachable from the Trino container.
  2. Topic customer exists and is populated with data.
  3. SASL/SSL configuration is correctly set, and credentials are valid.

Any insights or assistance on resolving this issue would be greatly appreciated!

@ebyhr
Copy link
Member

ebyhr commented Jan 24, 2025

Do you face this issue with all topics or a specific topic? Can you test with a small topic?

@ayronmax
Copy link
Author

Thank you for your response. I will test with a smaller topic and let you know the results.

@ayronmax
Copy link
Author

I tested with a smaller topic using the following configuration:

Table Description:

{
    "tableName": "small_topic",
    "schemaName": "default",
    "topicName": "small_topic",
    "key": {
        "dataFormat": "raw",
        "fields": [
            {
                "name": "kafka_key",
                "type": "VARCHAR",
                "hidden": "false"
            }
        ]
    },
    "message": {
        "dataFormat": "json",
        "fields": [
            {
                "name": "row_number",
                "mapping": "rowNumber",
                "type": "BIGINT"
            }
        ]
    }
}

Topic Data:

  • Topic: small_topic
  • Key: 12345 (string)
  • Value:
{
  "rowNumber": 1
}

Despite testing with a smaller dataset, the same error persists.

Reported Error:

trino-coordinator  | 2025-01-24T15:09:09.335Z	DEBUG	dispatcher-query-7	io.trino.execution.QueryStateMachine	Query 20250124_150809_00013_uvva8 is FAILED
trino-coordinator  | 2025-01-24T15:09:09.335Z	DEBUG	Query-20250124_150809_00013_uvva8-250	io.trino.execution.QueryStateMachine	Query 20250124_150809_00013_uvva8 failed
trino-coordinator  | io.trino.spi.TrinoException: Cannot list splits for table 'small_topic' reading topic 'small_topic'
trino-coordinator  | 	at io.trino.plugin.kafka.KafkaSplitManager.getSplits(KafkaSplitManager.java:109)
trino-coordinator  | 	at io.trino.plugin.base.classloader.ClassLoaderSafeConnectorSplitManager.getSplits(ClassLoaderSafeConnectorSplitManager.java:51)
trino-coordinator  | 	at io.trino.split.SplitManager.getSplits(SplitManager.java:89)
trino-coordinator  | 	at io.trino.sql.planner.SplitSourceFactory$Visitor.createSplitSource(SplitSourceFactory.java:183)
trino-coordinator  | 	at io.trino.sql.planner.SplitSourceFactory$Visitor.visitTableScan(SplitSourceFactory.java:156)
trino-coordinator  | 	at io.trino.sql.planner.SplitSourceFactory$Visitor.visitTableScan(SplitSourceFactory.java:130)
trino-coordinator  | 	at io.trino.sql.planner.plan.TableScanNode.accept(TableScanNode.java:218)
trino-coordinator  | 	at io.trino.sql.planner.SplitSourceFactory$Visitor.visitLimit(SplitSourceFactory.java:378)
trino-coordinator  | 	at io.trino.sql.planner.SplitSourceFactory$Visitor.visitLimit(SplitSourceFactory.java:130)
trino-coordinator  | 	at io.trino.sql.planner.plan.LimitNode.accept(LimitNode.java:123)
trino-coordinator  | 	at io.trino.sql.planner.SplitSourceFactory.createSplitSources(SplitSourceFactory.java:110)
trino-coordinator  | 	at io.trino.execution.scheduler.PipelinedQueryScheduler$DistributedStagesScheduler.createStageScheduler(PipelinedQueryScheduler.java:1054)
trino-coordinator  | 	at io.trino.execution.scheduler.PipelinedQueryScheduler$DistributedStagesScheduler.create(PipelinedQueryScheduler.java:930)
trino-coordinator  | 	at io.trino.execution.scheduler.PipelinedQueryScheduler.createDistributedStagesScheduler(PipelinedQueryScheduler.java:320)
trino-coordinator  | 	at io.trino.execution.scheduler.PipelinedQueryScheduler.start(PipelinedQueryScheduler.java:303)
trino-coordinator  | 	at io.trino.execution.SqlQueryExecution.start(SqlQueryExecution.java:446)
trino-coordinator  | 	at io.trino.execution.SqlQueryManager.createQuery(SqlQueryManager.java:272)
trino-coordinator  | 	at io.trino.dispatcher.LocalDispatchQuery.startExecution(LocalDispatchQuery.java:150)
trino-coordinator  | 	at io.trino.dispatcher.LocalDispatchQuery.lambda$waitForMinimumWorkers$2(LocalDispatchQuery.java:134)
trino-coordinator  | 	at io.airlift.concurrent.MoreFutures.lambda$addSuccessCallback$12(MoreFutures.java:570)
trino-coordinator  | 	at io.airlift.concurrent.MoreFutures$3.onSuccess(MoreFutures.java:545)
trino-coordinator  | 	at com.google.common.util.concurrent.Futures$CallbackListener.run(Futures.java:1139)
trino-coordinator  | 	at io.trino.$gen.Trino_468____20250124_150651_2.run(Unknown Source)
trino-coordinator  | 	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
trino-coordinator  | 	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
trino-coordinator  | 	at java.base/java.lang.Thread.run(Thread.java:1575)
trino-coordinator  | Caused by: org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Development

No branches or pull requests

2 participants