forked from mavlink/MAVSDK-Java
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathstream.j2
138 lines (122 loc) · 7.71 KB
/
stream.j2
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
{% if not is_finite %}
// Infinite stream
private final FlowableProcessor<{% if return_type.is_primitive %}{{ return_type.name }}{% elif return_type.is_repeated %}List<{{ plugin_name.upper_camel_case }}.{{ return_type.inner_name }}>{% else %}{{ plugin_name.upper_camel_case }}.{{ return_type.name }}{% endif %}> {{ name.lower_camel_case }}Processor = PublishProcessor.create();
private final Flowable<{% if return_type.is_primitive %}{{ return_type.name }}{% elif return_type.is_repeated %}List<{{ plugin_name.upper_camel_case }}.{{ return_type.inner_name }}>{% else %}{{ plugin_name.upper_camel_case }}.{{ return_type.name }}{% endif %}> {{ name.lower_camel_case }} = {{ name.lower_camel_case }}Processor.onBackpressureBuffer().share();;
private volatile boolean is{{ name.upper_camel_case }}Initialized = false;
private void process{{ name.upper_camel_case }}({% for param in params %}@NonNull {{ param.type_info.name }} {{ param.name.lower_camel_case }}{{ ", " if not loop.last }}{% endfor %}) {
{{ plugin_name.upper_camel_case }}Proto.Subscribe{{ name.upper_camel_case }}Request request = {{ plugin_name.upper_camel_case }}Proto.Subscribe{{ name.upper_camel_case }}Request.newBuilder()
{%- for param in params %}
{%- if param.type_info.is_primitive %}
.set{{ param.name.upper_camel_case }}({{ param.name.lower_camel_case }})
{%- elif param.type_info.is_repeated %}
.addAll{{ param.name.upper_camel_case }}({{ param.name.lower_camel_case }}.stream().map(elem -> elem.rpc{{ param.type_info.inner_name }}())::iterator)
{%- else %}
.set{{ param.name.upper_camel_case }}({{ param.name.lower_camel_case }}.rpc{{ param.type_info.name }}())
{%- endif %}
{%- endfor %}
.build();
stub.subscribe{{ name.upper_camel_case }}(request, new StreamObserver<{{ plugin_name.upper_camel_case }}Proto.{{ name.upper_camel_case }}Response>() {
@Override
public void onNext({{ plugin_name.upper_camel_case }}Proto.{{ name.upper_camel_case }}Response value) {
{%- if return_type.is_repeated %}
{%- if return_type.is_primitive %}
{{ name.lower_camel_case }}Processor.onNext(value.get{{ return_name.upper_camel_case }}List());
{%- else %}
{{ name.lower_camel_case }}Processor.onNext(value.get{{ return_name.upper_camel_case }}List().stream().map({{ return_type.inner_name }}::translateFromRpc).collect(Collectors.toList()));
{%- endif %}
{%- else %}
{%- if return_type.is_primitive %}
{{ name.lower_camel_case }}Processor.onNext(value.get{{ return_name.upper_camel_case }}());
{%- else %}
{{ name.lower_camel_case }}Processor.onNext({{ return_type.name }}.translateFromRpc(value.get{{ return_name.upper_camel_case }}()));
{%- endif %}
{%- endif %}
}
@Override
public void onError(Throwable t) {
{{ name.lower_camel_case }}Processor.onError(t);
}
@Override
public void onCompleted() {
{{ name.lower_camel_case }}Processor.onComplete();
}
});
}
@CheckReturnValue
public Flowable<{% if return_type.is_primitive %}{{ return_type.name }}{% elif return_type.is_repeated %}List<{{ plugin_name.upper_camel_case }}.{{ return_type.inner_name }}>{% else %}{{ plugin_name.upper_camel_case }}.{{ return_type.name }}{% endif %}> get{{ name.upper_camel_case }}({% for param in params %}@NonNull {{ param.type_info.name }} {{ param.name.lower_camel_case }}{{ ", " if not loop.last }}{% endfor %}) {
if (!is{{ name.upper_camel_case }}Initialized) {
synchronized (this) {
if (!is{{ name.upper_camel_case }}Initialized) {
MavsdkEventQueue.executor().execute(() -> process{{ name.upper_camel_case }}({% for param in params %}{{ param.name.lower_camel_case }}{{ ", " if not loop.last }}{% endfor %}));
is{{ name.upper_camel_case }}Initialized = true;
}
}
}
return {{ name.lower_camel_case }};
}
{% else %}
// Finite stream
private Flowable<{% if return_type.is_primitive %}{{ return_type.name }}{% elif return_type.is_repeated %}List<{{ plugin_name.upper_camel_case }}.{{ return_type.inner_name }}>{% else %}{{ plugin_name.upper_camel_case }}.{{ return_type.name }}{% endif %}> create{{ name.upper_camel_case }}({% for param in params %}{{ param.type_info.name }} {{ param.name.lower_camel_case }}{{ ", " if not loop.last }}{% endfor %}) {
{{ plugin_name.upper_camel_case }}Proto.Subscribe{{ name.upper_camel_case }}Request request = {{ plugin_name.upper_camel_case }}Proto.Subscribe{{ name.upper_camel_case }}Request.newBuilder()
{%- for param in params %}
{%- if param.type_info.is_primitive %}
.set{{ param.name.upper_camel_case }}({{ param.name.lower_camel_case }})
{%- elif param.type_info.is_repeated %}
.addAll{{ param.name.upper_camel_case }}({{ param.name.lower_camel_case }}.stream().map(elem -> elem.rpc{{ param.type_info.inner_name }}())::iterator)
{%- else %}
.set{{ param.name.upper_camel_case }}({{ param.name.lower_camel_case }}.rpc{{ param.type_info.name }}())
{%- endif %}
{%- endfor %}
.build();
Flowable<{% if return_type.is_primitive %}{{ return_type.name }}{% elif return_type.is_repeated %}List<{{ plugin_name.upper_camel_case }}.{{ return_type.inner_name }}>{% else %}{{ plugin_name.upper_camel_case }}.{{ return_type.name }}{% endif %}> flowable = Flowable.create(emitter -> {
if (!isInitialized) {
Thread.sleep(PLUGIN_INIT_TIMEOUT_MS);
if (!isInitialized) {
throw new MavsdkException("No System");
}
}
stub.subscribe{{ name.upper_camel_case }}(request, new StreamObserver<{{ plugin_name.upper_camel_case }}Proto.{{ name.upper_camel_case }}Response>() {
@Override
public void onNext({{ plugin_name.upper_camel_case }}Proto.{{ name.upper_camel_case }}Response value) {
{{ plugin_name.upper_camel_case }}Result result = {{ plugin_name.upper_camel_case }}Result.translateFromRpc(value.get{{ plugin_name.upper_camel_case }}Result());
switch (result.result) {
case SUCCESS:
emitter.onComplete();
break;
case NEXT:
{%- if return_type.is_repeated %}
{%- if return_type.is_primitive %}
emitter.onNext(value.get{{ return_name.upper_camel_case }}List());
{%- else %}
emitter.onNext(value.get{{ return_name.upper_camel_case }}List().stream().map({{ return_type.inner_name }}::translateFromRpc).collect(Collectors.toList()));
{%- endif %}
{%- else %}
{%- if return_type.is_primitive %}
emitter.onNext(value.get{{ return_name.upper_camel_case }}());
{%- else %}
emitter.onNext({{ return_type.name }}.translateFromRpc(value.get{{ return_name.upper_camel_case }}()));
{%- endif %}
{%- endif %}
break;
default:
emitter.onError(new {{ plugin_name.upper_camel_case }}Exception(result.result, result.resultStr));
break;
}
}
@Override
public void onError(Throwable t) {
emitter.onError(t);
}
@Override
public void onCompleted() {
emitter.onComplete();
}
});
}, BackpressureStrategy.BUFFER);
return flowable.subscribeOn(Schedulers.io()).share();
}
@CheckReturnValue
public Flowable<{% if return_type.is_primitive %}{{ return_type.name }}{% elif return_type.is_repeated %}List<{{ plugin_name.upper_camel_case }}.{{ return_type.inner_name }}>{% else %}{{ plugin_name.upper_camel_case }}.{{ return_type.name }}{% endif %}> {{ name.lower_camel_case }}({% for param in params %}@NonNull {{ param.type_info.name }} {{ param.name.lower_camel_case }}{{ ", " if not loop.last }}{% endfor %}) {
return create{{ name.upper_camel_case }}({% for param in params %}{{ param.name.lower_camel_case }}{{ ", " if not loop.last }}{% endfor %});
}
{% endif %}