Skip to content

Commit 36fe918

Browse files
authored
Merge pull request #573 from mspruc/main
Change main execution in sql-api
2 parents 990fe21 + 3653f76 commit 36fe918

1 file changed

Lines changed: 37 additions & 37 deletions

File tree

  • wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/context

wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/context/SqlContext.java

Lines changed: 37 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
import org.apache.calcite.tools.RuleSet;
2929
import org.apache.calcite.tools.RuleSets;
3030

31-
import org.apache.wayang.api.sql.calcite.utils.ModelParser;
3231
import org.apache.wayang.api.sql.calcite.convention.WayangConvention;
3332
import org.apache.wayang.api.sql.calcite.optimizer.Optimizer;
3433
import org.apache.wayang.api.sql.calcite.rules.WayangRules;
@@ -106,71 +105,71 @@ public SqlContext(final Configuration configuration, final List<Plugin> plugins)
106105
* args[6...] = platforms
107106
*/
108107
public static void main(final String[] args) throws Exception {
109-
if (args.length < 5)
108+
if (args.length < 4)
110109
throw new IllegalArgumentException(
111-
"Usage: ./bin/wayang-submit org.apache.wayang.api.sql.SqlContext <SQL statement path> <JDBC driver> <JDBC URL> <JDBC user> <JDBC password> <Result output path> [platforms...]");
110+
"Usage: ./bin/wayang-submit org.apache.wayang.api.sql.SqlContext <configuration path> <SQL statement path> <output path> [platforms...]");
112111

113112
//Specify the named arguments
114113
Options options = new Options();
115114
options.addOption("p", "platforms", true, "[platforms...]");
116-
options.addOption("s", "schema", true, "Schema path");
117115
options.addOption("q", "query", true, "SQL statement path");
118116
options.addOption("o", "outputPath", true, "Output path");
119-
options.addOption("d", "data", true, "Data path for file-based schema");
120117
options.addOption("c", "config", true, "File path for config file");
121-
options.addOption("jdbcDriver", true, "JDBC driver");
122-
options.addOption("jdbcUrl", true, "JDBC URL");
123-
options.addOption("jdbcPassword", true, "JDBC URL");
124118

125119
CommandLineParser parser = new DefaultParser();
126120
CommandLine cmd = parser.parse(options, args);
127121

128122
final String queryPath = cmd.getOptionValue("q");
129-
final String jdbcDriver = cmd.getOptionValue("jdbcDriver");
130-
final String jdbcUrl = cmd.getOptionValue("jdbcUrl");
131-
final String jdbcUser = cmd.getOptionValue("jdbcUser");
132-
final String jdbcPassword = cmd.getOptionValue("jdbcPassword");
133123
final String outputPath = cmd.getOptionValue("o");
134-
final String dataPath = cmd.getOptionValue("d");
135-
final String schemaPath = cmd.getOptionValue("s");
136-
137-
final String query = StringUtils.chop(
138-
Files.readString(Paths.get(queryPath))
139-
.stripTrailing());
140-
141-
final String driverPlatform = jdbcDriver.split("\\.")[0];
142124

125+
final String query = StringUtils.chop(Files.readString(Paths.get(queryPath)).stripTrailing());
143126
final Configuration configuration = new Configuration();
144127

145128
if (cmd.hasOption("c")) {
146129
configuration.load(cmd.getOptionValue("c"));
147130
}
148131

149-
final String calciteModel = Resources.toString(
150-
new URL(schemaPath),
151-
Charset.defaultCharset()
152-
);
132+
final SqlContext context = new SqlContext(configuration, List.of(Java.channelConversionPlugin(), Postgres.conversionPlugin()));
153133

154-
configuration.setProperty("wayang.calcite.model", calciteModel);
155-
configuration.setProperty(String.format("wayang.%s.jdbc.url", driverPlatform), jdbcUrl);
156-
configuration.setProperty(String.format("wayang.%s.jdbc.user", driverPlatform), jdbcUser);
157-
configuration.setProperty(String.format("wayang.%s.jdbc.password", driverPlatform), jdbcPassword);
134+
final List<Plugin> plugins = JavaConversions.seqAsJavaList(Parameters.loadPlugins(cmd.getOptionValue("p")));
135+
plugins.stream().forEach(context::register);
158136

159-
final JSONObject calciteModelJSON = (JSONObject) new JSONParser().parse(calciteModel);
137+
final Properties configProperties = Optimizer.ConfigProperties.getDefaults();
138+
final RelDataTypeFactory relDataTypeFactory = new JavaTypeFactoryImpl();
139+
140+
final Optimizer optimizer = Optimizer.create(context.calciteSchema, configProperties,
141+
relDataTypeFactory);
160142

161-
final Configuration parseModel = new ModelParser(configuration, calciteModelJSON).setProperties();
143+
final SqlNode sqlNode = optimizer.parseSql(query);
144+
final SqlNode validatedSqlNode = optimizer.validate(sqlNode);
145+
final RelNode relNode = optimizer.convert(validatedSqlNode);
162146

163-
final SqlContext context = new SqlContext(parseModel,
164-
List.of(Java.channelConversionPlugin(), Postgres.conversionPlugin()));
147+
PrintUtils.print("After parsing sql query", relNode);
165148

166-
List<Plugin> plugins = JavaConversions.seqAsJavaList(Parameters.loadPlugins(cmd.getOptionValue("p")));
167-
plugins.stream().forEach(context::register);
149+
final RuleSet rules = RuleSets.ofList(
150+
WayangRules.WAYANG_TABLESCAN_RULE,
151+
WayangRules.WAYANG_TABLESCAN_ENUMERABLE_RULE,
152+
WayangRules.WAYANG_PROJECT_RULE,
153+
WayangRules.WAYANG_FILTER_RULE,
154+
WayangRules.WAYANG_JOIN_RULE,
155+
WayangRules.WAYANG_AGGREGATE_RULE,
156+
WayangRules.WAYANG_SORT_RULE);
157+
158+
final RelNode wayangRel = optimizer.optimize(
159+
relNode,
160+
relNode.getTraitSet().plus(WayangConvention.INSTANCE),
161+
rules);
168162

169-
final Collection<Record> result = context.executeSql(query);
163+
PrintUtils.print("After translating logical intermediate plan", wayangRel);
164+
165+
final Collection<Record> collector = new ArrayList<>();
166+
final WayangPlan wayangPlan = optimizer.convertWithConfig(wayangRel, configuration, collector);
167+
collector.add(new Record(wayangRel.getRowType().getFieldNames().toArray()));
168+
context.execute(getJobName(), wayangPlan);
170169

171170
try (BufferedWriter writer = Files.newBufferedWriter(Paths.get(outputPath))) {
172-
for (final Record record : result) {
173-
writer.write(record.toString());
171+
for (final Record record : collector) {
172+
writer.write(Arrays.toString(record.getValues()));
174173
writer.newLine();
175174
}
176175
} catch (IOException e) {
@@ -210,6 +209,7 @@ public Collection<Record> executeSql(final String sql) throws SqlParseException
210209

211210
final Collection<Record> collector = new ArrayList<>();
212211
final WayangPlan wayangPlan = optimizer.convert(wayangRel, collector);
212+
213213
this.execute(getJobName(), wayangPlan);
214214

215215
return collector;

0 commit comments

Comments
 (0)