Skip to content

Commit 56b9bc2

Browse files
committed
Address review comments
1 parent 921d3f8 commit 56b9bc2

3 files changed

Lines changed: 47 additions & 109 deletions

File tree

wayang-platforms/wayang-generic-jdbc/src/main/java/org/apache/wayang/genericjdbc/execution/GenericJdbcExecutor.java

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,6 @@
4343
import org.apache.wayang.genericjdbc.platform.GenericJdbcPlatform;
4444
import org.apache.wayang.jdbc.compiler.FunctionCompiler;
4545

46-
4746
import java.io.IOException;
4847
import java.io.OutputStreamWriter;
4948
import java.io.UncheckedIOException;
@@ -140,19 +139,19 @@ public void execute(ExecutionStage stage, OptimizationContext optimizationContex
140139
* @return the said follow-up {@link ExecutionTask} or {@code null} if none
141140
*/
142141
private ExecutionTask findGenericJdbcExecutionOperatorTaskInStage(ExecutionTask task, ExecutionStage stage) {
143-
assert task.getNumOuputChannels() == 1;
144-
final Channel outputChannel = task.getOutputChannel(0);
142+
assert task.getNumOuputChannels() == 1;
143+
final Channel outputChannel = task.getOutputChannel(0);
145144

146-
if (outputChannel.getConsumers().isEmpty()) {
147-
return null;
148-
}
145+
if (outputChannel.getConsumers().isEmpty()) {
146+
return null;
147+
}
149148

150-
final ExecutionTask consumer = WayangCollections.getSingle(outputChannel.getConsumers());
149+
final ExecutionTask consumer = WayangCollections.getSingle(outputChannel.getConsumers());
151150

152-
return consumer.getStage() == stage && consumer.getOperator() instanceof GenericJdbcExecutionOperator
153-
? consumer
154-
: null;
155-
}
151+
return consumer.getStage() == stage && consumer.getOperator() instanceof GenericJdbcExecutionOperator ?
152+
consumer :
153+
null;
154+
}
156155

157156
/**
158157
* Instantiates the outbound {@link SqlQueryChannel} of an {@link ExecutionTask}.

wayang-platforms/wayang-generic-jdbc/src/main/java/org/apache/wayang/genericjdbc/operators/GenericSqlToStreamOperator.java

Lines changed: 12 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -50,8 +50,7 @@
5050
/**
5151
* Converts {@link SqlQueryChannel}s into {@link StreamChannel}s.
5252
*/
53-
public class GenericSqlToStreamOperator extends UnaryToUnaryOperator<Record, Record>
54-
implements JavaExecutionOperator, JsonSerializable {
53+
public class GenericSqlToStreamOperator extends UnaryToUnaryOperator<Record, Record> implements JavaExecutionOperator, JsonSerializable {
5554

5655
private final GenericJdbcPlatform jdbcPlatform;
5756

@@ -82,7 +81,6 @@ public Tuple<Collection<ExecutionLineageNode>, Collection<ChannelInstance>> eval
8281
GenericJdbcPlatform producerPlatform =
8382
(GenericJdbcPlatform) input.getChannel().getProducer().getPlatform();
8483

85-
// Fix: safely resolve JDBC name
8684
String jdbcName = input.getJdbcName();
8785
if (jdbcName == null || jdbcName.trim().isEmpty()) {
8886
jdbcName = producerPlatform.getPlatformId();
@@ -111,22 +109,18 @@ public Tuple<Collection<ExecutionLineageNode>, Collection<ChannelInstance>> eval
111109
queryLineageNode.addPredecessor(input.getLineage());
112110

113111
ExecutionLineageNode outputLineageNode = new ExecutionLineageNode(operatorContext);
114-
outputLineageNode.add(
115-
LoadProfileEstimators.createFromSpecification(
116-
String.format("wayang.%s.sqltostream.load.output",
117-
this.jdbcPlatform.getPlatformId()),
112+
outputLineageNode.add(LoadProfileEstimators.createFromSpecification(
113+
String.format("wayang.%s.sqltostream.load.output", this.jdbcPlatform.getPlatformId()),
118114
executor.getConfiguration()
119-
)
120-
);
115+
));
121116
output.getLineage().addPredecessor(outputLineageNode);
122117

123118
return queryLineageNode.collectAndMark();
124119
}
125120

126121
@Override
127122
public List<ChannelDescriptor> getSupportedInputChannels(int index) {
128-
return Collections.singletonList(
129-
this.jdbcPlatform.getGenericSqlQueryChannelDescriptor());
123+
return Collections.singletonList(this.jdbcPlatform.getGenericSqlQueryChannelDescriptor());
130124
}
131125

132126
@Override
@@ -137,10 +131,8 @@ public List<ChannelDescriptor> getSupportedOutputChannels(int index) {
137131
@Override
138132
public Collection<String> getLoadProfileEstimatorConfigurationKeys() {
139133
return Arrays.asList(
140-
String.format("wayang.%s.sqltostream.load.query",
141-
this.jdbcPlatform.getPlatformId()),
142-
String.format("wayang.%s.sqltostream.load.output",
143-
this.jdbcPlatform.getPlatformId())
134+
String.format("wayang.%s.sqltostream.load.query", this.jdbcPlatform.getPlatformId()),
135+
String.format("wayang.%s.sqltostream.load.output", this.jdbcPlatform.getPlatformId())
144136
);
145137
}
146138

@@ -169,8 +161,7 @@ private void moveToNext() {
169161
this.next = null;
170162
this.close();
171163
} else {
172-
final int recordWidth =
173-
this.resultSet.getMetaData().getColumnCount();
164+
final int recordWidth = this.resultSet.getMetaData().getColumnCount();
174165

175166
Object[] values = new Object[recordWidth];
176167

@@ -183,8 +174,7 @@ private void moveToNext() {
183174
} catch (SQLException e) {
184175
this.next = null;
185176
this.close();
186-
throw new WayangException(
187-
"Exception while iterating the result set.", e);
177+
throw new WayangException("Exception while iterating the result set.", e);
188178
}
189179
}
190180

@@ -206,8 +196,7 @@ public void close() {
206196
try {
207197
this.resultSet.close();
208198
} catch (Throwable t) {
209-
LogManager.getLogger(this.getClass())
210-
.error("Could not close result set.", t);
199+
LogManager.getLogger(this.getClass()).error("Could not close result set.", t);
211200
} finally {
212201
this.resultSet = null;
213202
}
@@ -217,15 +206,13 @@ public void close() {
217206

218207
@Override
219208
public WayangJsonObj toJson() {
220-
return new WayangJsonObj()
221-
.put("platform", this.jdbcPlatform.getClass().getCanonicalName());
209+
return new WayangJsonObj().put("platform", this.jdbcPlatform.getClass().getCanonicalName());
222210
}
223211

224212
@SuppressWarnings("unused")
225213
public static GenericSqlToStreamOperator fromJson(WayangJsonObj wayangJsonObj) {
226214
final String platformClassName = wayangJsonObj.getString("platform");
227-
GenericJdbcPlatform jdbcPlatform =
228-
ReflectionUtils.evaluate(platformClassName + ".getInstance()");
215+
GenericJdbcPlatform jdbcPlatform = ReflectionUtils.evaluate(platformClassName + ".getInstance()");
229216
return new GenericSqlToStreamOperator(jdbcPlatform);
230217
}
231218
}

wayang-platforms/wayang-jdbc-template/src/main/java/org/apache/wayang/jdbc/execution/JdbcExecutor.java

Lines changed: 25 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -63,12 +63,8 @@ public JdbcExecutor(final JdbcPlatformTemplate platform, final Job job) {
6363
}
6464

6565
@Override
66-
public void execute(final ExecutionStage stage,
67-
final OptimizationContext optimizationContext,
68-
final ExecutionState executionState) {
69-
70-
final Tuple2<String, SqlQueryChannel.Instance> pair =
71-
JdbcExecutor.createSqlQuery(stage, optimizationContext, this);
66+
public void execute(final ExecutionStage stage, final OptimizationContext optimizationContext, final ExecutionState executionState) {
67+
final Tuple2<String, SqlQueryChannel.Instance> pair = JdbcExecutor.createSqlQuery(stage, optimizationContext, this);
7268

7369
final String query = pair.field0;
7470
final SqlQueryChannel.Instance queryChannel = pair.field1;
@@ -80,9 +76,7 @@ public void execute(final ExecutionStage stage,
8076
/**
8177
* Safe version (removes WayangCollections.getSingle crash)
8278
*/
83-
private static ExecutionTask findJdbcExecutionOperatorTaskInStage(
84-
final ExecutionTask task,
85-
final ExecutionStage stage) {
79+
private static ExecutionTask findJdbcExecutionOperatorTaskInStage(final ExecutionTask task, final ExecutionStage stage) {
8680

8781
assert task.getNumOuputChannels() == 1;
8882

@@ -94,34 +88,26 @@ private static ExecutionTask findJdbcExecutionOperatorTaskInStage(
9488

9589
final ExecutionTask consumer = outputChannel.getConsumers().iterator().next();
9690

97-
return consumer.getStage() == stage &&
98-
consumer.getOperator() instanceof JdbcExecutionOperator
99-
? consumer
100-
: null;
91+
return consumer.getStage() == stage && consumer.getOperator() instanceof JdbcExecutionOperator ? consumer
92+
: null;
10193
}
10294

103-
private static SqlQueryChannel.Instance instantiateOutboundChannel(
104-
final ExecutionTask task,
105-
final OptimizationContext optimizationContext,
106-
final JdbcExecutor jdbcExecutor) {
107-
95+
private static SqlQueryChannel.Instance instantiateOutboundChannel(final ExecutionTask task,
96+
final OptimizationContext optimizationContext, final JdbcExecutor jdbcExecutor) {
10897
assert task.getNumOuputChannels() == 1;
10998
assert task.getOutputChannel(0) instanceof SqlQueryChannel;
11099

111100
final SqlQueryChannel outputChannel = (SqlQueryChannel) task.getOutputChannel(0);
112101

113-
final OptimizationContext.OperatorContext operatorContext =
114-
optimizationContext.getOperatorContext(task.getOperator());
102+
final OptimizationContext.OperatorContext operatorContext = optimizationContext
103+
.getOperatorContext(task.getOperator());
115104

116105
return outputChannel.createInstance(jdbcExecutor, operatorContext, 0);
117106
}
118107

119-
private static SqlQueryChannel.Instance instantiateOutboundChannel(
120-
final ExecutionTask task,
108+
private static SqlQueryChannel.Instance instantiateOutboundChannel(final ExecutionTask task,
121109
final OptimizationContext optimizationContext,
122-
final SqlQueryChannel.Instance predecessorChannelInstance,
123-
final JdbcExecutor jdbcExecutor) {
124-
110+
final SqlQueryChannel.Instance predecessorChannelInstance,final JdbcExecutor jdbcExecutor) {
125111
final SqlQueryChannel.Instance newInstance =
126112
instantiateOutboundChannel(task, optimizationContext, jdbcExecutor);
127113

@@ -133,10 +119,8 @@ private static SqlQueryChannel.Instance instantiateOutboundChannel(
133119
/**
134120
* Main SQL builder
135121
*/
136-
protected static Tuple2<String, SqlQueryChannel.Instance> createSqlQuery(
137-
final ExecutionStage stage,
138-
final OptimizationContext context,
139-
final JdbcExecutor jdbcExecutor) {
122+
protected static Tuple2<String, SqlQueryChannel.Instance> createSqlQuery(final ExecutionStage stage,
123+
final OptimizationContext context, final JdbcExecutor jdbcExecutor) {
140124

141125
final Collection<?> startTasks = stage.getStartTasks();
142126
final Collection<?> termTasks = stage.getTerminalTasks();
@@ -152,14 +136,10 @@ protected static Tuple2<String, SqlQueryChannel.Instance> createSqlQuery(
152136
}
153137

154138
final JdbcTableSource tableOp = (JdbcTableSource) startTask.getOperator();
155-
156-
SqlQueryChannel.Instance tipChannelInstance =
157-
instantiateOutboundChannel(startTask, context, jdbcExecutor);
158-
139+
SqlQueryChannel.Instance tipChannelInstance = instantiateOutboundChannel(startTask, context, jdbcExecutor);
159140
final Collection<JdbcFilterOperator> filterTasks = new ArrayList<>(4);
160141
JdbcProjectionOperator projectionTask = null;
161142
final Collection<JdbcJoinOperator<?>> joinTasks = new ArrayList<>();
162-
163143
final Set<ExecutionTask> allTasks = stage.getAllTasks();
164144

165145
ExecutionTask nextTask =
@@ -184,45 +164,27 @@ protected static Tuple2<String, SqlQueryChannel.Instance> createSqlQuery(
184164
findJdbcExecutionOperatorTaskInStage(nextTask, stage);
185165
}
186166

187-
final StringBuilder query =
188-
createSqlString(jdbcExecutor, tableOp, filterTasks, projectionTask, joinTasks);
189-
167+
final StringBuilder query = createSqlString(jdbcExecutor, tableOp, filterTasks, projectionTask, joinTasks);
190168
return new Tuple2<>(query.toString(), tipChannelInstance);
191169
}
192170

193-
public static StringBuilder createSqlString(
194-
final JdbcExecutor jdbcExecutor,
195-
final JdbcTableSource tableOp,
196-
final Collection<JdbcFilterOperator> filterTasks,
197-
JdbcProjectionOperator projectionTask,
171+
public static StringBuilder createSqlString(final JdbcExecutor jdbcExecutor, final JdbcTableSource tableOp,
172+
final Collection<JdbcFilterOperator> filterTasks, JdbcProjectionOperator projectionTask,
198173
final Collection<JdbcJoinOperator<?>> joinTasks) {
199-
200-
final String tableName =
201-
tableOp.createSqlClause(jdbcExecutor.connection, jdbcExecutor.functionCompiler);
202-
203-
final Collection<String> conditions =
204-
filterTasks.stream()
174+
final String tableName = tableOp.createSqlClause(jdbcExecutor.connection, jdbcExecutor.functionCompiler);
175+
final Collection<String> conditions = filterTasks.stream()
205176
.map(op -> op.createSqlClause(jdbcExecutor.connection, jdbcExecutor.functionCompiler))
206177
.collect(Collectors.toList());
207-
208-
final String projection =
209-
projectionTask == null
210-
? "*"
211-
: projectionTask.createSqlClause(jdbcExecutor.connection, jdbcExecutor.functionCompiler);
212-
213-
final Collection<String> joins =
214-
joinTasks.stream()
178+
final String projection = projectionTask == null ? "*" : projectionTask.createSqlClause(jdbcExecutor.connection, jdbcExecutor.functionCompiler);
179+
final Collection<String> joins = joinTasks.stream()
215180
.map(op -> op.createSqlClause(jdbcExecutor.connection, jdbcExecutor.functionCompiler))
216181
.collect(Collectors.toList());
217182

218183
final StringBuilder sb = new StringBuilder(1000);
219-
220184
sb.append("SELECT ").append(projection).append(" FROM ").append(tableName);
221-
222185
for (final String join : joins) {
223186
sb.append(" ").append(join);
224187
}
225-
226188
if (!conditions.isEmpty()) {
227189
sb.append(" WHERE ");
228190
String separator = "";
@@ -231,9 +193,7 @@ public static StringBuilder createSqlString(
231193
separator = " AND ";
232194
}
233195
}
234-
235196
sb.append(';');
236-
237197
return sb;
238198
}
239199

@@ -251,28 +211,20 @@ public Platform getPlatform() {
251211
return this.platform;
252212
}
253213

254-
private void saveResult(final FileChannel.Instance outputFileChannelInstance,
255-
final ResultSet rs)
214+
private void saveResult(final FileChannel.Instance outputFileChannelInstance, final ResultSet rs)
256215
throws IOException, SQLException {
257216

258-
final FileSystem outFs =
259-
FileSystems.getFileSystem(outputFileChannelInstance.getSinglePath()).get();
260-
261-
try (final OutputStreamWriter writer =
262-
new OutputStreamWriter(outFs.create(outputFileChannelInstance.getSinglePath()))) {
263-
217+
final FileSystem outFs = FileSystems.getFileSystem(outputFileChannelInstance.getSinglePath()).get();
218+
try (final OutputStreamWriter writer =new OutputStreamWriter(
219+
outFs.create(outputFileChannelInstance.getSinglePath()))) {
264220
while (rs.next()) {
265-
266221
final ResultSetMetaData rsmd = rs.getMetaData();
267-
268222
for (int i = 1; i <= rsmd.getColumnCount(); i++) {
269223
writer.write(rs.getString(i));
270224
if (i < rsmd.getColumnCount()) writer.write('\t');
271225
}
272-
273226
if (!rs.isLast()) writer.write('\n');
274227
}
275-
276228
} catch (final UncheckedIOException e) {
277229
throw e.getCause();
278230
}

0 commit comments

Comments
 (0)