Skip to content

Commit 54f2de7

Browse files
authored
Merge pull request #709 from zkaoudi/main
Unit test for Java Parquet source operator
2 parents c9f3702 + 55d9e25 commit 54f2de7

3 files changed

Lines changed: 120 additions & 13 deletions

File tree

wayang-platforms/wayang-java/pom.xml

Lines changed: 5 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -50,16 +50,13 @@
5050
<version>1.1.2-SNAPSHOT</version>
5151
</dependency>
5252
<dependency>
53-
<groupId>org.apache.hadoop</groupId>
54-
<artifactId>hadoop-common</artifactId>
55-
</dependency>
56-
<dependency>
57-
<groupId>org.apache.hadoop</groupId>
58-
<artifactId>hadoop-hdfs</artifactId>
53+
<groupId>org.apache.parquet</groupId>
54+
<artifactId>parquet-avro</artifactId>
55+
<version>1.15.2</version>
5956
</dependency>
6057
<dependency>
6158
<groupId>org.apache.parquet</groupId>
62-
<artifactId>parquet-avro</artifactId>
59+
<artifactId>parquet-hadoop</artifactId>
6360
<version>1.15.2</version>
6461
</dependency>
6562
<dependency>
@@ -72,11 +69,6 @@
7269
<artifactId>slf4j-api</artifactId>
7370
<version>2.0.6</version>
7471
</dependency>
75-
<dependency>
76-
<groupId>org.apache.logging.log4j</groupId>
77-
<artifactId>log4j-slf4j-impl</artifactId>
78-
<version>2.20.0</version>
79-
</dependency>
8072
<!-- Mockito for mocking -->
8173
<dependency>
8274
<groupId>org.mockito</groupId>
@@ -86,4 +78,4 @@
8678
</dependency>
8779
</dependencies>
8880

89-
</project>
81+
</project>
Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.wayang.java.operators;
20+
21+
import org.apache.wayang.basic.data.Record;
22+
import org.apache.wayang.core.platform.ChannelInstance;
23+
import org.apache.wayang.java.channels.StreamChannel;
24+
import org.junit.jupiter.api.Test;
25+
26+
import java.nio.file.Path;
27+
import java.nio.file.Paths;
28+
import java.util.List;
29+
30+
import static org.junit.jupiter.api.Assertions.assertEquals;
31+
32+
/**
33+
* Test suite for {@link JavaParquetSource}.
34+
*/
35+
class JavaParquetSourceTest extends JavaExecutionOperatorTestBase {
36+
37+
@Test
38+
void testReadAllColumns() {
39+
Path parquetFile = Paths.get("src/test/resources/data.parquet").toAbsolutePath();
40+
JavaParquetSource source = new JavaParquetSource(parquetFile.toUri().toString(), null);
41+
42+
ChannelInstance[] inputs = new ChannelInstance[0];
43+
ChannelInstance[] outputs = new ChannelInstance[]{createStreamChannelInstance()};
44+
evaluate(source, inputs, outputs);
45+
46+
List<Record> result = ((StreamChannel.Instance) outputs[0]).<Record>provideStream().toList();
47+
48+
assertEquals(3, result.size());
49+
assertEquals("alice", result.get(0).getString(0));
50+
assertEquals(30, result.get(0).getInt(1));
51+
assertEquals("bob", result.get(1).getString(0));
52+
assertEquals(25, result.get(1).getInt(1));
53+
assertEquals("carol", result.get(2).getString(0));
54+
assertEquals(41, result.get(2).getInt(1));
55+
}
56+
57+
@Test
58+
void testReadWithProjection() {
59+
Path parquetFile = Paths.get("src/test/resources/data.parquet").toAbsolutePath();
60+
JavaParquetSource source = new JavaParquetSource(parquetFile.toUri().toString(), new String[]{"name"});
61+
62+
ChannelInstance[] inputs = new ChannelInstance[0];
63+
ChannelInstance[] outputs = new ChannelInstance[]{createStreamChannelInstance()};
64+
evaluate(source, inputs, outputs);
65+
66+
List<Record> result = ((StreamChannel.Instance) outputs[0]).<Record>provideStream().toList();
67+
68+
assertEquals(3, result.size());
69+
// Each projected record contains only the "name" column
70+
assertEquals(1, result.get(0).size());
71+
assertEquals("alice", result.get(0).getString(0));
72+
assertEquals("bob", result.get(1).getString(0));
73+
assertEquals("carol", result.get(2).getString(0));
74+
}
75+
76+
/* The following lines were used to create the sample Parquet file.
77+
We keep it here for reference, but we don't want to run it in the test suite as it adds a dependency on Hadoop and Parquet libraries and complicates the test setup.
78+
*/
79+
80+
/*
81+
private static final Schema SCHEMA = SchemaBuilder.record("TestRecord")
82+
.namespace("org.apache.wayang.test")
83+
.fields()
84+
.requiredString("name")
85+
.requiredInt("age")
86+
.endRecord();
87+
88+
private static Path writeSampleParquet(Path dir) throws IOException {
89+
Path file = dir.resolve("data.parquet");
90+
org.apache.hadoop.fs.Path hadoopPath = new org.apache.hadoop.fs.Path(file.toUri());
91+
List<GenericRecord> records = Arrays.asList(
92+
makeRecord("alice", 30),
93+
makeRecord("bob", 25),
94+
makeRecord("carol", 41)
95+
);
96+
try (ParquetWriter<GenericRecord> writer = AvroParquetWriter.<GenericRecord>builder(hadoopPath)
97+
.withSchema(SCHEMA)
98+
.withConf(new Configuration())
99+
.withCompressionCodec(CompressionCodecName.UNCOMPRESSED)
100+
.build()) {
101+
for (GenericRecord record : records) {
102+
writer.write(record);
103+
}
104+
}
105+
return file;
106+
}
107+
108+
private static GenericRecord makeRecord(String name, int age) {
109+
GenericRecord record = new GenericData.Record(SCHEMA);
110+
record.put("name", name);
111+
record.put("age", age);
112+
return record;
113+
}
114+
*/
115+
}
706 Bytes
Binary file not shown.

0 commit comments

Comments
 (0)