Source code for scribe_data.cli.convert.to_csv_or_tsv

# SPDX-License-Identifier: GPL-3.0-or-later
"""
Functions to convert data returned from the Scribe-Data CLI to CSV or TSV files.
"""

import csv
import json
from pathlib import Path

from scribe_data.utils import (
    DEFAULT_CSV_EXPORT_DIR,
    DEFAULT_JSON_EXPORT_DIR,
    DEFAULT_TSV_EXPORT_DIR,
    camel_to_snake,
    check_index_exists,
)

# MARK: CSV or TSV


[docs] def convert_to_csv_or_tsv( language: str, data_types: str | list[str], input_file: Path, output_dir: Path, output_type: str, overwrite: bool = False, identifier_case: str = "camel", ) -> None: """ Convert a JSON File to CSV/TSV file. Parameters ---------- language : str The language of the file to convert. data_types : Union[str, List[str]] The data type of the file to convert. input_file : Path The input JSON file path. output_dir : Path The output directory path for results. output_type : str The output format, should be "csv" or "tsv". overwrite : bool Whether to overwrite existing files. identifier_case : str The case format for identifiers. Default is "camel". Returns ------- None A CSV/TSV files. """ if not language: raise ValueError(f"Language '{language.capitalize()}' is not recognized.") data_types = [data_types] if isinstance(data_types, str) else data_types # Modify input file path to use the provided input_file or default JSON export path. input_file_path = ( input_file or DEFAULT_JSON_EXPORT_DIR / language.lower() / f"{data_types[0]}.json" ) for dtype in data_types: if not input_file_path.exists(): print(f"No data found for {dtype} conversion at '{input_file_path}'.") continue try: with input_file_path.open("r", encoding="utf-8") as f: data = json.load(f) except (IOError, json.JSONDecodeError) as e: print(f"Error reading '{input_file_path}': {e}") continue # Determine the delimiter based on output type. delimiter = "," if output_type == "csv" else "\t" if output_dir is None: output_dir = ( DEFAULT_CSV_EXPORT_DIR if output_type == "csv" else DEFAULT_TSV_EXPORT_DIR ) final_output_dir = output_dir / language.capitalize() final_output_dir.mkdir(parents=True, exist_ok=True) output_file = final_output_dir / f"{dtype}.{output_type}" if check_index_exists(output_file, overwrite): print(f"Skipping {dtype}") continue try: with output_file.open("w", newline="", encoding="utf-8") as file: writer = csv.writer(file, delimiter=delimiter) # Handle different JSON structures based on the format. if isinstance(data, dict): first_key = list(data.keys())[0] first_val = next(iter(data.values())) if data else None if isinstance(first_val, dict): # Handle case: { key: { value1: ..., value2: ... } }. columns = sorted(first_val.keys()) header = [ camel_to_snake(dtype[:-1]) if identifier_case == "snake" else dtype[:-1] ] header += [ camel_to_snake(col) if identifier_case == "snake" else col for col in columns ] writer.writerow(header) for key, value in data.items(): row = [key] + [value.get(col, "") for col in columns] writer.writerow(row) elif isinstance(data[first_key], list): if all(isinstance(item, dict) for item in data[first_key]): # Handle case: { key: [ { value1: ..., value2: ... } ] }. if "emoji" in data[first_key][0]: # emoji specific case columns = ["word", "emoji", "is_base", "rank"] writer.writerow( [camel_to_snake(col) for col in columns] if identifier_case == "snake" else columns ) for key, value in data.items(): for item in value: row = [ key, item.get("emoji", ""), item.get("is_base", ""), item.get("rank", ""), ] writer.writerow(row) else: if identifier_case == "snake": columns = [camel_to_snake(dtype[:-1])] + [ camel_to_snake(col) for col in data[first_key][0].keys() ] else: columns = [dtype[:-1]] + list( data[first_key][0].keys() ) writer.writerow(columns) for key, value in data.items(): for item in value: row = [key] + [ item.get(col, "") for col in columns[1:] ] writer.writerow(row) elif all(isinstance(item, str) for item in data[first_key]): # Handle case: { key: [value1, value2, ...] }. header = [ camel_to_snake(dtype[:-1]) if identifier_case == "snake" else dtype[:-1] ] header += [ f"autosuggestion_{i + 1}" for i in range(len(data[first_key])) ] writer.writerow(header) for key, value in data.items(): row = [key] + value writer.writerow(row) else: # Handle case: { key: value }. writer.writerow( [ camel_to_snake(dtype[:-1]) if identifier_case == "snake" else dtype[:-1], "value", ] ) for key, value in data.items(): writer.writerow([key, value]) except IOError as e: print(f"Error writing to '{output_file}': {e}") continue print(f"Data for {language.capitalize()} {dtype} written to '{output_file}'")