Skip to content

Commit b893e2c

Browse files
authored
Merge pull request #640 from 2pk03/feature/object-file-serialization
Ported object-file IO onto a safe codec to mitigate gadget chain attack
2 parents f05a09f + 6189abb commit b893e2c

15 files changed

Lines changed: 477 additions & 62 deletions

File tree

wayang-api/wayang-api-python/pom.xml

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,4 +75,25 @@
7575
<scope>compile</scope>
7676
</dependency>
7777
</dependencies>
78+
79+
<build>
80+
<plugins>
81+
<plugin>
82+
<groupId>org.apache.maven.plugins</groupId>
83+
<artifactId>maven-surefire-plugin</artifactId>
84+
<configuration>
85+
<skipTests>${python.worker.tests.skip}</skipTests>
86+
</configuration>
87+
</plugin>
88+
</plugins>
89+
</build>
90+
91+
<profiles>
92+
<profile>
93+
<id>python-worker-tests</id>
94+
<properties>
95+
<python.worker.tests.skip>false</python.worker.tests.skip>
96+
</properties>
97+
</profile>
98+
</profiles>
7899
</project>
Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.wayang.basic.operators;
20+
21+
import com.fasterxml.jackson.databind.DeserializationFeature;
22+
import com.fasterxml.jackson.databind.ObjectMapper;
23+
import com.fasterxml.jackson.databind.type.CollectionType;
24+
import org.apache.commons.lang3.Validate;
25+
26+
import java.io.ByteArrayInputStream;
27+
import java.io.ByteArrayOutputStream;
28+
import java.io.IOException;
29+
import java.io.ObjectInputStream;
30+
import java.io.ObjectOutputStream;
31+
import java.util.ArrayList;
32+
import java.util.Arrays;
33+
import java.util.Collection;
34+
import java.util.Collections;
35+
import java.util.List;
36+
37+
/**
38+
* Utility methods that convert between Java objects and the on-disk representation used by {@link ObjectFileSink}.
39+
*/
40+
public final class ObjectFileSerialization {
41+
42+
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper()
43+
.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);
44+
45+
static {
46+
OBJECT_MAPPER.findAndRegisterModules();
47+
}
48+
49+
private ObjectFileSerialization() {
50+
}
51+
52+
/**
53+
* Serialize a chunk of objects using the provided {@link ObjectFileSerializationMode}.
54+
*
55+
* @param chunk buffer that contains the objects to serialize
56+
* @param validLength number of valid entries inside {@code chunk}
57+
* @param mode the serialization mode
58+
* @return serialized bytes
59+
* @throws IOException if serialization fails
60+
*/
61+
public static byte[] serializeChunk(Object[] chunk, int validLength, ObjectFileSerializationMode mode) throws IOException {
62+
Validate.notNull(mode, "Serialization mode must be provided.");
63+
switch (mode) {
64+
case JSON:
65+
return serializeJson(chunk, validLength);
66+
case LEGACY_JAVA_SERIALIZATION:
67+
return serializeLegacy(chunk, validLength);
68+
default:
69+
throw new IllegalArgumentException("Unknown serialization mode: " + mode);
70+
}
71+
}
72+
73+
/**
74+
* Deserialize a chunk of objects.
75+
*
76+
* @param payload the serialized data
77+
* @param mode the serialization mode
78+
* @param elementType the expected element type
79+
* @return list of deserialized objects (never {@code null})
80+
* @throws IOException if deserialization fails
81+
* @throws ClassNotFoundException if a class cannot be resolved in legacy mode
82+
*/
83+
public static List<Object> deserializeChunk(byte[] payload,
84+
ObjectFileSerializationMode mode,
85+
Class<?> elementType) throws IOException, ClassNotFoundException {
86+
Validate.notNull(mode, "Serialization mode must be provided.");
87+
switch (mode) {
88+
case JSON:
89+
return deserializeJson(payload, elementType);
90+
case LEGACY_JAVA_SERIALIZATION:
91+
return deserializeLegacy(payload);
92+
default:
93+
throw new IllegalArgumentException("Unknown serialization mode: " + mode);
94+
}
95+
}
96+
97+
private static byte[] serializeLegacy(Object[] chunk, int validLength) throws IOException {
98+
Object[] payload = Arrays.copyOf(chunk, validLength);
99+
try (ByteArrayOutputStream bos = new ByteArrayOutputStream();
100+
ObjectOutputStream oos = new ObjectOutputStream(bos)) {
101+
oos.writeObject(payload);
102+
oos.flush();
103+
return bos.toByteArray();
104+
}
105+
}
106+
107+
private static List<Object> deserializeLegacy(byte[] payload) throws IOException, ClassNotFoundException {
108+
try (ByteArrayInputStream bis = new ByteArrayInputStream(payload);
109+
ObjectInputStream ois = new ObjectInputStream(bis)) {
110+
Object tmp = ois.readObject();
111+
if (tmp == null) {
112+
return Collections.emptyList();
113+
}
114+
if (tmp instanceof Collection) {
115+
return new ArrayList<>((Collection<?>) tmp);
116+
}
117+
if (tmp.getClass().isArray()) {
118+
return Arrays.asList((Object[]) tmp);
119+
}
120+
return new ArrayList<>(Collections.singletonList(tmp));
121+
}
122+
}
123+
124+
private static byte[] serializeJson(Object[] chunk, int validLength) throws IOException {
125+
Object[] payload = Arrays.copyOf(chunk, validLength);
126+
return OBJECT_MAPPER.writeValueAsBytes(payload);
127+
}
128+
129+
private static List<Object> deserializeJson(byte[] payload, Class<?> elementType) throws IOException {
130+
Validate.notNull(elementType, "Element type must be provided for JSON deserialization.");
131+
CollectionType type = OBJECT_MAPPER.getTypeFactory()
132+
.constructCollectionType(List.class, elementType);
133+
List<?> list = OBJECT_MAPPER.readValue(payload, type);
134+
if (list == null) {
135+
return Collections.emptyList();
136+
}
137+
return new ArrayList<>(list);
138+
}
139+
}
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.wayang.basic.operators;
20+
21+
/**
22+
* Supported serialization modes for {@link ObjectFileSource} and {@link ObjectFileSink}.
23+
*/
24+
public enum ObjectFileSerializationMode {
25+
26+
/**
27+
* Legacy Java serialization using {@link java.io.ObjectOutputStream}/{@link java.io.ObjectInputStream}.
28+
* This mode is deprecated and will be removed in a future release.
29+
*/
30+
@Deprecated
31+
LEGACY_JAVA_SERIALIZATION,
32+
33+
/**
34+
* JSON-based serialization that avoids Java serialization gadget chains.
35+
*/
36+
JSON
37+
}

wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/operators/ObjectFileSink.java

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,9 @@
1919
package org.apache.wayang.basic.operators;
2020

2121
import java.util.Objects;
22+
import java.util.concurrent.atomic.AtomicBoolean;
23+
import org.apache.logging.log4j.LogManager;
24+
import org.apache.logging.log4j.Logger;
2225
import org.apache.wayang.core.function.TransformationDescriptor;
2326
import org.apache.wayang.core.optimizer.costs.DefaultLoadEstimator;
2427
import org.apache.wayang.core.optimizer.costs.NestableLoadProfileEstimator;
@@ -36,6 +39,12 @@ public class ObjectFileSink<T> extends UnarySink<T> {
3639

3740
protected final Class<T> tClass;
3841

42+
private static final AtomicBoolean LEGACY_WARNING_EMITTED = new AtomicBoolean(false);
43+
44+
private final Logger logger = LogManager.getLogger(this.getClass());
45+
46+
private ObjectFileSerializationMode serializationMode = ObjectFileSerializationMode.JSON;
47+
3948
/**
4049
* Creates a new instance.
4150
*
@@ -69,5 +78,35 @@ public ObjectFileSink(ObjectFileSink<T> that) {
6978
super(that);
7079
this.textFileUrl = that.textFileUrl;
7180
this.tClass = that.tClass;
81+
this.serializationMode = that.getSerializationMode();
82+
}
83+
84+
public ObjectFileSerializationMode getSerializationMode() {
85+
if (this.serializationMode == ObjectFileSerializationMode.LEGACY_JAVA_SERIALIZATION
86+
&& LEGACY_WARNING_EMITTED.compareAndSet(false, true)) {
87+
this.logger.warn("ObjectFileSink is using deprecated legacy Java serialization. "
88+
+ "Please switch to the JSON serialization mode via ObjectFileSink#useJsonSerialization().");
89+
}
90+
return this.serializationMode;
91+
}
92+
93+
public ObjectFileSink<T> withSerializationMode(ObjectFileSerializationMode serializationMode) {
94+
this.serializationMode = Objects.requireNonNull(serializationMode, "serializationMode");
95+
return this;
96+
}
97+
98+
/**
99+
* Configure this sink to use the deprecated legacy Java serialization.
100+
*/
101+
@Deprecated
102+
public ObjectFileSink<T> useLegacySerialization() {
103+
return this.withSerializationMode(ObjectFileSerializationMode.LEGACY_JAVA_SERIALIZATION);
104+
}
105+
106+
/**
107+
* Configure this sink to use the JSON-based serialization.
108+
*/
109+
public ObjectFileSink<T> useJsonSerialization() {
110+
return this.withSerializationMode(ObjectFileSerializationMode.JSON);
72111
}
73112
}

wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/operators/ObjectFileSource.java

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,11 @@
1818

1919
package org.apache.wayang.basic.operators;
2020

21+
import java.util.Objects;
2122
import java.util.Optional;
2223
import java.util.OptionalDouble;
2324
import java.util.OptionalLong;
25+
import java.util.concurrent.atomic.AtomicBoolean;
2426
import org.apache.commons.lang3.Validate;
2527
import org.apache.logging.log4j.LogManager;
2628
import org.apache.logging.log4j.Logger;
@@ -43,6 +45,10 @@ public class ObjectFileSource<T> extends UnarySource<T> {
4345

4446
private final Class<T> tClass;
4547

48+
private static final AtomicBoolean LEGACY_WARNING_EMITTED = new AtomicBoolean(false);
49+
50+
private ObjectFileSerializationMode serializationMode = ObjectFileSerializationMode.JSON;
51+
4652
public ObjectFileSource(String inputUrl, DataSetType<T> type) {
4753
super(type);
4854
this.inputUrl = inputUrl;
@@ -64,6 +70,7 @@ public ObjectFileSource(ObjectFileSource that) {
6470
super(that);
6571
this.inputUrl = that.getInputUrl();
6672
this.tClass = that.getTypeClass();
73+
this.serializationMode = that.getSerializationMode();
6774
}
6875

6976
public String getInputUrl() {
@@ -74,6 +81,35 @@ public Class<T> getTypeClass(){
7481
return this.tClass;
7582
}
7683

84+
public ObjectFileSerializationMode getSerializationMode() {
85+
if (this.serializationMode == ObjectFileSerializationMode.LEGACY_JAVA_SERIALIZATION
86+
&& LEGACY_WARNING_EMITTED.compareAndSet(false, true)) {
87+
this.logger.warn("ObjectFileSource is using deprecated legacy Java serialization. "
88+
+ "Please switch to the JSON serialization mode via ObjectFileSource#useJsonSerialization().");
89+
}
90+
return this.serializationMode;
91+
}
92+
93+
public ObjectFileSource<T> withSerializationMode(ObjectFileSerializationMode serializationMode) {
94+
this.serializationMode = Objects.requireNonNull(serializationMode, "serializationMode");
95+
return this;
96+
}
97+
98+
/**
99+
* Configure this source to use the deprecated legacy Java serialization.
100+
*/
101+
@Deprecated
102+
public ObjectFileSource<T> useLegacySerialization() {
103+
return this.withSerializationMode(ObjectFileSerializationMode.LEGACY_JAVA_SERIALIZATION);
104+
}
105+
106+
/**
107+
* Configure this source to use the JSON-based serialization.
108+
*/
109+
public ObjectFileSource<T> useJsonSerialization() {
110+
return this.withSerializationMode(ObjectFileSerializationMode.JSON);
111+
}
112+
77113
@Override
78114
public Optional<org.apache.wayang.core.optimizer.cardinality.CardinalityEstimator> createCardinalityEstimator(
79115
final int outputIndex,

0 commit comments

Comments
 (0)