Skip to content

Commit 19cbcab

Browse files
fix: improve CSV header validation and error messages
1 parent 56656eb commit 19cbcab

1 file changed

Lines changed: 48 additions & 4 deletions

File tree

wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/sources/fs/JavaCSVTableSource.java

Lines changed: 48 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ public Tuple<Collection<ExecutionLineageNode>, Collection<ChannelInstance>> eval
116116
}
117117

118118
private Stream<Record> createStream(final String actualInputPath) {
119-
return streamLines(actualInputPath).map(this::parseLine);
119+
return this.streamLines(actualInputPath).map(this::parseLine);
120120
}
121121

122122
private Record parseLine(final String s) {
@@ -126,7 +126,12 @@ private Record parseLine(final String s) {
126126
final String[] tokens = CsvRowConverter.parseLine(s, this.separator);
127127
if (tokens.length != fieldTypes.size())
128128
throw new IllegalStateException(
129-
String.format("Error while parsing CSV file %s at line %s, using separator %s", sourcePath, s, separator));
129+
String.format(
130+
"Column count mismatch in CSV file '%s': expected %d columns but found %d "
131+
+ "(separator '%s'). Line: '%s'. "
132+
+ "Ensure the header uses 'name:type' format with commas "
133+
+ "and data rows use '%s' as delimiter.",
134+
sourcePath, fieldTypes.size(), tokens.length, separator, s, separator));
130135
// now tokens.length == fieldtypes.size
131136

132137
final Object[] objects = new Object[tokens.length];
@@ -163,19 +168,58 @@ public List<ChannelDescriptor> getSupportedOutputChannels(final int index) {
163168
* @param path of the file
164169
* @return the {@link Stream}
165170
*/
166-
private static Stream<String> streamLines(final String path) {
171+
private Stream<String> streamLines(final String path) {
167172
final FileSystem fileSystem = FileSystems.getFileSystem(path).orElseThrow(
168173
() -> new IllegalStateException(String.format("No file system found for %s", path)));
169174
try {
170175
final Iterator<String> lineIterator = createLineIterator(fileSystem, path);
171-
lineIterator.next(); // skip header row
176+
177+
if (!lineIterator.hasNext()) {
178+
throw new IllegalStateException(String.format(
179+
"CSV file '%s' is empty. Expected a header row (e.g., 'id:int,name:string').",
180+
path));
181+
}
182+
183+
final String headerLine = lineIterator.next(); // read & skip header
184+
validateHeaderLine(headerLine);
172185
return StreamSupport.stream(Spliterators.spliteratorUnknownSize(lineIterator, 0), false);
173186
} catch (final IOException e) {
174187
throw new WayangException(String.format("%s failed to read %s.", FileUtils.class, path), e);
175188
}
176189

177190
}
178191

192+
/**
193+
* Validates the CSV header for Calcite compatibility.
194+
* Checks that the header is present, uses comma separators (not the data
195+
* delimiter), and each column follows the 'name:type' format
196+
* (e.g., 'id:int,name:string,email:string'). Note that Calcite hardcodes
197+
* commas for header parsing, while data rows use Wayang's configurable
198+
* separator (default ';').
199+
*
200+
* @param path the filesystem path to the CSV file
201+
*/
202+
private void validateHeaderLine(final String headerLine) {
203+
final String[] headerColumns = headerLine.split(",");
204+
205+
if (headerColumns.length != fieldTypes.size()) {
206+
throw new IllegalStateException(String.format(
207+
"CSV file '%s': header has %d comma-separated columns but table schema expects %d. "
208+
+ "Ensure the header uses commas with typed columns "
209+
+ "(e.g., 'id:int,name:string,email:string,country:string'). Header: '%s'.",
210+
sourcePath, headerColumns.length, fieldTypes.size(), headerLine));
211+
}
212+
213+
for (final String column : headerColumns) {
214+
if (!column.trim().contains(":")) {
215+
throw new IllegalStateException(String.format(
216+
"CSV file '%s': header column '%s' missing required type. "
217+
+ "Expected 'name:type' format (e.g., 'id:int'). Full header: '%s'.",
218+
sourcePath, column.trim(), headerLine));
219+
}
220+
}
221+
}
222+
179223
/**
180224
* Creates an {@link Iterator} over the lines of a given {@code path} (that
181225
* resides in the given {@code fileSystem}).

0 commit comments

Comments
 (0)