Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Go SDK + Protos] Fix Proto Spec for Pane encoding + Go SDK implementation. #33840

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@
* Fixed session window aggregation, which wasn't being performed per-key. ([#33542](https://github.com/apache/beam/issues/33542)).)
* [Dataflow Streaming Appliance] Fixed commits failing with KeyCommitTooLargeException when a key outputs >180MB of results. [#33588](https://github.com/apache/beam/issues/33588).
* Fixed a Dataflow template creation issue that ignores template file creation errors (Java) ([#33636](https://github.com/apache/beam/issues/33636))

* Correctly documented Pane Encodings in the portability protocols ([#33840](https://github.com/apache/beam/issues/33840)).

## Security Fixes
* Fixed (CVE-YYYY-NNNN)[https://www.cve.org/CVERecord?id=CVE-YYYY-NNNN] (Java/Python/Go) ([#X](https://github.com/apache/beam/issues/X)).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -594,6 +594,9 @@ message BagStateSpec {
string element_coder_id = 1;
}

// OrderedListState values are encoded with the var int encoded
// millis-since-epoch followed by the value encoded by the provided coder.
// Be aware this is not the standard timestamp value encoding.
message OrderedListStateSpec {
string element_coder_id = 1;
}
Expand Down Expand Up @@ -946,15 +949,15 @@ message StandardCoders {
// coder.
//
// pane - The first byte of the pane info determines which type of
// encoding is used, as well as the is_first, is_last, and timing
// encoding is used, as well as the is_first, is_last and timing
// fields. If this byte is bits [0 1 2 3 4 5 6 7], then:
// * bits [0 1 2 3] determine the encoding as follows:
// 0000 - The entire pane info is encoded as a single byte.
// The is_first, is_last, and timing fields are encoded
// as below, and the index and non-speculative index are
// both zero (and hence are not encoded here).
// 0001 - The pane info is encoded as this byte plus a single
// VarInt encoed integer representing the pane index. The
// VarInt encoded integer representing the pane index. The
// non-speculative index can be derived as follows:
// -1 if the pane is early, otherwise equal to index.
// 0010 - The pane info is encoded as this byte plus two VarInt
Expand All @@ -965,8 +968,10 @@ message StandardCoders {
// 01 - on time
// 10 - late
// 11 - unknown
// * bit 6 is 1 if this is the first pane, 0 otherwise.
// * bit 7 is 1 if this is the last pane, 0 otherwise.
// * bit 6 is 1 if this is the last pane, 0 otherwise.
// Commonly set with `byte |= 0x02`
// * bit 7 is 1 if this is the first pane, 0 otherwise.
// Commonly set with `byte |= 0x01`
//
// element - The element incoded using the supplied element coder.
//
Expand Down Expand Up @@ -1329,7 +1334,7 @@ message Trigger {
message AfterSynchronizedProcessingTime {
}

// The default trigger. Equivalent to Repeat { AfterEndOfWindow } but
// The default trigger. Equivalent to AfterEndOfWindow { Late: Always }} but
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are equivalent, but I do prefer the phrasing you have altered it to.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AFAICT They are not.

AfterEndOfWindow is defined as having the sub triggers as implicitly repeating with no default behavior if a sub trigger isn't defined (technically, it defaults to the never trigger). So, Repeat { AfterEndOfWindow } has only an ontime, and maybe a closing firing. The repeat isn't semantically useful.

// specially denoted to indicate the user did not alter the triggering.
message Default {
}
Expand All @@ -1339,12 +1344,12 @@ message Trigger {
int32 element_count = 1;
}

// Never ready. There will only be an ON_TIME output and a final
// output at window expiration.
// Never ready. There will only be an ON_TIME final output at window
// expiration.
message Never {
}

// Always ready. This can also be expressed as ElementCount(1) but
// Always ready. This can also be expressed as Repeat{ ElementCount(1) } but
// is more explicit.
message Always {
}
Expand Down
8 changes: 4 additions & 4 deletions sdks/go/pkg/beam/core/graph/coder/panes.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@ func EncodePane(v typex.PaneInfo, w io.Writer) error {

pane := byte(0)
if v.IsFirst {
pane |= 0x02
pane |= 0x01
}
if v.IsLast {
pane |= 0x01
pane |= 0x02
}
pane |= byte(v.Timing << 2)

Expand Down Expand Up @@ -64,10 +64,10 @@ func EncodePane(v typex.PaneInfo, w io.Writer) error {
func NewPane(b byte) typex.PaneInfo {
pn := typex.NoFiringPane()

if !(b&0x02 == 2) {
if !(b&0x01 == 1) {
pn.IsFirst = false
}
if !(b&0x01 == 1) {
if !(b&0x02 == 2) {
pn.IsLast = false
}

Expand Down
54 changes: 48 additions & 6 deletions sdks/go/pkg/beam/core/graph/coder/panes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,13 @@ func equalPanes(left, right typex.PaneInfo) bool {

func TestPaneCoder(t *testing.T) {
tests := []struct {
name string
timing typex.PaneTiming
first bool
last bool
index int64
nsIndex int64
name string
timing typex.PaneTiming
first bool
last bool
index int64
nsIndex int64
firstByte byte
}{
{
"false bools",
Expand All @@ -47,6 +48,7 @@ func TestPaneCoder(t *testing.T) {
false,
0,
0,
0b00001100,
},
{
"true bools",
Expand All @@ -55,6 +57,7 @@ func TestPaneCoder(t *testing.T) {
true,
0,
0,
0b00001111,
},
{
"first pane",
Expand All @@ -63,6 +66,7 @@ func TestPaneCoder(t *testing.T) {
false,
0,
0,
0b00001101,
},
{
"last pane",
Expand All @@ -71,6 +75,7 @@ func TestPaneCoder(t *testing.T) {
true,
0,
0,
0b00001110,
},
{
"on time, different index and non-speculative",
Expand All @@ -79,6 +84,7 @@ func TestPaneCoder(t *testing.T) {
false,
1,
2,
0b00100100,
},
{
"valid early pane",
Expand All @@ -87,6 +93,7 @@ func TestPaneCoder(t *testing.T) {
false,
math.MaxInt64,
-1,
0b00010001,
},
{
"on time, max non-speculative index",
Expand All @@ -95,6 +102,7 @@ func TestPaneCoder(t *testing.T) {
true,
0,
math.MaxInt64,
0b00100110,
},
{
"late pane, max index",
Expand All @@ -103,6 +111,7 @@ func TestPaneCoder(t *testing.T) {
false,
math.MaxInt64,
0,
0b00101000,
},
{
"on time, min non-speculative index",
Expand All @@ -111,6 +120,7 @@ func TestPaneCoder(t *testing.T) {
true,
0,
math.MinInt64,
0b00100110,
},
{
"late, min index",
Expand All @@ -119,6 +129,34 @@ func TestPaneCoder(t *testing.T) {
false,
math.MinInt64,
0,
0b00101000,
},
{
"last late firing",
typex.PaneLate,
false,
true,
2,
1,
0b00101010,
},
{
"encodeByte 41",
typex.PaneLate,
true,
false,
2,
1,
0b00101001, // 41
},
{
"encodeByte 18",
typex.PaneEarly,
false,
true,
0,
-1,
0b00010010, // 18
},
}
for _, test := range tests {
Expand All @@ -129,6 +167,10 @@ func TestPaneCoder(t *testing.T) {
if err != nil {
t.Fatalf("failed to encode pane %v, got %v", input, err)
}
first := buf.Bytes()[0]
if got, want := first, test.firstByte; got != want {
t.Errorf("Unexpected First Byte: got %#08b, want %#08b, for %v ", got, want, input)
}
got, err := DecodePane(&buf)
if err != nil {
t.Fatalf("failed to decode pane from buffer %v, got %v", &buf, err)
Expand Down
2 changes: 1 addition & 1 deletion sdks/go/pkg/beam/core/graph/window/trigger/trigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ func (t *NeverTrigger) String() string {
func (t NeverTrigger) trigger() {}

// Never creates a Never Trigger that is never ready to fire.
// There will only be an ON_TIME output and a final output at window expiration.
// There will only be a single ON_TIME final output at window expiration + allowed lateness.
func Never() *NeverTrigger {
return &NeverTrigger{}
}
Expand Down
13 changes: 13 additions & 0 deletions sdks/go/pkg/beam/core/typex/special.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,19 @@ const (
PaneUnknown PaneTiming = 3
)

func (t PaneTiming) String() string {
switch t {
case PaneEarly:
return "early"
case PaneOnTime:
return "ontime"
case PaneLate:
return "late"
default:
return "unknown"
}
}

// PaneInfo represents the output pane.
type PaneInfo struct {
Timing PaneTiming
Expand Down
Loading