Skip to content

Commit 23033b2

Browse files
authored
Merge pull request #708 from mohit-devlogs/jdbc-join-operator
Add Join operator support for Generic JDBC platform
2 parents 83d281d + dbb9208 commit 23033b2

12 files changed

Lines changed: 540 additions & 100 deletions

File tree

wayang-platforms/wayang-generic-jdbc/pom.xml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,13 @@
7474
<version>1.1.2-SNAPSHOT</version>
7575
<scope>compile</scope>
7676
</dependency>
77+
78+
<dependency>
79+
<groupId>org.hsqldb</groupId>
80+
<artifactId>hsqldb</artifactId>
81+
<version>2.7.2</version>
82+
<scope>test</scope>
83+
</dependency>
7784
</dependencies>
7885

7986

wayang-platforms/wayang-generic-jdbc/src/main/java/org/apache/wayang/genericjdbc/execution/GenericJdbcExecutor.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,6 @@
4343
import org.apache.wayang.genericjdbc.platform.GenericJdbcPlatform;
4444
import org.apache.wayang.jdbc.compiler.FunctionCompiler;
4545

46-
4746
import java.io.IOException;
4847
import java.io.OutputStreamWriter;
4948
import java.io.UncheckedIOException;
@@ -142,8 +141,14 @@ public void execute(ExecutionStage stage, OptimizationContext optimizationContex
142141
private ExecutionTask findGenericJdbcExecutionOperatorTaskInStage(ExecutionTask task, ExecutionStage stage) {
143142
assert task.getNumOuputChannels() == 1;
144143
final Channel outputChannel = task.getOutputChannel(0);
144+
145+
if (outputChannel.getConsumers().isEmpty()) {
146+
return null;
147+
}
148+
145149
final ExecutionTask consumer = WayangCollections.getSingle(outputChannel.getConsumers());
146-
return consumer.getStage() == stage && consumer.getOperator() instanceof GenericJdbcExecutionOperator ?
150+
151+
return consumer.getStage() == stage && consumer.getOperator() instanceof GenericJdbcExecutionOperator ?
147152
consumer :
148153
null;
149154
}
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.wayang.genericjdbc.mapping;
20+
21+
import org.apache.wayang.basic.data.Record;
22+
import org.apache.wayang.basic.operators.JoinOperator;
23+
import org.apache.wayang.core.mapping.Mapping;
24+
import org.apache.wayang.core.mapping.OperatorPattern;
25+
import org.apache.wayang.core.mapping.PlanTransformation;
26+
import org.apache.wayang.core.mapping.ReplacementSubplanFactory;
27+
import org.apache.wayang.core.mapping.SubplanPattern;
28+
import org.apache.wayang.core.types.DataSetType;
29+
import org.apache.wayang.genericjdbc.operators.GenericJdbcJoinOperator;
30+
import org.apache.wayang.genericjdbc.platform.GenericJdbcPlatform;
31+
32+
import java.util.Collection;
33+
import java.util.Collections;
34+
35+
/**
36+
* Mapping from {@link JoinOperator} to {@link GenericJdbcJoinOperator}.
37+
*/
38+
@SuppressWarnings("unchecked")
39+
public class JoinMapping implements Mapping {
40+
41+
@Override
42+
public Collection<PlanTransformation> getTransformations() {
43+
return Collections.singleton(new PlanTransformation(
44+
this.createSubplanPattern(),
45+
this.createReplacementSubplanFactory(),
46+
GenericJdbcPlatform.getInstance()
47+
));
48+
}
49+
50+
private SubplanPattern createSubplanPattern() {
51+
52+
final OperatorPattern<JoinOperator<Record, Record, ?>> operatorPattern =
53+
new OperatorPattern<>(
54+
"join",
55+
new JoinOperator<>(null, null,
56+
DataSetType.createDefault(Record.class),
57+
DataSetType.createDefault(Record.class)),
58+
false
59+
);
60+
61+
return SubplanPattern.createSingleton(operatorPattern);
62+
}
63+
64+
private ReplacementSubplanFactory createReplacementSubplanFactory() {
65+
return new ReplacementSubplanFactory.OfSingleOperators<JoinOperator>(
66+
(matchedOperator, epoch) ->
67+
new GenericJdbcJoinOperator<>(matchedOperator).at(epoch)
68+
);
69+
}
70+
}

wayang-platforms/wayang-generic-jdbc/src/main/java/org/apache/wayang/genericjdbc/mapping/Mappings.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,10 @@ public class Mappings {
3131
public static final Collection<Mapping> ALL = Arrays.asList(
3232
new FilterMapping(),
3333
new ProjectionMapping(),
34+
35+
new JoinMapping(),
3436
new TableSinkMapping()
37+
3538
);
3639

3740
}
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.wayang.genericjdbc.operators;
20+
21+
import org.apache.wayang.basic.data.Record;
22+
import org.apache.wayang.basic.operators.JoinOperator;
23+
import org.apache.wayang.core.function.TransformationDescriptor;
24+
import org.apache.wayang.jdbc.operators.JdbcJoinOperator;
25+
26+
/**
27+
* Generic JDBC implementation of the {@link JoinOperator}.
28+
*/
29+
public class GenericJdbcJoinOperator<KeyType>
30+
extends JdbcJoinOperator<KeyType>
31+
implements GenericJdbcExecutionOperator {
32+
33+
public GenericJdbcJoinOperator(
34+
TransformationDescriptor<Record, KeyType> keyDescriptor0,
35+
TransformationDescriptor<Record, KeyType> keyDescriptor1) {
36+
super(keyDescriptor0, keyDescriptor1);
37+
}
38+
39+
public GenericJdbcJoinOperator(JoinOperator<Record, Record, KeyType> that) {
40+
super(that);
41+
}
42+
43+
@Override
44+
protected GenericJdbcJoinOperator<KeyType> createCopy() {
45+
return new GenericJdbcJoinOperator<>(this);
46+
}
47+
}

wayang-platforms/wayang-generic-jdbc/src/main/java/org/apache/wayang/genericjdbc/operators/GenericJdbcTableSource.java

Lines changed: 22 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -33,28 +33,29 @@
3333
import java.sql.SQLException;
3434
import java.util.List;
3535

36-
3736
public class GenericJdbcTableSource extends JdbcTableSource implements GenericJdbcExecutionOperator {
3837

38+
/**
39+
* Name of the JDBC configuration to use.
40+
*/
41+
public String jdbcName;
42+
3943
/**
4044
* Creates a new instance.
4145
*
42-
* @see TableSource#TableSource(String, String...)
43-
* @param jdbcName on which table resides
44-
*
45-
*
46+
* @param tableName the table to read from
47+
* @param jdbcName the JDBC configuration name
48+
* @param columnNames the columns to read
4649
*/
47-
48-
public String jdbcName;
49-
public GenericJdbcTableSource(String jdbcName, String tableName, String... columnNames) {
50+
public GenericJdbcTableSource(String tableName, String... columnNames) {
5051
super(tableName, columnNames);
51-
this.jdbcName = jdbcName;
52+
this.jdbcName = "genericjdbc";
5253
}
5354

5455
/**
5556
* Copies an instance (exclusive of broadcasts).
5657
*
57-
* @param that that should be copied
58+
* @param that the instance that should be copied
5859
*/
5960
public GenericJdbcTableSource(GenericJdbcTableSource that) {
6061
super(that);
@@ -66,42 +67,45 @@ public List<ChannelDescriptor> getSupportedInputChannels(int index) {
6667
throw new UnsupportedOperationException("This operator has no input channels.");
6768
}
6869

70+
@Override
6971
public CardinalityEstimator getCardinalityEstimator(int outputIndex) {
7072
assert outputIndex == 0;
7173
return new CardinalityEstimator() {
7274
@Override
7375
public CardinalityEstimate estimate(OptimizationContext optimizationContext, CardinalityEstimate... inputEstimates) {
74-
// see Job for StopWatch measurements
76+
7577
final TimeMeasurement timeMeasurement = optimizationContext.getJob().getStopWatch().start(
7678
"Optimization", "Cardinality&Load Estimation", "Push Estimation", "Estimate source cardinalities"
7779
);
7880

79-
// Establish a DB connection.
8081
try (Connection connection = GenericJdbcPlatform.getInstance()
81-
.createDatabaseDescriptor(optimizationContext.getConfiguration(),jdbcName)
82+
.createDatabaseDescriptor(optimizationContext.getConfiguration(), jdbcName)
8283
.createJdbcConnection()) {
8384

84-
// Query the table cardinality.
85-
final String sql = String.format("SELECT count(*) FROM %s;", GenericJdbcTableSource.this.getTableName());
85+
final String sql = String.format(
86+
"SELECT count(*) FROM %s;", GenericJdbcTableSource.this.getTableName()
87+
);
88+
8689
final ResultSet resultSet = connection.createStatement().executeQuery(sql);
90+
8791
if (!resultSet.next()) {
8892
throw new SQLException("No query result for \"" + sql + "\".");
8993
}
9094
long cardinality = resultSet.getLong(1);
9195
return new CardinalityEstimate(cardinality, cardinality, 1d);
9296

9397
} catch (Exception e) {
98+
9499
LogManager.getLogger(this.getClass()).error(
95100
"Could not estimate cardinality for {}.", GenericJdbcTableSource.this, e
96101
);
97102

98-
// If we could not load the cardinality, let's use a very conservative estimate.
99103
return new CardinalityEstimate(10, 10000000, 0.9);
104+
100105
} finally {
101106
timeMeasurement.stop();
102107
}
103108
}
104109
};
105110
}
106-
}
107-
111+
}

0 commit comments

Comments
 (0)