Skip to content

Commit 85017ca

Browse files
authored
Merge pull request #593 from mspruc/main
fix memory leak in java platform
2 parents 6db324b + 6cc033d commit 85017ca

2 files changed

Lines changed: 106 additions & 100 deletions

File tree

wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/operators/JavaTextFileSource.java

Lines changed: 69 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -26,26 +26,22 @@
2626
import org.apache.wayang.core.platform.ChannelInstance;
2727
import org.apache.wayang.core.platform.lineage.ExecutionLineageNode;
2828
import org.apache.wayang.core.util.Tuple;
29-
import org.apache.wayang.core.util.fs.FileSystem;
30-
import org.apache.wayang.core.util.fs.FileSystems;
3129
import org.apache.wayang.java.channels.StreamChannel;
3230
import org.apache.wayang.java.execution.JavaExecutor;
3331

3432
import java.io.BufferedReader;
3533
import java.io.IOException;
36-
import java.io.InputStream;
3734
import java.io.InputStreamReader;
38-
import java.net.*;
3935
import java.util.Arrays;
4036
import java.util.Collection;
4137
import java.util.Collections;
4238
import java.util.List;
4339
import java.util.stream.Stream;
44-
import java.io.BufferedReader;
45-
import java.io.IOException;
46-
import java.io.InputStreamReader;
40+
import java.net.HttpURLConnection;
41+
import java.net.URI;
4742
import java.net.URL;
48-
import java.util.stream.Stream;
43+
import java.nio.file.Files;
44+
import java.nio.file.Path;
4945

5046
import org.slf4j.Logger;
5147
import org.slf4j.LoggerFactory;
@@ -57,7 +53,47 @@ public class JavaTextFileSource extends TextFileSource implements JavaExecutionO
5753

5854
private static final Logger logger = LoggerFactory.getLogger(JavaTextFileSource.class);
5955

60-
public JavaTextFileSource(String inputUrl) {
56+
/**
57+
* @return Stream<String> from the provided URL
58+
*/
59+
public static Stream<String> streamFromURL(final URL sourceUrl) {
60+
try {
61+
final HttpURLConnection connection = (HttpURLConnection) sourceUrl.openConnection();
62+
connection.setRequestMethod("GET");
63+
64+
// Check if the response code indicates success (HTTP status code 200)
65+
if (connection.getResponseCode() == HttpURLConnection.HTTP_OK) {
66+
logger.info(">>> Ready to stream the data from URL: " + sourceUrl.toString());
67+
// Read the data line by line and process it in the StreamChannel
68+
final BufferedReader reader = new BufferedReader(new InputStreamReader(connection.getInputStream()));
69+
return reader.lines().onClose(() -> {
70+
try {
71+
connection.disconnect();
72+
reader.close();
73+
} catch (final IOException e) {
74+
e.printStackTrace();
75+
}
76+
});
77+
} else {
78+
throw new WayangException("Connection with Http failed");
79+
}
80+
} catch (final Exception e) {
81+
throw new WayangException(e);
82+
}
83+
}
84+
85+
/**
86+
* @return Stream<String> from the file system
87+
*/
88+
public static Stream<String> streamFromFs(final String path) {
89+
try {
90+
return Files.lines(Path.of(URI.create(path)));
91+
} catch (final Exception e) {
92+
throw new WayangException(e);
93+
}
94+
}
95+
96+
public JavaTextFileSource(final String inputUrl) {
6197
super(inputUrl);
6298
}
6399

@@ -66,64 +102,44 @@ public JavaTextFileSource(String inputUrl) {
66102
*
67103
* @param that that should be copied
68104
*/
69-
public JavaTextFileSource(TextFileSource that) {
105+
public JavaTextFileSource(final TextFileSource that) {
70106
super(that);
71107
}
72108

73109
@Override
74110
public Tuple<Collection<ExecutionLineageNode>, Collection<ChannelInstance>> evaluate(
75-
ChannelInstance[] inputs,
76-
ChannelInstance[] outputs,
77-
JavaExecutor javaExecutor,
78-
OptimizationContext.OperatorContext operatorContext) {
111+
final ChannelInstance[] inputs,
112+
final ChannelInstance[] outputs,
113+
final JavaExecutor javaExecutor,
114+
final OptimizationContext.OperatorContext operatorContext) {
79115

80116
assert inputs.length == this.getNumInputs();
81117
assert outputs.length == this.getNumOutputs();
82118

83-
84-
String urlStr = this.getInputUrl().trim();
85-
URL sourceUrl = null;
119+
final String urlStr = this.getInputUrl().trim();
120+
final URL sourceUrl;
86121

87122
try {
88123
sourceUrl = new URL(urlStr);
89-
String protocol = sourceUrl.getProtocol();
90-
if ( protocol.startsWith("https") || protocol.startsWith("http") ) {
91-
HttpURLConnection connection2 = (HttpURLConnection) sourceUrl.openConnection();
92-
connection2.setRequestMethod("GET");
93-
94-
// Check if the response code indicates success (HTTP status code 200)
95-
if (connection2.getResponseCode() == HttpURLConnection.HTTP_OK) {
96-
logger.info(">>> Ready to stream the data from URL: " + urlStr);
97-
// Read the data line by line and process it in the StreamChannel
98-
Stream<String> lines2 = new BufferedReader(new InputStreamReader(connection2.getInputStream())).lines();
99-
((StreamChannel.Instance) outputs[0]).accept(lines2);
100-
}
101-
}
102-
else {
103-
FileSystem fs = FileSystems.getFileSystem(urlStr).orElseThrow(
104-
() -> new WayangException(String.format("Cannot access file system of %s.", urlStr))
105-
);
106-
107-
final InputStream inputStream = fs.open(urlStr);
108-
Stream<String> lines = new BufferedReader(new InputStreamReader(inputStream)).lines();
109-
((StreamChannel.Instance) outputs[0]).accept(lines);
110-
}
111-
} catch (MalformedURLException e) {
112-
throw new RuntimeException(e);
113-
} catch (ProtocolException e) {
114-
throw new RuntimeException(e);
115-
} catch (IOException e) {
116-
throw new WayangException(String.format("Reading %s failed.", urlStr), e);
124+
} catch (final Exception e) {
125+
throw new WayangException("Could not create URL from string: " + urlStr, e);
117126
}
118127

119-
ExecutionLineageNode prepareLineageNode = new ExecutionLineageNode(operatorContext);
128+
final String protocol = sourceUrl.getProtocol();
129+
130+
final Stream<String> lines = (protocol.startsWith("https") || protocol.startsWith("http"))
131+
? JavaTextFileSource.streamFromURL(sourceUrl)
132+
: JavaTextFileSource.streamFromFs(urlStr);
133+
134+
((StreamChannel.Instance) outputs[0]).accept(lines);
135+
136+
final ExecutionLineageNode prepareLineageNode = new ExecutionLineageNode(operatorContext);
120137
prepareLineageNode.add(LoadProfileEstimators.createFromSpecification(
121-
"wayang.java.textfilesource.load.prepare", javaExecutor.getConfiguration()
122-
));
123-
ExecutionLineageNode mainLineageNode = new ExecutionLineageNode(operatorContext);
138+
"wayang.java.textfilesource.load.prepare", javaExecutor.getConfiguration()));
139+
140+
final ExecutionLineageNode mainLineageNode = new ExecutionLineageNode(operatorContext);
124141
mainLineageNode.add(LoadProfileEstimators.createFromSpecification(
125-
"wayang.java.textfilesource.load.main", javaExecutor.getConfiguration()
126-
));
142+
"wayang.java.textfilesource.load.main", javaExecutor.getConfiguration()));
127143

128144
outputs[0].getLineage().addPredecessor(mainLineageNode);
129145

@@ -141,12 +157,12 @@ public JavaTextFileSource copy() {
141157
}
142158

143159
@Override
144-
public List<ChannelDescriptor> getSupportedInputChannels(int index) {
160+
public List<ChannelDescriptor> getSupportedInputChannels(final int index) {
145161
throw new UnsupportedOperationException(String.format("%s does not have input channels.", this));
146162
}
147163

148164
@Override
149-
public List<ChannelDescriptor> getSupportedOutputChannels(int index) {
165+
public List<ChannelDescriptor> getSupportedOutputChannels(final int index) {
150166
assert index <= this.getNumOutputs() || (index == 0 && this.getNumOutputs() == 0);
151167
return Collections.singletonList(StreamChannel.DESCRIPTOR);
152168
}

wayang-platforms/wayang-java/src/test/java/org/apache/wayang/java/operators/JavaTextFileSourceTest.java

Lines changed: 37 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -20,17 +20,16 @@
2020

2121
import org.apache.wayang.java.channels.JavaChannelInstance;
2222
import org.apache.wayang.java.execution.JavaExecutor;
23+
2324
import org.junit.jupiter.api.AfterEach;
25+
import org.junit.jupiter.api.Assumptions;
2426
import org.junit.jupiter.api.BeforeEach;
2527
import org.junit.jupiter.api.Disabled;
2628
import org.junit.jupiter.api.Test;
2729

28-
import java.io.IOException;
29-
import java.net.URISyntaxException;
3030
import java.net.URL;
31-
import java.util.*;
32-
import java.util.stream.Collectors;
33-
31+
import java.util.List;
32+
import java.util.Locale;
3433
import static org.junit.jupiter.api.Assertions.assertEquals;
3534

3635
/**
@@ -57,28 +56,23 @@ void teardownTest() {
5756
}
5857

5958
@Test
60-
void testReadLocalFile() throws IOException, URISyntaxException {
59+
void testReadLocalFile() {
6160
final String testFileName = "/banking-tx-small.csv";
6261

63-
JavaExecutor javaExecutor = null;
64-
try {
65-
// Prepare the source.
66-
final URL inputUrl = this.getClass().getResource(testFileName);
67-
System.out.println( "* " + inputUrl + " *");
68-
JavaTextFileSource source = new JavaTextFileSource(
69-
inputUrl.toString() );
62+
// Prepare the source.
63+
final URL inputUrl = this.getClass().getResource(testFileName);
64+
System.out.println("* " + inputUrl + " *");
65+
final JavaTextFileSource source = new JavaTextFileSource(
66+
inputUrl.toString());
7067

71-
// Execute.
72-
JavaChannelInstance[] inputs = new JavaChannelInstance[]{};
73-
JavaChannelInstance[] outputs = new JavaChannelInstance[]{createStreamChannelInstance()};
74-
evaluate(source, inputs, outputs);
68+
// Execute.
69+
final JavaChannelInstance[] inputs = new JavaChannelInstance[] {};
70+
final JavaChannelInstance[] outputs = new JavaChannelInstance[] { createStreamChannelInstance() };
71+
evaluate(source, inputs, outputs);
7572

76-
// Verify the outcome.
77-
final List<String> result = outputs[0].<String>provideStream().collect(Collectors.toList());
78-
assertEquals(63, result.size());
79-
} finally {
80-
if (javaExecutor != null) javaExecutor.dispose();
81-
}
73+
// Verify the outcome.
74+
final List<String> result = outputs[0].<String>provideStream().toList();
75+
assertEquals(63, result.size());
8276
}
8377

8478
/**
@@ -89,54 +83,50 @@ void testReadLocalFile() throws IOException, URISyntaxException {
8983
*/
9084
@Disabled
9185
@Test
92-
void testReadRemoteFileHTTP() throws IOException, URISyntaxException {
86+
void testReadRemoteFileHTTP() throws Exception {
9387
final String testFileURL = "http://localhost:8000/LICENSE";
9488

95-
JavaExecutor javaExecutor = null;
89+
final JavaExecutor javaExecutor = null;
9690
try {
9791
// Prepare the source.
9892
final URL inputUrl = new URL(testFileURL);
99-
System.out.println( "** " + inputUrl + " **");
100-
JavaTextFileSource source = new JavaTextFileSource(
101-
inputUrl.toString() );
93+
System.out.println("** " + inputUrl + " **");
94+
final JavaTextFileSource source = new JavaTextFileSource(
95+
inputUrl.toString());
10296

10397
// Execute.
104-
JavaChannelInstance[] inputs = new JavaChannelInstance[]{};
105-
JavaChannelInstance[] outputs = new JavaChannelInstance[]{createStreamChannelInstance()};
98+
final JavaChannelInstance[] inputs = new JavaChannelInstance[] {};
99+
final JavaChannelInstance[] outputs = new JavaChannelInstance[] { createStreamChannelInstance() };
106100
evaluate(source, inputs, outputs);
107101

108102
// Verify the outcome.
109-
final List<String> result = outputs[0].<String>provideStream().collect(Collectors.toList());
103+
final List<String> result = outputs[0].<String>provideStream().toList();
110104
assertEquals(225, result.size());
111105
} finally {
112-
if (javaExecutor != null) javaExecutor.dispose();
106+
if (javaExecutor != null)
107+
javaExecutor.dispose();
113108
}
114109
}
115110

116-
@Disabled
117111
@Test
118-
void testReadRemoteFileHTTPS() throws IOException, URISyntaxException {
119-
final String testFileURL = "https://kamir.solidcommunity.net/public/ecolytiq-sustainability-profile/profile2.ttl";
120-
121-
JavaExecutor javaExecutor = null;
112+
void testReadRemoteFileHTTPS() throws Exception {
122113
try {
114+
final String testFileURL = "https://downloads.apache.org/incubator/wayang/1.0.0/RELEASE_NOTES";
115+
123116
// Prepare the source.
124117
final URL inputUrl = new URL(testFileURL);
125-
System.out.println( "*** " + inputUrl + " ***");
126-
JavaTextFileSource source = new JavaTextFileSource(
127-
inputUrl.toString() );
118+
final JavaTextFileSource source = new JavaTextFileSource(inputUrl.toString());
128119

129120
// Execute.
130-
JavaChannelInstance[] inputs = new JavaChannelInstance[]{};
131-
JavaChannelInstance[] outputs = new JavaChannelInstance[]{createStreamChannelInstance()};
121+
final JavaChannelInstance[] inputs = new JavaChannelInstance[] {};
122+
final JavaChannelInstance[] outputs = new JavaChannelInstance[] { createStreamChannelInstance() };
132123
evaluate(source, inputs, outputs);
133124

134125
// Verify the outcome.
135-
final List<String> result = outputs[0].<String>provideStream().collect(Collectors.toList());
136-
assertEquals(23, result.size());
137-
} finally {
138-
if (javaExecutor != null) javaExecutor.dispose();
126+
final List<String> result = outputs[0].<String>provideStream().toList();
127+
assertEquals(64, result.size());
128+
} catch (final Exception e) {
129+
Assumptions.assumeTrue(false, "Skipping test due to possible network error: " + e.getMessage());
139130
}
140-
141131
}
142132
}

0 commit comments

Comments
 (0)