py: support arrow_ipc format for adhoc queries#4226
Conversation
* also support case sensitive view names to listen to pipelines Signed-off-by: Abhinav Gyawali <22275402+abhizer@users.noreply.github.com>
Signed-off-by: feldera-bot <feldera-bot@feldera.com>
python/feldera/pipeline.py
Outdated
|
|
||
| return self.client.query_as_pyarrow(self.name, query) | ||
|
|
||
| def query_pylist(self, query: str) -> List[Mapping[str, Any]]: |
There was a problem hiding this comment.
it seems like given this return type suffers from the same problem as the json (aliased columns will be removed?)
There was a problem hiding this comment.
Yeah, it does. It doesn't suffer from it if we return pyarrow.Table.
There was a problem hiding this comment.
Ok probably not a good return type then, can we remove it?
Its stupid but unless we change our SQL to not accept this nonsense I dont see a good alternative cc @mihaibudiu
There was a problem hiding this comment.
maybe instead if str as key you can have a Column Type that hashes using unique identifiers/position in the select stmt but displays as the column name
There was a problem hiding this comment.
it should be a list, not a mapping
There was a problem hiding this comment.
It is mostly for convenience and works in most cases.
Should I remove it?
There was a problem hiding this comment.
no need to remove but either use a key that has a unique hash like the position or list of list
Signed-off-by: Abhinav Gyawali <22275402+abhizer@users.noreply.github.com>
mihaibudiu
left a comment
There was a problem hiding this comment.
I don't have any other comments
In general, the view should be treated as a list of lists instead of as a list of maps
|
Is this ready to merge? |
* instead of a `List[dict]` it now returns a list of rows, where each row is a list of (column_name, value) tuples. Signed-off-by: Abhinav Gyawali <22275402+abhizer@users.noreply.github.com>
|
@gz There is a problem with the arrow_ipc format, it seems to fail non deterministically. Failure: ❯ fda query --host http://localhost:8080 test_adhoc_query_pyarrow_same_alias_name 'SELECT T.x, S.x FROM T, S WHERE T.y = S.y' --format arrow_ipc
thread 'main' panicked at crates/fda/src/adhoc.rs:208:59:
called `Result::unwrap()` on an `Err` value: ParseError("Unable to get root as message: RangeOutOfBounds { range: 1703948..1703952, error_trace: ErrorTrace([]) }")
note: run with `RUST_BACKTRACE=1` environment variable to display a backtraceSuccess: ❯ fda query --host http://localhost:8080 test_adhoc_query_pyarrow_same_alias_name 'SELECT T.x, S.x FROM T, S WHERE T.y = S.y' --format arrow_ipc
+---+---+
| x | x |
+---+---+
| 4 | 6 |
| 1 | 5 |
+---+---+Similarly the python test Success: ❯ pytest . -k "test_adhoc_query_pylist_same_alias" -s
=========================================================================== test session starts ===========================================================================
platform linux -- Python 3.12.10, pytest-8.3.5, pluggy-1.5.0
rootdir: /home/abhizer/Development/feldera.git/py_arrowrpc/python
configfile: pyproject.toml
plugins: timeout-2.3.1
collected 51 items / 50 deselected / 1 selected
tests/test_pipeline_builder.py .
==================================================================== 1 passed, 50 deselected in 1.35s =====================================================================Failure: ❯ pytest . -k "test_adhoc_query_pylist_same_alias" -s
=========================================================================== test session starts ===========================================================================
platform linux -- Python 3.12.10, pytest-8.3.5, pluggy-1.5.0
rootdir: /home/abhizer/Development/feldera.git/py_arrowrpc/python
configfile: pyproject.toml
plugins: timeout-2.3.1
collected 51 items / 50 deselected / 1 selected
tests/test_pipeline_builder.py F
================================================================================ FAILURES =================================================================================
_______________________________________________________ TestPipelineBuilder.test_adhoc_query_pylist_same_alias_name _______________________________________________________
self = <tests.test_pipeline_builder.TestPipelineBuilder testMethod=test_adhoc_query_pylist_same_alias_name>
def test_adhoc_query_pylist_same_alias_name(self):
dataT = [{"x": 1, "y": 2}, {"x": 4, "y": 3}]
dataS = [{"x": 5, "y": 2}, {"x": 6, "y": 3}]
name = "test_adhoc_query_pyarrow_same_alias_name"
sql = """
CREATE TABLE T(x INT, y INT) with ('materialized' = 'true');
CREATE TABLE S(x INT, y INT) with ('materialized' = 'true');
"""
pipeline = PipelineBuilder(TEST_CLIENT, name, sql).create_or_replace()
pipeline.start()
pipeline.input_json("T", dataT)
pipeline.input_json("S", dataS)
> resp = pipeline.query_pylist("SELECT T.x, S.x FROM T, S WHERE T.y = S.y")
tests/test_pipeline_builder.py:1268:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
feldera/pipeline.py:753: in query_pylist
table = self.query_pyarrow(query)
feldera/pipeline.py:731: in query_pyarrow
return self.client.query_as_pyarrow(self.name, query)
feldera/rest/feldera_client.py:710: in query_as_pyarrow
with pyarrow.ipc.RecordBatchStreamReader(resp.raw) as reader:
.venv/lib/python3.12/site-packages/pyarrow/ipc.py:52: in __init__
self._open(source, options=options, memory_pool=memory_pool)
pyarrow/ipc.pxi:1006: in pyarrow.lib._RecordBatchStreamReader._open
???
pyarrow/error.pxi:155: in pyarrow.lib.pyarrow_internal_check_status
???
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
> ???
E OSError: Invalid flatbuffers message.
pyarrow/error.pxi:92: OSError
========================================================================= short test summary info =========================================================================
FAILED tests/test_pipeline_builder.py::TestPipelineBuilder::test_adhoc_query_pylist_same_alias_name - OSError: Invalid flatbuffers message.
==================================================================== 1 failed, 50 deselected in 1.33s =====================================================================Sometimes, it also fails with:
|
Signed-off-by: feldera-bot <feldera-bot@users.noreply.github.com>
|
can you reprodcue it reliably if you save the bytestream of a failed invocation in a file and try to parse it with e.g., pyarrow or rust arrow crates? also can you enable |
|
The same issue, along with the pattern of the error described: #4287 |
|
Can we make progress with this issue? |
|
We need either a backend fix for transport over HTTP, or python SDK and WebConsole to switch to websockets |
|
websockets sounds excessive. what kind of fix is needed for transport? |
|
There is a weird issue with serialization described here, idk if Gerd investigated it yet |
mythical-fred
left a comment
There was a problem hiding this comment.
The discussion on line 733 (dict vs. list return type for aliased columns) has been open since June 2025 with no resolution. The author's last response was nine months ago. The tests added here look good as a starting point, but the open question about whether to return a dict (with aliasing bug) or a list/typed Column needs to be resolved before this can land.
|
@abhizer Unaware of this PR I submitted this: #5814, which also adds support for pyarrow query format. Let's sync. I see three options:
|
Fixes: #3923