@@ -267,8 +267,7 @@ private static void executeSinkStage(final ExecutionStage stage, final Optimizat
267267 final Collection <?> startTasks = stage .getStartTasks ();
268268 final Collection <?> termTasks = stage .getTerminalTasks ();
269269
270- assert startTasks .size () == 1 : "Invalid JDBC stage: multiple sources are not currently supported" ;
271- final ExecutionTask startTask = (ExecutionTask ) startTasks .toArray ()[0 ];
270+ final ExecutionTask startTask = JdbcExecutor .selectStartTask (startTasks , stage );
272271 assert termTasks .size () == 1 : "Invalid JDBC stage: multiple terminal tasks are not currently supported." ;
273272 final ExecutionTask termTask = (ExecutionTask ) termTasks .toArray ()[0 ];
274273 assert startTask .getOperator () instanceof TableSource
@@ -281,6 +280,9 @@ private static void executeSinkStage(final ExecutionStage stage, final Optimizat
281280 final JdbcTableSinkOperator sinkOp = (JdbcTableSinkOperator ) termTask .getOperator ();
282281 final Collection <JdbcExecutionOperator > filterTasks = new ArrayList <>(4 );
283282 JdbcProjectionOperator projectionTask = null ;
283+ JdbcGlobalReduceOperator globalReduceTask = null ;
284+ JdbcReduceByOperator reduceByTask = null ;
285+ JdbcSortOperator sortTask = null ;
284286 final Collection <JdbcExecutionOperator > joinTasks = new ArrayList <>();
285287
286288 // Walk through intermediate operators, stopping at the sink
@@ -293,6 +295,18 @@ private static void executeSinkStage(final ExecutionStage stage, final Optimizat
293295 final JdbcProjectionOperator projectionOperator = (JdbcProjectionOperator ) nextTask .getOperator ();
294296 assert projectionTask == null ;
295297 projectionTask = projectionOperator ;
298+ } else if (nextTask .getOperator () instanceof JdbcGlobalReduceOperator ) {
299+ final JdbcGlobalReduceOperator globalReduceOperator = (JdbcGlobalReduceOperator ) nextTask .getOperator ();
300+ assert globalReduceTask == null ;
301+ globalReduceTask = globalReduceOperator ;
302+ } else if (nextTask .getOperator () instanceof JdbcReduceByOperator ) {
303+ final JdbcReduceByOperator reduceByOperator = (JdbcReduceByOperator ) nextTask .getOperator ();
304+ assert reduceByTask == null ;
305+ reduceByTask = reduceByOperator ;
306+ } else if (nextTask .getOperator () instanceof JdbcSortOperator ) {
307+ final JdbcSortOperator sortOperator = (JdbcSortOperator ) nextTask .getOperator ();
308+ assert sortTask == null ;
309+ sortTask = sortOperator ;
296310 } else if (nextTask .getOperator () instanceof JdbcJoinOperator ) {
297311 final JdbcJoinOperator joinOperator = (JdbcJoinOperator ) nextTask .getOperator ();
298312 joinTasks .add (joinOperator );
@@ -303,8 +317,8 @@ private static void executeSinkStage(final ExecutionStage stage, final Optimizat
303317 }
304318
305319 // Compose the SELECT query
306- final StringBuilder selectQuery = createSqlString (jdbcExecutor , tableOp , filterTasks , projectionTask , null , null , null ,
307- joinTasks );
320+ final StringBuilder selectQuery = createSqlString (jdbcExecutor , tableOp , filterTasks , projectionTask ,
321+ globalReduceTask , reduceByTask , sortTask , joinTasks );
308322
309323 // Remove trailing semicolon from SELECT
310324 String selectSql = selectQuery .toString ();
0 commit comments