Skip to content

Commit 560fcfc

Browse files
msprucjuripetersen
andauthored
ML cost model integration for Wayang (#765)
* add one hot encoding schemes * add onnx model integration * add complexity class for function descriptor * utils for determining the complexity of an operator Co-authored-by: Juri Petersen <juri@hey.com> * add test resources for ml cost model --------- Co-authored-by: mspruc <mspruc@users.noreply.github.com> Co-authored-by: Juri Petersen <juri@hey.com>
1 parent be25631 commit 560fcfc

41 files changed

Lines changed: 4205 additions & 2 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.wayang.basic.util;
20+
21+
import org.apache.wayang.basic.operators.CoGroupOperator;
22+
import org.apache.wayang.basic.operators.FilterOperator;
23+
import org.apache.wayang.basic.operators.FlatMapOperator;
24+
import org.apache.wayang.basic.operators.GlobalReduceOperator;
25+
import org.apache.wayang.basic.operators.GroupByOperator;
26+
import org.apache.wayang.basic.operators.JoinOperator;
27+
import org.apache.wayang.basic.operators.LoopOperator;
28+
import org.apache.wayang.basic.operators.MapOperator;
29+
import org.apache.wayang.basic.operators.MapPartitionsOperator;
30+
import org.apache.wayang.basic.operators.MaterializedGroupByOperator;
31+
import org.apache.wayang.basic.operators.ReduceByOperator;
32+
import org.apache.wayang.basic.operators.ReduceOperator;
33+
import org.apache.wayang.basic.operators.SortOperator;
34+
import org.apache.wayang.core.optimizer.ComplexityClass;
35+
import org.apache.wayang.core.plan.wayangplan.Operator;
36+
37+
public class ComplexityUtils {
38+
/**
39+
* Infer complexity class from a given operator's descriptors.
40+
* @param operator
41+
* @return {@link ComplexityClass#LOGARITHMIC}, {@link ComplexityClass#LINEAR}, {@link ComplexityClass#QUADRATIC} or {@link ComplexityClass#SUPERQUADRATIC}. {@link ComplexityClass#LINEAR} on default
42+
*/
43+
public static ComplexityClass inferFromOperator(final Operator operator) {
44+
if (operator instanceof final ReduceByOperator reduceBy) {
45+
return reduceBy.getReduceDescriptor().getComplexityClass().orElse(ComplexityClass.LINEAR);
46+
} else if (operator instanceof final ReduceOperator reduce) {
47+
return reduce.getReduceDescriptor().getComplexityClass().orElse(ComplexityClass.LINEAR);
48+
} else if (operator instanceof final GlobalReduceOperator globalReduce) {
49+
return globalReduce.getReduceDescriptor().getComplexityClass().orElse(ComplexityClass.LINEAR);
50+
} else if (operator instanceof final CoGroupOperator coGroup) {
51+
return coGroup.getKeyDescriptor0().getComplexityClass().orElse(ComplexityClass.LINEAR);
52+
} else if (operator instanceof final GroupByOperator groupBy) {
53+
return groupBy.getKeyDescriptor().getComplexityClass().orElse(ComplexityClass.LINEAR);
54+
} else if (operator instanceof final MaterializedGroupByOperator matGroupBy) {
55+
return matGroupBy.getKeyDescriptor().getComplexityClass().orElse(ComplexityClass.LINEAR);
56+
} else if (operator instanceof final SortOperator sort) {
57+
return sort.getKeyDescriptor().getComplexityClass().orElse(ComplexityClass.LINEAR);
58+
} else if (operator instanceof final JoinOperator join) {
59+
return join.getKeyDescriptor0().getComplexityClass().orElse(ComplexityClass.LINEAR);
60+
} else if (operator instanceof final MapOperator map) {
61+
return map.getFunctionDescriptor().getComplexityClass().orElse(ComplexityClass.LINEAR);
62+
} else if (operator instanceof final FlatMapOperator flatMap) {
63+
return flatMap.getFunctionDescriptor().getComplexityClass().orElse(ComplexityClass.LINEAR);
64+
} else if (operator instanceof final MapPartitionsOperator mapPartitions) {
65+
return mapPartitions.getFunctionDescriptor().getComplexityClass().orElse(ComplexityClass.LINEAR);
66+
} else if (operator instanceof final FilterOperator filter) {
67+
return filter.getPredicateDescriptor().getComplexityClass().orElse(ComplexityClass.LINEAR);
68+
} else if (operator instanceof final LoopOperator loop) {
69+
return loop.getCriterionDescriptor().getComplexityClass().orElse(ComplexityClass.LINEAR);
70+
}
71+
72+
return ComplexityClass.LINEAR;
73+
}
74+
}

wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/function/FunctionDescriptor.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
package org.apache.wayang.core.function;
2020

21+
import org.apache.wayang.core.optimizer.ComplexityClass;
2122
import org.apache.wayang.core.optimizer.ProbabilisticDoubleInterval;
2223
import org.apache.wayang.core.optimizer.costs.LoadEstimator;
2324
import org.apache.wayang.core.optimizer.costs.LoadProfileEstimator;
@@ -32,10 +33,13 @@
3233
*/
3334
public abstract class FunctionDescriptor implements Serializable {
3435

35-
public FunctionDescriptor() {}
36+
public FunctionDescriptor() {
37+
}
3638

3739
private LoadProfileEstimator loadProfileEstimator;
3840

41+
private ComplexityClass complexityClass = null;
42+
3943
public FunctionDescriptor(LoadProfileEstimator loadProfileEstimator) {
4044
this.setLoadProfileEstimator(loadProfileEstimator);
4145
}
@@ -48,6 +52,14 @@ public Optional<LoadProfileEstimator> getLoadProfileEstimator() {
4852
return Optional.ofNullable(this.loadProfileEstimator);
4953
}
5054

55+
public Optional<ComplexityClass> getComplexityClass(){
56+
return Optional.ofNullable(complexityClass);
57+
}
58+
59+
public void setComplexityClass(final ComplexityClass complexityClass){
60+
this.complexityClass = complexityClass;
61+
}
62+
5163
/**
5264
* Utility method to retrieve the selectivity of a {@link FunctionDescriptor}
5365
*
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.wayang.core.optimizer;
20+
21+
public enum ComplexityClass {
22+
LINEAR,
23+
LOGARITHMIC,
24+
QUADRATIC,
25+
SUPERQUADRATIC
26+
}

wayang-plugins/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,6 @@
3939
<modules>
4040
<module>wayang-iejoin</module>
4141
<module>wayang-spatial</module>
42+
<module>wayang-ml</module>
4243
</modules>
43-
4444
</project>

wayang-plugins/wayang-ml/pom.xml

Lines changed: 157 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,157 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<!--
3+
~ Licensed to the Apache Software Foundation (ASF) under one
4+
~ or more contributor license agreements. See the NOTICE file
5+
~ distributed with this work for additional information
6+
~ regarding copyright ownership. The ASF licenses this file
7+
~ to you under the Apache License, Version 2.0 (the
8+
~ "License"); you may not use this file except in compliance
9+
~ with the License. You may obtain a copy of the License at
10+
~
11+
~ http://www.apache.org/licenses/LICENSE-2.0
12+
~
13+
~ Unless required by applicable law or agreed to in writing,
14+
~ software distributed under the License is distributed on an
15+
~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16+
~ KIND, either express or implied. See the License for the
17+
~ specific language governing permissions and limitations
18+
~ under the License.
19+
~
20+
-->
21+
22+
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
23+
<modelVersion>4.0.0</modelVersion>
24+
25+
<parent>
26+
<groupId>org.apache.wayang</groupId>
27+
<artifactId>wayang-plugins</artifactId>
28+
<version>1.1.2-SNAPSHOT</version>
29+
</parent>
30+
31+
<artifactId>wayang-ml</artifactId>
32+
<version>1.1.2-SNAPSHOT</version>
33+
34+
<properties>
35+
<java-module-name>org.apache.wayang.extensions.ml</java-module-name>
36+
</properties>
37+
38+
<dependencies>
39+
<dependency>
40+
<groupId>org.apache.wayang</groupId>
41+
<artifactId>wayang-api-sql</artifactId>
42+
<version>1.1.2-SNAPSHOT</version>
43+
</dependency>
44+
<dependency>
45+
<groupId>com.microsoft.onnxruntime</groupId>
46+
<artifactId>onnxruntime</artifactId>
47+
<version>1.21.1</version>
48+
</dependency>
49+
<!--dependency>
50+
<groupId>com.microsoft.onnxruntime</groupId>
51+
<artifactId>onnxruntime_gpu</artifactId>
52+
<version>1.18.0</version>
53+
</dependency-->
54+
<dependency>
55+
<groupId>org.apache.wayang</groupId>
56+
<artifactId>wayang-core</artifactId>
57+
<version>1.1.2-SNAPSHOT</version>
58+
</dependency>
59+
<dependency>
60+
<groupId>org.apache.wayang</groupId>
61+
<artifactId>wayang-basic</artifactId>
62+
<version>1.1.2-SNAPSHOT</version>
63+
</dependency>
64+
<dependency>
65+
<groupId>org.apache.wayang</groupId>
66+
<artifactId>wayang-java</artifactId>
67+
<version>1.1.2-SNAPSHOT</version>
68+
</dependency>
69+
<dependency>
70+
<groupId>org.apache.wayang</groupId>
71+
<artifactId>wayang-spark</artifactId>
72+
<version>1.1.2-SNAPSHOT</version>
73+
</dependency>
74+
<dependency>
75+
<groupId>org.apache.wayang</groupId>
76+
<artifactId>wayang-flink</artifactId>
77+
<version>1.1.2-SNAPSHOT</version>
78+
</dependency>
79+
<dependency>
80+
<groupId>org.apache.flink</groupId>
81+
<artifactId>flink-java</artifactId>
82+
<version>${flink.version}</version>
83+
</dependency>
84+
<dependency>
85+
<groupId>org.apache.wayang</groupId>
86+
<artifactId>wayang-giraph</artifactId>
87+
<version>1.1.2-SNAPSHOT</version>
88+
</dependency>
89+
<dependency>
90+
<groupId>org.apache.wayang</groupId>
91+
<artifactId>wayang-generic-jdbc</artifactId>
92+
<version>1.1.2-SNAPSHOT</version>
93+
</dependency>
94+
<dependency>
95+
<groupId>org.reflections</groupId>
96+
<artifactId>reflections</artifactId>
97+
<version>0.10.2</version>
98+
</dependency>
99+
<dependency>
100+
<groupId>org.apache.wayang</groupId>
101+
<artifactId>wayang-benchmark</artifactId>
102+
<version>1.1.2-SNAPSHOT</version>
103+
</dependency>
104+
<dependency>
105+
<groupId>org.apache.wayang</groupId>
106+
<artifactId>wayang-api-python</artifactId>
107+
<version>1.1.2-SNAPSHOT</version>
108+
</dependency>
109+
<dependency>
110+
<groupId>org.apache.commons</groupId>
111+
<artifactId>commons-dbcp2</artifactId>
112+
<version>2.7.0</version>
113+
</dependency>
114+
<dependency>
115+
<groupId>org.apache.spark</groupId>
116+
<artifactId>spark-core_2.12</artifactId>
117+
<version>${spark.version}</version>
118+
</dependency>
119+
<dependency>
120+
<groupId>org.apache.spark</groupId>
121+
<artifactId>spark-graphx_2.12</artifactId>
122+
<version>${spark.version}</version>
123+
</dependency>
124+
<dependency>
125+
<groupId>org.apache.spark</groupId>
126+
<artifactId>spark-mllib_2.12</artifactId>
127+
<version>${spark.version}</version>
128+
</dependency>
129+
<dependency>
130+
<groupId>com.google.protobuf</groupId>
131+
<artifactId>protobuf-java</artifactId>
132+
<version>3.16.3</version>
133+
</dependency>
134+
<dependency>
135+
<groupId>org.apache.calcite</groupId>
136+
<artifactId>calcite-core</artifactId>
137+
<version>${calcite.version}</version>
138+
</dependency>
139+
<dependency>
140+
<groupId>org.apache.calcite</groupId>
141+
<artifactId>calcite-linq4j</artifactId>
142+
<version>${calcite.version}</version>
143+
</dependency>
144+
<dependency>
145+
<groupId>org.apache.calcite</groupId>
146+
<artifactId>calcite-file</artifactId>
147+
<version>${calcite.version}</version>
148+
</dependency>
149+
</dependencies>
150+
<build>
151+
<resources>
152+
<resource>
153+
<directory>src/main/resources</directory>
154+
</resource>
155+
</resources>
156+
</build>
157+
</project>
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.wayang.ml;
20+
21+
import java.util.Optional;
22+
23+
import org.apache.logging.log4j.Level;
24+
import org.apache.wayang.core.api.Configuration;
25+
import org.apache.wayang.core.api.Job;
26+
import org.apache.wayang.core.api.WayangContext;
27+
import org.apache.wayang.core.plan.wayangplan.WayangPlan;
28+
import org.apache.wayang.core.util.ReflectionUtils;
29+
import org.apache.wayang.ml.encoding.OneHotMappings;
30+
import org.apache.wayang.ml.encoding.TreeEncoder;
31+
import org.apache.wayang.ml.util.Logging;
32+
33+
/**
34+
* This is the entry point for users to work with Wayang ML.
35+
*/
36+
public class MLContext extends WayangContext {
37+
public MLContext() {
38+
super();
39+
}
40+
41+
public MLContext(final Configuration configuration) {
42+
super(configuration);
43+
}
44+
45+
/**
46+
* Execute a plan.
47+
*
48+
* @param wayangPlan the plan to execute
49+
* @param udfJars JARs that declare the code for the UDFs
50+
* @see ReflectionUtils#getDeclaringJar(Class)
51+
*/
52+
@Override
53+
public void execute(final WayangPlan wayangPlan, final String... udfJars) {
54+
this.setLogLevel(Level.ERROR);
55+
final Job wayangJob = this.createJob("", wayangPlan, udfJars);
56+
57+
final Configuration config = this.getConfiguration();
58+
final Configuration jobConfig = wayangJob.getConfiguration();
59+
60+
wayangJob.execute();
61+
62+
if (config.getBooleanProperty("wayang.ml.experience.enabled")) {
63+
final Optional<String> originalOption = config.getOptionalStringProperty("wayang.ml.experience.original");
64+
65+
final OneHotMappings mappings = new OneHotMappings();
66+
final TreeEncoder encoder = new TreeEncoder(mappings);
67+
final String original = originalOption.orElse(encoder.encode(wayangPlan, wayangJob.getOptimizationContext(), false).toString());
68+
69+
final Optional<String> choicesOption = config
70+
.getOptionalStringProperty("wayang.ml.experience.with-platforms");
71+
final String withChoices = choicesOption
72+
.orElse(jobConfig.getStringProperty("wayang.ml.experience.with-platforms"));
73+
74+
final long execTime = jobConfig.getLongProperty("wayang.ml.experience.exec-time");
75+
76+
this.logExperience(original, withChoices, execTime);
77+
}
78+
}
79+
80+
private void logExperience(final String original, final String withChoices, final long execTime) {
81+
if (!this.getConfiguration().getBooleanProperty("wayang.ml.experience.enabled")) {
82+
return;
83+
}
84+
85+
final String content = String.format("%s:%s:%d", original, withChoices, execTime);
86+
Logging.writeToFile(content, this.getConfiguration().getStringProperty("wayang.ml.experience.file"));
87+
}
88+
}

0 commit comments

Comments
 (0)