feat: add __arrow_c_stream__ function#11338
Conversation
bc5c796 to
e5e19d6
Compare
| if not values.flags.c_contiguous: | ||
| values = np.ascontiguousarray(values) |
There was a problem hiding this comment.
I think we can only use values.ravel down there to ensure contiguous array.
|
Thought For It will be sparse if dataarrays does not have the same coords, but that's another PR altogether. Edit:Well that worked better than I thought it would: >>> pl.DataFrame(ds)
shape: (694_080, 7)
┌───────┬───────┬──────────┬───────────┬──────────┬──────────┬──────────┐
│ month ┆ level ┆ latitude ┆ longitude ┆ z ┆ u ┆ v │
│ --- ┆ --- ┆ --- ┆ --- ┆ --- ┆ --- ┆ --- │
│ i32 ┆ i32 ┆ f32 ┆ f32 ┆ f64 ┆ f64 ┆ f64 │
╞═══════╪═══════╪══════════╪═══════════╪══════════╪══════════╪══════════╡
│ 1 ┆ 850 ┆ -60.0 ┆ 60.0 ┆ 11536.64 ┆ 4.968181 ┆ 0.351744 │
│ ┆ ┆ ┆ ┆ 4639 ┆ ┆ │
│ 1 ┆ 850 ┆ -60.0 ┆ 60.75 ┆ 11534.91 ┆ 5.015362 ┆ 0.359389 │
│ ┆ ┆ ┆ ┆ 9612 ┆ ┆ │
│ 1 ┆ 850 ┆ -60.0 ┆ 61.5 ┆ 11531.46 ┆ 5.015362 ┆ 0.367034 │
│ ┆ ┆ ┆ ┆ 9557 ┆ ┆ │
│ 1 ┆ 850 ┆ -60.0 ┆ 62.25 ┆ 11529.74 ┆ 5.046816 ┆ 0.359389 │
│ ┆ ┆ ┆ ┆ 4529 ┆ ┆ │
│ 1 ┆ 850 ┆ -60.0 ┆ 63.0 ┆ 11528.01 ┆ 5.046816 ┆ 0.328331 │
│ ┆ ┆ ┆ ┆ 9502 ┆ ┆ │
│ … ┆ … ┆ … ┆ … ┆ … ┆ … ┆ … │
│ 7 ┆ 500 ┆ -57.0 ┆ 8.25 ┆ 49749.45 ┆ 17.24943 ┆ 0.429629 │
│ ┆ ┆ ┆ ┆ 3099 ┆ 3 ┆ │
│ 7 ┆ 500 ┆ -57.0 ┆ 9.0 ┆ 49747.72 ┆ 17.24943 ┆ 0.375157 │
│ ┆ ┆ ┆ ┆ 8072 ┆ 3 ┆ │
│ 7 ┆ 500 ┆ -57.0 ┆ 9.75 ┆ 49746.00 ┆ 17.37525 ┆ 0.328331 │
│ ┆ ┆ ┆ ┆ 3044 ┆ ┆ │
│ 7 ┆ 500 ┆ -57.0 ┆ 10.5 ┆ 49744.27 ┆ 17.37525 ┆ 0.281027 │
│ ┆ ┆ ┆ ┆ 8017 ┆ ┆ │
│ 7 ┆ 500 ┆ -57.0 ┆ 11.25 ┆ 49742.55 ┆ 17.56240 ┆ 0.2342 │
│ ┆ ┆ ┆ ┆ 2989 ┆ 2 ┆ │
└───────┴───────┴──────────┴───────────┴──────────┴──────────┴──────────┘Will make an another PR if this one get merged. |
|
I am going to need guidance to add the polars dependency to pixi, there are quite a lot of groups and I am not very familiar with what is used for the CI testing so that we can have some testing or we only support the pyarrow conversion and we do not test the |
Add pyarrow capsule method to quickly convert datarray to polars The function is mostly zero copy, only the coordinates grid need to be computed
6efce04 to
276c394
Compare
|
I think the implementation is near ready here, the naive implem was not working with coordinates with mutliple dimensions, I fixed that with manual broadcasting which is mostly views at memory level until copy is needed for coordinates values. On a small benchmark, the conversion to polars is around 10x faster than a bare: df = pl.DataFrame(da.to_dataframe().reset_index()) |
| # Order axes based on Variable dims | ||
| dim_order = [coord.dims.index(dim) for dim in dims if dim in coord.dims] | ||
|
|
||
| # Reorder coords values to variable dim order | ||
| ordered_coords = coord.values.transpose(dim_order) | ||
|
|
||
| # Expand coord dims | ||
| # coord dims (x, y) variable dims (x,y,z) -> (x, y, 1) | ||
| # NOTE: Insert a length-1 axis for each data dim missing for coordinates | ||
| # (slice(None) keeps an existing axis, np.newaxis adds one) | ||
| indexer = tuple( | ||
| slice(None) if dim in coord.dims else np.newaxis for dim in dims | ||
| ) | ||
| expanded_coords = ordered_coords[indexer] | ||
|
|
||
| # Broadcast to full flattened shape (x, y, 1) -> (x, y, z) | ||
| broadcasted = np.broadcast_to(expanded_coords, shape) |
There was a problem hiding this comment.
can we share this with the to_dataframe path? alternatively, can we rewrite the to_dataframe path to use this function? (that would be a different PR)
There was a problem hiding this comment.
I am a bit fearful of using this for now for theto_dataframe function, the path seems similar but the to Dataframe is using dataset conversion and then conversion to dataframe converting the coords to pandas MultiIndex.
I think another PR is the way to go, and I'm not sure we can factor everything here.
|
|
||
| @requires_dask | ||
| @requires_pyarrow | ||
| def test_dask_dataarray(self): |
There was a problem hiding this comment.
what is the intent here? Does it load to memory?
There was a problem hiding this comment.
I just wanted to have a test for dask too, to ensure everything works as intented.
Also in a following PR we can implement chunking with RecordBatch for streaming dask chunks as RecordBatch for native arrow streaming.
I can remove it, if you find it out of scope.
There was a problem hiding this comment.
Yes for now it loads into memory with the casting to numpy values and then conversion to pyarrow array before constructing the table with it.
There was a problem hiding this comment.
for now it loads into memory
We should error and ask the user to compute instead. Xarray's does not compute by default.
There was a problem hiding this comment.
You want explicit compute that's right ? to_dataframe also load data when using a dask backed-DataArray:
import dask.array as da
import pyarrow as pa
dask_da = xr.DataArray(
da.from_array(np.arange(6, dtype=float).reshape(2, 3)),
dims=["x", "y"],
coords={"x": [0, 1], "y": [10, 20, 30]},
name="data",
)
print(dask_da.to_dataframe())I can raise an error if you want.
There was a problem hiding this comment.
yep I prefer the error. That behaviour seems like a bug and we should deprecate it
Co-authored-by: Deepak Cherian <dcherian@users.noreply.github.com>
Description
Add pyarrow capsule method to quickly convert datarray to polars
The function is mostly zero copy, only the coordinates grid need to be computed.
I wanted to implement the
__arrow_c_array__function to return a fixed_shape_tensor but somehow polars prioritize this over__arrow_c_stream__method.So for convenience I leave this here for now.
Feel free to close this PR and discuss this further in a dedicated issue if you want.
We can go one step further to save memory with
pa.DictionaryArrayto use the indice encoding that pyarrow supports out of the box we just need to create the indices using numpy before.This enable :
Checklist
.to_polars_df()method (very similar to.to_dataframe(), which implicitly uses pandas) #10135whats-new.rstapi.rstAI Disclosure