class CreateTablesModule(CreateFromModule):
_module_type_name = "create.tables"
_config_cls = CreateTablesModuleConfig
def create__tables__from__file_bundle(
self, source_value: Value, job_log: JobLog
) -> Any:
"""Create a database from a file_bundle value.
Currently, only csv files are supported, files in the source file_bundle that have different extensions will be ignored.
Unless 'merge_into_single_table' is set to 'True' in the module configuration, each csv file will create one table
in the resulting database. If this option is set, only a single table with all the values of all
csv files will be created. For this to work, all csv files should follow the same schema.
"""
from pyarrow import csv as pa_csv
include_raw_content_in_file_info: Union[bool, None] = self.get_config_value(
"include_source_metadata"
)
tables = {}
bundle: KiaraFileBundle = source_value.data
table_names: List[str] = []
included_files: Dict[str, bool] = {}
errors: Dict[str, Union[None, str]] = {}
for rel_path in sorted(bundle.included_files.keys()):
if not rel_path.endswith(".csv"):
job_log.add_log(
f"Ignoring file (not csv): {rel_path}", log_level=logging.INFO
)
included_files[rel_path] = False
errors[rel_path] = "Not a csv file."
continue
file_item = bundle.included_files[rel_path]
table_name = find_free_id(
stem=file_item.file_name_without_extension, current_ids=table_names
)
try:
table_names.append(table_name)
table = pa_csv.read_csv(file_item.path)
tables[table_name] = table
included_files[rel_path] = True
except Exception as e:
included_files[rel_path] = False
errors[rel_path] = KiaraException.get_root_details(e)
if self.get_config_value("ignore_errors") is True or True:
log_message("ignore.import_file", file=rel_path, reason=str(e))
continue
raise KiaraProcessingException(e)
if include_raw_content_in_file_info in [None, True]:
include_content: bool = self.get_config_value("include_source_file_content")
if "file_items" in tables:
raise KiaraProcessingException(
"Can't create table: 'file_items' columns already exists."
)
table = create_table_from_file_bundle(
file_bundle=source_value.data,
include_content=include_content,
included_files=included_files,
errors=errors,
)
tables["file_items"] = table
return tables