|
36 | 36 | import org.apache.wayang.basic.data.Record; |
37 | 37 | import org.apache.wayang.core.api.Configuration; |
38 | 38 | import org.apache.wayang.core.plugin.Plugin; |
| 39 | +import org.apache.wayang.api.utils.Parameters; |
39 | 40 | import org.apache.wayang.core.api.WayangContext; |
| 41 | +import org.apache.wayang.core.util.ReflectionUtils; |
40 | 42 | import org.apache.wayang.core.plan.wayangplan.WayangPlan; |
41 | 43 | import org.apache.wayang.java.Java; |
42 | 44 | import org.apache.wayang.postgres.Postgres; |
43 | 45 | import org.apache.wayang.spark.Spark; |
| 46 | +import org.apache.commons.cli.*; |
44 | 47 |
|
| 48 | +import com.google.common.io.Resources; |
| 49 | + |
| 50 | +import org.json.simple.JSONObject; |
| 51 | +import org.json.simple.parser.JSONParser; |
| 52 | + |
| 53 | +import scala.collection.JavaConversions; |
45 | 54 | import java.io.BufferedWriter; |
46 | 55 | import java.io.IOException; |
47 | 56 | import java.nio.file.Files; |
| 57 | +import java.nio.charset.Charset; |
48 | 58 | import java.nio.file.Paths; |
| 59 | +import java.net.URL; |
49 | 60 | import java.sql.SQLException; |
50 | 61 | import java.util.ArrayList; |
51 | 62 | import java.util.Arrays; |
@@ -87,43 +98,60 @@ public SqlContext(final Configuration configuration, final List<Plugin> plugins) |
87 | 98 | /** |
88 | 99 | * Entry point for executing SQL statements while providing arguments. |
89 | 100 | * You need to provide at least a JDBC source. |
90 | | - * Sample: |
91 | | - * @param args args[0] = config file path, args[1] = query file path, args[2] = output path |
| 101 | + * |
| 102 | + * @param args args[0] = SQL statement path, args[1] = JDBC driver, args[2] = |
| 103 | + * JDBC URL, args[3] = JDBC user, |
| 104 | + * args[4] = JDBC password, args[5] = outputPath, |
| 105 | + * args[6...] = platforms |
92 | 106 | */ |
93 | 107 | public static void main(final String[] args) throws Exception { |
94 | 108 | if (args.length < 3) |
95 | 109 | throw new IllegalArgumentException( |
96 | 110 | "Usage: ./bin/wayang-submit org.apache.wayang.api.sql.SqlContext <configuration path> <SQL statement path> <output path> [platforms...]"); |
97 | 111 |
|
98 | | - final String configurationPath = args[0]; |
99 | | - final String queryPath = args[1]; |
100 | | - final String outputPath = args[2]; |
| 112 | + //Specify the named arguments |
| 113 | + Options options = new Options(); |
| 114 | + options.addOption("p", "platforms", true, "[platforms...]"); |
| 115 | + options.addOption("s", "schema", true, "Schema path"); |
| 116 | + options.addOption("q", "query", true, "SQL statement path"); |
| 117 | + options.addOption("o", "outputPath", true, "Output path"); |
| 118 | + options.addOption("d", "data", true, "Data path for file-based schema"); |
| 119 | + options.addOption("c", "config", true, "File path for config file"); |
| 120 | + options.addOption("jdbcDriver", true, "JDBC driver"); |
| 121 | + options.addOption("jdbcUrl", true, "JDBC URL"); |
| 122 | + options.addOption("jdbcPassword", true, "JDBC URL"); |
| 123 | + |
| 124 | + CommandLineParser parser = new DefaultParser(); |
| 125 | + CommandLine cmd = parser.parse(options, args); |
| 126 | + |
| 127 | + final String queryPath = cmd.getOptionValue("q"); |
| 128 | + final String jdbcDriver = cmd.getOptionValue("jdbcDriver"); |
| 129 | + final String jdbcUrl = cmd.getOptionValue("jdbcUrl"); |
| 130 | + final String jdbcUser = cmd.getOptionValue("jdbcUser"); |
| 131 | + final String jdbcPassword = cmd.getOptionValue("jdbcPassword"); |
| 132 | + final String outputPath = cmd.getOptionValue("o"); |
| 133 | + final String dataPath = cmd.getOptionValue("d"); |
| 134 | + final String schemaPath = cmd.getOptionValue("s"); |
101 | 135 |
|
102 | 136 | final String query = StringUtils.chop(Files.readString(Paths.get(queryPath)).stripTrailing()); |
| 137 | + final Configuration configuration = new Configuration(); |
103 | 138 |
|
104 | | - final Configuration configuration = new Configuration(configurationPath); |
105 | | - |
106 | | - final SqlContext context = new SqlContext(configuration, |
107 | | - List.of(Java.channelConversionPlugin(), |
108 | | - Postgres.conversionPlugin())); |
109 | | - |
110 | | - for (int i = 3; i < args.length; i++) { |
111 | | - final String platform = args[i]; |
112 | | - switch (platform.toLowerCase()) { |
113 | | - case "spark": |
114 | | - context.withPlugin(Spark.basicPlugin()); |
115 | | - break; |
116 | | - case "java": |
117 | | - context.withPlugin(Java.basicPlugin()); |
118 | | - break; |
119 | | - case "postgres": |
120 | | - context.withPlugin(Postgres.plugin()); |
121 | | - break; |
122 | | - default: |
123 | | - throw new IllegalArgumentException("platform not supported " + platform); |
124 | | - } |
| 139 | + if (cmd.hasOption("c")) { |
| 140 | + configuration.load(cmd.getOptionValue("c")); |
125 | 141 | } |
126 | 142 |
|
| 143 | + final String calciteModel = Resources.toString( |
| 144 | + new URL(schemaPath), |
| 145 | + Charset.defaultCharset() |
| 146 | + ); |
| 147 | + |
| 148 | + final JSONObject calciteModelJSON = (JSONObject) new JSONParser().parse(calciteModel); |
| 149 | + |
| 150 | + final SqlContext context = new SqlContext(configuration, List.of(Java.channelConversionPlugin(), Postgres.conversionPlugin())); |
| 151 | + |
| 152 | + final List<Plugin> plugins = JavaConversions.seqAsJavaList(Parameters.loadPlugins(cmd.getOptionValue("p"))); |
| 153 | + plugins.stream().forEach(context::register); |
| 154 | + |
127 | 155 | final Properties configProperties = Optimizer.ConfigProperties.getDefaults(); |
128 | 156 | final RelDataTypeFactory relDataTypeFactory = new JavaTypeFactoryImpl(); |
129 | 157 |
|
|
0 commit comments