-
Notifications
You must be signed in to change notification settings - Fork 988
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Latency Spike during Spark Structured Streaming #2165
Comments
Pyspark w/ Python bulk apiI tested the same task w/ Elasticsearch Python API def updateToESPythonAPI(df, epoch_id):
es = Elasticsearch([
'http://esfarm-cluster.~~~~~~~.com:10200',
], http_auth=('id', 'pw!'))
index_name = "structured_python01"
data = []
for row in df.toJSON().collect():
json_ = json.loads(row)
data.append(
{
"_index": index_name,
"_id": json_['doc_id'],
"_source": row,
}
)
count = len(data)
start = time.time()
helpers.bulk(es, data)
end = time.time()
print("ES Bulk Sended !")
print(f"It took {end - start:.5f} sec sending {count} items. \n") and it showed no latency spike I doubt there's some bug in elasticsearch-spark-30 connector Or just the discrepancy of the version could raise unexpected behavior ? Pyspark w/ elasticsearch SinkAnd w/ Elasticsearch Connector, |
Scala Spark + foreachBatch + Batch APIHi guys, i tested again with Scala Spark (3.1.2-2) It's just as same as Streaming Sink but, rather used batch API in foreachBatch function. val esQuery = df
.writeStream
.queryName("ES Bulk Update")
.option("truncate", "false")
.format("console")
.trigger(org.apache.spark.sql.streaming.Trigger.ProcessingTime("5 seconds"))
.outputMode("append")
.foreachBatch {
(batchDF: DataFrame, batchId: Long) =>
...
batchDF.write
.format("org.elasticsearch.spark.sql")
.option("es.nodes", endpoint)
.option("es.resource", index)
.option("es.mapping.id", "doc_id")
.option("es.net.http.auth.user", username)
.option("es.net.http.auth.pass", password)
.option("es.nodes.wan.only", "true")
.option("es.write.operation", "upsert")
.mode("append")
.save()
}
.start() And it displayed no latency spike I believe that there's some problem in the |
What kind an issue is this?
The easier it is to track down the bug, the faster it is solved.
Often a solution already exists! Don’t send pull requests to implement new features without
first getting our support. Sometimes we leave features out on purpose to keep the project small.
Issue description
I'm using elasticsearch-hadoop to stream data into Elasticsearch server using Spark Structured Streaming.
But during streaming, it shows repetitive and periodic latency spike of Operation Duration in spark.
Spark
When i tried Structured Streaming w/ console sink (print to console), it shows the stable low latency
ES
I digged into ES montoring w/ Grafana, but it shows nothing special w/ ES.
It shows indexing time lower than 50ms and it doesn't matter w/ that high latency in streaming (about 50s)
ES-Hadoop connector
I saw the elasticsearch-hadoop code and i found out that it uses Bulk API to send Dataframe to ES w/ Hadoop HTTP Client.
At first, i believed that ES does not response fast enough to spark to commit the operation even if it has done indexing
But, for now i have no idea about this latency spike
Do you have any case like this ?
Steps to reproduce
Version Info
OS: : Centos7
JVM : Java 1.8.0_112
Hadoop/Spark: Spark 3.1.2-2,
Hadoop 3.1.1.3.1.2-39
ES-Hadoop : 7.12.1 (Scala Spark https://mvnrepository.com/artifact/org.elasticsearch/elasticsearch-spark-30)
ES : 7.10.1 (Elasticsearch Server)
Feature description
The text was updated successfully, but these errors were encountered: