Skip to content

Commit

Permalink
explicit arg names
Browse files Browse the repository at this point in the history
  • Loading branch information
jrmccluskey committed Jan 9, 2025
1 parent 3aab6c6 commit ddc0a86
Showing 1 changed file with 7 additions and 5 deletions.
12 changes: 7 additions & 5 deletions sdks/python/apache_beam/yaml/yaml_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,8 @@ def write_to_text(pcoll, path: str):
"""
try:
field_names = [
name for name,
_ in schemas.named_fields_from_element_type(pcoll.element_type)
name for name, _ in schemas.named_fields_from_element_type(
pcoll.element_type)
]
except Exception as exn:
raise ValueError(
Expand Down Expand Up @@ -167,7 +167,9 @@ def write_to_bigquery(
described at https://beam.apache.org/documentation/sdks/yaml-errors/
Otherwise permanently failing records will cause pipeline failure.
"""

class WriteToBigQueryHandlingErrors(beam.PTransform):

def default_label(self):
return 'WriteToBigQuery'

Expand Down Expand Up @@ -236,9 +238,9 @@ def _validate_schema():
beam_schema = avroio.avro_schema_to_beam_schema(schema)
covert_to_row = avroio.avro_dict_to_beam_row(schema, beam_schema)
return (
beam_schema,
lambda record: covert_to_row(
fastavro.schemaless_reader(io.BytesIO(record), schema)))
beam_schema, lambda record: covert_to_row(
fastavro.schemaless_reader(
fo=io.BytesIO(record), writer_schema=schema)))
else:
raise ValueError(f'Unknown format: {format}')

Expand Down

0 comments on commit ddc0a86

Please sign in to comment.