From 3a56ce7bee50714da06f2dd16e471a328cb4a655 Mon Sep 17 00:00:00 2001 From: Silviu Calinoiu Date: Thu, 25 Feb 2016 14:11:42 -0800 Subject: [PATCH] Small fixes in BigQuery snippets and wordcount example. --- README.md | 12 +++---- .../dataflow/examples/wordcount_debugging.py | 33 ++++--------------- 2 files changed, 12 insertions(+), 33 deletions(-) diff --git a/README.md b/README.md index 0b4a36c..e3fdec3 100644 --- a/README.md +++ b/README.md @@ -332,10 +332,10 @@ you can write to. ```python import google.cloud.dataflow as df -# The output table needs to point to something in your project. -output_table = 'YOUR_PROJECT:DATASET.TABLE' input_table = 'clouddataflow-readonly:samples.weather_stations' -p = df.Pipeline('DirectPipelineRunner') +project = 'YOUR-PROJECT' +output_table = '%s:DATASET.TABLENAME' % project +p = df.Pipeline(argv=['--project', project]) (p | df.Read('read', df.io.BigQuerySource(input_table)) | df.FlatMap( @@ -357,12 +357,12 @@ of using the whole table. ```python import google.cloud.dataflow as df -# The output table needs to point to something in your project. -output_table = 'YOUR_PROJECT:DATASET.TABLE' +project = 'YOUR-PROJECT' +output_table = '%s:DATASET.TABLENAME' % project input_query = 'SELECT month, COUNT(month) AS tornado_count ' \ 'FROM [clouddataflow-readonly:samples.weather_stations] ' \ 'WHERE tornado=true GROUP BY month' -p = df.Pipeline('DirectPipelineRunner') +p = df.Pipeline(argv=['--project', project]) (p | df.Read('read', df.io.BigQuerySource(query=input_query)) | df.Write('write', df.io.BigQuerySink( diff --git a/google/cloud/dataflow/examples/wordcount_debugging.py b/google/cloud/dataflow/examples/wordcount_debugging.py index 3ada254..aa8d0cb 100644 --- a/google/cloud/dataflow/examples/wordcount_debugging.py +++ b/google/cloud/dataflow/examples/wordcount_debugging.py @@ -78,23 +78,6 @@ def process(self, context): context.aggregate_to(self.umatched_words, 1) -class AssertEqualsIgnoringOrderFn(df.DoFn): - """A DoFn that asserts that its input is the same as the expected value. - - This DoFn is useful only for testing purposes with small data sets. It will - materialize all of its input and assumes that its input is a singleton. - """ - - def __init__(self, expected_elements): - super(AssertEqualsIgnoringOrderFn, self).__init__() - self.expected_elements = expected_elements - - def process(self, context): - assert sorted(context.element) == sorted(self.expected_elements), ( - 'AssertEqualsIgnoringOrderFn input does not match expected value.' - '%s != %s' % (context.element, self.expected_elements)) - - class CountWords(df.PTransform): """A transform to count the occurrences of each word. @@ -136,19 +119,15 @@ def run(argv=sys.argv[1:]): p | df.io.Read('read', df.io.TextFileSource(known_args.input)) | CountWords() | df.ParDo('FilterText', FilterTextFn('Flourish|stomach'))) - # AssertEqualsIgnoringOrderFn is a convenient DoFn to validate its input. - # Asserts are best used in unit tests with small data sets but is demonstrated - # here as a teaching tool. + # assert_that is a convenient PTransform that checks a PCollection has an + # expected value. Asserts are best used in unit tests with small data sets but + # is demonstrated here as a teaching tool. # - # Note AssertEqualsIgnoringOrderFn does not provide any output and that - # successful completion of the Pipeline implies that the expectations were - # met. Learn more at + # Note assert_that does not provide any output and that successful completion + # of the Pipeline implies that the expectations were met. Learn more at # https://cloud.google.com/dataflow/pipelines/testing-your-pipeline on how to # test your pipeline. - # pylint: disable=expression-not-assigned - (filtered_words - | df.transforms.combiners.ToList('ToList') - | df.ParDo(AssertEqualsIgnoringOrderFn([('Flourish', 3), ('stomach', 1)]))) + df.assert_that(filtered_words, df.equal_to([('Flourish', 3), ('stomach', 1)])) # Format the counts into a PCollection of strings and write the output using a # "Write" transform that has side effects.