-
Notifications
You must be signed in to change notification settings - Fork 718
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[SPARKNLP-1098] Adding PDF reader support
- Loading branch information
Showing
18 changed files
with
713 additions
and
46 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,119 @@ | ||
/* | ||
* Copyright 2017-2025 John Snow Labs | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package com.johnsnowlabs.reader | ||
|
||
import com.johnsnowlabs.nlp.util.io.ResourceHelper | ||
import org.apache.pdfbox.pdmodel.PDDocument | ||
import org.apache.pdfbox.text.{PDFTextStripper, TextPosition} | ||
import org.apache.spark.sql.DataFrame | ||
import org.apache.spark.sql.functions.{col, udf} | ||
|
||
import scala.collection.JavaConverters._ | ||
import scala.collection.mutable | ||
|
||
class PdfReader extends Serializable { | ||
|
||
private val spark = ResourceHelper.spark | ||
import spark.implicits._ | ||
|
||
private val MAX_CHARACTER_BEFORE_HEADER = 1000 | ||
|
||
def pdf(filePath: String): DataFrame = { | ||
if (ResourceHelper.validFile(filePath)) { | ||
val binaryFilesRDD = spark.sparkContext.binaryFiles(filePath) | ||
val byteArrayRDD = binaryFilesRDD.map { case (path, portableDataStream) => | ||
val byteArray = portableDataStream.toArray() | ||
(path, byteArray) | ||
} | ||
byteArrayRDD | ||
.toDF("path", "content") | ||
.withColumn("pdf", parsePdfUDF(col("content"))) | ||
// .select(col("path"), col("pdf")) //TODO: Add parameter to output content bytes | ||
} else throw new IllegalArgumentException(s"Invalid filePath: $filePath") | ||
} | ||
|
||
private val parsePdfUDF = udf((data: Array[Byte]) => { | ||
pdfToText(data) | ||
}) | ||
|
||
private def pdfToText(content: Array[Byte]): Seq[HTMLElement] = { | ||
val validPdf = checkAndFixPdf(content) | ||
val pdfDoc = PDDocument.load(validPdf) | ||
val numPages = pdfDoc.getNumberOfPages | ||
require(numPages >= 1, "pdf input stream cannot be empty") | ||
|
||
val result = pdfboxMethod(pdfDoc, 0, numPages - 1) | ||
pdfDoc.close() | ||
result | ||
} | ||
|
||
private def pdfboxMethod( | ||
pdfDoc: => PDDocument, | ||
startPage: Int, | ||
endPage: Int): Seq[HTMLElement] = { | ||
val metadata = mutable.Map[String, String]() | ||
metadata += ("fileName" -> pdfDoc.getDocumentInformation.getTitle) | ||
extractText(pdfDoc, startPage, endPage, metadata) | ||
} | ||
|
||
private def checkAndFixPdf(content: Array[Byte]): Array[Byte] = { | ||
val pdfStartIndex = new String( | ||
content.slice(0, Math.min(MAX_CHARACTER_BEFORE_HEADER, content.length))).indexOf("%PDF") | ||
if (pdfStartIndex == -1) throw new RuntimeException("Pdf document is not valid") | ||
val validContent = content.slice(pdfStartIndex, content.length) | ||
validContent | ||
} | ||
|
||
private def extractText( | ||
pdfDoc: PDDocument, | ||
startPage: Int, | ||
endPage: Int, | ||
metadata: mutable.Map[String, String]): Seq[HTMLElement] = { | ||
class CustomPDFTextStripper extends PDFTextStripper { | ||
val textDetails = new java.util.ArrayList[(String, Float, String)]() | ||
|
||
override protected def processTextPosition(text: TextPosition): Unit = { | ||
val char = text.getUnicode | ||
val y = text.getY | ||
val fontName = text.getFont.getName | ||
textDetails.add((char, y, fontName)) | ||
super.processTextPosition(text) | ||
} | ||
} | ||
|
||
val pdfStripper = new CustomPDFTextStripper | ||
pdfStripper.setSortByPosition(true) | ||
pdfStripper.setStartPage(startPage + 1) | ||
pdfStripper.setEndPage(endPage + 1) | ||
pdfStripper.getText(pdfDoc) | ||
|
||
val groupedByLine = pdfStripper.textDetails.asScala.groupBy { case (_, y, _) => y } | ||
val elements = groupedByLine.flatMap { case (_, chars) => | ||
val lineText = chars.map(_._1).mkString | ||
val isBold = chars.exists { case (_, _, fontName) => | ||
fontName.toLowerCase.contains("bold") || fontName.toLowerCase.contains("bd") | ||
} | ||
val elementType = if (isBold) { | ||
ElementType.TITLE | ||
} else { | ||
ElementType.NARRATIVE_TEXT | ||
} | ||
if (lineText.trim.nonEmpty) Some(HTMLElement(elementType, lineText, metadata)) else None | ||
} | ||
|
||
elements.toSeq | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,193 @@ | ||
/* | ||
* Copyright 2017-2025 John Snow Labs | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package com.johnsnowlabs.reader | ||
|
||
import com.johnsnowlabs.nlp.IAnnotation | ||
import com.johnsnowlabs.reader.util.pdf._ | ||
import org.apache.pdfbox.pdmodel.PDDocument | ||
import org.apache.pdfbox.text.PDFTextStripper | ||
import org.apache.spark.internal.Logging | ||
import org.apache.spark.ml.Transformer | ||
import org.apache.spark.ml.param.shared.{HasInputCol, HasOutputCol} | ||
import org.apache.spark.ml.param.{BooleanParam, IntParam, Param, ParamMap} | ||
import org.apache.spark.ml.util.{DefaultParamsWritable, Identifiable} | ||
import org.apache.spark.sql.expressions.UserDefinedFunction | ||
import org.apache.spark.sql.functions.{col, posexplode_outer, udf} | ||
import org.apache.spark.sql.types._ | ||
import org.apache.spark.sql.{DataFrame, Dataset} | ||
|
||
import scala.util.{Failure, Success, Try} | ||
|
||
class PdfToText(override val uid: String) | ||
extends Transformer | ||
with DefaultParamsWritable | ||
with HasInputValidator | ||
with HasInputCol | ||
with HasOutputCol | ||
with HasLocalProcess | ||
with PdfToTextTrait { | ||
|
||
def this() = this(Identifiable.randomUID("PDF_TO_TEXT_TRANSFORMER")) | ||
|
||
override def copy(extra: ParamMap): Transformer = defaultCopy(extra) | ||
|
||
protected def outputDataType: StructType = new StructType() | ||
.add($(outputCol), StringType) | ||
.add("height_dimension", IntegerType) | ||
.add("width_dimension", IntegerType) | ||
.add($(inputCol), BinaryType) | ||
.add("exception", StringType) | ||
.add($(pageNumCol), IntegerType) | ||
|
||
override def transformSchema(schema: StructType): StructType = { | ||
// Add the return fields | ||
validateInputCol(schema, $(inputCol), BinaryType) | ||
validateInputCol(schema, $(originCol), StringType) | ||
schema | ||
.add(StructField($(outputCol), StringType, nullable = false)) | ||
.add(StructField($(pageNumCol), IntegerType, nullable = false)) | ||
} | ||
|
||
final val pageNumCol = new Param[String](this, "pageNumCol", "Page number output column name.") | ||
final val originCol = | ||
new Param[String](this, "originCol", "Input column name with original path of file.") | ||
final val partitionNum = new IntParam(this, "partitionNum", "Number of partitions.") | ||
final val storeSplittedPdf = | ||
new BooleanParam(this, "storeSplittedPdf", "Force to store splitted pdf.") | ||
|
||
/** @group getParam */ | ||
def setOriginCol(value: String): this.type = set(originCol, value) | ||
|
||
/** @group setParam */ | ||
def setInputCol(value: String): this.type = set(inputCol, value) | ||
|
||
/** @group setParam */ | ||
def setOutputCol(value: String): this.type = set(outputCol, value) | ||
|
||
/** @group getParam */ | ||
def setPartitionNum(value: Int): this.type = set(partitionNum, value) | ||
|
||
/** @group setParam */ | ||
def setStoreSplittedPdf(value: Boolean): this.type = set(storeSplittedPdf, value) | ||
|
||
setDefault( | ||
inputCol -> "content", | ||
outputCol -> "text", | ||
pageNumCol -> "pagenum", | ||
originCol -> "path", | ||
partitionNum -> 0, | ||
storeSplittedPdf -> false) | ||
|
||
private def transformUDF: UserDefinedFunction = udf( | ||
(path: String, content: Array[Byte]) => { | ||
doProcess(content) | ||
}, | ||
ArrayType(outputDataType)) | ||
|
||
private def doProcess( | ||
content: Array[Byte]): Seq[(String, Int, Int, Array[Byte], String, Int)] = { | ||
val pagesTry = Try(pdfToText(content, $(storeSplittedPdf))) | ||
|
||
pagesTry match { | ||
case Failure(_) => | ||
Seq() | ||
case Success(content) => | ||
content | ||
} | ||
} | ||
|
||
override def transform(df: Dataset[_]): DataFrame = { | ||
transformSchema(df.schema) | ||
|
||
val selCols1 = df.columns | ||
.filterNot(_ == $(inputCol)) | ||
.map(col) :+ posexplode_outer(transformUDF(df.col($(originCol)), df.col($(inputCol)))) | ||
.as(Seq("tmp_num", "tmp_result")) | ||
val selCols = df.columns | ||
.filterNot(_ == $(inputCol)) | ||
.map(col) :+ col("tmp_result.*") | ||
|
||
var result = df.select(selCols1: _*) | ||
result = result | ||
.select(selCols: _*) | ||
$(partitionNum) match { | ||
case 0 => result | ||
case _ => result.repartition($(partitionNum)) | ||
} | ||
} | ||
|
||
override def localProcess( | ||
input: Array[Map[String, Seq[IAnnotation]]]): Array[Map[String, Seq[IAnnotation]]] = { | ||
input.flatMap { case lightRecord => | ||
val pdfs = lightRecord.getOrElse( | ||
getOrDefault(inputCol), | ||
throw new RuntimeException(s"Column not found ${getOrDefault(inputCol)}")) | ||
|
||
pdfs flatMap { case BinaryFile(bytes, path) => | ||
doProcess(bytes).zipWithIndex.map { case ((text, _, _, content, exception, _), pageNum) => | ||
val metadata = | ||
Map("exception" -> exception, "sourcePath" -> path, "pageNum" -> pageNum.toString) | ||
|
||
val result = lightRecord ++ Map( | ||
getOutputCol -> Seq(OcrText(text, metadata, content)), | ||
getOrDefault(pageNumCol) -> Seq(PageNum(pageNum))) | ||
result | ||
} | ||
} | ||
} | ||
} | ||
|
||
} | ||
|
||
trait PdfToTextTrait extends Logging with PdfUtils { | ||
|
||
/* | ||
* extracts a text layer from a PDF. | ||
*/ | ||
private def extractText(document: => PDDocument, startPage: Int, endPage: Int): Seq[String] = { | ||
val pdfTextStripper = new PDFTextStripper | ||
pdfTextStripper.setStartPage(startPage + 1) | ||
pdfTextStripper.setEndPage(endPage + 1) | ||
Seq(pdfTextStripper.getText(document)) | ||
} | ||
|
||
def pdfToText( | ||
content: Array[Byte], | ||
storeSplittedPdf: Boolean): Seq[(String, Int, Int, Array[Byte], String, Int)] = { | ||
val validPdf = checkAndFixPdf(content) | ||
val pdfDoc = PDDocument.load(validPdf) | ||
val numPages = pdfDoc.getNumberOfPages | ||
log.info(s"Number of pages ${numPages}") | ||
require(numPages >= 1, "pdf input stream cannot be empty") | ||
|
||
val result = pdfboxMethod(pdfDoc, 0, numPages - 1, content, storeSplittedPdf) | ||
pdfDoc.close() | ||
log.info("Close pdf") | ||
result | ||
} | ||
|
||
private def pdfboxMethod( | ||
pdfDoc: => PDDocument, | ||
startPage: Int, | ||
endPage: Int, | ||
content: Array[Byte], | ||
storeSplittedPdf: Boolean): Seq[(String, Int, Int, Array[Byte], String, Int)] = { | ||
val text = extractText(pdfDoc, startPage, endPage).mkString(System.lineSeparator()) | ||
val heightDimension = pdfDoc.getPage(startPage).getMediaBox.getHeight.toInt | ||
val widthDimension = pdfDoc.getPage(startPage).getMediaBox.getWidth.toInt | ||
Seq((text, heightDimension, widthDimension, if (storeSplittedPdf) content else null, null, 0)) | ||
} | ||
} |
Oops, something went wrong.