Skip to content

Commit

Permalink
Finish yapf upgrade to 0.43.0
Browse files Browse the repository at this point in the history
  • Loading branch information
jrmccluskey committed Jan 9, 2025
1 parent b4c3a4f commit f4dc6c6
Show file tree
Hide file tree
Showing 768 changed files with 5,092 additions and 2,017 deletions.
1 change: 1 addition & 0 deletions sdks/python/apache_beam/coders/avro_record.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

class AvroRecord(object):
"""Simple wrapper class for dictionary records."""

def __init__(self, value):
self.record = value

Expand Down
48 changes: 42 additions & 6 deletions sdks/python/apache_beam/coders/coder_impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@

class CoderImpl(object):
"""For internal use only; no backwards-compatibility guarantees."""

def encode_to_stream(self, value, stream, nested):
# type: (Any, create_OutputStream, bool) -> None

Expand Down Expand Up @@ -211,6 +212,7 @@ class SimpleCoderImpl(CoderImpl):
"""For internal use only; no backwards-compatibility guarantees.
Subclass of CoderImpl implementing stream methods using encode/decode."""

def encode_to_stream(self, value, stream, nested):
# type: (Any, create_OutputStream, bool) -> None

Expand All @@ -228,6 +230,7 @@ class StreamCoderImpl(CoderImpl):
"""For internal use only; no backwards-compatibility guarantees.
Subclass of CoderImpl implementing encode/decode using stream methods."""

def encode(self, value):
# type: (Any) -> bytes
out = create_OutputStream()
Expand Down Expand Up @@ -255,6 +258,7 @@ class CallbackCoderImpl(CoderImpl):
This is the default implementation used if Coder._get_impl()
is not overwritten.
"""

def __init__(self, encoder, decoder, size_estimator=None):
self._encoder = encoder
self._decoder = decoder
Expand Down Expand Up @@ -297,6 +301,7 @@ def __repr__(self):

class ProtoCoderImpl(SimpleCoderImpl):
"""For internal use only; no backwards-compatibility guarantees."""

def __init__(self, proto_message_type):
self.proto_message_type = proto_message_type

Expand All @@ -311,12 +316,14 @@ def decode(self, encoded):

class DeterministicProtoCoderImpl(ProtoCoderImpl):
"""For internal use only; no backwards-compatibility guarantees."""

def encode(self, value):
return value.SerializePartialToString(deterministic=True)


class ProtoPlusCoderImpl(SimpleCoderImpl):
"""For internal use only; no backwards-compatibility guarantees."""

def __init__(self, proto_plus_type):
# type: (Type[proto.Message]) -> None
self.proto_plus_type = proto_plus_type
Expand Down Expand Up @@ -356,6 +363,7 @@ def decode(self, value):

class FastPrimitivesCoderImpl(StreamCoderImpl):
"""For internal use only; no backwards-compatibility guarantees."""

def __init__(
self, fallback_coder_impl, requires_deterministic_step_label=None):
self.fallback_coder_impl = fallback_coder_impl
Expand Down Expand Up @@ -610,6 +618,7 @@ class BytesCoderImpl(CoderImpl):
"""For internal use only; no backwards-compatibility guarantees.
A coder for bytes/str objects."""

def encode_to_stream(self, value, out, nested):
# type: (bytes, create_OutputStream, bool) -> None

Expand All @@ -636,6 +645,7 @@ class BooleanCoderImpl(CoderImpl):
"""For internal use only; no backwards-compatibility guarantees.
A coder for bool objects."""

def encode_to_stream(self, value, out, nested):
out.write_byte(1 if value else 0)

Expand Down Expand Up @@ -675,12 +685,12 @@ class MapCoderImpl(StreamCoderImpl):
attribute values.
A coder for typing.Mapping objects."""

def __init__(
self,
key_coder, # type: CoderImpl
value_coder, # type: CoderImpl
is_deterministic = False
):
is_deterministic=False):
self._key_coder = key_coder
self._value_coder = value_coder
self._is_deterministic = is_deterministic
Expand Down Expand Up @@ -760,6 +770,7 @@ def estimate_size(self, unused_value, nested=False):

class BigEndianShortCoderImpl(StreamCoderImpl):
"""For internal use only; no backwards-compatibility guarantees."""

def encode_to_stream(self, value, out, nested):
# type: (int, create_OutputStream, bool) -> None
out.write_bigendian_int16(value)
Expand All @@ -776,6 +787,7 @@ def estimate_size(self, unused_value, nested=False):

class SinglePrecisionFloatCoderImpl(StreamCoderImpl):
"""For internal use only; no backwards-compatibility guarantees."""

def encode_to_stream(self, value, out, nested):
# type: (float, create_OutputStream, bool) -> None
out.write_bigendian_float(value)
Expand All @@ -792,6 +804,7 @@ def estimate_size(self, unused_value, nested=False):

class FloatCoderImpl(StreamCoderImpl):
"""For internal use only; no backwards-compatibility guarantees."""

def encode_to_stream(self, value, out, nested):
# type: (float, create_OutputStream, bool) -> None
out.write_bigendian_double(value)
Expand Down Expand Up @@ -863,6 +876,7 @@ class TimestampCoderImpl(StreamCoderImpl):
that of the Java SDK InstantCoder.
https://github.com/apache/beam/blob/f5029b4f0dfff404310b2ef55e2632bbacc7b04f/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/InstantCoder.java#L79
"""

def encode_to_stream(self, value, out, nested):
# type: (Timestamp, create_OutputStream, bool) -> None
millis = value.micros // 1000
Expand All @@ -889,6 +903,7 @@ def estimate_size(self, unused_value, nested=False):

class TimerCoderImpl(StreamCoderImpl):
"""For internal use only; no backwards-compatibility guarantees."""

def __init__(self, key_coder_impl, window_coder_impl):
self._timestamp_coder_impl = TimestampCoderImpl()
self._boolean_coder_impl = BooleanCoderImpl()
Expand Down Expand Up @@ -947,6 +962,7 @@ class VarIntCoderImpl(StreamCoderImpl):
"""For internal use only; no backwards-compatibility guarantees.
A coder for int objects."""

def encode_to_stream(self, value, out, nested):
# type: (int, create_OutputStream, bool) -> None
out.write_var_int64(value)
Expand Down Expand Up @@ -978,6 +994,7 @@ class SingletonCoderImpl(CoderImpl):
"""For internal use only; no backwards-compatibility guarantees.
A coder that always encodes exactly one value."""

def __init__(self, value):
self._value = value

Expand Down Expand Up @@ -1005,6 +1022,7 @@ class AbstractComponentCoderImpl(StreamCoderImpl):
"""For internal use only; no backwards-compatibility guarantees.
CoderImpl for coders that are comprised of several component coders."""

def __init__(self, coder_impls):
for c in coder_impls:
assert isinstance(c, CoderImpl), c
Expand All @@ -1030,8 +1048,8 @@ def decode_from_stream(self, in_stream, nested):
# type: (create_InputStream, bool) -> Any
return self._construct_from_components([
c.decode_from_stream(
in_stream, nested or i + 1 < len(self._coder_impls)) for i,
c in enumerate(self._coder_impls)
in_stream, nested or i + 1 < len(self._coder_impls))
for i, c in enumerate(self._coder_impls)
])

def estimate_size(self, value, nested=False):
Expand Down Expand Up @@ -1061,6 +1079,7 @@ def get_estimated_size_and_observables(self, value, nested=False):

class AvroCoderImpl(SimpleCoderImpl):
"""For internal use only; no backwards-compatibility guarantees."""

def __init__(self, schema):
self.parsed_schema = parse_schema(json.loads(schema))

Expand All @@ -1077,6 +1096,7 @@ def decode(self, encoded):

class TupleCoderImpl(AbstractComponentCoderImpl):
"""A coder for tuple objects."""

def _extract_components(self, value):
return tuple(value)

Expand All @@ -1085,6 +1105,7 @@ def _construct_from_components(self, components):


class _ConcatSequence(object):

def __init__(self, head, tail):
# type: (Iterable[Any], Iterable[Any]) -> None
self._head = head
Expand Down Expand Up @@ -1277,12 +1298,14 @@ class TupleSequenceCoderImpl(SequenceCoderImpl):
"""For internal use only; no backwards-compatibility guarantees.
A coder for homogeneous tuple objects."""

def _construct_from_sequence(self, components):
return tuple(components)


class _AbstractIterable(object):
"""Wraps an iterable hiding methods that might not always be available."""

def __init__(self, contents):
self._contents = contents

Expand Down Expand Up @@ -1315,6 +1338,7 @@ class IterableCoderImpl(SequenceCoderImpl):
"""For internal use only; no backwards-compatibility guarantees.
A coder for homogeneous iterable objects."""

def __init__(self, *args, use_abstract_iterable=None, **kwargs):
super().__init__(*args, **kwargs)
if use_abstract_iterable is None:
Expand All @@ -1332,6 +1356,7 @@ class ListCoderImpl(SequenceCoderImpl):
"""For internal use only; no backwards-compatibility guarantees.
A coder for homogeneous list objects."""

def _construct_from_sequence(self, components):
return components if isinstance(components, list) else list(components)

Expand Down Expand Up @@ -1360,6 +1385,7 @@ class PaneInfoCoderImpl(StreamCoderImpl):
"""For internal use only; no backwards-compatibility guarantees.
Coder for a PaneInfo descriptor."""

def _choose_encoding(self, value):
if ((value._index == 0 and value._nonspeculative_index == 0) or
value._timing == PaneInfoTiming_UNKNOWN):
Expand Down Expand Up @@ -1422,6 +1448,7 @@ def estimate_size(self, value, nested=False):


class _OrderedUnionCoderImpl(StreamCoderImpl):

def __init__(self, coder_impl_types, fallback_coder_impl):
assert len(coder_impl_types) < 128
self._types, self._coder_impls = zip(*coder_impl_types)
Expand Down Expand Up @@ -1555,6 +1582,7 @@ class ParamWindowedValueCoderImpl(WindowedValueCoderImpl):
encoding, and uses the supplied parameterized timestamp, windows
and pane info values during decoding when reconstructing the windowed
value."""

def __init__(self, value_coder, window_coder, payload):
super().__init__(value_coder, TimestampCoderImpl(), window_coder)
self._timestamp, self._windows, self._pane_info = self._from_proto(
Expand Down Expand Up @@ -1595,6 +1623,7 @@ class LengthPrefixCoderImpl(StreamCoderImpl):
"""For internal use only; no backwards-compatibility guarantees.
Coder which prefixes the length of the encoded object in the stream."""

def __init__(self, value_coder):
# type: (CoderImpl) -> None
self._value_coder = value_coder
Expand Down Expand Up @@ -1625,6 +1654,7 @@ class ShardedKeyCoderImpl(StreamCoderImpl):
shard id byte string
encoded user key
"""

def __init__(self, key_coder_impl):
self._shard_id_coder_impl = BytesCoderImpl()
self._key_coder_impl = key_coder_impl
Expand Down Expand Up @@ -1660,6 +1690,7 @@ class TimestampPrefixingWindowCoderImpl(StreamCoderImpl):
window's max_timestamp()
encoded window using it's own coder.
"""

def __init__(self, window_coder_impl: CoderImpl) -> None:
self._window_coder_impl = window_coder_impl

Expand Down Expand Up @@ -1687,6 +1718,7 @@ def _create_opaque_window(end, encoded_window):
from apache_beam.transforms.window import BoundedWindow

class _OpaqueWindow(BoundedWindow):

def __init__(self, end, encoded_window):
super().__init__(end)
self.encoded_window = encoded_window
Expand Down Expand Up @@ -1715,6 +1747,7 @@ class TimestampPrefixingOpaqueWindowCoderImpl(StreamCoderImpl):
window's max_timestamp()
length prefixed encoded window
"""

def __init__(self) -> None:
pass

Expand Down Expand Up @@ -1770,6 +1803,7 @@ def finalize_write(self):


class GenericRowColumnEncoder(RowColumnEncoder):

def __init__(self, coder_impl, column):
self.coder_impl = coder_impl
self.column = column
Expand All @@ -1790,6 +1824,7 @@ def finalize_write(self):

class RowCoderImpl(StreamCoderImpl):
"""For internal use only; no backwards-compatibility guarantees."""

def __init__(self, schema, components):
self.schema = schema
self.num_fields = len(self.schema.fields)
Expand Down Expand Up @@ -1859,8 +1894,7 @@ def _row_column_encoders(self, columns):
RowColumnEncoder.create(
self.schema.fields[i].type.atomic_type,
self.components[i],
columns[name]) for i,
name in enumerate(self.field_names)
columns[name]) for i, name in enumerate(self.field_names)
]

def encode_batch_to_stream(self, columns: Dict[str, np.ndarray], out):
Expand Down Expand Up @@ -1964,6 +1998,7 @@ def decode_batch_from_stream(self, dest: Dict[str, np.ndarray], in_stream):


class LogicalTypeCoderImpl(StreamCoderImpl):

def __init__(self, logical_type, representation_coder):
self.logical_type = logical_type
self.representation_coder = representation_coder.get_impl()
Expand All @@ -1982,6 +2017,7 @@ class BigIntegerCoderImpl(StreamCoderImpl):
For interoperability with Java SDK, encoding needs to match that of the Java
SDK BigIntegerCoder."""

def encode_to_stream(self, value, out, nested):
# type: (int, create_OutputStream, bool) -> None
if value < 0:
Expand Down
Loading

0 comments on commit f4dc6c6

Please sign in to comment.