diff --git a/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIO.java b/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIO.java index 6d940f7d96d9..04141e5c677a 100644 --- a/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIO.java +++ b/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIO.java @@ -74,6 +74,9 @@ *
  • {@code boolean} ignoreHeaderCase * - must be false. + *
  • {@code boolean} skipHeaderRecord + * - must be false. The header is already accounted for during parsing. * * *

    Ignored CSVFormat parameters

    diff --git a/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIOParseHelpers.java b/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIOParseHelpers.java index 856ccf42d84a..4e4102f0efb7 100644 --- a/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIOParseHelpers.java +++ b/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIOParseHelpers.java @@ -62,6 +62,10 @@ static void validateCsvFormat(CSVFormat format) { "Illegal %s: column name is required", CSVFormat.class); } + checkArgument( + !format.getSkipHeaderRecord(), + "Illegal %s: cannot skip header record because the header is already accounted for", + CSVFormat.class); } /** diff --git a/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIOStringToCsvRecord.java b/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIOStringToCsvRecord.java index c92961f94a95..b5ce6a0fec22 100644 --- a/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIOStringToCsvRecord.java +++ b/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIOStringToCsvRecord.java @@ -20,6 +20,9 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; +import org.apache.beam.sdk.coders.ListCoder; +import org.apache.beam.sdk.coders.NullableCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; @@ -47,14 +50,21 @@ final class CsvIOStringToCsvRecord */ @Override public PCollection> expand(PCollection input) { - return input.apply(ParDo.of(new ProcessLineToRecordFn())); + return input + .apply(ParDo.of(new ProcessLineToRecordFn())) + .setCoder(ListCoder.of(NullableCoder.of(StringUtf8Coder.of()))); } /** Processes each line in order to convert it to a {@link CSVRecord}. */ private class ProcessLineToRecordFn extends DoFn> { + private final String headerLine = headerLine(csvFormat); + @ProcessElement public void process(@Element String line, OutputReceiver> receiver) throws IOException { + if (headerLine.equals(line)) { + return; + } for (CSVRecord record : CSVParser.parse(line, csvFormat).getRecords()) { receiver.output(csvRecordtoList(record)); } @@ -69,4 +79,9 @@ private static List csvRecordtoList(CSVRecord record) { } return cells; } + + /** Returns a formatted line of the CSVFormat header. */ + static String headerLine(CSVFormat csvFormat) { + return String.join(String.valueOf(csvFormat.getDelimiter()), csvFormat.getHeader()); + } } diff --git a/sdks/java/io/csv/src/test/java/org/apache/beam/sdk/io/csv/CsvIOParseHelpersTest.java b/sdks/java/io/csv/src/test/java/org/apache/beam/sdk/io/csv/CsvIOParseHelpersTest.java index 5a387652022b..97374cf52fe6 100644 --- a/sdks/java/io/csv/src/test/java/org/apache/beam/sdk/io/csv/CsvIOParseHelpersTest.java +++ b/sdks/java/io/csv/src/test/java/org/apache/beam/sdk/io/csv/CsvIOParseHelpersTest.java @@ -121,6 +121,18 @@ public void givenCSVFormatThatAllowsDuplicateHeaderNames_throwsException() { gotMessage); } + @Test + public void givenCSVFormatThatSkipsHeaderRecord_throwsException() { + CSVFormat format = csvFormatWithHeader().withSkipHeaderRecord(true); + String gotMessage = + assertThrows( + IllegalArgumentException.class, () -> CsvIOParseHelpers.validateCsvFormat(format)) + .getMessage(); + assertEquals( + "Illegal class org.apache.commons.csv.CSVFormat: cannot skip header record because the header is already accounted for", + gotMessage); + } + /** End of tests for {@link CsvIOParseHelpers#validateCsvFormat(CSVFormat)}. */ ////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/sdks/java/io/csv/src/test/java/org/apache/beam/sdk/io/csv/CsvIOStringToCsvRecordTest.java b/sdks/java/io/csv/src/test/java/org/apache/beam/sdk/io/csv/CsvIOStringToCsvRecordTest.java index 44db791cbee5..1b81391c4fb0 100644 --- a/sdks/java/io/csv/src/test/java/org/apache/beam/sdk/io/csv/CsvIOStringToCsvRecordTest.java +++ b/sdks/java/io/csv/src/test/java/org/apache/beam/sdk/io/csv/CsvIOStringToCsvRecordTest.java @@ -17,6 +17,8 @@ */ package org.apache.beam.sdk.io.csv; +import static org.apache.beam.sdk.io.csv.CsvIOStringToCsvRecord.headerLine; + import java.util.Arrays; import java.util.Collections; import org.apache.beam.sdk.testing.PAssert; @@ -24,63 +26,97 @@ import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.values.PCollection; import org.apache.commons.csv.CSVFormat; +import org.apache.commons.csv.QuoteMode; import org.junit.Rule; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; /** Tests for {@link CsvIOStringToCsvRecord}. */ +@RunWith(JUnit4.class) public class CsvIOStringToCsvRecordTest { @Rule public final TestPipeline pipeline = TestPipeline.create(); + private static final String[] header = {"a_string", "an_integer", "a_double"}; + @Test - public void testSingleLineCsvRecord() { - String csvRecord = "a,1"; - PCollection input = pipeline.apply(Create.of(csvRecord)); + public void givenCommentMarker_skipsLine() { + CSVFormat csvFormat = csvFormat().withCommentMarker('#'); + PCollection input = + pipeline.apply( + Create.of(headerLine(csvFormat), "#should skip me", "a,1,1.1", "b,2,2.2", "c,3,3.3")); - CsvIOStringToCsvRecord underTest = new CsvIOStringToCsvRecord(csvFormat()); + CsvIOStringToCsvRecord underTest = new CsvIOStringToCsvRecord(csvFormat); PAssert.that(input.apply(underTest)) - .containsInAnyOrder(Collections.singletonList(Arrays.asList("a", "1"))); + .containsInAnyOrder( + Arrays.asList( + Arrays.asList("a", "1", "1.1"), + Arrays.asList("b", "2", "2.2"), + Arrays.asList("c", "3", "3.3"))); pipeline.run(); } @Test - public void testMultiLineCsvRecord() { - String csvRecords = - "\"a\r\n1\",\"a\r\n2\"" + "\n" + "\"b\r\n1\",\"b\r\n2\"" + "\n" + "\"c\r\n1\",\"c\r\n2\""; - PCollection input = pipeline.apply(Create.of(csvRecords)); + public void givenNoCommentMarker_doesntSkipLine() { + CSVFormat csvFormat = csvFormat(); + PCollection input = + pipeline.apply( + Create.of(headerLine(csvFormat), "#comment", "a,1,1.1", "b,2,2.2", "c,3,3.3")); - CsvIOStringToCsvRecord underTest = - new CsvIOStringToCsvRecord(csvFormat().withRecordSeparator('\n')); + CsvIOStringToCsvRecord underTest = new CsvIOStringToCsvRecord(csvFormat); PAssert.that(input.apply(underTest)) .containsInAnyOrder( Arrays.asList( - Arrays.asList("a\r\n1", "a\r\n2"), - Arrays.asList("b\r\n1", "b\r\n2"), - Arrays.asList("c\r\n1", "c\r\n2"))); + Collections.singletonList("#comment"), + Arrays.asList("a", "1", "1.1"), + Arrays.asList("b", "2", "2.2"), + Arrays.asList("c", "3", "3.3"))); pipeline.run(); } @Test - public void testCsvRecordsWithSkipHeaderRecord() { - String csvRecords = "a_string,an_integer\na,1\nb,2\n"; - PCollection input = pipeline.apply(Create.of(csvRecords)); + public void givenCustomDelimiter_splitsCells() { + CSVFormat csvFormat = csvFormat().withDelimiter(';'); + PCollection input = + pipeline.apply(Create.of(headerLine(csvFormat), "a;1;1.1", "b;2;2.2", "c;3;3.3")); - CsvIOStringToCsvRecord underTest = - new CsvIOStringToCsvRecord(csvFormat().withSkipHeaderRecord()); + CsvIOStringToCsvRecord underTest = new CsvIOStringToCsvRecord(csvFormat); + PAssert.that(input.apply(underTest)) + .containsInAnyOrder( + Arrays.asList( + Arrays.asList("a", "1", "1.1"), + Arrays.asList("b", "2", "2.2"), + Arrays.asList("c", "3", "3.3"))); + + pipeline.run(); + } + + @Test + public void givenEscapeCharacter_includeInCell() { + CSVFormat csvFormat = csvFormat().withEscape('$'); + PCollection input = + pipeline.apply(Create.of(headerLine(csvFormat), "a$,b,1,1.1", "b,2,2.2", "c,3,3.3")); + + CsvIOStringToCsvRecord underTest = new CsvIOStringToCsvRecord(csvFormat); PAssert.that(input.apply(underTest)) - .containsInAnyOrder(Arrays.asList(Arrays.asList("a", "1"), Arrays.asList("b", "2"))); + .containsInAnyOrder( + Arrays.asList( + Arrays.asList("a,b", "1", "1.1"), + Arrays.asList("b", "2", "2.2"), + Arrays.asList("c", "3", "3.3"))); pipeline.run(); } @Test - public void testCsvRecordsWithCommentMarker() { - String csvRecords = "#leaving a comment\n" + "a,1,1.1\nb,2,2.2\nc,3,3.3"; - PCollection input = pipeline.apply(Create.of(csvRecords)); + public void givenHeaderComment_isNoop() { + CSVFormat csvFormat = csvFormat().withHeaderComments("abc", "def", "xyz"); + PCollection input = + pipeline.apply(Create.of(headerLine(csvFormat), "a,1,1.1", "b,2,2.2", "c,3,3.3")); - CsvIOStringToCsvRecord underTest = - new CsvIOStringToCsvRecord(csvFormat().withCommentMarker('#')); + CsvIOStringToCsvRecord underTest = new CsvIOStringToCsvRecord(csvFormat); PAssert.that(input.apply(underTest)) .containsInAnyOrder( Arrays.asList( @@ -92,52 +128,367 @@ public void testCsvRecordsWithCommentMarker() { } @Test - public void testCsvRecordsWithIgnoreEmptyLines() { - String csvRecords = "line1\nline2\nline3\nline4\nline5\n\n\nline6"; - PCollection input = pipeline.apply(Create.of(csvRecords)); + public void givenIgnoreEmptyLines_shouldSkip() { + CSVFormat csvFormat = csvFormat().withIgnoreEmptyLines(true); + PCollection input = + pipeline.apply(Create.of(headerLine(csvFormat), "a,1,1.1", "", "b,2,2.2", "", "c,3,3.3")); - CsvIOStringToCsvRecord underTest = - new CsvIOStringToCsvRecord(csvFormat().withIgnoreEmptyLines()); + CsvIOStringToCsvRecord underTest = new CsvIOStringToCsvRecord(csvFormat); PAssert.that(input.apply(underTest)) .containsInAnyOrder( Arrays.asList( - Collections.singletonList("line1"), - Collections.singletonList("line2"), - Collections.singletonList("line3"), - Collections.singletonList("line4"), - Collections.singletonList("line5"), - Collections.singletonList("line6"))); + Arrays.asList("a", "1", "1.1"), + Arrays.asList("b", "2", "2.2"), + Arrays.asList("c", "3", "3.3"))); pipeline.run(); } @Test - public void testCsvRecordWithIgnoreSurroundingSpaces() { - String csvRecord = " Seattle , WA "; - PCollection input = pipeline.apply(Create.of(csvRecord)); + public void givenNoIgnoreEmptyLines_isNoop() { + CSVFormat csvFormat = csvFormat().withIgnoreEmptyLines(false); + PCollection input = + pipeline.apply(Create.of(headerLine(csvFormat), "a,1,1.1", "", "b,2,2.2", "", "c,3,3.3")); - CsvIOStringToCsvRecord underTest = - new CsvIOStringToCsvRecord(csvFormat().withIgnoreSurroundingSpaces()); + CsvIOStringToCsvRecord underTest = new CsvIOStringToCsvRecord(csvFormat); + PAssert.that(input.apply(underTest)) + .containsInAnyOrder( + Arrays.asList( + Arrays.asList("a", "1", "1.1"), + Arrays.asList("b", "2", "2.2"), + Arrays.asList("c", "3", "3.3"))); + + pipeline.run(); + } + + @Test + public void givenIgnoreSurroundingSpaces_removesSpaces() { + CSVFormat csvFormat = csvFormat().withIgnoreSurroundingSpaces(true); + PCollection input = + pipeline.apply( + Create.of( + headerLine(csvFormat), + " a ,1,1.1", + "b, 2 ,2.2", + "c,3, 3.3 ")); + + CsvIOStringToCsvRecord underTest = new CsvIOStringToCsvRecord(csvFormat); + PAssert.that(input.apply(underTest)) + .containsInAnyOrder( + Arrays.asList( + Arrays.asList("a", "1", "1.1"), + Arrays.asList("b", "2", "2.2"), + Arrays.asList("c", "3", "3.3"))); + + pipeline.run(); + } + + @Test + public void givenNotIgnoreSurroundingSpaces_keepsSpaces() { + CSVFormat csvFormat = csvFormat().withIgnoreSurroundingSpaces(false); + PCollection input = + pipeline.apply( + Create.of( + headerLine(csvFormat), + " a ,1,1.1", + "b, 2 ,2.2", + "c,3, 3.3 ")); + + CsvIOStringToCsvRecord underTest = new CsvIOStringToCsvRecord(csvFormat); + PAssert.that(input.apply(underTest)) + .containsInAnyOrder( + Arrays.asList( + Arrays.asList(" a ", "1", "1.1"), + Arrays.asList("b", " 2 ", "2.2"), + Arrays.asList("c", "3", " 3.3 "))); + + pipeline.run(); + } + + @Test + public void givenNullString_parsesNullCells() { + CSVFormat csvFormat = csvFormat().withNullString("🐼"); + PCollection input = + pipeline.apply(Create.of(headerLine(csvFormat), "a,1,🐼", "b,🐼,2.2", "🐼,3,3.3")); + + CsvIOStringToCsvRecord underTest = new CsvIOStringToCsvRecord(csvFormat); + PAssert.that(input.apply(underTest)) + .containsInAnyOrder( + Arrays.asList( + Arrays.asList("a", "1", null), + Arrays.asList("b", null, "2.2"), + Arrays.asList(null, "3", "3.3"))); + + pipeline.run(); + } + + @Test + public void givenNoNullString_isNoop() { + CSVFormat csvFormat = csvFormat(); + PCollection input = + pipeline.apply(Create.of(headerLine(csvFormat), "a,1,🐼", "b,🐼,2.2", "🐼,3,3.3")); + + CsvIOStringToCsvRecord underTest = new CsvIOStringToCsvRecord(csvFormat); + PAssert.that(input.apply(underTest)) + .containsInAnyOrder( + Arrays.asList( + Arrays.asList("a", "1", "🐼"), + Arrays.asList("b", "🐼", "2.2"), + Arrays.asList("🐼", "3", "3.3"))); + + pipeline.run(); + } + + @Test + public void givenCustomQuoteCharacter_includesSpecialCharacters() { + CSVFormat csvFormat = csvFormat().withQuote(':'); + PCollection input = + pipeline.apply(Create.of(headerLine(csvFormat), ":a,:,1,1.1", "b,2,2.2", "c,3,3.3")); + + CsvIOStringToCsvRecord underTest = new CsvIOStringToCsvRecord(csvFormat); + PAssert.that(input.apply(underTest)) + .containsInAnyOrder( + Arrays.asList( + Arrays.asList("a,", "1", "1.1"), + Arrays.asList("b", "2", "2.2"), + Arrays.asList("c", "3", "3.3"))); + pipeline.run(); + } + + @Test + public void givenQuoteModeAll_isNoop() { + CSVFormat csvFormat = csvFormat().withQuoteMode(QuoteMode.ALL); + PCollection input = + pipeline.apply( + Create.of( + headerLine(csvFormat), + "\"a\",\"1\",\"1.1\"", + "\"b\",\"2\",\"2.2\"", + "\"c\",\"3\",\"3.3\"")); + + CsvIOStringToCsvRecord underTest = new CsvIOStringToCsvRecord(csvFormat); PAssert.that(input.apply(underTest)) - .containsInAnyOrder(Collections.singletonList(Arrays.asList("Seattle", "WA"))); + .containsInAnyOrder( + Arrays.asList( + Arrays.asList("a", "1", "1.1"), + Arrays.asList("b", "2", "2.2"), + Arrays.asList("c", "3", "3.3"))); pipeline.run(); } @Test - public void testCsvRecordWithTrailingDelimiter() { - String csvRecord = "a,b,c,"; + public void givenQuoteModeAllNonNull_isNoop() { + CSVFormat csvFormat = csvFormat().withNullString("N/A").withQuoteMode(QuoteMode.ALL_NON_NULL); + PCollection input = + pipeline.apply( + Create.of( + headerLine(csvFormat), + "\"a\",\"1\",N/A", + "\"b\",\"2\",\"2.2\"", + "\"c\",\"3\",\"3.3\"")); + + CsvIOStringToCsvRecord underTest = new CsvIOStringToCsvRecord(csvFormat); + PAssert.that(input.apply(underTest)) + .containsInAnyOrder( + Arrays.asList( + Arrays.asList("a", "1", null), + Arrays.asList("b", "2", "2.2"), + Arrays.asList("c", "3", "3.3"))); + + pipeline.run(); + } + + @Test + public void givenQuoteModeMinimal_isNoop() { + CSVFormat csvFormat = csvFormat().withQuoteMode(QuoteMode.MINIMAL); + PCollection input = + pipeline.apply(Create.of(headerLine(csvFormat), "\"a,\",1,1.1", "b,2,2.2", "c,3,3.3")); + + CsvIOStringToCsvRecord underTest = new CsvIOStringToCsvRecord(csvFormat); + PAssert.that(input.apply(underTest)) + .containsInAnyOrder( + Arrays.asList( + Arrays.asList("a,", "1", "1.1"), + Arrays.asList("b", "2", "2.2"), + Arrays.asList("c", "3", "3.3"))); + pipeline.run(); + } + + @Test + public void givenQuoteModeNonNumeric_isNoop() { + CSVFormat csvFormat = csvFormat().withQuoteMode(QuoteMode.NON_NUMERIC); + PCollection input = + pipeline.apply( + Create.of(headerLine(csvFormat), "\"a\",1,1.1", "\"b\",2,2.2", "\"c\",3,3.3")); + + CsvIOStringToCsvRecord underTest = new CsvIOStringToCsvRecord(csvFormat); + PAssert.that(input.apply(underTest)) + .containsInAnyOrder( + Arrays.asList( + Arrays.asList("a", "1", "1.1"), + Arrays.asList("b", "2", "2.2"), + Arrays.asList("c", "3", "3.3"))); + pipeline.run(); + } + + @Test + public void givenQuoteModeNone_isNoop() { + CSVFormat csvFormat = csvFormat().withEscape('$').withQuoteMode(QuoteMode.NONE); + PCollection input = + pipeline.apply(Create.of(headerLine(csvFormat), "a,1,1.1", "b,2,2.2", "c,3,3.3")); + + CsvIOStringToCsvRecord underTest = new CsvIOStringToCsvRecord(csvFormat); + PAssert.that(input.apply(underTest)) + .containsInAnyOrder( + Arrays.asList( + Arrays.asList("a", "1", "1.1"), + Arrays.asList("b", "2", "2.2"), + Arrays.asList("c", "3", "3.3"))); + pipeline.run(); + } + + @Test + public void givenCustomRecordSeparator_isNoop() { + CSVFormat csvFormat = csvFormat().withRecordSeparator("😆"); + PCollection input = + pipeline.apply(Create.of(headerLine(csvFormat), "a,1,1.1😆b,2,2.2😆c,3,3.3")); + + CsvIOStringToCsvRecord underTest = new CsvIOStringToCsvRecord(csvFormat); + PAssert.that(input.apply(underTest)) + .containsInAnyOrder( + Collections.singletonList( + Arrays.asList("a", "1", "1.1😆b", "2", "2.2😆c", "3", "3.3"))); + pipeline.run(); + } + + @Test + public void givenSystemRecordSeparator_isNoop() { + CSVFormat csvFormat = csvFormat().withSystemRecordSeparator(); + String systemRecordSeparator = csvFormat.getRecordSeparator(); + PCollection input = + pipeline.apply( + Create.of( + headerLine(csvFormat), + "a,1,1.1" + systemRecordSeparator + "b,2,2.2" + systemRecordSeparator + "c,3,3.3")); + + CsvIOStringToCsvRecord underTest = new CsvIOStringToCsvRecord(csvFormat); + PAssert.that(input.apply(underTest)) + .containsInAnyOrder( + Arrays.asList( + Arrays.asList("a", "1", "1.1"), + Arrays.asList("b", "2", "2.2"), + Arrays.asList("c", "3", "3.3"))); + pipeline.run(); + } + + @Test + public void givenTrailingDelimiter_skipsEndingDelimiter() { + CSVFormat csvFormat = csvFormat().withTrailingDelimiter(true); + PCollection input = + pipeline.apply(Create.of(headerLine(csvFormat), "a,1,1.1,", "b,2,2.2,", "c,3,3.3,")); + + CsvIOStringToCsvRecord underTest = new CsvIOStringToCsvRecord(csvFormat); + PAssert.that(input.apply(underTest)) + .containsInAnyOrder( + Arrays.asList( + Arrays.asList("a", "1", "1.1"), + Arrays.asList("b", "2", "2.2"), + Arrays.asList("c", "3", "3.3"))); + pipeline.run(); + } + + @Test + public void givenNoTrailingDelimiter_includesEndingCell() { + CSVFormat csvFormat = csvFormat().withTrailingDelimiter(false); + PCollection input = + pipeline.apply(Create.of(headerLine(csvFormat), "a,1,1.1,", "b,2,2.2,", "c,3,3.3,")); + + CsvIOStringToCsvRecord underTest = new CsvIOStringToCsvRecord(csvFormat); + PAssert.that(input.apply(underTest)) + .containsInAnyOrder( + Arrays.asList( + Arrays.asList("a", "1", "1.1", ""), + Arrays.asList("b", "2", "2.2", ""), + Arrays.asList("c", "3", "3.3", ""))); + pipeline.run(); + } + + @Test + public void givenTrim_removesSpaces() { + CSVFormat csvFormat = csvFormat().withTrim(true); + PCollection input = + pipeline.apply( + Create.of( + headerLine(csvFormat), + " a ,1,1.1", + "b, 2 ,2.2", + "c,3, 3.3 ")); + + CsvIOStringToCsvRecord underTest = new CsvIOStringToCsvRecord(csvFormat); + PAssert.that(input.apply(underTest)) + .containsInAnyOrder( + Arrays.asList( + Arrays.asList("a", "1", "1.1"), + Arrays.asList("b", "2", "2.2"), + Arrays.asList("c", "3", "3.3"))); + + pipeline.run(); + } + + @Test + public void givenNoTrim_keepsSpaces() { + CSVFormat csvFormat = csvFormat().withTrim(false); + PCollection input = + pipeline.apply( + Create.of( + headerLine(csvFormat), + " a ,1,1.1", + "b, 2 ,2.2", + "c,3, 3.3 ")); + + CsvIOStringToCsvRecord underTest = new CsvIOStringToCsvRecord(csvFormat); + PAssert.that(input.apply(underTest)) + .containsInAnyOrder( + Arrays.asList( + Arrays.asList(" a ", "1", "1.1"), + Arrays.asList("b", " 2 ", "2.2"), + Arrays.asList("c", "3", " 3.3 "))); + + pipeline.run(); + } + + @Test + public void testSingleLineCsvRecord() { + String csvRecord = "a,1"; PCollection input = pipeline.apply(Create.of(csvRecord)); + CsvIOStringToCsvRecord underTest = new CsvIOStringToCsvRecord(csvFormat()); + PAssert.that(input.apply(underTest)) + .containsInAnyOrder(Collections.singletonList(Arrays.asList("a", "1"))); + + pipeline.run(); + } + + @Test + public void testMultiLineCsvRecord() { + String csvRecords = + "\"a\r\n1\",\"a\r\n2\"" + "\n" + "\"b\r\n1\",\"b\r\n2\"" + "\n" + "\"c\r\n1\",\"c\r\n2\""; + PCollection input = pipeline.apply(Create.of(csvRecords)); + CsvIOStringToCsvRecord underTest = - new CsvIOStringToCsvRecord(csvFormat().withTrailingDelimiter()); + new CsvIOStringToCsvRecord(csvFormat().withRecordSeparator('\n')); PAssert.that(input.apply(underTest)) - .containsInAnyOrder(Collections.singletonList(Arrays.asList("a", "b", "c"))); + .containsInAnyOrder( + Arrays.asList( + Arrays.asList("a\r\n1", "a\r\n2"), + Arrays.asList("b\r\n1", "b\r\n2"), + Arrays.asList("c\r\n1", "c\r\n2"))); pipeline.run(); } private static CSVFormat csvFormat() { - return CSVFormat.DEFAULT.withHeader("a_string", "an_integer", "a_double"); + return CSVFormat.DEFAULT.withAllowDuplicateHeaderNames(false).withHeader(header); } }