class PickColumnModule(KiaraModule):
"""Pick one column from a table, returning an array."""
_module_type_name = "tables.pick.table"
_config_cls = PickTablesModuleConfig
@classmethod
def retrieve_included_operations(cls):
return {"tables.pick.column": {"module_config": {"pick_type": "column"}}}
def create_inputs_schema(
self,
) -> ValueMapSchema:
inputs: Dict[str, Any] = {
"tables": {"type": "tables", "doc": "A tables instance."}
}
pick_type = self.get_config_value("pick_type")
table_name = self.get_config_value("table_name")
if not table_name:
inputs["table_name"] = {
"type": "string",
"doc": "The name of the table to pick.",
}
if pick_type == "column":
column_name = self.get_config_value("column_name")
if not column_name:
inputs["column_name"] = {
"type": "string",
"doc": "The name of the column to pick.",
}
return inputs
def create_outputs_schema(
self,
) -> ValueMapSchema:
pick_type = self.get_config_value("pick_type")
if pick_type == "table":
outputs: Mapping[str, Any] = {
"table": {"type": "table", "doc": "The table."}
}
elif pick_type == "column":
outputs = {"array": {"type": "array", "doc": "The column."}}
else:
# should never happen since validation would pick that up
raise KiaraException(f"Invalid pick type: {pick_type}")
return outputs
def process(self, inputs: ValueMap, outputs: ValueMap) -> None:
import pyarrow as pa
pick_type: Literal["table", "column"] = self.get_config_value("pick_type")
table_name: Union[str, None] = self.get_config_value("table_name")
if not table_name:
table_name = inputs.get_value_data("table_name")
tables_value: Value = inputs.get_value_obj("tables")
tables: KiaraTables = tables_value.data
if table_name not in tables.table_names:
raise KiaraProcessingException(
f"Invalid table name '{table_name}'. Available table names: {', '.join(tables.table_names)}"
)
table_value = tables.get_table(table_name)
if pick_type == "table":
outputs.set_value("table", table_value)
return
column_name: Union[str, None] = self.get_config_value("column_name")
if not column_name:
column_name = inputs.get_value_data("column_name")
if not column_name:
raise KiaraProcessingException(
"Could not cut column from table: column_name not provided or empty string."
)
if column_name not in table_value.column_names:
raise KiaraProcessingException(
f"Invalid column name '{column_name}'. Available column names: {', '.join(table_value.column_names)}"
)
table: pa.Table = table_value.arrow_table
column = table.column(column_name)
outputs.set_value("array", column)