Skip to content

Commit 14d9d7f

Browse files
committed
Merge remote-tracking branch 'upstream/main' into jdbc-join-operator
2 parents ebe526b + fd208c7 commit 14d9d7f

17 files changed

Lines changed: 1138 additions & 235 deletions

File tree

wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/calcite/converter/WayangAggregateVisitor.java

Lines changed: 38 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,13 @@
1818

1919
package org.apache.wayang.api.sql.calcite.converter;
2020

21+
import java.util.Arrays;
2122
import java.util.HashSet;
2223
import java.util.List;
24+
import java.util.stream.Collectors;
2325

2426
import org.apache.calcite.rel.core.AggregateCall;
25-
27+
import org.apache.calcite.rel.type.RelDataTypeField;
2628
import org.apache.wayang.api.sql.calcite.converter.functions.AggregateAddCols;
2729
import org.apache.wayang.api.sql.calcite.converter.functions.AggregateFunction;
2830
import org.apache.wayang.api.sql.calcite.converter.functions.AggregateKeyExtractor;
@@ -57,15 +59,40 @@ Operator visit(final WayangAggregate wayangRelNode) {
5759
Record.class);
5860
childOp.connectTo(0, mapOperator, 0);
5961

60-
final Operator aggregateOperator = wayangRelNode.getGroupCount() > 0 ? new ReduceByOperator<>(
61-
new TransformationDescriptor<>(new AggregateKeyExtractor(groupingFields), Record.class, Object.class),
62-
new ReduceDescriptor<>(new AggregateFunction(aggregateCalls),
63-
DataUnitType.createGrouped(Record.class),
64-
DataUnitType.createBasicUnchecked(Record.class)))
65-
: new GlobalReduceOperator<>(
66-
new ReduceDescriptor<>(new AggregateFunction(aggregateCalls),
67-
DataUnitType.createGrouped(Record.class),
68-
DataUnitType.createBasicUnchecked(Record.class)));
62+
63+
final Operator aggregateOperator;
64+
65+
if (wayangRelNode.getGroupCount() > 0) {
66+
aggregateOperator = new ReduceByOperator<>(
67+
new TransformationDescriptor<>(
68+
new AggregateKeyExtractor(groupingFields), Record.class,
69+
Object.class),
70+
new ReduceDescriptor<>(new AggregateFunction(aggregateCalls),
71+
DataUnitType.createGrouped(Record.class),
72+
DataUnitType.createBasicUnchecked(Record.class)));
73+
} else {
74+
final List<String> reductionFunctions = wayangRelNode.getNamedAggCalls().stream()
75+
.map(agg -> agg.left.getAggregation().getName()).toList();
76+
77+
final List<String> fields = wayangRelNode.getInput().getRowType().getFieldList().stream()
78+
.map(RelDataTypeField::getName).toList();
79+
80+
final List<String> aliases = wayangRelNode.getRowType().getFieldList().stream()
81+
.map(RelDataTypeField::getName).toList();
82+
83+
final String[] reductionStatements = new String[reductionFunctions.size()];
84+
85+
for (int i = 0; i < reductionStatements.length; i++) {
86+
reductionStatements[i] = reductionFunctions.get(i) + "(" + fields.get(i) + ") AS " + aliases.get(i);
87+
}
88+
89+
aggregateOperator = new GlobalReduceOperator<>(
90+
new ReduceDescriptor<>(new AggregateFunction(aggregateCalls),
91+
DataUnitType.createGrouped(Record.class),
92+
DataUnitType.createBasicUnchecked(Record.class))
93+
.withSqlImplementation(
94+
Arrays.stream(reductionStatements).collect(Collectors.joining(","))));
95+
}
6996

7097
mapOperator.connectTo(0, aggregateOperator, 0);
7198

@@ -74,6 +101,7 @@ Operator visit(final WayangAggregate wayangRelNode) {
74101
Record.class,
75102
Record.class);
76103
aggregateOperator.connectTo(0, mapOperator2, 0);
104+
77105
return mapOperator2;
78106
}
79107
}

wayang-api/wayang-api-sql/src/test/java/org/apache/wayang/api/sql/SqlToWayangRelTest.java

Lines changed: 149 additions & 66 deletions
Large diffs are not rendered by default.

wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/function/ReduceDescriptor.java

Lines changed: 43 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -18,18 +18,19 @@
1818

1919
package org.apache.wayang.core.function;
2020

21+
import java.util.function.BinaryOperator;
22+
2123
import org.apache.wayang.core.optimizer.costs.LoadEstimator;
2224
import org.apache.wayang.core.optimizer.costs.LoadProfileEstimator;
2325
import org.apache.wayang.core.optimizer.costs.NestableLoadProfileEstimator;
2426
import org.apache.wayang.core.types.BasicDataUnitType;
2527
import org.apache.wayang.core.types.DataUnitGroupType;
2628
import org.apache.wayang.core.types.DataUnitType;
2729

28-
import java.util.function.BinaryOperator;
29-
3030
/**
31-
* This descriptor pertains to functions that take multiple data units and aggregate them into a single data unit
32-
* by means of a tree-like fold, i.e., using a commutative, associative function..
31+
* This descriptor pertains to functions that take multiple data units and
32+
* aggregate them into a single data unit by means of a tree-like fold, i.e.,
33+
* using a commutative, associative function..
3334
*/
3435
public class ReduceDescriptor<Type> extends FunctionDescriptor {
3536

@@ -39,43 +40,67 @@ public class ReduceDescriptor<Type> extends FunctionDescriptor {
3940

4041
private final SerializableBinaryOperator<Type> javaImplementation;
4142

42-
public ReduceDescriptor(SerializableBinaryOperator<Type> javaImplementation,
43-
DataUnitGroupType<Type> inputType,
44-
BasicDataUnitType<Type> outputType) {
43+
/**
44+
* sql implementation of the reduce operator
45+
*/
46+
private String sqlImplementation;
47+
48+
public ReduceDescriptor(final SerializableBinaryOperator<Type> javaImplementation, final DataUnitGroupType<Type> inputType,
49+
final BasicDataUnitType<Type> outputType) {
4550
this(javaImplementation, inputType, outputType, new NestableLoadProfileEstimator(
46-
LoadEstimator.createFallback(1, 1),
47-
LoadEstimator.createFallback(1, 1)
48-
));
51+
LoadEstimator.createFallback(1, 1), LoadEstimator.createFallback(1, 1)));
4952
}
5053

51-
public ReduceDescriptor(SerializableBinaryOperator<Type> javaImplementation,
52-
DataUnitGroupType<Type> inputType, BasicDataUnitType<Type> outputType,
53-
LoadProfileEstimator loadProfileEstimator) {
54+
public ReduceDescriptor(final SerializableBinaryOperator<Type> javaImplementation, final DataUnitGroupType<Type> inputType,
55+
final BasicDataUnitType<Type> outputType, final LoadProfileEstimator loadProfileEstimator) {
5456
super(loadProfileEstimator);
5557
this.inputType = inputType;
5658
this.outputType = outputType;
5759
this.javaImplementation = javaImplementation;
5860
}
5961

60-
public ReduceDescriptor(SerializableBinaryOperator<Type> javaImplementation, Class<Type> inputType) {
62+
public ReduceDescriptor(final SerializableBinaryOperator<Type> javaImplementation, final Class<Type> inputType) {
6163
this(javaImplementation, DataUnitType.createGroupedUnchecked(inputType),
62-
DataUnitType.createBasicUnchecked(inputType)
63-
);
64+
DataUnitType.createBasicUnchecked(inputType));
6465
}
6566

67+
/**
68+
* This is function is not built to last. It is thought to help out devising
69+
* programs while we are still figuring
70+
* out how to express functions in a platform-independent way.
71+
*
72+
* @return a function that can perform the reduce
73+
*/
74+
public String getSqlImplementation() {
75+
return this.sqlImplementation;
76+
}
6677

6778
/**
68-
* This is function is not built to last. It is thought to help out devising programs while we are still figuring
79+
* This is function is not built to last. It is thought to help out devising
80+
* programs while we are still figuring
6981
* out how to express functions in a platform-independent way.
7082
*
7183
* @return a function that can perform the reduce
7284
*/
85+
public ReduceDescriptor<Type> withSqlImplementation(final String sqlImplementation) {
86+
this.sqlImplementation = sqlImplementation;
87+
return this;
88+
}
89+
90+
/**
91+
* This is function is not built to last. It is thought to help out devising
92+
* programs while we are still figuring out how to express functions in a
93+
* platform-independent way.
94+
*
95+
* @return a function that can perform the reduce
96+
*/
7397
public BinaryOperator<Type> getJavaImplementation() {
7498
return this.javaImplementation;
7599
}
76100

77101
/**
78-
* In generic code, we do not have the type parameter values of operators, functions etc. This method avoids casting issues.
102+
* In generic code, we do not have the type parameter values of operators,
103+
* functions etc. This method avoids casting issues.
79104
*
80105
* @return this instance with type parameters set to {@link Object}
81106
*/

0 commit comments

Comments
 (0)