Skip to content

Commit fe2e8cf

Browse files
authored
Merge branch 'apache:main' into main
2 parents 671252e + 5aa399b commit fe2e8cf

1 file changed

Lines changed: 51 additions & 48 deletions

File tree

  • wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/context

wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/context/SqlContext.java

Lines changed: 51 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
import org.apache.calcite.tools.RuleSet;
2929
import org.apache.calcite.tools.RuleSets;
3030

31-
import org.apache.wayang.api.sql.calcite.utils.ModelParser;
3231
import org.apache.wayang.api.sql.calcite.convention.WayangConvention;
3332
import org.apache.wayang.api.sql.calcite.optimizer.Optimizer;
3433
import org.apache.wayang.api.sql.calcite.rules.WayangRules;
@@ -100,80 +99,83 @@ public SqlContext(final Configuration configuration, final List<Plugin> plugins)
10099
* Entry point for executing SQL statements while providing arguments.
101100
* You need to provide at least a JDBC source.
102101
*
103-
* @param args args[0] = SQL statement path, args[1] = JDBC driver, args[2] =
104-
* JDBC URL, args[3] = JDBC user,
105-
*              args[4] = JDBC password, args[5] = outputPath,
106-
* args[6...] = platforms
102+
* @param args
103+
* <ul>
104+
* <li><b>-p, --platforms</b>: Comma-separated list of execution
105+
* platforms (e.g., spark, java).</li>
106+
* <li><b>-q, --query</b>: Path to the SQL query file to be
107+
* executed.</li>
108+
* <li><b>-o, --outputPath</b>: Path where the output results will
109+
* be stored.</li>
110+
* <li><b>-c, --config</b>: Path to the configuration file.</li>
111+
* </ul>
107112
*/
108113
public static void main(final String[] args) throws Exception {
109-
if (args.length < 5)
114+
if (args.length < 4)
110115
throw new IllegalArgumentException(
111-
"Usage: ./bin/wayang-submit org.apache.wayang.api.sql.SqlContext <SQL statement path> <JDBC driver> <JDBC URL> <JDBC user> <JDBC password> <Result output path> [platforms...]");
116+
"Usage: ./bin/wayang-submit org.apache.wayang.api.sql.SqlContext <configuration path> <SQL statement path> <output path> [platforms...]");
112117

113-
//Specify the named arguments
114-
Options options = new Options();
118+
// Specify the named arguments
119+
final Options options = new Options();
115120
options.addOption("p", "platforms", true, "[platforms...]");
116-
options.addOption("s", "schema", true, "Schema path");
117121
options.addOption("q", "query", true, "SQL statement path");
118122
options.addOption("o", "outputPath", true, "Output path");
119-
options.addOption("d", "data", true, "Data path for file-based schema");
120123
options.addOption("c", "config", true, "File path for config file");
121-
options.addOption("jdbcDriver", true, "JDBC driver");
122-
options.addOption("jdbcUrl", true, "JDBC URL");
123-
options.addOption("jdbcPassword", true, "JDBC URL");
124124

125-
CommandLineParser parser = new DefaultParser();
126-
CommandLine cmd = parser.parse(options, args);
125+
final CommandLineParser parser = new DefaultParser();
126+
final CommandLine cmd = parser.parse(options, args);
127127

128128
final String queryPath = cmd.getOptionValue("q");
129-
final String jdbcDriver = cmd.getOptionValue("jdbcDriver");
130-
final String jdbcUrl = cmd.getOptionValue("jdbcUrl");
131-
final String jdbcUser = cmd.getOptionValue("jdbcUser");
132-
final String jdbcPassword = cmd.getOptionValue("jdbcPassword");
133129
final String outputPath = cmd.getOptionValue("o");
134-
final String dataPath = cmd.getOptionValue("d");
135-
final String schemaPath = cmd.getOptionValue("s");
136130

137-
final String query = StringUtils.chop(
138-
Files.readString(Paths.get(queryPath))
139-
.stripTrailing());
131+
final String query = StringUtils.chop(Files.readString(Paths.get(queryPath)).stripTrailing());
132+
final Configuration configuration = new Configuration(cmd.getOptionValue("c"));
140133

141-
final String driverPlatform = jdbcDriver.split("\\.")[0];
134+
final SqlContext context = new SqlContext(configuration,
135+
List.of(Java.channelConversionPlugin(), Postgres.conversionPlugin()));
142136

143-
final Configuration configuration = new Configuration();
137+
final List<Plugin> plugins = JavaConversions.seqAsJavaList(Parameters.loadPlugins(cmd.getOptionValue("p")));
138+
plugins.stream().forEach(context::register);
144139

145-
if (cmd.hasOption("c")) {
146-
configuration.load(cmd.getOptionValue("c"));
147-
}
140+
final Properties configProperties = Optimizer.ConfigProperties.getDefaults();
141+
final RelDataTypeFactory relDataTypeFactory = new JavaTypeFactoryImpl();
148142

149-
final String calciteModel = Resources.toString(
150-
new URL(schemaPath),
151-
Charset.defaultCharset()
152-
);
143+
final Optimizer optimizer = Optimizer.create(context.calciteSchema, configProperties,
144+
relDataTypeFactory);
153145

154-
configuration.setProperty("wayang.calcite.model", calciteModel);
155-
configuration.setProperty(String.format("wayang.%s.jdbc.url", driverPlatform), jdbcUrl);
156-
configuration.setProperty(String.format("wayang.%s.jdbc.user", driverPlatform), jdbcUser);
157-
configuration.setProperty(String.format("wayang.%s.jdbc.password", driverPlatform), jdbcPassword);
146+
final SqlNode sqlNode = optimizer.parseSql(query);
147+
final SqlNode validatedSqlNode = optimizer.validate(sqlNode);
148+
final RelNode relNode = optimizer.convert(validatedSqlNode);
158149

159-
final JSONObject calciteModelJSON = (JSONObject) new JSONParser().parse(calciteModel);
150+
PrintUtils.print("After parsing sql query", relNode);
160151

161-
final Configuration parseModel = new ModelParser(configuration, calciteModelJSON).setProperties();
152+
final RuleSet rules = RuleSets.ofList(
153+
WayangRules.WAYANG_TABLESCAN_RULE,
154+
WayangRules.WAYANG_TABLESCAN_ENUMERABLE_RULE,
155+
WayangRules.WAYANG_PROJECT_RULE,
156+
WayangRules.WAYANG_FILTER_RULE,
157+
WayangRules.WAYANG_JOIN_RULE,
158+
WayangRules.WAYANG_AGGREGATE_RULE,
159+
WayangRules.WAYANG_SORT_RULE);
162160

163-
final SqlContext context = new SqlContext(parseModel,
164-
List.of(Java.channelConversionPlugin(), Postgres.conversionPlugin()));
161+
final RelNode wayangRel = optimizer.optimize(
162+
relNode,
163+
relNode.getTraitSet().plus(WayangConvention.INSTANCE),
164+
rules);
165165

166-
List<Plugin> plugins = JavaConversions.seqAsJavaList(Parameters.loadPlugins(cmd.getOptionValue("p")));
167-
plugins.stream().forEach(context::register);
166+
PrintUtils.print("After translating logical intermediate plan", wayangRel);
168167

169-
final Collection<Record> result = context.executeSql(query);
168+
final Collection<Record> collector = new ArrayList<>();
169+
final WayangPlan wayangPlan = optimizer.convertWithConfig(wayangRel, configuration, collector);
170+
collector.add(new Record(wayangRel.getRowType().getFieldNames().toArray()));
171+
context.execute(getJobName(), wayangPlan);
170172

171173
try (BufferedWriter writer = Files.newBufferedWriter(Paths.get(outputPath))) {
172-
for (final Record record : result) {
173-
writer.write(record.toString());
174+
for (final Record record : collector) {
175+
writer.write(Arrays.toString(record.getValues()));
174176
writer.newLine();
175177
}
176-
} catch (IOException e) {
178+
} catch (final IOException e) {
177179
e.printStackTrace();
178180
}
179181
}
@@ -210,6 +212,7 @@ public Collection<Record> executeSql(final String sql) throws SqlParseException
210212

211213
final Collection<Record> collector = new ArrayList<>();
212214
final WayangPlan wayangPlan = optimizer.convert(wayangRel, collector);
215+
213216
this.execute(getJobName(), wayangPlan);
214217

215218
return collector;

0 commit comments

Comments
 (0)