Skip to content

Commit 7b5d3b1

Browse files
committed
Add Dataset flag to read/write Parquet APIs and update docs to reflect the unified interface.
1 parent 599508d commit 7b5d3b1

5 files changed

Lines changed: 22 additions & 25 deletions

File tree

README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -80,8 +80,8 @@ Wayang’s Spark platform can now execute end-to-end pipelines on Spark `Dataset
8080
To build a Dataset-backed pipeline:
8181

8282
1. **Use the Dataset-aware plan builder APIs.**
83-
- `PlanBuilder.readParquetAsDataset(...)` (or the Java equivalent) reads Parquet files directly into a Dataset channel.
84-
- `DataQuanta.writeParquetAsDataset(...)` writes a Dataset channel without converting it back to an RDD.
83+
- `PlanBuilder.readParquet(..., preferDataset = true)` (or `JavaPlanBuilder.readParquet(..., ..., true)`) reads Parquet files directly into a Dataset channel.
84+
- `DataQuanta.writeParquet(..., preferDataset = true)` writes a Dataset channel without converting it back to an RDD.
8585
2. **Keep operators dataset-compatible.** Most operators continue to work unchanged; if an operator explicitly prefers RDDs, Wayang will insert the necessary conversions automatically (at an additional cost). Custom operators can expose `DatasetChannel` descriptors to stay in the dataframe world.
8686
3. **Let the optimizer do the rest.** The optimizer now assigns a higher cost to Dataset↔RDD conversions, so once you opt into Dataset sources/sinks the plan will stay in Dataset form by default.
8787

guides/spark-datasets.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,8 @@ Wayang’s Spark backend can now run entire pipelines on Spark `Dataset[Row]` (a
3333
## Enable Dataset sources and sinks
3434

3535
1. **Plan builder APIs:**
36-
- `PlanBuilder.readParquetAsDataset(...)` (Scala/Java) loads Parquet files into a `DatasetChannel` instead of an `RddChannel`.
37-
- `DataQuanta.writeParquetAsDataset(...)` writes a dataset back to Parquet without converting to RDD first.
36+
- `PlanBuilder.readParquet(..., preferDataset = true)` (Scala) or `JavaPlanBuilder.readParquet(..., ..., true)` loads Parquet files into a `DatasetChannel` instead of an `RddChannel`.
37+
- `DataQuanta.writeParquet(..., preferDataset = true)` writes a dataset back to Parquet without converting to RDD first.
3838
2. **Prefer dataset-friendly operators:** Most unary/binary operators accept either channel type, but custom operators can advertise dataset descriptors explicitly. See `DatasetChannel` in `wayang-platforms/wayang-spark` for details.
3939
3. **Let the optimizer keep it:** The optimizer now assigns costs to Dataset↔RDD conversions, so once your plan starts with a dataset channel it will stay in dataset form unless an operator demands an RDD.
4040

wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/DataQuanta.scala

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1027,11 +1027,10 @@ class DataQuanta[Out: ClassTag](val operator: ElementaryOperator, outputIndex: I
10271027
writeTextFileJava(url, toSerializableFunction(formatterUdf), udfLoad)
10281028
}
10291029

1030-
def writeParquet(url: String, overwrite: Boolean = false)(implicit ev: Out =:= Record): Unit =
1031-
writeParquetJava(url, overwrite, preferDataset = false)
1032-
1033-
def writeParquetAsDataset(url: String, overwrite: Boolean = true)(implicit ev: Out =:= Record): Unit =
1034-
writeParquetJava(url, overwrite, preferDataset = true)
1030+
def writeParquet(url: String,
1031+
overwrite: Boolean = false,
1032+
preferDataset: Boolean = false)(implicit ev: Out =:= Record): Unit =
1033+
writeParquetJava(url, overwrite, preferDataset)
10351034

10361035
/**
10371036
* Write the data quanta in this instance to a text file. Triggers execution.

wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/JavaPlanBuilder.scala

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -69,18 +69,22 @@ class JavaPlanBuilder(wayangCtx: WayangContext, jobName: String) {
6969
* @param projection the projection, if any
7070
* @return [[DataQuantaBuilder]] for the file
7171
*/
72-
def readParquet(url: String, projection: Array[String] = null): UnarySourceDataQuantaBuilder[UnarySourceDataQuantaBuilder[_, Record], Record] =
73-
createSourceBuilder(ParquetSource.create(url, projection))(ClassTag(classOf[Record]))
72+
def readParquet(url: String,
73+
projection: Array[String] = null): UnarySourceDataQuantaBuilder[UnarySourceDataQuantaBuilder[_, Record], Record] =
74+
readParquet(url, projection, preferDataset = false)
7475

7576
/**
76-
* Read a parquet file and provide it as a dataset of [[Record]]s backed by Spark Datasets.
77+
* Read a parquet file and optionally keep it backed by Spark Datasets.
7778
*
7879
* @param url the URL of the Parquet file
7980
* @param projection the projection, if any
81+
* @param preferDataset when {@code true}, emit a Dataset-backed channel
8082
* @return [[DataQuantaBuilder]] for the file
8183
*/
82-
def readParquetAsDataset(url: String, projection: Array[String] = null): UnarySourceDataQuantaBuilder[UnarySourceDataQuantaBuilder[_, Record], Record] =
83-
createSourceBuilder(ParquetSource.create(url, projection).preferDatasetOutput(true))(ClassTag(classOf[Record]))
84+
def readParquet(url: String,
85+
projection: Array[String],
86+
preferDataset: Boolean): UnarySourceDataQuantaBuilder[UnarySourceDataQuantaBuilder[_, Record], Record] =
87+
createSourceBuilder(ParquetSource.create(url, projection).preferDatasetOutput(preferDataset))(ClassTag(classOf[Record]))
8488

8589
/**
8690
* Read a text file from a Google Cloud Storage bucket and provide it as a dataset of [[String]]s, one per line.

wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/PlanBuilder.scala

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -136,19 +136,13 @@ class PlanBuilder(private[api] val wayangContext: WayangContext, private var job
136136
*
137137
* @param url the URL of the Parquet file
138138
* @param projection the projection, if any
139+
* @param preferDataset when {@code true}, keep the resulting channel backed by Spark Datasets
139140
* @return [[DataQuanta]] of [[Record]]s representing the file
140141
*/
141-
def readParquet(url: String, projection: Array[String] = null): DataQuanta[Record] = load(ParquetSource.create(url, projection))
142-
143-
/**
144-
* Read a parquet file and keep it backed by a Spark Dataset throughout execution.
145-
*
146-
* @param url the URL of the Parquet file
147-
* @param projection the projection, if any
148-
* @return [[DataQuanta]] of [[Record]]s backed by a Spark Dataset when executed on Spark
149-
*/
150-
def readParquetAsDataset(url: String, projection: Array[String] = null): DataQuanta[Record] =
151-
load(ParquetSource.create(url, projection).preferDatasetOutput(true))
142+
def readParquet(url: String,
143+
projection: Array[String] = null,
144+
preferDataset: Boolean = false): DataQuanta[Record] =
145+
load(ParquetSource.create(url, projection).preferDatasetOutput(preferDataset))
152146

153147
/**
154148
* Read a text file from a Google Cloud Storage bucket and provide it as a dataset of [[String]]s, one per line.

0 commit comments

Comments
 (0)