Source code for scribe_data.cli.convert.to_json

# SPDX-License-Identifier: GPL-3.0-or-later
"""
Functions to convert data returned from the Scribe-Data CLI to JSON files.
"""

import csv
import json
from pathlib import Path

from scribe_data.utils import (
    DEFAULT_JSON_EXPORT_DIR,
    camel_to_snake,
    check_index_exists,
)

# MARK: JSON


[docs] def convert_to_json( language: str, data_types: str | list[str] | None, input_file: Path, output_dir: Path, output_type: str, overwrite: bool = False, identifier_case: str = "camel", ) -> None: """ Convert a CSV/TSV file to JSON. Parameters ---------- language : str The language of the file to convert. data_types : Union[str, List[str]] The data type of the file to convert. input_file : Path The input CSV/TSV file path. output_dir : Path The output directory path for results. output_type : str The output format, should be "json". overwrite : bool Whether to overwrite existing files. identifier_case : str The case format for identifiers. Default is "camel". Returns ------- None A JSON file. """ if not language: raise ValueError(f"Language '{language.capitalize()}' is not recognized.") data_types = [data_types] if isinstance(data_types, str) else data_types if not data_types: return if output_dir is None: output_dir = DEFAULT_JSON_EXPORT_DIR json_output_dir = Path(output_dir) / language.capitalize() json_output_dir.mkdir(parents=True, exist_ok=True) for dtype in data_types: if not input_file.exists(): print(f"No data found for {dtype} conversion at '{input_file}'.") continue delimiter = {".csv": ",", ".tsv": "\t"}.get(input_file.suffix.lower()) if not delimiter: raise ValueError( f"Unsupported file extension '{input_file.suffix}' for {str(input_file)}. Please provide a '.csv' or '.tsv' file." ) try: with input_file.open("r", encoding="utf-8") as file: reader = csv.DictReader(file, delimiter=delimiter) rows = list(reader) if not rows: print(f"No data found in '{input_file}'.") continue # Use the first row to inspect column headers. first_row = rows[0] keys = list(first_row.keys()) data: dict = {} if len(keys) == 1: # Handle Case: { key: None }. for row in rows: data[row[keys[0]]] = None elif len(keys) == 2: # Handle Case: { key: value }. for row in rows: key = ( camel_to_snake(row[keys[0]]) if identifier_case == "snake" else row[keys[0]] ) value = row[keys[1]] data[key] = value elif len(keys) > 2: if all(col in first_row for col in ["emoji", "is_base", "rank"]): # Handle Case: { key: [ { emoji: ..., is_base: ..., rank: ... }, { emoji: ..., is_base: ..., rank: ... } ] }. for row in rows: if reader.fieldnames and len(reader.fieldnames) > 0: if identifier_case == "snake": raw_value = row.get(reader.fieldnames[0]) key = camel_to_snake(raw_value or "") else: key = row.get(reader.fieldnames[0]) emoji = row.get("emoji", "").strip() is_base = ( row.get("is_base", "false").strip().lower() == "true" ) rank = row.get("rank", None) rank = int(rank) if rank and rank.isdigit() else None entry = {"emoji": emoji, "is_base": is_base, "rank": rank} if key is None: continue data.setdefault(key, []).append(entry) else: # Handle Case: { key: { value1: ..., value2: ... } }. for row in rows: data[row[keys[0]]] = { ( camel_to_snake(k) if identifier_case == "snake" else k ): row[k] for k in keys[1:] } except (IOError, csv.Error) as e: print(f"Error reading '{input_file}': {e}") continue # Define output file path output_file = json_output_dir / f"{dtype}.{output_type}" if check_index_exists(output_file, overwrite): print(f"Skipping {dtype}") continue try: with output_file.open("w", encoding="utf-8") as file: json.dump(data, file, ensure_ascii=False, indent=2) except IOError as e: print(f"Error writing to '{output_file}': {e}") continue print(f"Data for {language.capitalize()} {dtype} written to {output_file}")