class CreateTableModule(CreateFromModule):
_module_type_name = "create.table"
_config_cls = CreateTableModuleConfig
def create_optional_inputs(
self, source_type: str, target_type
) -> Union[Mapping[str, Mapping[str, Any]], None]:
if source_type == "file":
return {
"first_row_is_header": {
"type": "boolean",
"optional": True,
"doc": "Whether the first row of a (csv) file is a header row. If not provided, kiara will try to auto-determine. Ignored if not a csv file.",
}
}
return None
def create__table__from__file(self, source_value: Value, optional: ValueMap) -> Any:
"""Create a table from a file, trying to auto-determine the format of said file.
Currently supported input file types:
- csv
- parquet
"""
input_file: KiaraFile = source_value.data
if input_file.file_name.endswith(".csv"):
return self.import_csv_file(source_value, optional)
elif input_file.file_name.endswith(".parquet"):
return self.import_parquet_file(source_value, optional)
def import_parquet_file(
self, source_value: Value, optional: ValueMap
) -> KiaraTable:
"""Create a table from a parquet file value."""
import pyarrow.parquet as pq
# TODO: use memory mapping to optimize memory usage?
input_file: KiaraFile = source_value.data
imported_data = None
errors = []
try:
imported_data = pq.read_table(input_file.path)
except Exception as e:
errors.append(e)
if imported_data is None:
raise KiaraProcessingException(
f"Failed to import parquet file '{input_file.path}'."
)
return KiaraTable.create_table(imported_data)
def import_csv_file(self, source_value: Value, optional: ValueMap) -> KiaraTable:
"""Create a table from a csv file value."""
import csv as py_csv
from pyarrow import csv
input_file: KiaraFile = source_value.data
imported_data = None
errors = []
has_header = optional.get_value_data("first_row_is_header")
if has_header is None:
try:
has_header = True
with open(input_file.path, "rt") as csvfile:
sniffer = py_csv.Sniffer()
has_header = sniffer.has_header(csvfile.read(2048))
csvfile.seek(0)
except Exception as e:
# TODO: add this to the procss log
log_message(
"csv_sniffer.error",
file=input_file.path,
error=str(e),
details="assuming csv file has header",
)
try:
if has_header:
imported_data = csv.read_csv(input_file.path)
else:
read_options = csv.ReadOptions(autogenerate_column_names=True)
imported_data = csv.read_csv(input_file.path, read_options=read_options)
except Exception as e:
errors.append(e)
if imported_data is None:
raise KiaraProcessingException(
f"Failed to import csv file '{input_file.path}'."
)
# import pandas as pd
# df = pd.read_csv(input_file.path)
# imported_data = pa.Table.from_pandas(df)
return KiaraTable.create_table(imported_data)
# def create__table__from__csv_file(self, source_value: Value) -> Any:
# """Create a table from a csv_file value."""
#
# from pyarrow import csv
#
# input_file: FileModel = source_value.data
# imported_data = csv.read_csv(input_file.path)
#
# # import pandas as pd
# # df = pd.read_csv(input_file.path)
# # imported_data = pa.Table.from_pandas(df)
#
# return KiaraTable.create_table(imported_data)
def create__table__from__file_bundle(self, source_value: Value) -> Any:
"""Create a table value from a text file_bundle.
The resulting table will have (at a minimum) the following columns:
- id: an auto-assigned index
- rel_path: the relative path of the file (from the provided base path)
- content: the text file content
"""
import pyarrow as pa
bundle: KiaraFileBundle = source_value.data
columns = FILE_BUNDLE_IMPORT_AVAILABLE_COLUMNS
ignore_errors = self.get_config_value("ignore_errors")
file_dict = bundle.read_text_file_contents(ignore_errors=ignore_errors)
# TODO: use chunks to save on memory
tabular: Dict[str, List[Any]] = {}
for column in columns:
for index, rel_path in enumerate(sorted(file_dict.keys())):
if column == "content":
_value: Any = file_dict[rel_path]
elif column == "id":
_value = index
elif column == "rel_path":
_value = rel_path
else:
file_model = bundle.included_files[rel_path]
_value = getattr(file_model, column)
tabular.setdefault(column, []).append(_value)
table = pa.Table.from_pydict(tabular)
return KiaraTable.create_table(table)