3030import org .apache .wayang .java .execution .JavaExecutor ;
3131
3232import java .io .BufferedReader ;
33- import java .io .File ;
34- import java .io .FileReader ;
3533import java .io .IOException ;
3634import java .io .InputStreamReader ;
3735import java .util .Arrays ;
@@ -55,6 +53,46 @@ public class JavaTextFileSource extends TextFileSource implements JavaExecutionO
5553
5654 private static final Logger logger = LoggerFactory .getLogger (JavaTextFileSource .class );
5755
56+ /**
57+ * @return Stream<String> from the provided URL
58+ */
59+ public static Stream <String > streamFromURL (final URL sourceUrl ) {
60+ try {
61+ final HttpURLConnection connection = (HttpURLConnection ) sourceUrl .openConnection ();
62+ connection .setRequestMethod ("GET" );
63+
64+ // Check if the response code indicates success (HTTP status code 200)
65+ if (connection .getResponseCode () == HttpURLConnection .HTTP_OK ) {
66+ logger .info (">>> Ready to stream the data from URL: " + sourceUrl .toString ());
67+ // Read the data line by line and process it in the StreamChannel
68+ final BufferedReader reader = new BufferedReader (new InputStreamReader (connection .getInputStream ()));
69+ return reader .lines ().onClose (() -> {
70+ try {
71+ connection .disconnect ();
72+ reader .close ();
73+ } catch (final IOException e ) {
74+ e .printStackTrace ();
75+ }
76+ });
77+ } else {
78+ throw new WayangException ("Connection with Http failed" );
79+ }
80+ } catch (final Exception e ) {
81+ throw new WayangException (e );
82+ }
83+ }
84+
85+ /**
86+ * @return Stream<String> from the file system
87+ */
88+ public static Stream <String > streamFromFs (final String path ) {
89+ try {
90+ return Files .lines (Path .of (URI .create (path )));
91+ } catch (final Exception e ) {
92+ throw new WayangException (e );
93+ }
94+ }
95+
5896 public JavaTextFileSource (final String inputUrl ) {
5997 super (inputUrl );
6098 }
@@ -90,8 +128,8 @@ public Tuple<Collection<ExecutionLineageNode>, Collection<ChannelInstance>> eval
90128 final String protocol = sourceUrl .getProtocol ();
91129
92130 final Stream <String > lines = (protocol .startsWith ("https" ) || protocol .startsWith ("http" ))
93- ? this .streamFromURL (sourceUrl )
94- : this .streamFromFs (urlStr );
131+ ? JavaTextFileSource .streamFromURL (sourceUrl )
132+ : JavaTextFileSource .streamFromFs (urlStr );
95133
96134 ((StreamChannel .Instance ) outputs [0 ]).accept (lines );
97135
@@ -108,45 +146,6 @@ public Tuple<Collection<ExecutionLineageNode>, Collection<ChannelInstance>> eval
108146 return prepareLineageNode .collectAndMark ();
109147 }
110148
111- /**
112- * @return Stream<String> from the provided URL
113- */
114- public static Stream <String > streamFromURL (final URL sourceUrl ) {
115- try {
116- final HttpURLConnection connection = (HttpURLConnection ) sourceUrl .openConnection ();
117- connection .setRequestMethod ("GET" );
118-
119- // Check if the response code indicates success (HTTP status code 200)
120- if (connection .getResponseCode () == HttpURLConnection .HTTP_OK ) {
121- logger .info (">>> Ready to stream the data from URL: " + sourceUrl .toString ());
122- // Read the data line by line and process it in the StreamChannel
123- final BufferedReader reader = new BufferedReader (new InputStreamReader (connection .getInputStream ()));
124- return reader .lines ().onClose (() -> {
125- try {
126- reader .close ();
127- } catch (final IOException e ) {
128- e .printStackTrace ();
129- }
130- });
131- } else {
132- throw new WayangException ("Connection with Http failed" );
133- }
134- } catch (final Exception e ) {
135- throw new WayangException (e );
136- }
137- }
138-
139- /**
140- * @return Stream<String> from the file system
141- */
142- public static Stream <String > streamFromFs (final String path ) {
143- try {
144- return Files .lines (Path .of (URI .create (path )));
145- } catch (final Exception e ) {
146- throw new WayangException (e );
147- }
148- }
149-
150149 @ Override
151150 public Collection <String > getLoadProfileEstimatorConfigurationKeys () {
152151 return Arrays .asList ("wayang.java.textfilesource.load.prepare" , "wayang.java.textfilesource.load.main" );
0 commit comments