Custom Electricity Market Management System (MMS) CSV reader library for Apache Spark.
This library can be used to efficiently read MMS data model reports in bulk - e.g. from monthly DVDs: (http://www.nemweb.com.au/Data_Archive/Wholesale_Electricity/MMSDM/2021/MMSDM_2021_08/MMSDM_Historical_Data_SQLLoader/DOCUMENTATION/Participant_Monthly_DVD.pdf)
It uses Spark's DataSource V2 API.
It reads files in AEMO's CSV format: (https://aemo.com.au/-/media/files/market-it-systems/guide-to-csv-data-format-standard.pdf?la=en)
- Partitions large files to avoid out of memory (OOM) errors
- Supports multiple reports per file
- Supports zipped files
- Supports filter pushdown
- Supports column pruning
- Reads report schemas from input files
- Registers {report_type, report_subtype, report_version} as temporary tables
Available from AEMO (Australian Energy Market Operator):
- Prerequisites:
- Maven
- OpenJDK 17
- Spark 3.5.1, prebuilt for Apache Hadoop 3.3 and later (https://spark.apache.org/downloads.html)
- Clone this repository
git clone https://github.com/niftimus/SparkMMS.git
- Compile
cd SparkMMS
mvn install
- Confirm the JAR library is built:
ls -la ./target/SparkMMS-0.4-SNAPSHOT.jar
- Start PySpark
# Ensure SPARK_HOME is set to the directory where Spark has been uncompressed
# export SPARK_HOME = <path_to_spark>
cd SparkMMS
$SPARK_HOME/bin/pyspark --jars ./target/SparkMMS-0.4-SNAPSHOT.jar --packages org.apache.hadoop:hadoop-azure:3.4.0,org.apache.hadoop:hadoop-aws:3.4.0
- Read in a sample file
df = spark \
.read \
.format("com.analyticsanvil.SparkMMS") \
.option("fileName", "./target/test-classes/com/analyticsanvil/test/PUBLIC_DVD_TRADINGLOAD_202010010000.CSV") \
.option("maxRowsPerPartition","50000") \
.option("minSplitFilesize","1000000") \
.load()
- Show row chunks:
df.show()
- Get a single report and show the results:
# Get a new dataframe with the schema of a single report type
def getReport(df, report_type, report_subtype, report_version):
from pyspark.sql.functions import explode
df = df.where(f"report_type = '{report_type}' and report_subtype = '{report_subtype}' and report_version = {report_version}")
tmpDF = df.select("column_headers", explode(df.data).alias("datarow"))
colHeaders = df.select("column_headers").first().column_headers
for idx, colName in enumerate(colHeaders):
tmpDF = tmpDF.withColumn(colName, tmpDF.datarow[idx])
tmpDF = tmpDF.drop("column_headers").drop("datarow")
return tmpDF
d=getReport(df, report_type = 'TRADING', report_subtype = 'UNIT_SOLUTION', report_version = 2)
d.show(20, False)
- Register the report as a temporary table and query using SQL:
# Register all reports available in the dataframe as temporary view in the metastore
def registerAllReports(df):
tmpDF = df.select("report_type","report_subtype","report_version")
tmpDF = tmpDF.dropDuplicates()
reports = tmpDF.collect()
for r in reports:
tmpReportDF = getReport(df,r.report_type,r.report_subtype,r.report_version)
tmpReportDF.createOrReplaceTempView(f"{r.report_type}_{r.report_subtype}_{r.report_version}")
registerAllReports(df)
spark.sql("show tables;").show()
spark.sql("select * from TRADING_UNIT_SOLUTION_2").show()